From 5a909bb443cb5026ae7c4ea70c3af7f95971894d Mon Sep 17 00:00:00 2001 From: Lutz Justen Date: Tue, 31 Jan 2023 16:33:03 +0100 Subject: [PATCH] [cdc_rsync] Improve throughput for local copies (#74) On Windows, fclose() seems to be very expensive for large files, where closing a 1 GB file takes up to 5 seconds. This CL calls fclose() in background threads. This tremendously improves local syncs, e.g. copying a 4.5 GB, 300 files data set takes only 7 seconds instead of 30 seconds. Also increases the buffer size for copying from 16K to 128K (better throughput for local copies), and adds a timestamp to debug and verbose console logs (useful when comparing client and server logs). --- cdc_rsync/cdc_rsync_client.cc | 2 +- cdc_rsync_server/cdc_rsync_server.cc | 234 ++++++++++++++++----- common/BUILD | 1 + common/log.cc | 13 +- common/log.h | 2 + common/threadpool.cc | 38 +++- common/threadpool.h | 22 +- common/threadpool_test.cc | 32 +++ integration_tests/cdc_rsync/output_test.py | 4 +- 9 files changed, 275 insertions(+), 73 deletions(-) diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index c09f046..e7798b7 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -623,7 +623,7 @@ absl::Status CdcRsyncClient::SendMissingFiles() { ParallelFileOpener file_opener(&files_, missing_file_indices_); - constexpr size_t kBufferSize = 16000; + constexpr size_t kBufferSize = 128 * 1024; for (uint32_t server_index = 0; server_index < missing_file_indices_.size(); ++server_index) { uint32_t client_index = missing_file_indices_[server_index]; diff --git a/cdc_rsync_server/cdc_rsync_server.cc b/cdc_rsync_server/cdc_rsync_server.cc index c96f9ca..95d21c1 100644 --- a/cdc_rsync_server/cdc_rsync_server.cc +++ b/cdc_rsync_server/cdc_rsync_server.cc @@ -32,6 +32,13 @@ namespace cdc_ft { namespace { +// Number of files for which to call fclose() and finalize files in parallel. +constexpr size_t kNumFinalizerThreads = 8; + +// Max 16 files in the patcher and finalizer queues to prevent that too many +// files are open concurrently. +constexpr size_t kMaxQueueSize = 16; + // Suffix for the patched file created from the basis file and the diff. constexpr char kIntermediatePathSuffix[] = ".__cdc_rsync_temp__"; @@ -49,6 +56,9 @@ uint16_t kExecutableBits = // |target_filepath| match, writes an intermediate file and replaces // the file at |target_filepath| with the intermediate file when all data has // been received. +// Each PatchTask is queued twice, once to create the patched file, and once in +// a different thread pool to close and finalize the patched file. This is +// because fclose() can take a long time to finish, so it could block patching. class PatchTask : public Task { public: PatchTask(const std::string& base_filepath, @@ -57,38 +67,69 @@ class PatchTask : public Task { : base_filepath_(base_filepath), target_filepath_(target_filepath), file_(file), - cdc_(cdc) {} + cdc_(cdc), + need_intermediate_file_(target_filepath_ == base_filepath_), + patched_filepath_(target_filepath_ == base_filepath_ + ? base_filepath_ + kIntermediatePathSuffix + : target_filepath_) {} virtual ~PatchTask() = default; + PatchTask(const PatchTask& other) = delete; + PatchTask& operator=(const PatchTask& other) = delete; + const ChangedFileInfo& File() const { return file_; } const absl::Status& Status() const { return status_; } // Task: void ThreadRun(IsCancelledPredicate is_cancelled) override { - bool need_intermediate_file = target_filepath_ == base_filepath_; - std::string patched_filepath = - need_intermediate_file ? base_filepath_ + kIntermediatePathSuffix - : target_filepath_; + // Each PatchTask is queued twice, once to apply the patch and once to + // close and finalize the patched file. + switch (state_) { + case State::kPatching: + Patch(); + state_ = State::kFinalizing; + break; + case State::kFinalizing: + Finalize(); + state_ = State::kDone; + break; + default: + assert(!"Invalid state"); + } + } - absl::StatusOr patched_file = path::OpenFile(patched_filepath, "wb"); - if (!patched_file.ok()) { - status_ = patched_file.status(); + private: + void Patch() { + absl::StatusOr patched_fp = path::OpenFile(patched_filepath_, "wb"); + if (!patched_fp.ok()) { + status_ = patched_fp.status(); return; } + patched_fp_ = *patched_fp; // Receive diff stream from server and apply. - bool is_executable = false; - status_ = cdc_->ReceiveDiffAndPatch(base_filepath_, *patched_file, - &is_executable); - fclose(*patched_file); + status_ = + cdc_->ReceiveDiffAndPatch(base_filepath_, patched_fp_, &is_executable_); + + // The file is closed by Finalize() in a separate thread pool since fclose() + // takes a while on some systems. + } + + void Finalize() { + if (patched_fp_) { + fclose(patched_fp_); + patched_fp_ = nullptr; + } + if (!status_.ok()) { + // Some error occurred during Patch(). return; } // These bits are OR'ed on top of the mode bits. - uint16_t mode_or_bits = is_executable ? kExecutableBits : 0; + uint16_t mode_or_bits = is_executable_ ? kExecutableBits : 0; // Store mode from the original base path. path::Stats stats; @@ -99,13 +140,13 @@ class PatchTask : public Task { return; } - if (need_intermediate_file) { + if (need_intermediate_file_) { // Replace |base_filepath_| (==|target_filepath_|) by the intermediate // file |patched_filepath|. - status_ = path::ReplaceFile(target_filepath_, patched_filepath); + status_ = path::ReplaceFile(target_filepath_, patched_filepath_); if (!status_.ok()) { status_ = WrapStatus(status_, "ReplaceFile() for '%s' by '%s' failed", - base_filepath_, patched_filepath); + base_filepath_, patched_filepath_); return; } } else { @@ -126,11 +167,82 @@ class PatchTask : public Task { status_ = path::SetFileTime(target_filepath_, file_.client_modified_time); } + const std::string base_filepath_; + const std::string target_filepath_; + const ChangedFileInfo file_; + CdcInterface* const cdc_; + const bool need_intermediate_file_ = false; + const std::string patched_filepath_; + + FILE* patched_fp_ = nullptr; + bool is_executable_ = false; + absl::Status status_; + + // This task is queued twice, once to patch and once to close and finalize the + // patched file. + enum class State { kPatching, kFinalizing, kDone }; + State state_ = State::kPatching; +}; + +// Background task that closes a file and sets the mtime and perms. This is done +// in the background since closing a file might block for a long time. +class FinalizeCopiedFileTask : public Task { + public: + // Finalize |file| with given path |filepath|. |status| is the status from + // writing the file. On error, the file is only closed. + FinalizeCopiedFileTask(FILE* fp, FileInfo file, std::string filepath, + bool is_executable, absl::Status status) + : fp_(fp), + file_(std::move(file)), + filepath_(std::move(filepath)), + is_executable_(is_executable), + status_(status) {} + virtual ~FinalizeCopiedFileTask() = default; + + FinalizeCopiedFileTask(const FinalizeCopiedFileTask& other) = delete; + FinalizeCopiedFileTask& operator=(const FinalizeCopiedFileTask& other) = + delete; + + const absl::Status& Status() const { return status_; } + + // Task: + void ThreadRun(IsCancelledPredicate is_cancelled) override { + assert(fp_); + fclose(fp_); + + if (!status_.ok()) { + // Writing the file failed, nothing to finalize. + status_ = WrapStatus(status_, "Failed to write file %s", filepath_); + return; + } + + // Set file write time. + status_ = path::SetFileTime(filepath_, file_.modified_time); + if (!status_.ok()) { + status_ = + WrapStatus(status_, "Failed to set file mod time for %s", filepath_); + return; + } + + // Set executable bit, but just print warnings as it's not critical. + if (is_executable_) { + path::Stats stats; + status_ = path::GetStats(filepath_, &stats); + if (status_.ok()) { + status_ = path::ChangeMode(filepath_, stats.mode | kExecutableBits); + } + if (!status_.ok()) { + LOG_WARNING("Failed to set executable bit on '%s': %s", filepath_, + status_.ToString()); + } + } + } + private: - std::string base_filepath_; - std::string target_filepath_; - ChangedFileInfo file_; - CdcInterface* cdc_; + FILE* const fp_ = nullptr; + const FileInfo file_; + const std::string filepath_; + const bool is_executable_; absl::Status status_; }; @@ -550,6 +662,8 @@ absl::Status CdcRsyncServer::HandleSendMissingFileData() { } } + Threadpool finalize_pool(kNumFinalizerThreads); + for (uint32_t server_index = 0; server_index < diff_.missing_files.size(); server_index++) { const FileInfo& file = diff_.missing_files[server_index]; @@ -569,8 +683,8 @@ absl::Status CdcRsyncServer::HandleSendMissingFileData() { request.server_index(), server_index); } - // Verify that there is no directory existing with the same name. - if (path::Exists(filepath) && path::DirExists(filepath)) { + // Remove |filepath| if it is a directory. + if (path::DirExists(filepath)) { assert(!diff_.extraneous_dirs.empty()); status = path::RemoveFile(filepath); if (!status.ok()) { @@ -619,27 +733,25 @@ absl::Status CdcRsyncServer::HandleSendMissingFileData() { } status = path::StreamWriteFileContents(*fp, handler); - fclose(*fp); - if (!status.ok()) { - return WrapStatus(status, "Failed to write file %s", filepath); + finalize_pool.QueueTask(std::make_unique( + *fp, file, filepath, is_executable, status)); + finalize_pool.WaitForQueuedTasksAtMost(kMaxQueueSize); + + // Drain finalize pool for the last file. + if (server_index + 1 == diff_.missing_files.size()) { + finalize_pool.Wait(); } - // Set file write time. - status = path::SetFileTime(filepath, file.modified_time); - if (!status.ok()) { - return WrapStatus(status, "Failed to set file mod time for %s", filepath); - } - - // Set executable bit, but just print warnings as it's not critical. - if (is_executable) { - path::Stats stats; - status = path::GetStats(filepath, &stats); - if (status.ok()) { - status = path::ChangeMode(filepath, stats.mode | kExecutableBits); - } - if (!status.ok()) { - LOG_WARNING("Failed to set executable bit on '%s': %s", filepath, - status.ToString()); + // Check the results of completed tasks. + for (std::unique_ptr task = finalize_pool.TryGetCompletedTask(); + task != nullptr; task = finalize_pool.TryGetCompletedTask()) { + const FinalizeCopiedFileTask* finalize_task = + static_cast(task.get()); + if (!finalize_task->Status().ok()) { + // Close and finish files that have already been copied, so we don't + // discard several already copied files because one failed. + finalize_pool.Wait(); + return finalize_task->Status(); } } } @@ -678,11 +790,22 @@ absl::Status CdcRsyncServer::SyncChangedFiles() { CdcInterface cdc(message_pump_.get()); // Pipeline sending signatures and patching files: - // MAIN THREAD: Send signatures to client. - // Only sends to the socket. - // WORKER THREAD: Receive diffs from client and patch file. - // Only reads from the socket. - Threadpool pool(1); + // MAIN THREAD: Send signatures to client. + // Only sends to the socket. + // PATCHER THREAD: Receive diffs from client and create patch file. + // Only reads from the socket. + // FINALIZER THREADS: Close patched files and finalize them. + Threadpool patch_pool(1); + Threadpool finalize_pool(kNumFinalizerThreads); + + // Forward finished patch task immediately to finalize pool. + patch_pool.SetTaskCompletedCallback( + [&finalize_pool](std::unique_ptr task) { + // Spin if there are too many outstanding tasks, in order to limit the + // max number of outstanding tasks. + finalize_pool.QueueTask(std::move(task)); + finalize_pool.WaitForQueuedTasksAtMost(kMaxQueueSize); + }); for (uint32_t server_index = 0; server_index < diff_.changed_files.size(); server_index++) { @@ -708,25 +831,28 @@ absl::Status CdcRsyncServer::SyncChangedFiles() { } // Queue patching task. - pool.QueueTask(std::make_unique(base_filepath, target_filepath, - file, &cdc)); + patch_pool.QueueTask(std::make_unique( + base_filepath, target_filepath, file, &cdc)); - // Wait for the last file to finish. + // Drain pools for the last file. if (server_index + 1 == diff_.changed_files.size()) { - pool.Wait(); + patch_pool.Wait(); + finalize_pool.Wait(); } // Check the results of completed tasks. - std::unique_ptr task = pool.TryGetCompletedTask(); - while (task) { - PatchTask* patch_task = static_cast(task.get()); + for (std::unique_ptr task = finalize_pool.TryGetCompletedTask(); + task != nullptr; task = finalize_pool.TryGetCompletedTask()) { + const PatchTask* patch_task = static_cast(task.get()); const std::string& task_path = patch_task->File().filepath; if (!patch_task->Status().ok()) { + // Close and finish files that have already been synced, so we don't + // discard several already synced files because one failed. + finalize_pool.Wait(); return WrapStatus(patch_task->Status(), "Failed to patch file '%s'", task_path); } LOG_INFO("Finished patching file %s", task_path.c_str()); - task = pool.TryGetCompletedTask(); } } diff --git a/common/BUILD b/common/BUILD index 6a12a66..b9f39a3 100644 --- a/common/BUILD +++ b/common/BUILD @@ -158,6 +158,7 @@ cc_library( deps = [ ":clock", ":platform", + ":stopwatch", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/synchronization", ], diff --git a/common/log.cc b/common/log.cc index 360ef66..ac8c58b 100644 --- a/common/log.cc +++ b/common/log.cc @@ -126,21 +126,22 @@ void ConsoleLog::WriteLogMessage(LogLevel level, const char* file, int line, absl::MutexLock lock(&mutex_); // Show leaner log messages in non-verbose mode. - bool show_file_func = GetLogLevel() <= LogLevel::kDebug; + bool show_time_file_func = GetLogLevel() <= LogLevel::kDebug; FILE* stdfile = level >= LogLevel::kError ? stderr : stdout; #if PLATFORM_WINDOWS HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); SetConsoleTextAttribute(hConsole, GetConsoleColor(level)); - if (show_file_func) { - fprintf(stdfile, "%s(%i): %s(): %s\n", file, line, func, message); + if (show_time_file_func) { + fprintf(stdfile, "%0.3f %s(%i): %s(): %s\n", stopwatch_.ElapsedSeconds(), + file, line, func, message); } else { fprintf(stdfile, "%s\n", message); } SetConsoleTextAttribute(hConsole, kLightGray); #else - if (show_file_func) { - fprintf(stdfile, "%-7s %s(%i): %s(): %s\n", GetLogLevelString(level), file, - line, func, message); + if (show_time_file_func) { + fprintf(stdfile, "%-7s %0.3f %s(%i): %s(): %s\n", GetLogLevelString(level), + stopwatch_.ElapsedSeconds(), file, line, func, message); } else { fprintf(stdfile, "%-7s %s\n", GetLogLevelString(level), message); } diff --git a/common/log.h b/common/log.h index fc681df..d6d8637 100644 --- a/common/log.h +++ b/common/log.h @@ -22,6 +22,7 @@ #include "absl/strings/str_format.h" #include "absl/synchronization/mutex.h" #include "common/clock.h" +#include "common/stopwatch.h" namespace cdc_ft { @@ -120,6 +121,7 @@ class ConsoleLog : public Log { ABSL_LOCKS_EXCLUDED(mutex_); private: + Stopwatch stopwatch_; absl::Mutex mutex_; }; diff --git a/common/threadpool.cc b/common/threadpool.cc index 5c1acc8..535533e 100644 --- a/common/threadpool.cc +++ b/common/threadpool.cc @@ -45,6 +45,11 @@ void Threadpool::Shutdown() { for (auto& worker : workers_) { if (worker.joinable()) worker.join(); } + + // Discard all completed tasks. + absl::MutexLock lock(&completed_tasks_mutex_); + std::queue> empty; + std::swap(completed_tasks_, empty); } void Threadpool::QueueTask(std::unique_ptr task) { @@ -77,6 +82,21 @@ std::unique_ptr Threadpool::GetCompletedTask() { return task; } +void Threadpool::SetTaskCompletedCallback(TaskCompletedCallback cb) { + absl::MutexLock lock(&completed_tasks_mutex_); + on_task_completed_ = std::move(cb); +} + +bool Threadpool::WaitForQueuedTasksAtMost(size_t count, + absl::Duration timeout) const { + absl::MutexLock lock(&task_queue_mutex_); + auto cond = [this, count]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(task_queue_mutex_) { + return shutdown_ || outstanding_task_count_ <= count; + }; + return task_queue_mutex_.AwaitWithTimeout(absl::Condition(&cond), timeout) && + outstanding_task_count_ <= count; +} + void Threadpool::ThreadWorkerMain() { bool task_finished = false; for (;;) { @@ -85,7 +105,8 @@ void Threadpool::ThreadWorkerMain() { absl::MutexLock lock(&task_queue_mutex_); // Decrease task count here, so we don't have to lock again at the end of - // the loop. + // the loop. It is important to first push the task, then decrease this + // count. Otherwise, there's a race between Wait() and GetCompletedTask(). if (task_finished) { assert(outstanding_task_count_ > 0); --outstanding_task_count_; @@ -104,17 +125,18 @@ void Threadpool::ThreadWorkerMain() { } // Run task, but make it cancellable. - task->ThreadRun([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( - task_queue_mutex_) -> bool { return shutdown_; }); - - { + task->ThreadRun([this]() ABSL_LOCKS_EXCLUDED(task_queue_mutex_) -> bool { absl::MutexLock lock(&task_queue_mutex_); - if (shutdown_) break; - } + return shutdown_; + }); // Push task to completed queue. absl::MutexLock lock(&completed_tasks_mutex_); - completed_tasks_.push(std::move(task)); + if (on_task_completed_) { + on_task_completed_(std::move(task)); + } else { + completed_tasks_.push(std::move(task)); + } task_finished = true; } } diff --git a/common/threadpool.h b/common/threadpool.h index 27a0a61..2a9b2a2 100644 --- a/common/threadpool.h +++ b/common/threadpool.h @@ -18,7 +18,6 @@ #define COMMON_THREADPOOL_H_ #include -#include #include #include #include @@ -57,7 +56,8 @@ class Threadpool { void QueueTask(std::unique_ptr task) ABSL_LOCKS_EXCLUDED(task_queue_mutex_); - // If available, returns the next completed task. + // Returns the next completed task if available or nullptr all are either + // queued or in progress. // For a single worker thread (|num_threads| == 1), tasks are completed in // FIFO order. This is no longer the case for multiple threads // (|num_threads| > 1). Tasks that got queued later might complete first. @@ -71,6 +71,14 @@ class Threadpool { std::unique_ptr GetCompletedTask() ABSL_LOCKS_EXCLUDED(completed_tasks_mutex_); + using TaskCompletedCallback = std::function)>; + + // Set a callback that is called immediately in a background thread when a + // task is completed. The task will not be put onto the completed queue, so + // if this callback is set, do not call (Try)GetCompletedTask. + void SetTaskCompletedCallback(TaskCompletedCallback cb) + ABSL_LOCKS_EXCLUDED(completed_tasks_mutex_); + // Returns the total number of worker threads in the pool. size_t NumThreads() const { return workers_.size(); } @@ -80,6 +88,14 @@ class Threadpool { return outstanding_task_count_; } + // Block until the number of queued tasks drops below or equal to |count|, or + // until the timeout is exceeded, or until Shutdown() is called, whatever + // comes sooner. Returns true if less than or equal to |count| tasks are + // queued. + bool WaitForQueuedTasksAtMost( + size_t count, absl::Duration timeout = absl::InfiniteDuration()) const + ABSL_LOCKS_EXCLUDED(mutex_); + private: // Background thread worker method. Picks tasks and runs them. void ThreadWorkerMain() @@ -94,6 +110,8 @@ class Threadpool { absl::Mutex completed_tasks_mutex_; std::queue> completed_tasks_ ABSL_GUARDED_BY(completed_tasks_mutex_); + TaskCompletedCallback on_task_completed_ + ABSL_GUARDED_BY(completed_tasks_mutex_); std::vector workers_; }; diff --git a/common/threadpool_test.cc b/common/threadpool_test.cc index f0ce99a..5ef6140 100644 --- a/common/threadpool_test.cc +++ b/common/threadpool_test.cc @@ -151,5 +151,37 @@ TEST_F(ThreadpoolTest, GetCompletedTask) { EXPECT_EQ(completed_task.get(), task); } +TEST_F(ThreadpoolTest, SetTaskCompletedCallback) { + auto task_func = [](Task::IsCancelledPredicate) { /* empty */ }; + + Semaphore task_finished(0); + Threadpool pool(1); + std::atomic_bool finished = false; + pool.SetTaskCompletedCallback( + [&task_finished, &finished](std::unique_ptr task) { + finished = true; + task_finished.Signal(); + }); + pool.QueueTask(std::make_unique(task_func)); + task_finished.Wait(); + EXPECT_TRUE(finished); + EXPECT_FALSE(pool.TryGetCompletedTask()); +} + +TEST_F(ThreadpoolTest, WaitForQueuedTasksAtMost) { + Semaphore task_signal(0); + auto task_func = [&task_signal](Task::IsCancelledPredicate) { + task_signal.Wait(); + }; + Threadpool pool(1); + pool.QueueTask(std::make_unique(task_func)); + pool.QueueTask(std::make_unique(task_func)); + EXPECT_FALSE(pool.WaitForQueuedTasksAtMost(1, absl::Milliseconds(10))); + task_signal.Signal(); + EXPECT_TRUE(pool.WaitForQueuedTasksAtMost(1, absl::Milliseconds(5000))); + EXPECT_EQ(pool.NumQueuedTasks(), 1); + task_signal.Signal(); +} + } // namespace } // namespace cdc_ft diff --git a/integration_tests/cdc_rsync/output_test.py b/integration_tests/cdc_rsync/output_test.py index 1acc1a5..d14b225 100644 --- a/integration_tests/cdc_rsync/output_test.py +++ b/integration_tests/cdc_rsync/output_test.py @@ -115,7 +115,7 @@ class OutputTest(test_base.CdcRsyncTest): # server-side output self._assert_regex( - r'DEBUG server_socket\.cc\([0-9]+\): Receive\(\): EOF\(\) detected', + r'DEBUG \d+\.\d{3} server_socket\.cc\([0-9]+\): Receive\(\): EOF\(\) detected', output) # TODO: Add a check here, as currently the output is misleading @@ -139,7 +139,7 @@ class OutputTest(test_base.CdcRsyncTest): # server-side output self._assert_regex( - r'VERBOSE message_pump\.cc\([0-9]+\): ThreadDoReceivePacket\(\): Received packet of size', + r'VERBOSE \d+\.\d{3} message_pump\.cc\([0-9]+\): ThreadDoReceivePacket\(\): Received packet of size', output) def test_quiet(self):