diff --git a/cdc_rsync/base/fake_socket.cc b/cdc_rsync/base/fake_socket.cc index cbd42be..94898d4 100644 --- a/cdc_rsync/base/fake_socket.cc +++ b/cdc_rsync/base/fake_socket.cc @@ -39,7 +39,8 @@ absl::Status FakeSocket::Receive(void* buffer, size_t size, *bytes_received = 0; std::unique_lock lock(data_mutex_); data_cv_.wait(lock, [this, size, allow_partial_read]() { - return allow_partial_read || data_.size() >= size || shutdown_; + size_t min_size = allow_partial_read ? 1 : size; + return data_.size() >= min_size || shutdown_; }); if (shutdown_) { return absl::UnavailableError("Pipe is shut down"); diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index 525890c..7d0f3af 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -779,9 +779,9 @@ absl::Status CdcRsyncClient::StopCompressionStream() { message_pump_.FlushOutgoingQueue(); message_pump_.RedirectOutput(nullptr); - // Flush compression stream and reset. - RETURN_IF_ERROR(compression_stream_->Flush(), - "Failed to flush compression stream"); + // Finish compression stream and reset. + RETURN_IF_ERROR(compression_stream_->Finish(), + "Failed to finish compression stream"); compression_stream_.reset(); // Wait for the server ack. This must be done before sending more data. diff --git a/cdc_rsync/zstd_stream.cc b/cdc_rsync/zstd_stream.cc index 7e0e61c..6460f99 100644 --- a/cdc_rsync/zstd_stream.cc +++ b/cdc_rsync/zstd_stream.cc @@ -27,22 +27,19 @@ namespace { // trigger a flush. This happens when files with no changes are diff'ed (this // produces very low volume data). Flushing prevents that the server gets stale // and becomes overwhelmed later. -constexpr absl::Duration kMinCompressPeriod = absl::Milliseconds(500); +constexpr absl::Duration kDefaultAutoFlushPeriod = absl::Milliseconds(500); } // namespace ZstdStream::ZstdStream(Socket* socket, int level, uint32_t num_threads) - : socket_(socket), cctx_(nullptr) { + : socket_(socket), + cctx_(nullptr), + auto_flush_period_(kDefaultAutoFlushPeriod) { status_ = WrapStatus(Initialize(level, num_threads), "Failed to initialize stream compressor"); } ZstdStream::~ZstdStream() { - if (cctx_) { - ZSTD_freeCCtx(cctx_); - cctx_ = nullptr; - } - { absl::MutexLock lock(&mutex_); shutdown_ = true; @@ -50,6 +47,11 @@ ZstdStream::~ZstdStream() { if (compressor_thread_.joinable()) { compressor_thread_.join(); } + + if (cctx_) { + ZSTD_freeCCtx(cctx_); + cctx_ = nullptr; + } } absl::Status ZstdStream::Write(const void* data, size_t size) { @@ -79,7 +81,7 @@ absl::Status ZstdStream::Write(const void* data, size_t size) { return absl::OkStatus(); } -absl::Status ZstdStream::Flush() { +absl::Status ZstdStream::Finish() { absl::MutexLock lock(&mutex_); if (!status_.ok()) return status_; @@ -134,7 +136,7 @@ void ZstdStream::ThreadCompressorMain() { in_buffer_.size() == in_buffer_.capacity(); }; bool flush = - !mutex_.AwaitWithTimeout(absl::Condition(&cond), kMinCompressPeriod); + !mutex_.AwaitWithTimeout(absl::Condition(&cond), auto_flush_period_); if (shutdown_) { return; } diff --git a/cdc_rsync/zstd_stream.h b/cdc_rsync/zstd_stream.h index a757847..dbe7658 100644 --- a/cdc_rsync/zstd_stream.h +++ b/cdc_rsync/zstd_stream.h @@ -36,8 +36,13 @@ class ZstdStream { // Sends the given |data| to the compressor. absl::Status Write(const void* data, size_t size) ABSL_LOCKS_EXCLUDED(mutex_); - // Flushes all remaining data and sends the compressed data to the socket. - absl::Status Flush() ABSL_LOCKS_EXCLUDED(mutex_); + // Finishes the stream and flushes all remaining data. + absl::Status Finish() ABSL_LOCKS_EXCLUDED(mutex_); + + // Flushes internal buffers if no new data is written for longer than this + // time. This makes sure that no data is stuck in the pipeline if no new input + // is available. Default is 500 ms. + void AutoFlushAfter(absl::Duration dur) { auto_flush_period_ = dur; } private: // Initializes the compressor and related data. @@ -58,6 +63,8 @@ class ZstdStream { bool last_chunk_sent_ ABSL_GUARDED_BY(mutex_) = false; absl::Status status_ ABSL_GUARDED_BY(mutex_); std::thread compressor_thread_; + + absl::Duration auto_flush_period_; }; } // namespace cdc_ft diff --git a/cdc_rsync/zstd_stream_test.cc b/cdc_rsync/zstd_stream_test.cc index 2a10f97..4657359 100644 --- a/cdc_rsync/zstd_stream_test.cc +++ b/cdc_rsync/zstd_stream_test.cc @@ -32,7 +32,7 @@ class ZstdStreamTest : public ::testing::Test { TEST_F(ZstdStreamTest, Small) { const std::string want = "Lorem ipsum gibberisulum foobarberis"; EXPECT_OK(cstream_.Write(want.data(), want.size())); - EXPECT_OK(cstream_.Flush()); + EXPECT_OK(cstream_.Finish()); Buffer buff(1024); size_t bytes_read; @@ -43,6 +43,52 @@ TEST_F(ZstdStreamTest, Small) { EXPECT_EQ(got, want); } +TEST_F(ZstdStreamTest, AutoFlushesAfterTimeout) { + const std::string want = "Lorem ipsum gibberisulum foobarberis"; + cstream_.AutoFlushAfter(absl::Milliseconds(10)); + EXPECT_OK(cstream_.Write(want.data(), want.size())); + // Note: No flush! cstream_ will compress and send the data after 10 ms. + + // Only read as much data as we have written, or else dstream_.Read() will + // expect more data. + Buffer buff(want.size()); + size_t bytes_read; + bool eof = false; + EXPECT_OK(dstream_.Read(buff.data(), buff.size(), &bytes_read, &eof)); + EXPECT_FALSE(eof); + std::string got(buff.data(), bytes_read); + EXPECT_EQ(got, want); +} + +// Regression test for an issue in UnzstdStream, where the reader tried to read +// from the socket even though the output data was still available in internal +// buffers. +TEST_F(ZstdStreamTest, DeliversOutputBeforeReadingNewData) { + const std::string want1 = "I want"; + const std::string want2 = "to eat cookies"; + cstream_.AutoFlushAfter(absl::Milliseconds(10)); + EXPECT_OK(cstream_.Write(want1.data(), want1.size())); + EXPECT_OK(cstream_.Write(want2.data(), want2.size())); + // Note: No flush! cstream_ will compress and send the data after 10 ms. + + Buffer buff1(want1.size()); + Buffer buff2(want2.size()); + size_t bytes_read1, bytes_read2; + bool eof1 = false, eof2 = false; + EXPECT_OK(dstream_.Read(buff1.data(), buff1.size(), &bytes_read1, &eof1)); + + // There was a bug in dstream_.Read(), where the method would first try to + // read new input before uncompressing data, even though the data was already + // present in internal buffers. + EXPECT_OK(dstream_.Read(buff2.data(), buff2.size(), &bytes_read2, &eof2)); + EXPECT_FALSE(eof1); + EXPECT_FALSE(eof2); + std::string got1(buff1.data(), bytes_read1); + std::string got2(buff2.data(), bytes_read2); + EXPECT_EQ(got1, want1); + EXPECT_EQ(got2, want2); +} + TEST_F(ZstdStreamTest, Large) { Buffer want(1024 * 1024 * 10 + 12345); constexpr uint64_t prime = 919393; @@ -55,7 +101,7 @@ TEST_F(ZstdStreamTest, Large) { size_t size = std::min(kChunkSize, want.size() - pos); EXPECT_OK(cstream_.Write(want.data() + pos, size)); } - EXPECT_OK(cstream_.Flush()); + EXPECT_OK(cstream_.Finish()); bool eof = false; Buffer buff(128 * 1024); diff --git a/cdc_rsync_server/unzstd_stream.cc b/cdc_rsync_server/unzstd_stream.cc index e744ea9..d77d51c 100644 --- a/cdc_rsync_server/unzstd_stream.cc +++ b/cdc_rsync_server/unzstd_stream.cc @@ -41,20 +41,6 @@ absl::Status UnzstdStream::Read(void* out_buffer, size_t out_size, ZSTD_outBuffer output = {out_buffer, out_size, 0}; while (output.pos < output.size && !*eof) { - if (input_.pos == input_.size) { - // Read more compressed input data. - // Allow partial reads since the stream could end any time. - size_t in_size; - absl::Status status = - socket_->Receive(in_buffer_.data(), in_buffer_.size(), - /*allow_partial_read=*/true, &in_size); - if (!status.ok()) { - return WrapStatus(status, "socket_->ReceiveEx() failed"); - } - input_.pos = 0; - input_.size = in_size; - } - // Decompress. size_t ret = ZSTD_decompressStream(dctx_, &output, &input_); if (ZSTD_isError(ret)) { @@ -67,6 +53,20 @@ absl::Status UnzstdStream::Read(void* out_buffer, size_t out_size, return MakeStatus("EOF with %u bytes input data available", input_.size - input_.pos); } + + if (input_.pos == input_.size && output.pos < output.size && !*eof) { + // Read more compressed input data. + // Allow partial reads since the stream could end any time. + size_t in_size; + absl::Status status = + socket_->Receive(in_buffer_.data(), in_buffer_.size(), + /*allow_partial_read=*/true, &in_size); + if (!status.ok()) { + return WrapStatus(status, "socket_->ReceiveEx() failed"); + } + input_.pos = 0; + input_.size = in_size; + } } // Output buffer is full or eof.