mirror of
https://github.com/nestriness/cdc-file-transfer.git
synced 2026-01-30 10:35:37 +02:00
[cdc_rsync] Fix issue in UnzstdStream (#59)
Fixes an issue in UnzstdStream where the Read() method always tries to read new input data if no input data is available, instead of first trying to uncompress. Since zstd maintains internal buffers, uncompression might succeed even without reading more input, so this is faster. This bug can lead to pipeline stalls in cdc_rsync.
This commit is contained in:
@@ -39,7 +39,8 @@ absl::Status FakeSocket::Receive(void* buffer, size_t size,
|
|||||||
*bytes_received = 0;
|
*bytes_received = 0;
|
||||||
std::unique_lock<std::mutex> lock(data_mutex_);
|
std::unique_lock<std::mutex> lock(data_mutex_);
|
||||||
data_cv_.wait(lock, [this, size, allow_partial_read]() {
|
data_cv_.wait(lock, [this, size, allow_partial_read]() {
|
||||||
return allow_partial_read || data_.size() >= size || shutdown_;
|
size_t min_size = allow_partial_read ? 1 : size;
|
||||||
|
return data_.size() >= min_size || shutdown_;
|
||||||
});
|
});
|
||||||
if (shutdown_) {
|
if (shutdown_) {
|
||||||
return absl::UnavailableError("Pipe is shut down");
|
return absl::UnavailableError("Pipe is shut down");
|
||||||
|
|||||||
@@ -779,9 +779,9 @@ absl::Status CdcRsyncClient::StopCompressionStream() {
|
|||||||
message_pump_.FlushOutgoingQueue();
|
message_pump_.FlushOutgoingQueue();
|
||||||
message_pump_.RedirectOutput(nullptr);
|
message_pump_.RedirectOutput(nullptr);
|
||||||
|
|
||||||
// Flush compression stream and reset.
|
// Finish compression stream and reset.
|
||||||
RETURN_IF_ERROR(compression_stream_->Flush(),
|
RETURN_IF_ERROR(compression_stream_->Finish(),
|
||||||
"Failed to flush compression stream");
|
"Failed to finish compression stream");
|
||||||
compression_stream_.reset();
|
compression_stream_.reset();
|
||||||
|
|
||||||
// Wait for the server ack. This must be done before sending more data.
|
// Wait for the server ack. This must be done before sending more data.
|
||||||
|
|||||||
@@ -27,22 +27,19 @@ namespace {
|
|||||||
// trigger a flush. This happens when files with no changes are diff'ed (this
|
// trigger a flush. This happens when files with no changes are diff'ed (this
|
||||||
// produces very low volume data). Flushing prevents that the server gets stale
|
// produces very low volume data). Flushing prevents that the server gets stale
|
||||||
// and becomes overwhelmed later.
|
// and becomes overwhelmed later.
|
||||||
constexpr absl::Duration kMinCompressPeriod = absl::Milliseconds(500);
|
constexpr absl::Duration kDefaultAutoFlushPeriod = absl::Milliseconds(500);
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
ZstdStream::ZstdStream(Socket* socket, int level, uint32_t num_threads)
|
ZstdStream::ZstdStream(Socket* socket, int level, uint32_t num_threads)
|
||||||
: socket_(socket), cctx_(nullptr) {
|
: socket_(socket),
|
||||||
|
cctx_(nullptr),
|
||||||
|
auto_flush_period_(kDefaultAutoFlushPeriod) {
|
||||||
status_ = WrapStatus(Initialize(level, num_threads),
|
status_ = WrapStatus(Initialize(level, num_threads),
|
||||||
"Failed to initialize stream compressor");
|
"Failed to initialize stream compressor");
|
||||||
}
|
}
|
||||||
|
|
||||||
ZstdStream::~ZstdStream() {
|
ZstdStream::~ZstdStream() {
|
||||||
if (cctx_) {
|
|
||||||
ZSTD_freeCCtx(cctx_);
|
|
||||||
cctx_ = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
absl::MutexLock lock(&mutex_);
|
absl::MutexLock lock(&mutex_);
|
||||||
shutdown_ = true;
|
shutdown_ = true;
|
||||||
@@ -50,6 +47,11 @@ ZstdStream::~ZstdStream() {
|
|||||||
if (compressor_thread_.joinable()) {
|
if (compressor_thread_.joinable()) {
|
||||||
compressor_thread_.join();
|
compressor_thread_.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cctx_) {
|
||||||
|
ZSTD_freeCCtx(cctx_);
|
||||||
|
cctx_ = nullptr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
absl::Status ZstdStream::Write(const void* data, size_t size) {
|
absl::Status ZstdStream::Write(const void* data, size_t size) {
|
||||||
@@ -79,7 +81,7 @@ absl::Status ZstdStream::Write(const void* data, size_t size) {
|
|||||||
return absl::OkStatus();
|
return absl::OkStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
absl::Status ZstdStream::Flush() {
|
absl::Status ZstdStream::Finish() {
|
||||||
absl::MutexLock lock(&mutex_);
|
absl::MutexLock lock(&mutex_);
|
||||||
if (!status_.ok()) return status_;
|
if (!status_.ok()) return status_;
|
||||||
|
|
||||||
@@ -134,7 +136,7 @@ void ZstdStream::ThreadCompressorMain() {
|
|||||||
in_buffer_.size() == in_buffer_.capacity();
|
in_buffer_.size() == in_buffer_.capacity();
|
||||||
};
|
};
|
||||||
bool flush =
|
bool flush =
|
||||||
!mutex_.AwaitWithTimeout(absl::Condition(&cond), kMinCompressPeriod);
|
!mutex_.AwaitWithTimeout(absl::Condition(&cond), auto_flush_period_);
|
||||||
if (shutdown_) {
|
if (shutdown_) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,8 +36,13 @@ class ZstdStream {
|
|||||||
// Sends the given |data| to the compressor.
|
// Sends the given |data| to the compressor.
|
||||||
absl::Status Write(const void* data, size_t size) ABSL_LOCKS_EXCLUDED(mutex_);
|
absl::Status Write(const void* data, size_t size) ABSL_LOCKS_EXCLUDED(mutex_);
|
||||||
|
|
||||||
// Flushes all remaining data and sends the compressed data to the socket.
|
// Finishes the stream and flushes all remaining data.
|
||||||
absl::Status Flush() ABSL_LOCKS_EXCLUDED(mutex_);
|
absl::Status Finish() ABSL_LOCKS_EXCLUDED(mutex_);
|
||||||
|
|
||||||
|
// Flushes internal buffers if no new data is written for longer than this
|
||||||
|
// time. This makes sure that no data is stuck in the pipeline if no new input
|
||||||
|
// is available. Default is 500 ms.
|
||||||
|
void AutoFlushAfter(absl::Duration dur) { auto_flush_period_ = dur; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Initializes the compressor and related data.
|
// Initializes the compressor and related data.
|
||||||
@@ -58,6 +63,8 @@ class ZstdStream {
|
|||||||
bool last_chunk_sent_ ABSL_GUARDED_BY(mutex_) = false;
|
bool last_chunk_sent_ ABSL_GUARDED_BY(mutex_) = false;
|
||||||
absl::Status status_ ABSL_GUARDED_BY(mutex_);
|
absl::Status status_ ABSL_GUARDED_BY(mutex_);
|
||||||
std::thread compressor_thread_;
|
std::thread compressor_thread_;
|
||||||
|
|
||||||
|
absl::Duration auto_flush_period_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace cdc_ft
|
} // namespace cdc_ft
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ class ZstdStreamTest : public ::testing::Test {
|
|||||||
TEST_F(ZstdStreamTest, Small) {
|
TEST_F(ZstdStreamTest, Small) {
|
||||||
const std::string want = "Lorem ipsum gibberisulum foobarberis";
|
const std::string want = "Lorem ipsum gibberisulum foobarberis";
|
||||||
EXPECT_OK(cstream_.Write(want.data(), want.size()));
|
EXPECT_OK(cstream_.Write(want.data(), want.size()));
|
||||||
EXPECT_OK(cstream_.Flush());
|
EXPECT_OK(cstream_.Finish());
|
||||||
|
|
||||||
Buffer buff(1024);
|
Buffer buff(1024);
|
||||||
size_t bytes_read;
|
size_t bytes_read;
|
||||||
@@ -43,6 +43,52 @@ TEST_F(ZstdStreamTest, Small) {
|
|||||||
EXPECT_EQ(got, want);
|
EXPECT_EQ(got, want);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ZstdStreamTest, AutoFlushesAfterTimeout) {
|
||||||
|
const std::string want = "Lorem ipsum gibberisulum foobarberis";
|
||||||
|
cstream_.AutoFlushAfter(absl::Milliseconds(10));
|
||||||
|
EXPECT_OK(cstream_.Write(want.data(), want.size()));
|
||||||
|
// Note: No flush! cstream_ will compress and send the data after 10 ms.
|
||||||
|
|
||||||
|
// Only read as much data as we have written, or else dstream_.Read() will
|
||||||
|
// expect more data.
|
||||||
|
Buffer buff(want.size());
|
||||||
|
size_t bytes_read;
|
||||||
|
bool eof = false;
|
||||||
|
EXPECT_OK(dstream_.Read(buff.data(), buff.size(), &bytes_read, &eof));
|
||||||
|
EXPECT_FALSE(eof);
|
||||||
|
std::string got(buff.data(), bytes_read);
|
||||||
|
EXPECT_EQ(got, want);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regression test for an issue in UnzstdStream, where the reader tried to read
|
||||||
|
// from the socket even though the output data was still available in internal
|
||||||
|
// buffers.
|
||||||
|
TEST_F(ZstdStreamTest, DeliversOutputBeforeReadingNewData) {
|
||||||
|
const std::string want1 = "I want";
|
||||||
|
const std::string want2 = "to eat cookies";
|
||||||
|
cstream_.AutoFlushAfter(absl::Milliseconds(10));
|
||||||
|
EXPECT_OK(cstream_.Write(want1.data(), want1.size()));
|
||||||
|
EXPECT_OK(cstream_.Write(want2.data(), want2.size()));
|
||||||
|
// Note: No flush! cstream_ will compress and send the data after 10 ms.
|
||||||
|
|
||||||
|
Buffer buff1(want1.size());
|
||||||
|
Buffer buff2(want2.size());
|
||||||
|
size_t bytes_read1, bytes_read2;
|
||||||
|
bool eof1 = false, eof2 = false;
|
||||||
|
EXPECT_OK(dstream_.Read(buff1.data(), buff1.size(), &bytes_read1, &eof1));
|
||||||
|
|
||||||
|
// There was a bug in dstream_.Read(), where the method would first try to
|
||||||
|
// read new input before uncompressing data, even though the data was already
|
||||||
|
// present in internal buffers.
|
||||||
|
EXPECT_OK(dstream_.Read(buff2.data(), buff2.size(), &bytes_read2, &eof2));
|
||||||
|
EXPECT_FALSE(eof1);
|
||||||
|
EXPECT_FALSE(eof2);
|
||||||
|
std::string got1(buff1.data(), bytes_read1);
|
||||||
|
std::string got2(buff2.data(), bytes_read2);
|
||||||
|
EXPECT_EQ(got1, want1);
|
||||||
|
EXPECT_EQ(got2, want2);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ZstdStreamTest, Large) {
|
TEST_F(ZstdStreamTest, Large) {
|
||||||
Buffer want(1024 * 1024 * 10 + 12345);
|
Buffer want(1024 * 1024 * 10 + 12345);
|
||||||
constexpr uint64_t prime = 919393;
|
constexpr uint64_t prime = 919393;
|
||||||
@@ -55,7 +101,7 @@ TEST_F(ZstdStreamTest, Large) {
|
|||||||
size_t size = std::min<size_t>(kChunkSize, want.size() - pos);
|
size_t size = std::min<size_t>(kChunkSize, want.size() - pos);
|
||||||
EXPECT_OK(cstream_.Write(want.data() + pos, size));
|
EXPECT_OK(cstream_.Write(want.data() + pos, size));
|
||||||
}
|
}
|
||||||
EXPECT_OK(cstream_.Flush());
|
EXPECT_OK(cstream_.Finish());
|
||||||
|
|
||||||
bool eof = false;
|
bool eof = false;
|
||||||
Buffer buff(128 * 1024);
|
Buffer buff(128 * 1024);
|
||||||
|
|||||||
@@ -41,20 +41,6 @@ absl::Status UnzstdStream::Read(void* out_buffer, size_t out_size,
|
|||||||
|
|
||||||
ZSTD_outBuffer output = {out_buffer, out_size, 0};
|
ZSTD_outBuffer output = {out_buffer, out_size, 0};
|
||||||
while (output.pos < output.size && !*eof) {
|
while (output.pos < output.size && !*eof) {
|
||||||
if (input_.pos == input_.size) {
|
|
||||||
// Read more compressed input data.
|
|
||||||
// Allow partial reads since the stream could end any time.
|
|
||||||
size_t in_size;
|
|
||||||
absl::Status status =
|
|
||||||
socket_->Receive(in_buffer_.data(), in_buffer_.size(),
|
|
||||||
/*allow_partial_read=*/true, &in_size);
|
|
||||||
if (!status.ok()) {
|
|
||||||
return WrapStatus(status, "socket_->ReceiveEx() failed");
|
|
||||||
}
|
|
||||||
input_.pos = 0;
|
|
||||||
input_.size = in_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decompress.
|
// Decompress.
|
||||||
size_t ret = ZSTD_decompressStream(dctx_, &output, &input_);
|
size_t ret = ZSTD_decompressStream(dctx_, &output, &input_);
|
||||||
if (ZSTD_isError(ret)) {
|
if (ZSTD_isError(ret)) {
|
||||||
@@ -67,6 +53,20 @@ absl::Status UnzstdStream::Read(void* out_buffer, size_t out_size,
|
|||||||
return MakeStatus("EOF with %u bytes input data available",
|
return MakeStatus("EOF with %u bytes input data available",
|
||||||
input_.size - input_.pos);
|
input_.size - input_.pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (input_.pos == input_.size && output.pos < output.size && !*eof) {
|
||||||
|
// Read more compressed input data.
|
||||||
|
// Allow partial reads since the stream could end any time.
|
||||||
|
size_t in_size;
|
||||||
|
absl::Status status =
|
||||||
|
socket_->Receive(in_buffer_.data(), in_buffer_.size(),
|
||||||
|
/*allow_partial_read=*/true, &in_size);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return WrapStatus(status, "socket_->ReceiveEx() failed");
|
||||||
|
}
|
||||||
|
input_.pos = 0;
|
||||||
|
input_.size = in_size;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Output buffer is full or eof.
|
// Output buffer is full or eof.
|
||||||
|
|||||||
Reference in New Issue
Block a user