diff --git a/asset_stream_manager/asset_stream_server.cc b/asset_stream_manager/asset_stream_server.cc index 4139965..1aba143 100644 --- a/asset_stream_manager/asset_stream_server.cc +++ b/asset_stream_manager/asset_stream_server.cc @@ -26,11 +26,11 @@ AssetStreamServer::AssetStreamServer(std::string src_dir, std::unique_ptr AssetStreamServer::Create( AssetStreamServerType type, std::string src_dir, DataStoreReader* data_store_reader, FileChunkMap* file_chunks, - ContentSentHandler content_sent) { + ContentSentHandler content_sent, PrioritizeAssetsHandler prio_assets) { switch (type) { case AssetStreamServerType::kGrpc: - return std::make_unique(src_dir, data_store_reader, - file_chunks, content_sent); + return std::make_unique( + src_dir, data_store_reader, file_chunks, content_sent, prio_assets); case AssetStreamServerType::kTest: return std::make_unique( src_dir, data_store_reader, file_chunks); @@ -38,4 +38,5 @@ std::unique_ptr AssetStreamServer::Create( assert(false); return nullptr; } + } // namespace cdc_ft diff --git a/asset_stream_manager/asset_stream_server.h b/asset_stream_manager/asset_stream_server.h index 7aeacfd..32c651a 100644 --- a/asset_stream_manager/asset_stream_server.h +++ b/asset_stream_manager/asset_stream_server.h @@ -33,6 +33,11 @@ namespace cdc_ft { using ContentSentHandler = std::function; +// Handles requests to prioritize the given list of assets while updating the +// manifest. |rel_paths| relative Unix paths of assets to prioritize. +using PrioritizeAssetsHandler = + std::function rel_paths)>; + class DataStoreReader; class FileChunkMap; @@ -49,7 +54,7 @@ class AssetStreamServer { static std::unique_ptr Create( AssetStreamServerType type, std::string src_dir, DataStoreReader* data_store_reader, FileChunkMap* file_chunks, - ContentSentHandler content_sent); + ContentSentHandler content_sent, PrioritizeAssetsHandler prio_assets); AssetStreamServer(const AssetStreamServer& other) = delete; AssetStreamServer& operator=(const AssetStreamServer& other) = delete; diff --git a/asset_stream_manager/grpc_asset_stream_server.cc b/asset_stream_manager/grpc_asset_stream_server.cc index 2f9b85c..a2399e0 100644 --- a/asset_stream_manager/grpc_asset_stream_server.cc +++ b/asset_stream_manager/grpc_asset_stream_server.cc @@ -40,6 +40,8 @@ using GetManifestIdResponse = proto::GetManifestIdResponse; using AckManifestIdReceivedRequest = proto::AckManifestIdReceivedRequest; using AckManifestIdReceivedResponse = proto::AckManifestIdReceivedResponse; using ConfigStreamService = proto::ConfigStreamService; +using ProcessAssetsRequest = proto::ProcessAssetsRequest; +using ProcessAssetsResponse = proto::ProcessAssetsResponse; } // namespace @@ -64,6 +66,8 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service { std::string rel_path; uint64_t offset; size_t size; + std::string instance_id = instance_ids_->Get(context->peer()); + for (const ContentIdProto& id : request->id()) { uint32_t uint32_size; if (file_chunks_->Lookup(id, &rel_path, &offset, &uint32_size)) { @@ -77,7 +81,6 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service { RETURN_GRPC_IF_ERROR( ReadFromDataStore(id, response->add_data(), &size)); } - std::string instance_id = instance_ids_->Get(context->peer()); if (content_sent_ != nullptr) { content_sent_(size, 1, instance_id); } @@ -144,8 +147,9 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service { class ConfigStreamServiceImpl final : public ConfigStreamService::Service { public: - explicit ConfigStreamServiceImpl(InstanceIdMap* instance_ids) - : instance_ids_(instance_ids) {} + ConfigStreamServiceImpl(InstanceIdMap* instance_ids, + PrioritizeAssetsHandler prio_handler) + : instance_ids_(instance_ids), prio_handler_(std::move(prio_handler)) {} ~ConfigStreamServiceImpl() { Shutdown(); } grpc::Status GetManifestId( @@ -183,6 +187,20 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service { return grpc::Status::OK; } + grpc::Status ProcessAssets(grpc::ServerContext* context, + const ProcessAssetsRequest* request, + ProcessAssetsResponse* response) override { + if (!prio_handler_) return grpc::Status::OK; + + std::vector rel_paths; + rel_paths.reserve(request->relative_paths().size()); + for (const std::string& rel_path : request->relative_paths()) { + rel_paths.push_back(rel_path); + } + prio_handler_(std::move(rel_paths)); + return grpc::Status::OK; + } + void SetManifestId(const ContentIdProto& id) ABSL_LOCKS_EXCLUDED(mutex_) { LOG_INFO("Updating manifest id '%s' in configuration service", ContentId::ToHexString(id)); @@ -219,6 +237,10 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service { return id_; } + void SetPrioritizeAssetsHandler(PrioritizeAssetsHandler handler) { + prio_handler_ = handler; + } + private: // Returns false if the update process was cancelled. bool WaitForUpdate(ContentIdProto& local_id) ABSL_LOCKS_EXCLUDED(mutex_) { @@ -234,23 +256,24 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service { mutable absl::Mutex mutex_; ContentIdProto id_ ABSL_GUARDED_BY(mutex_); bool running_ ABSL_GUARDED_BY(mutex_) = true; - InstanceIdMap* instance_ids_; + InstanceIdMap* instance_ids_ = nullptr; + PrioritizeAssetsHandler prio_handler_; // Maps instance ids to the last acknowledged manifest id. using AckedManifestIdsMap = std::unordered_map; AckedManifestIdsMap acked_manifest_ids_ ABSL_GUARDED_BY(mutex_); }; -GrpcAssetStreamServer::GrpcAssetStreamServer(std::string src_dir, - DataStoreReader* data_store_reader, - FileChunkMap* file_chunks, - ContentSentHandler content_sent) +GrpcAssetStreamServer::GrpcAssetStreamServer( + std::string src_dir, DataStoreReader* data_store_reader, + FileChunkMap* file_chunks, ContentSentHandler content_sent, + PrioritizeAssetsHandler prio_assets) : AssetStreamServer(src_dir, data_store_reader, file_chunks), asset_stream_service_(std::make_unique( std::move(src_dir), data_store_reader, file_chunks, &instance_ids_, content_sent)), - config_stream_service_( - std::make_unique(&instance_ids_)) {} + config_stream_service_(std::make_unique( + &instance_ids_, std::move(prio_assets))) {} GrpcAssetStreamServer::~GrpcAssetStreamServer() = default; diff --git a/asset_stream_manager/grpc_asset_stream_server.h b/asset_stream_manager/grpc_asset_stream_server.h index 619f3cc..aa9bc1f 100644 --- a/asset_stream_manager/grpc_asset_stream_server.h +++ b/asset_stream_manager/grpc_asset_stream_server.h @@ -40,7 +40,8 @@ class GrpcAssetStreamServer : public AssetStreamServer { // Creates a new asset streaming gRpc server. GrpcAssetStreamServer(std::string src_dir, DataStoreReader* data_store_reader, FileChunkMap* file_chunks, - ContentSentHandler content_sent); + ContentSentHandler content_sent, + PrioritizeAssetsHandler prio_assets); ~GrpcAssetStreamServer(); diff --git a/asset_stream_manager/local_assets_stream_manager_service_impl.h b/asset_stream_manager/local_assets_stream_manager_service_impl.h index b109695..3f5c6d5 100644 --- a/asset_stream_manager/local_assets_stream_manager_service_impl.h +++ b/asset_stream_manager/local_assets_stream_manager_service_impl.h @@ -57,15 +57,13 @@ class LocalAssetsStreamManagerServiceImpl final // if it exists. grpc::Status StartSession(grpc::ServerContext* context, const StartSessionRequest* request, - StartSessionResponse* response) override - ABSL_LOCKS_EXCLUDED(sessions_mutex_); + StartSessionResponse* response) override; // Stops the streaming session to the instance with id // |request->gamelet_id()|. Returns a NotFound error if no session exists. grpc::Status StopSession(grpc::ServerContext* context, const StopSessionRequest* request, - StopSessionResponse* response) override - ABSL_LOCKS_EXCLUDED(sessions_mutex_); + StopSessionResponse* response) override; private: // Convert StartSessionRequest enum to metrics enum. diff --git a/asset_stream_manager/multi_session.cc b/asset_stream_manager/multi_session.cc index 1769bee..c46acee 100644 --- a/asset_stream_manager/multi_session.cc +++ b/asset_stream_manager/multi_session.cc @@ -96,10 +96,25 @@ MultiSessionRunner::MultiSessionRunner( absl::Status MultiSessionRunner::Initialize(int port, AssetStreamServerType type, ContentSentHandler content_sent) { + // Create the manifest updater. + UpdaterConfig cfg; + cfg.num_threads = num_updater_threads_; + cfg.src_dir = src_dir_; + + assert(!manifest_updater_); + manifest_updater_ = + std::make_unique(data_store_, std::move(cfg)); + + // Let the manifest updater handle requests to prioritize certain assets. + PrioritizeAssetsHandler prio_assets = + std::bind(&ManifestUpdater::AddPriorityAssets, manifest_updater_.get(), + std::placeholders::_1); + // Start the server. assert(!server_); server_ = AssetStreamServer::Create(type, src_dir_, data_store_, - &file_chunks_, content_sent); + &file_chunks_, std::move(content_sent), + std::move(prio_assets)); assert(server_); RETURN_IF_ERROR(server_->Start(port), "Failed to start asset stream server for '%s'", src_dir_); @@ -162,12 +177,6 @@ ContentIdProto MultiSessionRunner::ManifestId() const { } void MultiSessionRunner::Run() { - // Create the manifest updater. - UpdaterConfig cfg; - cfg.num_threads = num_updater_threads_; - cfg.src_dir = src_dir_; - ManifestUpdater manifest_updater(data_store_, std::move(cfg)); - // Set up file watcher. // The streamed path should be a directory and exist at the beginning. FileWatcherWin watcher(src_dir_); @@ -179,28 +188,23 @@ void MultiSessionRunner::Run() { return; } - // Push an intermediate manifest containing the full directory structure, but - // potentially missing chunks. The purpose is that the FUSE can immediately - // show the structure and inode stats. FUSE will block on file reads that - // cannot be served due to missing chunks until the manifest is ready. - auto push_intermediate_manifest = [this](const ContentIdProto& manifest_id) { + // Push the intermediate manifest(s) and the final version with this handler. + auto push_handler = [this](const ContentIdProto& manifest_id) { SetManifest(manifest_id); }; // Bring the manifest up to date. LOG_INFO("Updating manifest for '%s'...", src_dir_); Stopwatch sw; - status = - manifest_updater.UpdateAll(&file_chunks_, push_intermediate_manifest); - RecordManifestUpdate(manifest_updater, sw.Elapsed(), + status = manifest_updater_->UpdateAll(&file_chunks_, push_handler); + RecordManifestUpdate(*manifest_updater_, sw.Elapsed(), metrics::UpdateTrigger::kInitUpdateAll, status); if (!status.ok()) { SetStatus( WrapStatus(status, "Failed to update manifest for '%s'", src_dir_)); return; } - RecordMultiSessionStart(manifest_updater); - SetManifest(manifest_updater.ManifestId()); + RecordMultiSessionStart(*manifest_updater_); LOG_INFO("Manifest for '%s' updated in %0.3f seconds", src_dir_, sw.ElapsedSeconds()); @@ -256,30 +260,26 @@ void MultiSessionRunner::Run() { src_dir_); modified_files.clear(); sw.Reset(); - status = manifest_updater.UpdateAll(&file_chunks_); - RecordManifestUpdate(manifest_updater, sw.Elapsed(), + status = manifest_updater_->UpdateAll(&file_chunks_, push_handler); + RecordManifestUpdate(*manifest_updater_, sw.Elapsed(), metrics::UpdateTrigger::kRunningUpdateAll, status); if (!status.ok()) { LOG_WARNING( "Updating manifest for '%s' after re-creating directory failed: " "'%s'", src_dir_, status.ToString()); - SetManifest(manifest_updater.DefaultManifestId()); - } else { - SetManifest(manifest_updater.ManifestId()); + SetManifest(manifest_updater_->DefaultManifestId()); } } else if (!modified_files.empty()) { ManifestUpdater::OperationList ops = GetFileOperations(modified_files); sw.Reset(); - status = manifest_updater.Update(&ops, &file_chunks_); - RecordManifestUpdate(manifest_updater, sw.Elapsed(), + status = manifest_updater_->Update(&ops, &file_chunks_, push_handler); + RecordManifestUpdate(*manifest_updater_, sw.Elapsed(), metrics::UpdateTrigger::kRegularUpdate, status); if (!status.ok()) { LOG_WARNING("Updating manifest for '%s' failed: %s", src_dir_, status.ToString()); - SetManifest(manifest_updater.DefaultManifestId()); - } else { - SetManifest(manifest_updater.ManifestId()); + SetManifest(manifest_updater_->DefaultManifestId()); } } @@ -482,15 +482,16 @@ absl::Status MultiSession::Shutdown() { sessions_.erase(instance_id); } + absl::Status status; if (runner_) { - RETURN_IF_ERROR(runner_->Shutdown()); + status = runner_->Shutdown(); } if (heartbeat_watcher_.joinable()) { heartbeat_watcher_.join(); } - return absl::OkStatus(); + return status; } absl::Status MultiSession::Status() { @@ -528,7 +529,7 @@ absl::Status MultiSession::StartSession(const std::string& instance_id, RETURN_IF_ERROR(session->Start(local_asset_stream_port_, kAssetStreamPortFirst, kAssetStreamPortLast)); - // Wait for the FUSE to receive the intermediate manifest. + // Wait for the FUSE to receive the first intermediate manifest. RETURN_IF_ERROR(runner_->WaitForManifestAck(instance_id, absl::Seconds(5))); sessions_[instance_id] = std::move(session); diff --git a/asset_stream_manager/multi_session.h b/asset_stream_manager/multi_session.h index a96bf6c..673f0e7 100644 --- a/asset_stream_manager/multi_session.h +++ b/asset_stream_manager/multi_session.h @@ -112,6 +112,7 @@ class MultiSessionRunner { const uint32_t num_updater_threads_; const ManifestUpdatedCb manifest_updated_cb_; std::unique_ptr server_; + std::unique_ptr manifest_updater_; // Modifications (shutdown, file changes). absl::Mutex mutex_; diff --git a/asset_stream_manager/multi_session_test.cc b/asset_stream_manager/multi_session_test.cc index 4f18bf4..c76ea24 100644 --- a/asset_stream_manager/multi_session_test.cc +++ b/asset_stream_manager/multi_session_test.cc @@ -65,6 +65,21 @@ class MetricsServiceForTest : public MultiSessionMetricsRecorder { metrics_records_.push_back(MetricsRecord(std::move(event), code)); } + // Waits until |num_events| events of type |type| have been recorded, or until + // the function times out. Returns true if the condition was met and false if + // in case of a timeout. + bool WaitForEvents(metrics::EventType type, int num_events = 1, + absl::Duration timeout = absl::Seconds(1)) { + absl::MutexLock lock(&mutex_); + auto cond = [this, type, num_events]() { + return std::count_if(metrics_records_.begin(), metrics_records_.end(), + [type](const MetricsRecord& mr) { + return mr.code == type; + }) >= num_events; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&cond), timeout); + } + std::vector GetEventsAndClear(metrics::EventType type) ABSL_LOCKS_EXCLUDED(mutex_) { std::vector events; @@ -172,8 +187,16 @@ class MultiSessionTest : public ManifestTestBase { metrics::ManifestUpdateData* data = events[i].evt.as_manager_data->manifest_update_data.get(); EXPECT_LT(data->local_duration_ms, 60000ull); - manifests[i].local_duration_ms = data->local_duration_ms; - EXPECT_EQ(*data, manifests[i]); + EXPECT_EQ(data->status, manifests[i].status); + EXPECT_EQ(data->total_assets_added_or_updated, + manifests[i].total_assets_added_or_updated); + EXPECT_EQ(data->total_assets_deleted, manifests[i].total_assets_deleted); + EXPECT_EQ(data->total_chunks, manifests[i].total_chunks); + EXPECT_EQ(data->total_files_added_or_updated, + manifests[i].total_files_added_or_updated); + EXPECT_EQ(data->total_processed_bytes, + manifests[i].total_processed_bytes); + EXPECT_EQ(data->trigger, manifests[i].trigger); } } @@ -268,13 +291,13 @@ TEST_F(MultiSessionTest, GetCachePath_DoesNotSplitUtfCodePoints) { TEST_F(MultiSessionTest, MultiSessionRunnerOnEmpty) { cfg_.src_dir = test_dir_path_; MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); - EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout)); - // The first update is always the empty manifest, wait for the second one. - ASSERT_TRUE(WaitForManifestUpdated(2)); + EXPECT_TRUE(WaitForManifestUpdated(2)); + ASSERT_TRUE( + metrics_service_->WaitForEvents(metrics::EventType::kMultiSessionStart)); ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); CheckMultiSessionStartRecorded(0, 0, 0); CheckManifestUpdateRecorded(std::vector{ @@ -290,13 +313,13 @@ TEST_F(MultiSessionTest, MultiSessionRunnerNonEmptySucceeds) { // Contains a.txt, subdir/b.txt, subdir/c.txt, subdir/d.txt. cfg_.src_dir = path::Join(base_dir_, "non_empty"); MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); - EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout)); - // The first update is always the empty manifest, wait for the second one. - ASSERT_TRUE(WaitForManifestUpdated(2)); + EXPECT_TRUE(WaitForManifestUpdated(2)); + ASSERT_TRUE( + metrics_service_->WaitForEvents(metrics::EventType::kMultiSessionStart)); CheckMultiSessionStartRecorded(46, 4, 4); ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals( {"a.txt", "subdir", "subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}, @@ -309,31 +332,50 @@ TEST_F(MultiSessionTest, MultiSessionRunnerNonEmptySucceeds) { TEST_F(MultiSessionTest, MultiSessionRunnerAddFileSucceeds) { cfg_.src_dir = test_dir_path_; MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); - EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); - EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout)); - // The first update is always the empty manifest, wait for the second one. - ASSERT_TRUE(WaitForManifestUpdated(2)); - ASSERT_OK(runner.Status()); - CheckMultiSessionStartRecorded(0, 0, 0); - ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); - CheckManifestUpdateRecorded(std::vector{ - GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll, - absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)}); - const std::string file_path = path::Join(test_dir_path_, "file.txt"); - EXPECT_OK(path::WriteFile(file_path, kData, kDataSize)); - // 1 file was added = incremented exp_num_manifest_updates. - ASSERT_TRUE(WaitForManifestUpdated(3)); - ASSERT_NO_FATAL_FAILURE( - ExpectManifestEquals({"file.txt"}, runner.ManifestId())); - CheckMultiSessionStartNotRecorded(); - CheckManifestUpdateRecorded(std::vector{ - GetManifestUpdateData(metrics::UpdateTrigger::kRegularUpdate, - absl::StatusCode::kOk, 1, 0, 1, 1, 0, kDataSize)}); + { + SCOPED_TRACE("Initialize."); + EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); + // 1 file was added, 1 intermediate + 1 final manifest is pushed. + EXPECT_TRUE(WaitForManifestUpdated(2)); + EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout)); + EXPECT_TRUE(metrics_service_->WaitForEvents( + metrics::EventType::kMultiSessionStart)); + ASSERT_OK(runner.Status()); + } + + { + SCOPED_TRACE("Created base manifest for the test directory."); + + CheckMultiSessionStartRecorded(0, 0, 0); + ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + CheckManifestUpdateRecorded(std::vector{ + GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll, + absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)}); + } + + { + SCOPED_TRACE("Added file.txt."); + + uint32_t prev_updates = num_manifest_updates_; + const std::string file_path = path::Join(test_dir_path_, "file.txt"); + EXPECT_OK(path::WriteFile(file_path, kData, kDataSize)); + // 1 file was added, 1 intermediate + 1 final manifest is pushed. + EXPECT_TRUE(WaitForManifestUpdated(prev_updates + 2)); + EXPECT_TRUE( + metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated)); + ASSERT_NO_FATAL_FAILURE( + ExpectManifestEquals({"file.txt"}, runner.ManifestId())); + CheckMultiSessionStartNotRecorded(); + CheckManifestUpdateRecorded( + std::vector{GetManifestUpdateData( + metrics::UpdateTrigger::kRegularUpdate, absl::StatusCode::kOk, 1, 0, + 1, 1, 0, kDataSize)}); + } EXPECT_OK(runner.Status()); EXPECT_OK(runner.Shutdown()); } @@ -343,7 +385,7 @@ TEST_F(MultiSessionTest, MultiSessionRunnerAddFileSucceeds) { TEST_F(MultiSessionTest, MultiSessionRunnerNoDirFails) { cfg_.src_dir = path::Join(base_dir_, "non_existing"); MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); @@ -365,16 +407,16 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) { kDataSize)); MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); { SCOPED_TRACE("Originally, only the streamed directory contains file.txt."); - EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout)); - // The first update is always the empty manifest, wait for the second one. - ASSERT_TRUE(WaitForManifestUpdated(2)); + EXPECT_TRUE(WaitForManifestUpdated(2)); + ASSERT_TRUE(metrics_service_->WaitForEvents( + metrics::EventType::kMultiSessionStart)); CheckMultiSessionStartRecorded((uint64_t)kDataSize, 1, 1); ASSERT_NO_FATAL_FAILURE( ExpectManifestEquals({"file.txt"}, runner.ManifestId())); @@ -387,8 +429,9 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) { { SCOPED_TRACE( "Remove the streamed directory, the manifest should become empty."); + uint32_t prev_updates = num_manifest_updates_; EXPECT_OK(path::RemoveDirRec(test_dir_path_)); - ASSERT_TRUE(WaitForManifestUpdated(3)); + ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 1)); ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); CheckManifestUpdateRecorded( std::vector{GetManifestUpdateData( @@ -400,9 +443,13 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) { SCOPED_TRACE( "Create the watched directory -> an empty manifest should be " "streamed."); + uint32_t prev_updates = num_manifest_updates_; EXPECT_OK(path::CreateDirRec(test_dir_path_)); - EXPECT_TRUE(WaitForManifestUpdated(4)); + // The first update is always the empty manifest, wait for the second one. + EXPECT_TRUE(WaitForManifestUpdated(prev_updates + 2)); ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + EXPECT_TRUE( + metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated)); CheckManifestUpdateRecorded(std::vector{ GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll, absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)}); @@ -410,11 +457,16 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) { { SCOPED_TRACE("Create 'new_file.txt' -> new manifest should be created."); + uint32_t prev_updates = num_manifest_updates_; EXPECT_OK(path::WriteFile(path::Join(test_dir_path_, "new_file.txt"), kData, kDataSize)); - ASSERT_TRUE(WaitForManifestUpdated(5)); + // The first update doesn't have the chunks for new_file.txt, wait for the + // second one. + ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 2)); ASSERT_NO_FATAL_FAILURE( ExpectManifestEquals({"new_file.txt"}, runner.ManifestId())); + EXPECT_TRUE( + metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated)); CheckManifestUpdateRecorded( std::vector{GetManifestUpdateData( metrics::UpdateTrigger::kRegularUpdate, absl::StatusCode::kOk, 1, 0, @@ -432,11 +484,11 @@ TEST_F(MultiSessionTest, MultiSessionRunnerFileAsStreamedDirFails) { EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize)); MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); - ASSERT_FALSE(WaitForManifestUpdated(1, absl::Milliseconds(10))); + ASSERT_FALSE(WaitForManifestUpdated(1, absl::Milliseconds(100))); CheckMultiSessionStartNotRecorded(); CheckManifestUpdateRecorded(std::vector{}); EXPECT_NOT_OK(runner.Shutdown()); @@ -452,33 +504,49 @@ TEST_F(MultiSessionTest, EXPECT_OK(path::CreateDirRec(cfg_.src_dir)); MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_, - false /*enable_stats*/, kTimeout, kNumThreads, + /*enable_stats=*/false, kTimeout, kNumThreads, metrics_service_, [this]() { OnManifestUpdated(); }); - EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); - ASSERT_TRUE(WaitForManifestUpdated(2)); - CheckMultiSessionStartRecorded(0, 0, 0); - CheckManifestUpdateRecorded(std::vector{ - GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll, - absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)}); - ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + { + SCOPED_TRACE("Initialize manifest in test directory."); - // Remove the streamed directory, the manifest should become empty. - EXPECT_OK(path::RemoveDirRec(cfg_.src_dir)); - ASSERT_TRUE(WaitForManifestUpdated(3)); - ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); - CheckManifestUpdateRecorded(std::vector{ - GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll, - absl::StatusCode::kNotFound, 0, 0, 0, 0, 0, 0)}); + EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest)); + ASSERT_TRUE(WaitForManifestUpdated(2)); + ASSERT_TRUE(metrics_service_->WaitForEvents( + metrics::EventType::kMultiSessionStart)); + CheckMultiSessionStartRecorded(0, 0, 0); + CheckManifestUpdateRecorded(std::vector{ + GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll, + absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)}); + ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + } - EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize)); - EXPECT_TRUE(WaitForManifestUpdated(4)); - ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); - CheckManifestUpdateRecorded( - std::vector{GetManifestUpdateData( - metrics::UpdateTrigger::kRunningUpdateAll, - absl::StatusCode::kFailedPrecondition, 0, 0, 0, 0, 0, 0)}); - CheckMultiSessionStartNotRecorded(); + { + SCOPED_TRACE("Remove the streamed directory, the manifest becomes empty."); + + uint32_t prev_updates = num_manifest_updates_; + EXPECT_OK(path::RemoveDirRec(cfg_.src_dir)); + ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 1)); + ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + CheckManifestUpdateRecorded(std::vector{ + GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll, + absl::StatusCode::kNotFound, 0, 0, 0, 0, 0, 0)}); + } + + { + SCOPED_TRACE("Create a file in place of the directory"); + + uint32_t prev_updates = num_manifest_updates_; + EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize)); + ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 2)); + ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId())); + metrics::ManifestUpdateData update_data = GetManifestUpdateData( + metrics::UpdateTrigger::kRunningUpdateAll, + absl::StatusCode::kFailedPrecondition, 0, 0, 0, 0, 0, 0); + CheckManifestUpdateRecorded( + std::vector{update_data, update_data}); + CheckMultiSessionStartNotRecorded(); + } EXPECT_OK(runner.Status()); EXPECT_OK(runner.Shutdown()); diff --git a/cdc_fuse_fs/BUILD b/cdc_fuse_fs/BUILD index bae6baf..6fe1356 100644 --- a/cdc_fuse_fs/BUILD +++ b/cdc_fuse_fs/BUILD @@ -50,6 +50,7 @@ cc_test( srcs = ["cdc_fuse_fs_test.cc"], deps = [ ":cdc_fuse_fs_lib_mocked", + ":mock_config_stream_client", "//common:status_test_macros", "//data_store", "//data_store:mem_data_store", @@ -130,6 +131,13 @@ cc_library( ], ) +cc_library( + name = "mock_config_stream_client", + srcs = ["mock_config_stream_client.cc"], + hdrs = ["mock_config_stream_client.h"], + deps = [":config_stream_client"], +) + filegroup( name = "all_test_sources", srcs = glob(["*_test.cc"]), diff --git a/cdc_fuse_fs/cdc_fuse_fs.cc b/cdc_fuse_fs/cdc_fuse_fs.cc index f5cc9ba..7386477 100644 --- a/cdc_fuse_fs/cdc_fuse_fs.cc +++ b/cdc_fuse_fs/cdc_fuse_fs.cc @@ -125,12 +125,28 @@ struct Inode { // Asset proto -> inode map. using InodeMap = std::unordered_map>; -// Queued request to open a file that has not been processed yet and should be -// processed once the manifest is updated. -struct OpenRequest { - fuse_req_t req; - fuse_ino_t ino; - struct fuse_file_info* fi; +// Queued request that cannot be processed yet and should be processed once the +// manifest is updated. +struct QueuedRequest { + // The request type that was blocked. + enum class Type { kOpen, kOpenDir, kLookup }; + Type type; + std::string rel_path; + + union { + // Only valid for type == kOpen or type == kOpenDir. + struct Open { + fuse_req_t req; + fuse_ino_t ino; + struct fuse_file_info fi; + } open; + // Only valid for type == kLookup. + struct Lookup { + fuse_req_t req; + fuse_ino_t parent_ino; + const char* name; + } lookup; + } u; }; // Global context. Fuse is based on loose callbacks, so this holds the fs state. @@ -170,12 +186,13 @@ struct CdcFuseFsContext { static thread_local Buffer buffer; // Configuration client to get configuration updates from the workstation. - std::unique_ptr config_stream_client_; + std::unique_ptr config_stream_client; - // Queue for requests to open files that have not been processed yet. - absl::Mutex queued_open_requests_mutex_; - std::vector queued_open_requests_ - ABSL_GUARDED_BY(queued_open_requests_mutex_); + // Queue for requests to open files or directories that have not been + // processed yet. + absl::Mutex queued_requests_mutex; + std::vector queued_requests + ABSL_GUARDED_BY(queued_requests_mutex); // Identifies whether FUSE consistency should be inspected after manifest // update. @@ -372,6 +389,75 @@ bool ValidateInode(fuse_req_t req, Inode& inode, fuse_ino_t ino) { } return true; } + +// Returns the full relative file path for the given |inode|. +std::string GetRelativePath(const Inode& inode) { + if (inode.asset.parent_ino() == FUSE_ROOT_ID) + return inode.asset.proto()->name(); + std::string rel_path = GetRelativePath(GetInode(inode.asset.parent_ino())); + absl::StrAppend(&rel_path, "/", inode.asset.proto()->name()); + return rel_path; +} + +// Asks the server to prioritize the asset at |rel_file_path|. +void PrioritizeAssetOnServer(const std::string& rel_file_path) { + std::vector assets{rel_file_path}; + LOG_INFO("Requesing server to prioritize asset '%s'", rel_file_path); + absl::Status status = + ctx->config_stream_client->ProcessAssets(std::move(assets)); + // An error is not critical, but we should log it. + if (!status.ok()) { + LOG_ERROR( + "Failed to request prioritization for asset '%s' from the server: %s", + rel_file_path, status.ToString()); + } +} + +// Queues a CdcFuseOpen request in the list of pending requests. Thread-safe. +void QueueOpenRequest(const std::string& rel_path, fuse_req_t req, + fuse_ino_t ino, struct fuse_file_info* fi) + ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { + QueuedRequest qr{QueuedRequest::Type::kOpen, rel_path}; + qr.u.open.req = req; + qr.u.open.ino = ino; + qr.u.open.fi = *fi; + { + absl::MutexLock lock(&ctx->queued_requests_mutex); + ctx->queued_requests.emplace_back(std::move(qr)); + } + PrioritizeAssetOnServer(rel_path); +} + +// Queues a CdcFuseOpenDir request in the list of pending requests. Thread-safe. +void QueueOpenDirRequest(const std::string& rel_path, fuse_req_t req, + fuse_ino_t ino, struct fuse_file_info* fi) + ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { + QueuedRequest qr{QueuedRequest::Type::kOpenDir, rel_path}; + qr.u.open.req = req; + qr.u.open.ino = ino; + qr.u.open.fi = *fi; + { + absl::MutexLock lock(&ctx->queued_requests_mutex); + ctx->queued_requests.emplace_back(std::move(qr)); + } + PrioritizeAssetOnServer(rel_path); +} + +// Queues a CdcFuseLookup reuquest in the list of pending requests. Thread-safe. +void QueueLookupRequest(const std::string& rel_path, fuse_req_t req, + fuse_ino_t parent_ino, const char* name) + ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { + QueuedRequest qr{QueuedRequest::Type::kLookup, rel_path}; + qr.u.lookup.req = req; + qr.u.lookup.parent_ino = parent_ino; + qr.u.lookup.name = name; + { + absl::MutexLock lock(&ctx->queued_requests_mutex); + ctx->queued_requests.emplace_back(std::move(qr)); + } + PrioritizeAssetOnServer(rel_path); +} + } // namespace // Implementation of the Fuse lookup() method. @@ -384,6 +470,17 @@ void CdcFuseLookup(fuse_req_t req, fuse_ino_t parent_ino, const char* name) if (!ValidateInode(req, parent, parent_ino)) { return; } + + if (parent.asset.proto()->in_progress()) { + // This directory has not been processed yet. Queue up the request and block + // until an updated manifest is available. + std::string rel_path = GetRelativePath(parent); + LOG_INFO("Request to open ino %u queued (file '%s' not ready)", parent_ino, + rel_path); + QueueLookupRequest(rel_path, req, parent_ino, name); + return; + } + absl::StatusOr proto = parent.asset.Lookup(name); if (!proto.ok()) { LOG_ERROR("Lookup of '%s' in ino %u failed: '%s'", name, parent_ino, @@ -468,13 +565,11 @@ void CdcFuseOpen(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) return; } - if (proto->file_size() > 0 && proto->file_chunks_size() == 0 && - proto->file_indirect_chunks_size() == 0) { + if (proto->file_size() > 0 && proto->in_progress()) { // This file has not been processed yet. Queue up the request Block until an // updated manifest is available. LOG_DEBUG("Request to open ino %u queued (file not ready)", ino); - absl::MutexLock lock(&ctx->queued_open_requests_mutex_); - ctx->queued_open_requests_.push_back({req, ino, fi}); + QueueOpenRequest(GetRelativePath(inode), req, ino, fi); return; } @@ -566,6 +661,15 @@ void CdcFuseOpenDir(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) fuse_reply_err(req, ENOTDIR); return; } + if (proto->in_progress()) { + // This directory has not been processed yet. Queue up the request until an + // updated manifest is available. + LOG_DEBUG("Request to open directory '%s' ino %u queued (dir not ready)", + proto->name(), ino); + QueueOpenDirRequest(GetRelativePath(inode), req, ino, fi); + return; + } + fuse_reply_open(req, fi); } @@ -1462,8 +1566,8 @@ absl::Status SetManifest(const ContentIdProto& manifest_id) absl::MutexLock inodes_lock(&ctx->inodes_mutex); for (const auto& [proto, inode] : ctx->inodes) { // Reset kUpdatedProto to kInitialized. The state was only used for - // validation. kUpdated is still needed for clearing kernel caches when a - // file is opened. + // validation. kUpdated is still needed for clearing kernel caches when + // a file is opened. assert(inode->IsValid()); if (inode->IsUpdatedProto() || inode->asset.proto()->type() == AssetProto::DIRECTORY) { @@ -1474,21 +1578,36 @@ absl::Status SetManifest(const ContentIdProto& manifest_id) } // Process outstanding open requests. Be sure to move the vector because - // CdcFuseOpen() might requeue requests. - std::vector requests; + // processing might requeue requests. + std::vector requests; { - absl::MutexLock lock(&ctx->queued_open_requests_mutex_); - requests.swap(ctx->queued_open_requests_); + absl::MutexLock lock(&ctx->queued_requests_mutex); + requests.swap(ctx->queued_requests); } - for (const OpenRequest request : requests) { - LOG_DEBUG("Resuming request to open ino %u", request.ino); - CdcFuseOpen(request.req, request.ino, request.fi); + for (QueuedRequest& qr : requests) { + switch (qr.type) { + case QueuedRequest::Type::kLookup: + LOG_DEBUG("Resuming request to look up '%s' in '%s' (ino %u)", + qr.u.lookup.name, qr.rel_path, qr.u.lookup.parent_ino); + CdcFuseLookup(qr.u.lookup.req, qr.u.lookup.parent_ino, + qr.u.lookup.name); + break; + case QueuedRequest::Type::kOpen: + LOG_DEBUG("Resuming request to open file '%s' (ino %u)", qr.rel_path, + qr.u.open.ino); + CdcFuseOpen(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi); + break; + case QueuedRequest::Type::kOpenDir: + LOG_DEBUG("Resuming request to open dir '%s' (ino %u)", qr.rel_path, + qr.u.open.ino); + CdcFuseOpenDir(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi); + break; + } } #ifndef USE_MOCK_LIBFUSE // Acknowledge that the manifest id was received and FUSE was updated. - absl::Status status = - ctx->config_stream_client_->SendManifestAck(manifest_id); + absl::Status status = ctx->config_stream_client->SendManifestAck(manifest_id); if (!status.ok()) { LOG_ERROR("Failed to send ack for manifest '%s'", ContentId::ToHexString(manifest_id)); @@ -1500,16 +1619,14 @@ absl::Status SetManifest(const ContentIdProto& manifest_id) return absl::OkStatus(); } -absl::Status StartConfigClient(std::string instance, - std::shared_ptr channel) { +void SetConfigClient( + std::unique_ptr config_client) { LOG_DEBUG("Starting configuration client"); assert(ctx && ctx->initialized); - if (ctx->config_stream_client_) { - ctx->config_stream_client_.reset(); + if (ctx->config_stream_client) { + ctx->config_stream_client.reset(); } - ctx->config_stream_client_ = std::make_unique( - std::move(instance), std::move(channel)); - return absl::OkStatus(); + ctx->config_stream_client = std::move(config_client); } // Initializes FUSE with a manifest for an empty directory: @@ -1542,7 +1659,7 @@ absl::Status Run(DataStoreReader* data_store_reader, bool consistency_check) { if (res == -1) return MakeStatus("Session loop failed"); LOG_INFO("Session loop finished."); - ctx->config_stream_client_->Shutdown(); + ctx->config_stream_client->Shutdown(); #else // This code is not unit tested. #endif diff --git a/cdc_fuse_fs/cdc_fuse_fs.h b/cdc_fuse_fs/cdc_fuse_fs.h index 97ee2be..74d7092 100644 --- a/cdc_fuse_fs/cdc_fuse_fs.h +++ b/cdc_fuse_fs/cdc_fuse_fs.h @@ -62,10 +62,8 @@ namespace cdc_fuse_fs { // see fuse_common.h. absl::Status Initialize(int argc, char** argv); -// Starts a client to read configuration updates over gRPC |channel|. -// |instance| is the gamelet instance id. -absl::Status StartConfigClient(std::string instance, - std::shared_ptr channel); +// Sets the client to read configuration updates to |config_client|. +void SetConfigClient(std::unique_ptr config_client); // Sets the |data_store_reader| to load data from, initializes FUSE with a // manifest for an empty directory, and starts the filesystem. The call does diff --git a/cdc_fuse_fs/cdc_fuse_fs_test.cc b/cdc_fuse_fs/cdc_fuse_fs_test.cc index 73b1df0..975e827 100644 --- a/cdc_fuse_fs/cdc_fuse_fs_test.cc +++ b/cdc_fuse_fs/cdc_fuse_fs_test.cc @@ -12,11 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Bazel does not honor copts for test targets, so we need to define +// USE_MOCK_LIBFUSE in the test code itself *before* including cdc_fuse_fs.h. +#ifndef USE_MOCK_LIBFUSE +#define USE_MOCK_LIBFUSE = 1 +#endif + #include "cdc_fuse_fs/cdc_fuse_fs.h" #include #include +#include "cdc_fuse_fs/mock_config_stream_client.h" #include "cdc_fuse_fs/mock_libfuse.h" #include "common/log.h" #include "common/path.h" @@ -91,6 +98,7 @@ class CdcFuseFsTest : public ::testing::Test { CdcFuseFsTest() : builder_(&cache_) { cdc_fuse_fs::Initialize(0, nullptr).IgnoreError(); Log::Initialize(std::make_unique(LogLevel::kInfo)); + cdc_fuse_fs::SetConfigClient(std::make_unique()); } ~CdcFuseFsTest() { Log::Shutdown(); @@ -157,17 +165,26 @@ class CdcFuseFsTest : public ::testing::Test { ExpectAccessError(R_OK | W_OK | X_OK, 0 /*error*/); } - // Wipes chunks for |kFile1Name| to simulate an intermediate manifest, i.e. - // the manifest that contains all assets, but misses file chunks. + // Wipes chunks for |kFile1Name| and assets for |kSubDirName| to simulate an + // intermediate manifest, i.e. the manifest for which some files and + // directories are not processed yet. // Returns the intermediate manifest id. ContentIdProto CreateIntermediateManifestId() { ManifestProto manifest; EXPECT_OK(cache_.GetProto(manifest_id_, &manifest)); EXPECT_GT(manifest.root_dir().dir_assets_size(), 0); if (manifest.root_dir().dir_assets_size() == 0) return ContentIdProto(); + AssetProto* file1 = manifest.mutable_root_dir()->mutable_dir_assets(0); EXPECT_EQ(file1->name(), kFile1Name); file1->clear_file_chunks(); + file1->set_in_progress(true); + + AssetProto* subdir = manifest.mutable_root_dir()->mutable_dir_assets(1); + EXPECT_EQ(subdir->name(), kSubdirName); + subdir->clear_dir_assets(); + subdir->set_in_progress(true); + return cache_.AddProto(manifest); } @@ -270,40 +287,59 @@ TEST_F(CdcFuseFsTest, OpenFailsWriteAccess) { EXPECT_EQ(fuse_.errors[0], EACCES); } -TEST_F(CdcFuseFsTest, OpenQueuedForIntermediateManifest) { +TEST_F(CdcFuseFsTest, RequestsQueuedForIntermediateManifest) { ContentIdProto intermediate_manifest_id = CreateIntermediateManifestId(); EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id)); + fuse_file_info fi; + auto cfg_client_ptr = std::make_unique(); + MockConfigStreamClient* cfg_client = cfg_client_ptr.get(); + cdc_fuse_fs::SetConfigClient(std::move(cfg_client_ptr)); - // Opening file1 should be queued as it contains no chunks. + // Opening file1 should be queued as it is marked as in-progress. CdcFuseLookup(req_, FUSE_ROOT_ID, kFile1Name); ASSERT_EQ(fuse_.entries.size(), 1); - fuse_file_info fi; CdcFuseOpen(req_, fuse_.entries[0].ino, &fi); - ASSERT_EQ(fuse_.open_files.size(), 0); + EXPECT_EQ(fuse_.open_files.size(), 0); + EXPECT_EQ(cfg_client->ReleasePrioritizedAssets(), + std::vector({kFile1Name})); + + // Opening subdir should be queued as it is marked as in-progress. + CdcFuseLookup(req_, FUSE_ROOT_ID, kSubdirName); + ASSERT_EQ(fuse_.entries.size(), 2); + CdcFuseOpenDir(req_, fuse_.entries[1].ino, &fi); + EXPECT_EQ(fuse_.open_files.size(), 0); + EXPECT_EQ(cfg_client->ReleasePrioritizedAssets(), + std::vector({kSubdirName})); // Setting the final manifest should fulfill queued open requests. EXPECT_OK(cdc_fuse_fs::SetManifest(manifest_id_)); - ASSERT_EQ(fuse_.open_files.size(), 1); + EXPECT_EQ(fuse_.open_files.size(), 2); } -TEST_F(CdcFuseFsTest, OpenQueuedRequestsRequeue) { +TEST_F(CdcFuseFsTest, QueuedRequestsRequeue) { ContentIdProto intermediate_manifest_id = CreateIntermediateManifestId(); EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id)); + fuse_file_info fi; - // Opening file1 should be queued as it contains no chunks. + // Opening file1 should be queued as it is marked as in-progress CdcFuseLookup(req_, FUSE_ROOT_ID, kFile1Name); ASSERT_EQ(fuse_.entries.size(), 1); - fuse_file_info fi; CdcFuseOpen(req_, fuse_.entries[0].ino, &fi); ASSERT_EQ(fuse_.open_files.size(), 0); + // Opening subdir should be queued as it is marked as in-progress. + CdcFuseLookup(req_, FUSE_ROOT_ID, kSubdirName); + ASSERT_EQ(fuse_.entries.size(), 2); + CdcFuseOpenDir(req_, fuse_.entries[1].ino, &fi); + EXPECT_EQ(fuse_.open_files.size(), 0); + // Setting the same incomplete manifest again should requeue the request. EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id)); ASSERT_EQ(fuse_.open_files.size(), 0); // Setting the final manifest should fulfill queued open requests. EXPECT_OK(cdc_fuse_fs::SetManifest(manifest_id_)); - ASSERT_EQ(fuse_.open_files.size(), 1); + ASSERT_EQ(fuse_.open_files.size(), 2); } TEST_F(CdcFuseFsTest, ReadSucceeds) { diff --git a/cdc_fuse_fs/config_stream_client.cc b/cdc_fuse_fs/config_stream_client.cc index c80561c..b9cead6 100644 --- a/cdc_fuse_fs/config_stream_client.cc +++ b/cdc_fuse_fs/config_stream_client.cc @@ -33,7 +33,7 @@ using ConfigStreamService = proto::ConfigStreamService; // from the workstation. class ManifestIdReader { public: - ManifestIdReader(ConfigStreamService::Stub* stub) : stub_(stub) {} + explicit ManifestIdReader(ConfigStreamService::Stub* stub) : stub_(stub) {} // Starts a GetManifestId() request and listens to the stream of manifest ids // sent from the workstation. Calls |callback| on every manifest id received. @@ -88,21 +88,22 @@ class ManifestIdReader { std::unique_ptr reader_thread_; }; -ConfigStreamClient::ConfigStreamClient(std::string instance, - std::shared_ptr channel) +ConfigStreamGrpcClient::ConfigStreamGrpcClient( + std::string instance, std::shared_ptr channel) : instance_(std::move(instance)), stub_(ConfigStreamService::NewStub(std::move(channel))), read_client_(std::make_unique(stub_.get())) {} -ConfigStreamClient::~ConfigStreamClient() = default; +ConfigStreamGrpcClient::~ConfigStreamGrpcClient() = default; -absl::Status ConfigStreamClient::StartListeningToManifestUpdates( +absl::Status ConfigStreamGrpcClient::StartListeningToManifestUpdates( std::function callback) { LOG_INFO("Starting to listen to manifest updates"); return read_client_->StartListeningToManifestUpdates(callback); } -absl::Status ConfigStreamClient::SendManifestAck(ContentIdProto manifest_id) { +absl::Status ConfigStreamGrpcClient::SendManifestAck( + ContentIdProto manifest_id) { AckManifestIdReceivedRequest request; request.set_gamelet_id(instance_); *request.mutable_manifest_id() = std::move(manifest_id); @@ -114,7 +115,21 @@ absl::Status ConfigStreamClient::SendManifestAck(ContentIdProto manifest_id) { return absl::OkStatus(); } -void ConfigStreamClient::Shutdown() { +absl::Status ConfigStreamGrpcClient::ProcessAssets( + std::vector assets) { + proto::ProcessAssetsRequest request; + for (std::string& asset : assets) + request.add_relative_paths(std::move(asset)); + + grpc::ClientContext context_; + proto::ProcessAssetsResponse response; + // The caller is waiting for the updated manifest anyway, so we can just wait + // for the response. + RETURN_ABSL_IF_ERROR(stub_->ProcessAssets(&context_, request, &response)); + return absl::OkStatus(); +} + +void ConfigStreamGrpcClient::Shutdown() { LOG_INFO("Stopping to listen to manifest updates"); read_client_->Shutdown(); } diff --git a/cdc_fuse_fs/config_stream_client.h b/cdc_fuse_fs/config_stream_client.h index fa25b1a..b89a402 100644 --- a/cdc_fuse_fs/config_stream_client.h +++ b/cdc_fuse_fs/config_stream_client.h @@ -32,26 +32,45 @@ namespace cdc_ft { class ManifestIdReader; +// Interface class for the config stream client. class ConfigStreamClient { public: - // |instance| is the id of the gamelet. - // |channel| is a gRPC channel to use. - ConfigStreamClient(std::string instance, - std::shared_ptr channel); - ~ConfigStreamClient(); + ConfigStreamClient() = default; + virtual ~ConfigStreamClient() = default; // Sends a request to get a stream of manifest id updates. |callback| is // called from a background thread for every manifest id received. // Returns immediately without waiting for the first manifest id. - absl::Status StartListeningToManifestUpdates( - std::function callback); + virtual absl::Status StartListeningToManifestUpdates( + std::function callback) = 0; // Sends a message to indicate that the |manifest_id| was received and FUSE // has been updated to use the new manifest. - absl::Status SendManifestAck(ContentIdProto manifest_id); + virtual absl::Status SendManifestAck(ContentIdProto manifest_id) = 0; + + // Sends a message to prioritize processing of the pending assets in |assets|. + // All assets are given as full relative Unix paths to the file or directory. + virtual absl::Status ProcessAssets(std::vector assets) = 0; // Stops listening for manifest updates. - void Shutdown(); + virtual void Shutdown() = 0; +}; + +class ConfigStreamGrpcClient : public ConfigStreamClient { + public: + // |instance| is the id of the gamelet. + // |channel| is a gRPC channel to use. + ConfigStreamGrpcClient(std::string instance, + std::shared_ptr channel); + ~ConfigStreamGrpcClient(); + + // ConfigStreamClient + + absl::Status StartListeningToManifestUpdates( + std::function callback) override; + absl::Status SendManifestAck(ContentIdProto manifest_id) override; + absl::Status ProcessAssets(std::vector assets) override; + void Shutdown() override; private: using ConfigStreamService = proto::ConfigStreamService; diff --git a/cdc_fuse_fs/main.cc b/cdc_fuse_fs/main.cc index 23e4f53..362afec 100644 --- a/cdc_fuse_fs/main.cc +++ b/cdc_fuse_fs/main.cc @@ -19,6 +19,7 @@ #include "absl/flags/parse.h" #include "absl_helper/jedec_size_flag.h" #include "cdc_fuse_fs/cdc_fuse_fs.h" +#include "cdc_fuse_fs/config_stream_client.h" #include "cdc_fuse_fs/constants.h" #include "common/gamelet_component.h" #include "common/log.h" @@ -191,10 +192,9 @@ int main(int argc, char* argv[]) { prefetch_size, dp_cleanup_timeout, dp_access_idle_timeout); - if (!cdc_ft::cdc_fuse_fs::StartConfigClient(instance, grpc_channel).ok()) { - LOG_ERROR("Could not start reading configuration updates'"); - return 1; - } + cdc_ft::cdc_fuse_fs::SetConfigClient( + std::make_unique( + std::move(instance), std::move(grpc_channel))); // Run FUSE. LOG_INFO("Running filesystem"); diff --git a/cdc_fuse_fs/mock_config_stream_client.cc b/cdc_fuse_fs/mock_config_stream_client.cc new file mode 100644 index 0000000..9d87aac --- /dev/null +++ b/cdc_fuse_fs/mock_config_stream_client.cc @@ -0,0 +1,44 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cdc_fuse_fs/mock_config_stream_client.h" + +namespace cdc_ft { + +std::vector MockConfigStreamClient::ReleasePrioritizedAssets() { + return std::move(prioritized_assets_); +} + +absl::Status MockConfigStreamClient::StartListeningToManifestUpdates( + std::function callback) { + return absl::OkStatus(); +} + +absl::Status MockConfigStreamClient::SendManifestAck( + ContentIdProto manifest_id) { + return absl::OkStatus(); +} + +absl::Status MockConfigStreamClient::ProcessAssets( + std::vector assets) { + prioritized_assets_.insert(prioritized_assets_.end(), assets.begin(), + assets.end()); + return absl::OkStatus(); +} + +void MockConfigStreamClient::Shutdown() { + // Do nothing. +} + +} // namespace cdc_ft diff --git a/cdc_fuse_fs/mock_config_stream_client.h b/cdc_fuse_fs/mock_config_stream_client.h new file mode 100644 index 0000000..12a004d --- /dev/null +++ b/cdc_fuse_fs/mock_config_stream_client.h @@ -0,0 +1,43 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_ +#define CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_ + +#include "cdc_fuse_fs/config_stream_client.h" + +namespace cdc_ft { + +// Mock ConfigStreamClient implementation, used for testing only. +class MockConfigStreamClient : public ConfigStreamClient { + public: + // Returns the list of relative file paths to assets that have been + // prioritized via ProcessAssets() and clears the list. + std::vector ReleasePrioritizedAssets(); + + // ConfigStreamClient + + absl::Status StartListeningToManifestUpdates( + std::function callback) override; + absl::Status SendManifestAck(ContentIdProto manifest_id) override; + absl::Status ProcessAssets(std::vector assets) override; + void Shutdown() override; + + private: + std::vector prioritized_assets_; +}; + +} // namespace cdc_ft + +#endif // CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_ diff --git a/common/file_watcher_win.cc b/common/file_watcher_win.cc index b77ec71..4ee1710 100644 --- a/common/file_watcher_win.cc +++ b/common/file_watcher_win.cc @@ -14,7 +14,9 @@ #include "common/file_watcher_win.h" -#define WIN32_LEAN_AND_MEAN +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN 1 +#endif #include #include @@ -98,7 +100,7 @@ class AsyncFileWatcher { ~AsyncFileWatcher() { Shutdown(); } - absl::Status GetStatus() ABSL_LOCKS_EXCLUDED(status_mutex_) const { + absl::Status GetStatus() const ABSL_LOCKS_EXCLUDED(status_mutex_) { absl::MutexLock mutex(&status_mutex_); return status_; } @@ -145,24 +147,24 @@ class AsyncFileWatcher { modified_files_.clear(); } - uint32_t GetEventCount() ABSL_LOCKS_EXCLUDED(modified_files_mutex_) const { + uint32_t GetEventCount() const ABSL_LOCKS_EXCLUDED(modified_files_mutex_) { absl::MutexLock mutex(&modified_files_mutex_); return event_count_; } - uint32_t GetDirRecreateEventCount() - ABSL_LOCKS_EXCLUDED(modified_files_mutex_) const { + uint32_t GetDirRecreateEventCount() const + ABSL_LOCKS_EXCLUDED(modified_files_mutex_) { absl::MutexLock mutex(&modified_files_mutex_); return dir_recreate_count_; } - bool IsWatching() ABSL_LOCKS_EXCLUDED(state_mutex) const { + bool IsWatching() const ABSL_LOCKS_EXCLUDED(state_mutex_) { absl::MutexLock mutex(&state_mutex_); return state_ != FileWatcherState::kDefault && state_ != FileWatcherState::kShuttingDown; } - bool IsShuttingDown() ABSL_LOCKS_EXCLUDED(state_mutex) const { + bool IsShuttingDown() const ABSL_LOCKS_EXCLUDED(state_mutex_) { absl::MutexLock mutex(&state_mutex_); return state_ == FileWatcherState::kShuttingDown; } @@ -186,7 +188,7 @@ class AsyncFileWatcher { void WatchDirChanges() { // TODO: Adjust also if there was no directory at the beginning. Currently, // the directory exists; otherwise, ManifestUpdater would fail. - bool prev_dir_exists = true; + bool first_run = true, prev_run_was_success = false; while (true) { ScopedHandle read_event(CreateEvent(nullptr, /* no security attributes */ TRUE, /* manual-reset event */ @@ -200,27 +202,30 @@ class AsyncFileWatcher { FILE_BASIC_INFO dir_info; absl::StatusOr status = GetValidDirHandle(&dir_info); - if (!status.ok()) { - SetStatus(status.status()); - } else { + SetStatus(status.status()); + if (status.ok()) { // The watched directory exists and its handle is valid. - if (!prev_dir_exists) { + if (!first_run) { ++dir_recreate_count_; - prev_dir_exists = true; - SetStatus(absl::OkStatus()); if (dir_recreated_cb_) dir_recreated_cb_(); } + first_run = false; + prev_run_was_success = true; + // Keep reading directory changes. This function only returns once it + // gets the shutdown signal, the watched directory is removed, or an + // error occurs while reading file changes. ReadDirChanges(*status, dir_info, read_event); if (IsShuttingDown()) { LOG_DEBUG("Shutting down watching '%s'.", dir_path_); return; } LOG_WARNING("Watched directory '%s' was possibly removed.", dir_path_); - ++dir_recreate_count_; ClearModifiedFiles(); + } else if (prev_run_was_success) { + prev_run_was_success = false; + ++dir_recreate_count_; if (dir_recreated_cb_) dir_recreated_cb_(); } - prev_dir_exists = false; // The shutdown event should be caught on both levels: when the // watched directory was not removed and when it was recreated. Here // the shutdown event is considered when the watched directory itself was @@ -530,7 +535,7 @@ class AsyncFileWatcher { std::thread dir_reader_; // watching thread. mutable absl::Mutex status_mutex_; - absl::Status status_ = absl::OkStatus() ABSL_GUARDED_BY(status_mutex_); + absl::Status status_ ABSL_GUARDED_BY(status_mutex_); mutable absl::Mutex modified_files_mutex_; FileMap modified_files_ ABSL_GUARDED_BY(modified_files_mutex_); @@ -538,8 +543,8 @@ class AsyncFileWatcher { uint32_t dir_recreate_count_ ABSL_GUARDED_BY(modified_files_mutex_) = 0; mutable absl::Mutex state_mutex_; - FileWatcherState state_ = FileWatcherState::kDefault ABSL_GUARDED_BY( - state_mutex_); // the current watcher state. + FileWatcherState state_ ABSL_GUARDED_BY(state_mutex_) = + FileWatcherState::kDefault; // the current watcher state. // Pointer to ReadDirectoryChangesExW function if available. decltype(ReadDirectoryChangesExW)* read_directory_changes_ex_ = nullptr; diff --git a/common/file_watcher_win_test.cc b/common/file_watcher_win_test.cc index de799be..46a7ba1 100644 --- a/common/file_watcher_win_test.cc +++ b/common/file_watcher_win_test.cc @@ -115,9 +115,8 @@ class FileWatcherParameterizedTest : public ::testing::TestWithParam { bool changed = false; do { auto cond = [this]() { return files_changed_; }; - files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond), - kWaitTimeout); - changed = files_changed_; + changed = files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond), + kWaitTimeout); files_changed_ = false; } while (changed && watcher_.GetEventCountForTesting() < min_event_count); return changed; @@ -128,9 +127,8 @@ class FileWatcherParameterizedTest : public ::testing::TestWithParam { bool changed = false; do { auto cond = [this]() { return dir_recreated_; }; - files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond), - kWaitTimeout); - changed = dir_recreated_; + changed = files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond), + kWaitTimeout); dir_recreated_ = false; } while (changed && watcher_.GetDirRecreateEventCountForTesting() < min_event_count); @@ -511,7 +509,10 @@ TEST_P(FileWatcherParameterizedTest, ModifiedTime) { TEST_P(FileWatcherParameterizedTest, DeleteWatchedDir) { EXPECT_OK(watcher_.StartWatching([this]() { OnFilesChanged(); }, - [this]() { OnDirRecreated(); })); + [this]() { OnDirRecreated(); }, kFWTimeout)); + + EXPECT_OK(path::WriteFile(first_file_path_, kFirstData, kFirstDataSize)); + EXPECT_TRUE(WaitForChange(2u)); // 2x modify EXPECT_OK(path::RemoveDirRec(watcher_dir_path_)); EXPECT_TRUE(WaitForDirRecreated(1u)); @@ -586,6 +587,8 @@ TEST_P(FileWatcherParameterizedTest, RecreateWatchedDirNoOldChanges) { EXPECT_OK(path::WriteFile(first_file_path_, kFirstData, kFirstDataSize)); EXPECT_OK(path::RemoveDirRec(watcher_dir_path_)); + EXPECT_TRUE(WaitForDirRecreated(1u)); + EXPECT_OK(path::CreateDirRec(watcher_dir_path_)); EXPECT_TRUE(WaitForDirRecreated(2u)); diff --git a/manifest/BUILD b/manifest/BUILD index 4759153..1ff8fe8 100644 --- a/manifest/BUILD +++ b/manifest/BUILD @@ -111,8 +111,14 @@ cc_library( cc_library( name = "manifest_updater", - srcs = ["manifest_updater.cc"], - hdrs = ["manifest_updater.h"], + srcs = [ + "manifest_updater.cc", + "pending_assets_queue.cc", + ], + hdrs = [ + "manifest_updater.h", + "pending_assets_queue.h", + ], deps = [ ":file_chunk_map", ":manifest_builder", diff --git a/manifest/manifest_builder.cc b/manifest/manifest_builder.cc index 04f9a3a..b55538e 100644 --- a/manifest/manifest_builder.cc +++ b/manifest/manifest_builder.cc @@ -91,15 +91,19 @@ absl::StatusOr ManifestBuilder::GetOrCreateAsset( name = parts.back(); parts.pop_back(); } - DirCreateMode create_mode = - force_create ? DirCreateMode::kForceCreate : DirCreateMode::kCreate; + DirCreateMode create_mode = type == AssetProto::UNKNOWN + ? DirCreateMode::kNoCreate + : force_create ? DirCreateMode::kForceCreate + : DirCreateMode::kCreate; AssetProto* dir; ASSIGN_OR_RETURN(dir, FindOrCreateDirPath(parts, create_mode), "Failed to create directory '%s'", JoinUnixPath(parts)); if (name.empty()) { // Special case: return the root directory for a DIRECTORY with empty name. - if (type == AssetProto::DIRECTORY) return AssetBuilder(dir, std::string()); + if (type == AssetProto::DIRECTORY || type == AssetProto::UNKNOWN) { + return AssetBuilder(dir, std::string()); + } return absl::InvalidArgumentError("Empty path given"); } @@ -108,6 +112,10 @@ absl::StatusOr ManifestBuilder::GetOrCreateAsset( AssetProto* asset = nullptr; if (result.ok()) { asset = result.value(); + // If the asset type is unknown, we return any type. + if (type == AssetProto::UNKNOWN) { + return AssetBuilder(asset, path::DirName(unix_path)); + } // Verify that both assets are of the same type. if (asset->type() != type) { if (force_create) { @@ -125,11 +133,15 @@ absl::StatusOr ManifestBuilder::GetOrCreateAsset( } // Create the asset if it was not found or it was deleted. if (!asset) { + if (type == AssetProto::UNKNOWN) { + return absl::NotFoundError( + absl::StrFormat("Asset '%s' does not exist.", path)); + } asset = dir->add_dir_assets(); InitNewAsset(name, type, asset); if (created) *created = true; } - return AssetBuilder(asset, path::ToUnix(path::DirName(path))); + return AssetBuilder(asset, path::DirName(unix_path)); } absl::Status ManifestBuilder::DeleteAsset(const std::string& path) { diff --git a/manifest/manifest_builder.h b/manifest/manifest_builder.h index e17ac62..693a2a0 100644 --- a/manifest/manifest_builder.h +++ b/manifest/manifest_builder.h @@ -76,6 +76,10 @@ class ManifestBuilder { // asset is removed (recursively for directories) and a new asset with the // same name is created instead. // + // When |type| is UNKNOWN, an existing assets of any type is returned, no new + // asset is created when it does not exist, nor are any of the directories + // that lead up to that asset. + // // When |created| is given, then it will be set to true if that asset was // actually added, otherwise it will be set to false. absl::StatusOr GetOrCreateAsset(const std::string& path, diff --git a/manifest/manifest_builder_test.cc b/manifest/manifest_builder_test.cc index 808a2ca..87cc75d 100644 --- a/manifest/manifest_builder_test.cc +++ b/manifest/manifest_builder_test.cc @@ -344,6 +344,52 @@ TEST_F(ManifestBuilderTest, FilesDirsCreatedOnlyOnce) { VerifyAssets(assets, builder.ManifestId()); } +TEST_F(ManifestBuilderTest, GetAssetsOfUnkonwnType) { + ManifestBuilder builder(cdc_params_, &cache_); + AssetMap assets; + assets["file1.txt"] = {"a"}; + assets["dir1"] = {}; + + ASSERT_OK(AddAssets(assets, &builder)); + bool created = false; + + // Get existing assets, force_create == false + EXPECT_OK(builder.GetOrCreateAsset("file1.txt", AssetProto::UNKNOWN, false, + &created)); + EXPECT_FALSE(created); + EXPECT_OK( + builder.GetOrCreateAsset("dir1", AssetProto::UNKNOWN, false, &created)); + EXPECT_FALSE(created); + + // Get existing assets, force_create == true + EXPECT_OK(builder.GetOrCreateAsset("file1.txt", AssetProto::UNKNOWN, true, + &created)); + EXPECT_FALSE(created); + EXPECT_OK( + builder.GetOrCreateAsset("dir1", AssetProto::UNKNOWN, true, &created)); + EXPECT_FALSE(created); + + // Get the root directory. + EXPECT_OK(builder.GetOrCreateAsset("", AssetProto::UNKNOWN)); + + // Get non-existing file fails, force_create = false + EXPECT_NOT_OK( + builder.GetOrCreateAsset("does_not_exist", AssetProto::UNKNOWN, false)); + + // Get non-existing file fails, force_create = true + EXPECT_NOT_OK( + builder.GetOrCreateAsset("does_not_exist", AssetProto::UNKNOWN, true)); + + // Get non-existing file fails, no sub-directories are created. + EXPECT_NOT_OK(builder.GetOrCreateAsset("new_dir1/does_not_exist", + AssetProto::UNKNOWN, false)); + EXPECT_NOT_OK(builder.GetOrCreateAsset("new_dir2/does_not_exist", + AssetProto::UNKNOWN, true)); + + ASSERT_OK(builder.Flush()); + VerifyAssets(assets, builder.ManifestId()); +} + TEST_F(ManifestBuilderTest, Deduplication) { ManifestBuilder builder(cdc_params_, &cache_); AssetMap assets; diff --git a/manifest/manifest_iterator.cc b/manifest/manifest_iterator.cc index f21daca..f9a8fb3 100644 --- a/manifest/manifest_iterator.cc +++ b/manifest/manifest_iterator.cc @@ -79,7 +79,7 @@ absl::Status ManifestIterator::Open(const std::string& manifest_file) { std::string msg = absl::StrFormat("failed to open file '%s' for reading", manifest_file); if (errno) { - status_ = ErrnoToCanonicalStatus(errno, msg); + status_ = ErrnoToCanonicalStatus(errno, "%s", msg); } else { status_ = absl::UnknownError(msg); } diff --git a/manifest/manifest_test_base.cc b/manifest/manifest_test_base.cc index e06177b..16545b2 100644 --- a/manifest/manifest_test_base.cc +++ b/manifest/manifest_test_base.cc @@ -82,12 +82,14 @@ ManifestTestBase::ManifestTestBase(std::string base_dir) std::vector ManifestTestBase::GetAllManifestAssets(ContentIdProto actual_manifest_id) { - ContentIdProto manifest_id; - EXPECT_OK(data_store_.GetProto(manifest_store_id_, &manifest_id)); - EXPECT_EQ(manifest_id, actual_manifest_id); + ContentIdProto expected_manifest_id; + EXPECT_OK(data_store_.GetProto(manifest_store_id_, &expected_manifest_id)); + EXPECT_EQ(ContentId::ToHexString(expected_manifest_id), + ContentId::ToHexString(actual_manifest_id)) + << DumpDataStoreProtos(); ManifestIterator manifest_iter(&data_store_); - EXPECT_OK(manifest_iter.Open(manifest_id)); + EXPECT_OK(manifest_iter.Open(expected_manifest_id)); std::vector assets; const AssetProto* entry; @@ -168,10 +170,10 @@ void ManifestTestBase::ExpectAssetInfosEqual(std::vector a, void ManifestTestBase::ExpectManifestEquals( std::initializer_list rel_paths, const ContentIdProto& actual_manifest_id) { - std::vector manifest_ais = + std::vector actual_ais = GetAllManifestAssets(actual_manifest_id); std::vector expected_ais = MakeAssetInfos(rel_paths); - ExpectAssetInfosEqual(manifest_ais, expected_ais); + ExpectAssetInfosEqual(actual_ais, expected_ais); } bool ManifestTestBase::InProgress(const ContentIdProto& manifest_id, diff --git a/manifest/manifest_test_base.h b/manifest/manifest_test_base.h index 835b864..67bf422 100644 --- a/manifest/manifest_test_base.h +++ b/manifest/manifest_test_base.h @@ -90,7 +90,7 @@ class ManifestTestBase : public ::testing::Test { // Compares the contents of the manifest to the real files at |rel_paths|. // The paths are relative to |cfg_.src_dir|. void ExpectManifestEquals(std::initializer_list rel_paths, - const ContentIdProto& actual_manifest_id); + const ContentIdProto& got_manifest_id); // Returns true if the file at Unix |path| contains file chunks in the // manifest referenced by |manifest_id|. diff --git a/manifest/manifest_updater.cc b/manifest/manifest_updater.cc index 0576f90..ee46cbe 100644 --- a/manifest/manifest_updater.cc +++ b/manifest/manifest_updater.cc @@ -35,6 +35,16 @@ namespace cdc_ft { namespace { +// A generic finalizer that invokes a given function at the end of its lifetime. +class Finalizer { + public: + explicit Finalizer(std::function finalize) : finalize_(finalize) {} + ~Finalizer() { finalize_(); } + + private: + std::function finalize_; +}; + // Returns AssetInfos for all files and dirs in |src_dir| + |rel_path|. Does not // recurse into sub-directories. absl::Status GetAllSrcAssets(const std::string& src_dir, @@ -104,49 +114,49 @@ void AssetInfo::AppendMoveChunks(RepeatedChunkRefProto* list, // Common fields for tasks that fill in manifest data. class ManifestTask : public Task { public: - ManifestTask(std::string src_dir, std::string relative_unix_path, - std::string filename) - : src_dir_(std::move(src_dir)), - rel_unix_path_(std::move(relative_unix_path)), - filename_(std::move(filename)) {} + ManifestTask(std::string src_dir, PendingAsset asset) + : src_dir_(std::move(src_dir)), asset_(std::move(asset)) {} // Relative unix path of the directory containing the file or directory for // this task. - const std::string& RelativeUnixPath() const { return rel_unix_path_; } + const std::string& RelativeUnixPath() const { return asset_.relative_path; } // Relative unix path of the file or directory for this task. std::string RelativeUnixFilePath() const { - return path::JoinUnix(rel_unix_path_, filename_); + return path::JoinUnix(RelativeUnixPath(), Filename()); } // Name of the file or directory to process with this task. - const std::string& Filename() const { return filename_; } + const std::string& Filename() const { return asset_.filename; } // Full path of the file or directory to process with this task. std::string FilePath() const { - return path::Join(src_dir_, path::ToNative(rel_unix_path_), filename_); + return path::Join(src_dir_, path::ToNative(RelativeUnixPath()), + asset_.filename); } // Returns the final status of the task. // Should not be accessed before the task is finished. const absl::Status& Status() const { return status_; } + // Returns whether or not this asset is explicitly prioritized. + bool Prioritized() const { return asset_.prioritized; } + + // Returns the pending asset's deadline. + absl::Time Deadline() const { return asset_.deadline; } + protected: const std::string src_dir_; - const std::string rel_unix_path_; - const std::string filename_; - + const PendingAsset asset_; absl::Status status_; }; // ThreadPool task that runs the CDC chunker on a given file. class FileChunkerTask : public ManifestTask { public: - FileChunkerTask(std::string src_dir, std::string relative_path, - std::string filename, const fastcdc::Config* cfg, - Buffer buffer) - : ManifestTask(std::move(src_dir), std::move(relative_path), - std::move(filename)), + FileChunkerTask(std::string src_dir, PendingAsset asset, + const fastcdc::Config* cfg, Buffer buffer) + : ManifestTask(std::move(src_dir), std::move(asset)), cfg_(cfg), buffer_(std::move(buffer)) { assert(cfg_->max_size > 0); @@ -223,11 +233,9 @@ class FileChunkerTask : public ManifestTask { // ThreadPool task that creates assets for the contents of a directory. class DirScannerTask : public ManifestTask { public: - DirScannerTask(std::string src_dir, std::string relative_path, - std::string filename, AssetBuilder dir, + DirScannerTask(std::string src_dir, PendingAsset asset, AssetBuilder dir, DataStoreReader* data_store) - : ManifestTask(std::move(src_dir), std::move(relative_path), - std::move(filename)), + : ManifestTask(std::move(src_dir), std::move(asset)), dir_(dir), data_store_(data_store) {} @@ -419,15 +427,16 @@ absl::Status ManifestUpdater::IsValidDir(std::string dir) { } ManifestUpdater::ManifestUpdater(DataStoreWriter* data_store, UpdaterConfig cfg) - : data_store_(data_store), cfg_(std::move(cfg)) { + : data_store_(data_store), + cfg_(std::move(cfg)), + queue_(kMinAssetProcessingTime) { path::EnsureEndsWithPathSeparator(&cfg_.src_dir); } ManifestUpdater::~ManifestUpdater() = default; -absl::Status ManifestUpdater::UpdateAll( - FileChunkMap* file_chunks, - PushIntermediateManifest push_intermediate_manifest) { +absl::Status ManifestUpdater::UpdateAll(FileChunkMap* file_chunks, + PushManifestHandler push_handler) { RETURN_IF_ERROR(ManifestUpdater::IsValidDir(cfg_.src_dir)); // Don't use the Windows localized time from path::GetStats. @@ -441,9 +450,8 @@ absl::Status ManifestUpdater::UpdateAll( std::vector operations{{Operator::kAdd, std::move(ri)}}; - absl::Status status = - Update(&operations, file_chunks, push_intermediate_manifest, - /*recursive=*/true); + absl::Status status = Update(&operations, file_chunks, push_handler, + /*recursive=*/true); if (status.ok() || !absl::IsUnavailable(status)) return status; @@ -456,7 +464,7 @@ absl::Status ManifestUpdater::UpdateAll( RETURN_IF_ERROR(data_store_->Wipe()); file_chunks->Clear(); - RETURN_IF_ERROR(Update(&operations, file_chunks, push_intermediate_manifest, + RETURN_IF_ERROR(Update(&operations, file_chunks, push_handler, /*recursive=*/true), "Failed to build manifest from scratch"); @@ -495,29 +503,97 @@ ContentIdProto ManifestUpdater::DefaultManifestId() { return manifest_id_; } -size_t ManifestUpdater::QueueTasks(Threadpool* pool, - const fastcdc::Config* cdc_cfg, - ManifestBuilder* manifest_builder) { - const size_t max_tasks_queued = MaxQueuedTasks(*pool); - size_t num_tasks_queued = 0; - while (pool->NumQueuedTasks() < max_tasks_queued && !queue_.empty() && - !buffers_.empty()) { - PendingAsset asset = std::move(queue_.front()); - absl::StatusOr dir; - queue_.pop_front(); +absl::Status ManifestUpdater::FlushAndPushManifest( + FileChunkMap* file_chunks, + std::unordered_set* manifest_content_ids, + PushManifestHandler push_manifest_handler) { + file_chunks->FlushUpdates(); + ASSIGN_OR_RETURN(manifest_id_, manifest_builder_->Flush(), + "Failed to flush intermediate manifest"); + // Add all content IDs that were just written back. + manifest_content_ids->insert(manifest_builder_->FlushedContentIds().begin(), + manifest_builder_->FlushedContentIds().end()); + if (push_manifest_handler) push_manifest_handler(manifest_id_); + last_manifest_flush_ = absl::Now(); + return absl::OkStatus(); +} +bool ManifestUpdater::WantManifestFlushed( + PushManifestHandler push_manifest_handler) const { + return push_manifest_handler && flush_deadline_ < absl::Now() && + last_manifest_flush_ + kMinDelayBetweenFlush < absl::Now(); +} + +absl::Status ManifestUpdater::MaybeFlushAndPushManifest( + size_t dir_scanner_tasks_queued, FileChunkMap* file_chunks, + std::unordered_set* manifest_content_ids, + PushManifestHandler push_manifest) { + // Flush only if there are no DirScannerTask active. + if (dir_scanner_tasks_queued == 0 && WantManifestFlushed(push_manifest)) { + flush_deadline_ = absl::InfiniteFuture(); + return FlushAndPushManifest(file_chunks, manifest_content_ids, + push_manifest); + } + return absl::OkStatus(); +} + +void ManifestUpdater::AddPriorityAssets(std::vector rel_paths) { + absl::MutexLock lock(&priority_mutex_); + absl::Time now = absl::Now(); + for (std::string& rel_path : rel_paths) { + priority_assets_.push_back(PriorityAsset{std::move(rel_path), now}); + } +} + +void ManifestUpdater::PrioritizeQueuedAssets() { + std::vector prio_assets; + { + absl::MutexLock lock(&priority_mutex_); + if (priority_assets_.empty()) return; + std::swap(prio_assets, priority_assets_); + } + + absl::Time deadline = queue_.Prioritize(prio_assets, manifest_builder_.get()); + if (deadline < flush_deadline_) flush_deadline_ = deadline; +} + +ManifestUpdater::QueueTasksResult ManifestUpdater::QueueTasks( + bool drain_dir_scanner_tasks, Threadpool* pool, + const fastcdc::Config* cdc_cfg) { + // Prioritize requested assets before queuing new tasks. + PrioritizeQueuedAssets(); + const size_t max_tasks_queued = MaxQueuedTasks(*pool); + size_t file_chunker_tasks = 0, dir_scanner_tasks = 0; + + // Skip DIRECTORY assets if we should drain DirScannerTasks. + PendingAssetsQueue::AcceptFunc accept = nullptr; + if (drain_dir_scanner_tasks) { + accept = [](const PendingAsset& p) { + return p.type != AssetProto::DIRECTORY; + }; + } + + absl::StatusOr dir; + PendingAsset asset; + + while (pool->NumQueuedTasks() < max_tasks_queued && !buffers_.empty() && + queue_.Dequeue(&asset, accept)) { switch (asset.type) { case AssetProto::FILE: pool->QueueTask(std::make_unique( - cfg_.src_dir, std::move(asset.relative_path), - std::move(asset.filename), cdc_cfg, std::move(buffers_.back()))); + cfg_.src_dir, std::move(asset), cdc_cfg, + std::move(buffers_.back()))); buffers_.pop_back(); + ++file_chunker_tasks; break; case AssetProto::DIRECTORY: - dir = manifest_builder->GetOrCreateAsset( + // Flushing the manifest may invalidate the pointers to the directory + // proto returned from GetOrCreateAsset(), so the manifest cannot be + // flushed as long as DirScannerTask are in the queue. + dir = manifest_builder_->GetOrCreateAsset( path::JoinUnix(asset.relative_path, asset.filename), - AssetProto::DIRECTORY, true); + AssetProto::DIRECTORY, /*force_create=*/true); if (!dir.ok()) { LOG_ERROR( "Failed to locate directory '%s' in the manifest, skipping it: " @@ -526,8 +602,9 @@ size_t ManifestUpdater::QueueTasks(Threadpool* pool, continue; } pool->QueueTask(std::make_unique( - cfg_.src_dir, std::move(asset.relative_path), - std::move(asset.filename), std::move(dir.value()), data_store_)); + cfg_.src_dir, std::move(asset), std::move(dir.value()), + data_store_)); + ++dir_scanner_tasks; break; default: @@ -535,15 +612,13 @@ size_t ManifestUpdater::QueueTasks(Threadpool* pool, AssetProto::Type_Name(asset.type), asset.relative_path); continue; } - ++num_tasks_queued; } - return num_tasks_queued; + return QueueTasksResult{dir_scanner_tasks, file_chunker_tasks}; } absl::Status ManifestUpdater::ApplyOperations( std::vector* operations, FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder, AssetBuilder* parent, bool recursive) { - assert(manifest_builder != nullptr); + AssetBuilder* parent, absl::Time deadline, bool recursive) { if (operations->empty()) return absl::OkStatus(); // First, handle all deletions to make the outcome independent of the order of @@ -561,7 +636,7 @@ absl::Status ManifestUpdater::ApplyOperations( // skipped. continue; } - RETURN_IF_ERROR(manifest_builder->DeleteAsset(ai.path), + RETURN_IF_ERROR(manifest_builder_->DeleteAsset(ai.path), "Failed to delete asset '%s' from manifest", ai.path); last_deleted = &ai.path; } @@ -591,8 +666,8 @@ absl::Status ManifestUpdater::ApplyOperations( case Operator::kUpdate: ASSIGN_OR_RETURN(asset_builder, - manifest_builder->GetOrCreateAsset(ai.path, ai.type, - true, &created), + manifest_builder_->GetOrCreateAsset(ai.path, ai.type, + true, &created), "Failed to add '%s' to the manifest", ai.path); break; } @@ -609,29 +684,30 @@ absl::Status ManifestUpdater::ApplyOperations( asset_builder.SetFileSize(ai.size); // Queue chunker tasks for files. asset_builder.SetInProgress(true); - } else if (recursive && ai.type == AssetProto::DIRECTORY) { - // We are recursing into all sub-directories, so we add queue up the - // child directory for scanning. - asset_builder.SetInProgress(true); + } else if (ai.type == AssetProto::DIRECTORY) { + asset_builder.SetPermissions(ManifestBuilder::kDefaultDirPerms); + // We are recursing into all sub-directories, so we queue up the child + // directory for scanning. + if (recursive) asset_builder.SetInProgress(true); } // If the asset is marked as in-progress, we need to queue it up. if (asset_builder.InProgress()) { - queue_.emplace_back(ai.type, asset_builder.RelativePath(), - asset_builder.Name()); + PendingAsset pending(ai.type, asset_builder.RelativePath(), + asset_builder.Name(), deadline); + queue_.Add(std::move(pending)); } } return absl::OkStatus(); } absl::Status ManifestUpdater::HandleFileChunkerResult( - FileChunkerTask* task, FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder) { + FileChunkerTask* task, FileChunkMap* file_chunks) { const std::string rel_file_path = task->RelativeUnixFilePath(); buffers_.emplace_back(task->ReleaseBuffer()); AssetBuilder asset_builder; - ASSIGN_OR_RETURN(asset_builder, manifest_builder->GetOrCreateAsset( + ASSIGN_OR_RETURN(asset_builder, manifest_builder_->GetOrCreateAsset( rel_file_path, AssetProto::FILE)); asset_builder.SetInProgress(false); if (!task->Status().ok()) { @@ -663,7 +739,6 @@ absl::Status ManifestUpdater::HandleFileChunkerResult( absl::Status ManifestUpdater::HandleDirScannerResult( DirScannerTask* task, FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder, std::unordered_set* manifest_content_ids) { // Include the error in the stats, but we can still try to process the // (partial) results. @@ -671,21 +746,26 @@ absl::Status ManifestUpdater::HandleDirScannerResult( ++stats_.total_dirs_failed; } + // If there's a chance we can do more work within the parent's deadline, we + // propagate the deadline to the children. + // TODO(chrschn) Use SteadyClock instead of the system clock. + absl::Time deadline = task->Deadline() > absl::Now() ? task->Deadline() + : absl::InfiniteFuture(); + // DirScannerTasks are inherently recursive. - RETURN_IF_ERROR(ApplyOperations(task->Operations(), file_chunks, - manifest_builder, task->Dir(), - /*recursive=*/true)); + RETURN_IF_ERROR(ApplyOperations(task->Operations(), file_chunks, task->Dir(), + deadline, /*recursive=*/true)); task->Dir()->SetInProgress(false); // Union all manifest chunk content IDs. - assert(manifest_content_ids != nullptr); manifest_content_ids->insert(task->ManifestContentIds()->begin(), task->ManifestContentIds()->end()); return task->Status(); } -absl::Status ManifestUpdater::Update( - OperationList* operations, FileChunkMap* file_chunks, - PushIntermediateManifest push_intermediate_manifest, bool recursive) { +absl::Status ManifestUpdater::Update(OperationList* operations, + FileChunkMap* file_chunks, + PushManifestHandler push_handler, + bool recursive) { Stopwatch sw; LOG_INFO( "Updating manifest for '%s': applying %u changes, " @@ -694,11 +774,20 @@ absl::Status ManifestUpdater::Update( stats_ = UpdaterStats(); + // Collects the content IDs that make up the manifest when recursing. They are + // used to prune the manifest cache directory at the end of the Update() + // process. + std::unordered_set manifest_content_ids; + CdcParamsProto cdc_params; cdc_params.set_min_chunk_size(cfg_.min_chunk_size); cdc_params.set_avg_chunk_size(cfg_.avg_chunk_size); cdc_params.set_max_chunk_size(cfg_.max_chunk_size); - ManifestBuilder manifest_builder(cdc_params, data_store_); + manifest_builder_ = + std::make_unique(cdc_params, data_store_); + + // Release the ManifestBuilder at the end of this function to free memory. + Finalizer finalizer([b = &manifest_builder_]() { b->reset(); }); // Load the manifest id from the store. ContentIdProto manifest_id; @@ -711,17 +800,17 @@ absl::Status ManifestUpdater::Update( // A non-existing manifest is not an issue, just build it from scratch. LOG_INFO("No cached manifest found. Building from scratch."); } else { - RETURN_IF_ERROR(manifest_builder.LoadManifest(manifest_id), + RETURN_IF_ERROR(manifest_builder_->LoadManifest(manifest_id), "Failed to load manifest with id '%s'", ContentId::ToHexString(manifest_id)); // The CDC params might have changed when loading the manifest. - if (ValidateCdcParams(manifest_builder.Manifest()->cdc_params())) { - cdc_params = manifest_builder.Manifest()->cdc_params(); + if (ValidateCdcParams(manifest_builder_->Manifest()->cdc_params())) { + cdc_params = manifest_builder_->Manifest()->cdc_params(); } } - RETURN_IF_ERROR(ApplyOperations(operations, file_chunks, &manifest_builder, - nullptr, recursive)); + RETURN_IF_ERROR(ApplyOperations(operations, file_chunks, nullptr, + absl::InfiniteFuture(), recursive)); Threadpool pool(cfg_.num_threads > 0 ? cfg_.num_threads : std::thread::hardware_concurrency()); @@ -730,36 +819,37 @@ absl::Status ManifestUpdater::Update( buffers_.reserve(max_queued_tasks); while (buffers_.size() < max_queued_tasks) buffers_.emplace_back(cfg_.max_chunk_size << 1); - size_t num_tasks_queued = 0; + size_t total_tasks_queued = 0, scanner_tasks_queued = 0; - // Collect the content IDs that make up the manifest when recursing. They are - // used to prune the manifest cache directory in the end. - std::unordered_set manifest_content_ids; - - // Push intermediate manifest if there are queued chunker tasks. - if (push_intermediate_manifest && !queue_.empty()) { - file_chunks->FlushUpdates(); - ASSIGN_OR_RETURN(manifest_id_, manifest_builder.Flush(), - "Failed to flush intermediate manifest"); - // Add all content IDs that were just written back. - manifest_content_ids.insert(manifest_builder.FlushedContentIds().begin(), - manifest_builder.FlushedContentIds().end()); - push_intermediate_manifest(manifest_id_); + // Push intermediate manifest if there are queued tasks. + if (push_handler && !queue_.Empty()) { + RETURN_IF_ERROR( + FlushAndPushManifest(file_chunks, &manifest_content_ids, push_handler)); } fastcdc::Config cdc_cfg = CdcConfigFromProto(cdc_params); - // Wait for the chunker tasks and update file assets. - while (!queue_.empty() || num_tasks_queued > 0) { - num_tasks_queued += QueueTasks(&pool, &cdc_cfg, &manifest_builder); + // Wait for the chunker and scanner tasks. + while (!queue_.Empty() || total_tasks_queued > 0) { + RETURN_IF_ERROR(MaybeFlushAndPushManifest(scanner_tasks_queued, file_chunks, + &manifest_content_ids, + push_handler)); + // Flushing the manifest may invalidate the AssetProto pointers held by the + // queued DirScannerTask. If the manifest should be flushed, we drain the + // queue from those tasks so that the push is safe. + bool drain_dir_scanners = WantManifestFlushed(push_handler); + + QueueTasksResult queued = QueueTasks(drain_dir_scanners, &pool, &cdc_cfg); + total_tasks_queued += queued.dir_scanners + queued.file_chunkers; + scanner_tasks_queued += queued.dir_scanners; + std::unique_ptr task = pool.GetCompletedTask(); - assert(num_tasks_queued > 0); - --num_tasks_queued; + assert(total_tasks_queued > 0); + --total_tasks_queued; FileChunkerTask* chunker_task = dynamic_cast(task.get()); if (chunker_task) { - status = - HandleFileChunkerResult(chunker_task, file_chunks, &manifest_builder); + status = HandleFileChunkerResult(chunker_task, file_chunks); if (!status.ok()) { LOG_ERROR("Failed to process file '%s': %s", chunker_task->FilePath(), @@ -770,8 +860,10 @@ absl::Status ManifestUpdater::Update( DirScannerTask* scanner_task = dynamic_cast(task.get()); if (scanner_task) { + assert(scanner_tasks_queued > 0); + --scanner_tasks_queued; status = HandleDirScannerResult(scanner_task, file_chunks, - &manifest_builder, &manifest_content_ids); + &manifest_content_ids); if (!status.ok()) { LOG_ERROR("Failed to process directory '%s': %s", scanner_task->FilePath(), status.ToString()); @@ -780,25 +872,23 @@ absl::Status ManifestUpdater::Update( } } - file_chunks->FlushUpdates(); - ASSIGN_OR_RETURN(manifest_id_, manifest_builder.Flush(), - "Failed to flush manifest"); - + // Don't pass in the push_handler here. We first want to write back the new + // manifest ID to the data store before we call the handler. + RETURN_IF_ERROR( + FlushAndPushManifest(file_chunks, &manifest_content_ids, nullptr)); // Save the manifest id to the store. std::string id_str = manifest_id_.SerializeAsString(); RETURN_IF_ERROR( data_store_->Put(GetManifestStoreId(), id_str.data(), id_str.size()), "Failed to store manifest id"); + if (push_handler) push_handler(manifest_id_); // Remove manifest chunks that are no longer referenced when recursing through // all sub-directories. This also makes sure that all referenced manifest // chunks are present. if (status.ok() && recursive) { // Retain the chunk that stores the manifest ID. - manifest_content_ids.insert(ManifestUpdater::GetManifestStoreId()); - // Add all content IDs that were just written back. - manifest_content_ids.insert(manifest_builder.FlushedContentIds().begin(), - manifest_builder.FlushedContentIds().end()); + manifest_content_ids.insert(GetManifestStoreId()); status = data_store_->Prune(std::move(manifest_content_ids)); if (!status.ok()) { // Signal to the caller that the manifest needs to be rebuilt from diff --git a/manifest/manifest_updater.h b/manifest/manifest_updater.h index d63d652..6a1ed48 100644 --- a/manifest/manifest_updater.h +++ b/manifest/manifest_updater.h @@ -26,6 +26,7 @@ #include "manifest/asset_builder.h" #include "manifest/file_chunk_map.h" #include "manifest/manifest_proto_defs.h" +#include "manifest/pending_assets_queue.h" namespace cdc_ft { namespace fastcdc { @@ -140,7 +141,7 @@ class ManifestUpdater { // Returns an error if |dir| does not exist or it is not a directory. static absl::Status IsValidDir(std::string dir); - using PushIntermediateManifest = + using PushManifestHandler = std::function; // |data_store| is used to store manifest chunks. File data chunks are not @@ -156,27 +157,29 @@ class ManifestUpdater { // Reads the full source directory and syncs the manifest to it. Prunes old, // unreferenced manifest chunks. Updates and flushes |file_chunks|. // - // If a valid |push_intermediate_manifest| is passed, then a manifest is - // flushed after the root directory has been added, but before all files and + // If a valid |push_handler| is passed, then a manifest is flushed at least + // twice and the handler is called: + // - after the root directory has been added, but before all files and // directories have been processed. That means, the manifest does not yet // contains all assets, all incomplete assets are set to in-progress. + // - after an asset that was prioritized with AddPriorityAssets() has been + // completed. + // - at the end of the update process in case of success absl::Status UpdateAll(FileChunkMap* file_chunks, - PushIntermediateManifest push_intermediate_manifest = - PushIntermediateManifest()); + PushManifestHandler push_handler = nullptr); // Updates the manifest by applying the |operations| list. Deletions are // handled first to make the outcome independent of the order in the list. - // Also updates and flushes |file_chunks| with the changes made. See - // UpdateAll() for a description of |push_intermediate_manifest|. + // Also updates and flushes |file_chunks| with the changes made. The + // |push_handler| is called at least twice during the operation and at the + // end, see UpdateAll() for more details. // // All paths should be Unix paths. If |recursive| is true, then a directory // scanner task is enqueued for each directory that is added to the manifest. // This is only needed during UpdateAll(). When the manifest is updated in // response to file watcher changes, then |recursive| should be set to false. absl::Status Update(OperationList* operations, FileChunkMap* file_chunks, - PushIntermediateManifest push_intermediate_manifest = - PushIntermediateManifest(), - bool recursive = false); + PushManifestHandler push_handler, bool recursive = false); // Content id of the current manifest. const ContentIdProto& ManifestId() const { return manifest_id_; } @@ -190,62 +193,84 @@ class ManifestUpdater { // Returns an empty manifest. ContentIdProto DefaultManifestId(); - private: - // Adds enough pending assets from |queue_| as tasks to the |pool| to keep all - // worker threads busy. Returns the number of tasks that were added. - size_t QueueTasks(Threadpool* pool, const fastcdc::Config* cdc_cfg, - ManifestBuilder* manifest_builder); + // Appends the given |rel_paths| to the list of assets to prioritize. All + // paths must be given as Unix paths. + void AddPriorityAssets(std::vector rel_paths) + ABSL_LOCKS_EXCLUDED(priority_mutex_); - // Applies the |operatio ns| list to the manifest owned by the - // |manifest_builder|. First, all deletions are handled and the corresponding - // files are removed from the |file_chunks| map, then all added or updated - // assets are processed. This guarantees that the outcome is independent of - // the order in the list. + private: + // Holds the number of queued tasks returned by QueueTasks(). + struct QueueTasksResult { + size_t dir_scanners = 0, file_chunkers = 0; + }; + + // Adds enough pending assets from |queue_| as tasks to the |pool| to keep all + // worker threads busy. If |drain_dir_scanner_tasks| is true, only + // FileChunkerTasks are queued, others are skipped. Returns the number of + // tasks that were queued as a QueueTaskResult. + QueueTasksResult QueueTasks(bool drain_dir_scanner_tasks, Threadpool* pool, + const fastcdc::Config* cdc_cfg); + + // Modifies the list of queued tasks to prioritize those assets that were + // previously selected using the AddPriorityAssets() method. + void PrioritizeQueuedAssets() ABSL_LOCKS_EXCLUDED(priority_mutex_); + + // Returns true if all of the following conditions are satisfied: + // - |push_manifest_handler| is valid + // - the flush deadline that was set by a prioritized asset is due + // - the manifest was not flushed recently + bool WantManifestFlushed(PushManifestHandler push_manifest_handler) const; + + // Checks if it is safe and desired to flush the manifest, then calls + // FlushAndPushManifest() if that is the case. + // |dir_scanner_tasks_queued| must be the number of currently queued + // DirScannerTasks. + // |file_chunks| is updated by the flush operation, if it is executed. + // |push_manifest_handler| is invoked if the manifest gets flushed and pushed. + absl::Status MaybeFlushAndPushManifest( + size_t dir_scanner_tasks_queued, FileChunkMap* file_chunks, + std::unordered_set* manifest_content_ids, + PushManifestHandler push_manifest_handler); + + // Flushes the in-progress manifest and the updates queued in |file_chunks|. + // If |push_manifest_handler| is not nullptr, it is invoked with the resulting + // manifest ID. + absl::Status FlushAndPushManifest( + FileChunkMap* file_chunks, + std::unordered_set* manifest_content_ids, + PushManifestHandler push_manifest_handler); + + // Applies the |operations| list to the manifest owned by the manifest + // builder. First, all deletions are handled and the corresponding files are + // removed from the |file_chunks| map, then all added or updated assets are + // processed. This guarantees that the outcome is independent of the order in + // the list. // // If |parent| is non-null, then it must be of type DIRECTORY and all added // assets are made direct children of |parent|. The function does *not* verify - // that all children have |parent| as directory path. + // that all children have |parent| as directory path. This is used to + // efficently handle the result of a DirScannerTask. // // Enqueues tasks to chunk the given files for files that were added or // updated. If |recursive| is true, then it will also enqueue directory - // scanner tasks for all given directories. + // scanner tasks for all given directories. All follow-up tasks have the given + // |deadline| set, which determines the deadline after which the manifest + // should be flushed. absl::Status ApplyOperations(std::vector* operations, - FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder, - AssetBuilder* parent, bool recursive); + FileChunkMap* file_chunks, AssetBuilder* parent, + absl::Time deadline, bool recursive); // Handles the results of a completed FileChunkerTask. absl::Status HandleFileChunkerResult(FileChunkerTask* task, - FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder); + FileChunkMap* file_chunks); // Handles the results of a completed DirScannerTask. absl::Status HandleDirScannerResult( DirScannerTask* task, FileChunkMap* file_chunks, - ManifestBuilder* manifest_builder, std::unordered_set* manifest_content_ids); - // Represents an asset that has not been fully processed yet. - struct PendingAsset { - PendingAsset() {} - PendingAsset(AssetProto::Type type, std::string relative_path, - std::string filename) - : type(type), - relative_path(std::move(relative_path)), - filename(std::move(filename)) {} - - // The asset type (either FILE or DIRECTORY). - AssetProto::Type type = AssetProto::UNKNOWN; - - // Relative unix path of the directory containing this asset. - std::string relative_path; - - // File name of the asset that still needs processing. - std::string filename; - }; - // Queue of pending assets waiting for completion. - std::list queue_; + PendingAssetsQueue queue_; // Pool of pre-allocated buffers std::vector buffers_; @@ -261,6 +286,29 @@ class ManifestUpdater { // Stats for the last Update*() operation. UpdaterStats stats_; + + // The builder used for updating the manifest. + std::unique_ptr manifest_builder_; + + // Holds the assets that should be prioritized while updating the manifest. + std::vector priority_assets_ ABSL_GUARDED_BY(priority_mutex_); + absl::Mutex priority_mutex_; + + // Deadline by which the manifest should be flushed again. + absl::Time flush_deadline_ = absl::InfiniteFuture(); + + // The time when the manifest was flushed last. + absl::Time last_manifest_flush_; + + // How much time we allow at least for processing a prioritized asset. The + // manifest won't be flushed for that time, to allow more assets to be + // finalized before the manifest is sent to the client. + static constexpr absl::Duration kMinAssetProcessingTime = + absl::Milliseconds(200); + + // How often we allow an intermediate manifest to be flushed and pushed. + static constexpr absl::Duration kMinDelayBetweenFlush = + absl::Milliseconds(500); }; }; // namespace cdc_ft diff --git a/manifest/manifest_updater_test.cc b/manifest/manifest_updater_test.cc index 067ea2d..ddd3609 100644 --- a/manifest/manifest_updater_test.cc +++ b/manifest/manifest_updater_test.cc @@ -107,7 +107,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_AddFileIncremental) { EXPECT_OK(updater.UpdateAll(&file_chunks_)); EXPECT_OK(updater.Update( MakeDeleteOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}), - &file_chunks_)); + &file_chunks_, nullptr)); ASSERT_NO_FATAL_FAILURE( ExpectManifestEquals({"a.txt", "subdir"}, updater.ManifestId())); @@ -173,13 +173,13 @@ TEST_F(ManifestUpdaterTest, UpdateAll_PrunesUnreferencedChunks) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); - EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr)); // 1 for manifest id, 1 for manifest, 1 indirect assets. EXPECT_EQ(data_store_.Chunks().size(), 3); EXPECT_OK(updater.Update( MakeUpdateOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}), - &file_chunks_)); + &file_chunks_, nullptr)); // 1 for manifest id, 1 for manifest, 5 indirect assets. // 2 additional chunks from the first Update() that are now unreferenced. // -1, because the indirect asset for "a.txt" is deduplicated @@ -207,7 +207,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_RecoversFromMissingChunks) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); - EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr)); // 1 for manifest id, 1 for manifest, 1 indirect assets. EXPECT_EQ(data_store_.Chunks().size(), 3) << "Manifest: " << ContentId::ToHexString(updater.ManifestId()) @@ -225,7 +225,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_RecoversFromMissingChunks) { EXPECT_OK(updater.UpdateAll(&file_chunks_)); // 1 for manifest id, 1 for manifest, 5 indirect assets. - // There would be 7 chunks without the removal above, see UpdateAll_Prune. + // There would be 8 chunks without the removal above, see UpdateAll_Prune. EXPECT_EQ(data_store_.Chunks().size(), 7) << "Manifest: " << ContentId::ToHexString(updater.ManifestId()) << std::endl @@ -272,15 +272,17 @@ TEST_F(ManifestUpdaterTest, UpdateAll_FileChunkMapAfterUpdate) { // Verifies that the intermediate manifest contains the expected files. TEST_F(ManifestUpdaterTest, UpdateAll_PushIntermediateManifest) { ContentIdProto intermediate_id; - auto push_intermediate_manifest = - [&intermediate_id](const ContentIdProto& manifest_id) { - intermediate_id = manifest_id; - }; + auto push_manifest = [&intermediate_id](const ContentIdProto& manifest_id) { + // Catch the first (= intermediate) manifest. + if (intermediate_id == ContentIdProto()) { + intermediate_id = manifest_id; + } + }; // Contains a.txt and subdir/b.txt. cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); - EXPECT_OK(updater.UpdateAll(&file_chunks_, push_intermediate_manifest)); + EXPECT_OK(updater.UpdateAll(&file_chunks_, push_manifest)); // Double check that the files in the final manifest are no longer in // progress. @@ -301,7 +303,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_PushIntermediateManifest) { TEST_F(ManifestUpdaterTest, Update_AddFile) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); - EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_added_or_updated, 1); @@ -319,7 +321,8 @@ TEST_F(ManifestUpdaterTest, Update_AddFile) { TEST_F(ManifestUpdaterTest, Update_AddFileAutoCreateSubdir) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); - EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_)); + EXPECT_OK( + updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_added_or_updated, 1); @@ -346,7 +349,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteFiles) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); EXPECT_OK(updater.UpdateAll(&file_chunks_)); - EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_added_or_updated, 0); @@ -360,7 +363,8 @@ TEST_F(ManifestUpdaterTest, Update_DeleteFiles) { updater.ManifestId())); // Delete another one in a subdirectory. - EXPECT_OK(updater.Update(MakeDeleteOps({"subdir/b.txt"}), &file_chunks_)); + EXPECT_OK( + updater.Update(MakeDeleteOps({"subdir/b.txt"}), &file_chunks_, nullptr)); ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals( {"subdir", "subdir/c.txt", "subdir/d.txt"}, updater.ManifestId())); } @@ -370,7 +374,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteDir) { cfg_.src_dir = path::Join(base_dir_, "non_empty"); ManifestUpdater updater(&data_store_, cfg_); EXPECT_OK(updater.UpdateAll(&file_chunks_)); - EXPECT_OK(updater.Update(MakeDeleteOps({"subdir"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeDeleteOps({"subdir"}), &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_added_or_updated, 0); @@ -390,7 +394,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteNonExistingAsset) { // We need to craft AssetInfos for non-existing assets manually. AssetInfo ai{"non_existing", AssetProto::DIRECTORY}; ManifestUpdater::OperationList ops{{Operator::kDelete, ai}}; - EXPECT_OK(updater.Update(&ops, &file_chunks_)); + EXPECT_OK(updater.Update(&ops, &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_deleted, 1); @@ -406,7 +410,7 @@ TEST_F(ManifestUpdaterTest, Update_AddNonExistingFile) { ai.path = "non_existing"; ManifestUpdater::OperationList ops{ {Operator::kAdd, ai}, {Operator::kAdd, MakeAssetInfo("a.txt").info}}; - EXPECT_OK(updater.Update(&ops, &file_chunks_)); + EXPECT_OK(updater.Update(&ops, &file_chunks_, nullptr)); const UpdaterStats& stats = updater.Stats(); EXPECT_EQ(stats.total_assets_added_or_updated, 2); @@ -428,17 +432,19 @@ TEST_F(ManifestUpdaterTest, Update_PushIntermediateManifest) { EXPECT_OK(updater.UpdateAll(&file_chunks_)); EXPECT_OK(updater.Update( MakeDeleteOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}), - &file_chunks_)); + &file_chunks_, nullptr)); // Add a.txt back and check intermediate manifest. ContentIdProto intermediate_id; - auto push_intermediate_manifest = - [&intermediate_id](const ContentIdProto& manifest_id) { - intermediate_id = manifest_id; - }; + auto push_manifest = [&intermediate_id](const ContentIdProto& manifest_id) { + // Catch the first (= intermediate) manifest. + if (intermediate_id == ContentIdProto()) { + intermediate_id = manifest_id; + } + }; EXPECT_OK(updater.Update( MakeUpdateOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}), - &file_chunks_, push_intermediate_manifest)); + &file_chunks_, push_manifest)); EXPECT_GT(intermediate_id.blake3_sum_160().size(), 0); // Only file a.txt is done in the intermediate manifest, all others are in @@ -460,17 +466,18 @@ TEST_F(ManifestUpdaterTest, Update_FileChunkMap) { ManifestUpdater updater(&data_store_, cfg_); // Add a.txt. - EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr)); ValidateChunkLookup("a.txt", true); ValidateChunkLookup("subdir/b.txt", false); // Add subdir/b.txt. - EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_)); + EXPECT_OK( + updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, nullptr)); ValidateChunkLookup("a.txt", true); ValidateChunkLookup("subdir/b.txt", true); // Remove a.txt. - EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_, nullptr)); ValidateChunkLookup("a.txt", false); ValidateChunkLookup("subdir/b.txt", true); } @@ -482,18 +489,20 @@ TEST_F(ManifestUpdaterTest, Update_IntermediateFileChunkMap) { ManifestUpdater updater(&data_store_, cfg_); // Add a.txt. - EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_)); + EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr)); // Add subdir/b.txt and check intermediate lookups. - auto push_intermediate_manifest = [this](const ContentIdProto&) { + int count = 0; + auto push_manifest = [this, &count](const ContentIdProto&) { + ++count; ValidateChunkLookup("a.txt", true); - ValidateChunkLookup("subdir/b.txt", false); // Not in yet. + // The first (= intermediate) manifest does not have the chunks, the second + // (= final) does. + ValidateChunkLookup("subdir/b.txt", count > 1); }; EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, - push_intermediate_manifest)); - ValidateChunkLookup("a.txt", true); - ValidateChunkLookup("subdir/b.txt", true); // Now it's in! + push_manifest)); } // A call to ManifestId() returns the manifest id!!! @@ -507,6 +516,70 @@ TEST_F(ManifestUpdaterTest, ManifestId) { EXPECT_EQ(updater.ManifestId(), manifest_id); } +TEST_F(ManifestUpdaterTest, VerifyPermissions) { + cfg_.src_dir = path::Join(base_dir_, "non_empty"); + ManifestUpdater updater(&data_store_, cfg_); + + EXPECT_OK(updater.UpdateAll(&file_chunks_)); + ManifestIterator manifest_iter(&data_store_); + EXPECT_OK(manifest_iter.Open(updater.ManifestId())); + const AssetProto* entry; + while ((entry = manifest_iter.NextEntry()) != nullptr) { + switch (entry->type()) { + case AssetProto::FILE: + EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultFilePerms); + break; + case AssetProto::DIRECTORY: + EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultDirPerms); + break; + case AssetProto::SYMLINK: + // Symlinks don't have their own permissions. + break; + default: + FAIL() << "Unhandled type: " << AssetProto::Type_Name(entry->type()); + break; + } + } +} + +TEST_F(ManifestUpdaterTest, VerifyIntermediateFilesAreExecutable) { + cfg_.src_dir = path::Join(base_dir_, "non_empty"); + ManifestUpdater updater(&data_store_, cfg_); + + int count = 0; + auto push_intermediate_manifest = [this, &count]( + const ContentIdProto& manifest_id) { + ++count; + ManifestIterator manifest_iter(&data_store_); + EXPECT_OK(manifest_iter.Open(manifest_id)); + const AssetProto* entry; + while ((entry = manifest_iter.NextEntry()) != nullptr) { + switch (entry->type()) { + case AssetProto::FILE: + if (count == 1) { + // While the manifest is in-progress, all files are set to be + // executable. + EXPECT_EQ(entry->permissions(), ManifestUpdater::kExecutablePerms); + } else { + EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultFilePerms); + } + break; + case AssetProto::DIRECTORY: + EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultDirPerms); + break; + default: + FAIL() << "Unhandled type: " << AssetProto::Type_Name(entry->type()); + break; + } + } + }; + + // Add subdir/b.txt and verify the file permissions. + EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, + push_intermediate_manifest)); + EXPECT_EQ(updater.Stats().total_files_added_or_updated, 1); +} + // Makes sure that executables are properly detected. TEST_F(ManifestUpdaterTest, DetectExecutables) { cfg_.src_dir = path::Join(base_dir_, "executables"); diff --git a/manifest/pending_assets_queue.cc b/manifest/pending_assets_queue.cc new file mode 100644 index 0000000..b9c5ca6 --- /dev/null +++ b/manifest/pending_assets_queue.cc @@ -0,0 +1,94 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "manifest/pending_assets_queue.h" + +#include "common/log.h" +#include "manifest/manifest_builder.h" + +namespace cdc_ft { + +PendingAssetsQueue::PendingAssetsQueue(absl::Duration min_processing_time) + : min_processing_time_(min_processing_time) {} + +void PendingAssetsQueue::Add(PendingAsset pending) { + if (pending.deadline == absl::InfiniteFuture()) { + queue_.push_back(std::move(pending)); + return; + } + + // Pending assets with a deadline will be added at the end of other + // prioritized assets. + auto it = + std::find_if(queue_.begin(), queue_.end(), [](const PendingAsset& pa) { + return pa.deadline == absl::InfiniteFuture(); + }); + queue_.insert(it, std::move(pending)); +} + +bool PendingAssetsQueue::Dequeue(PendingAsset* pending, AcceptFunc accept) { + auto it = queue_.begin(); + while (it != queue_.end() && accept && !accept(*it)) ++it; + if (it == queue_.end()) return false; + *pending = std::move(*it); + queue_.erase(it); + return true; +} + +absl::Time PendingAssetsQueue::Prioritize( + const std::vector& prio_assets, + ManifestBuilder* manifest_builder) { + absl::Time min_received = absl::InfiniteFuture(); + + for (const PriorityAsset& prio_asset : prio_assets) { + if (prio_asset.received < min_received) min_received = prio_asset.received; + + // Check if this asset is still in progress. + absl::StatusOr asset = manifest_builder->GetOrCreateAsset( + prio_asset.rel_file_path, AssetProto::UNKNOWN); + if (!asset.ok()) { + LOG_ERROR("Failed to prioritize asset '%s': %s", prio_asset.rel_file_path, + asset.status().ToString()); + continue; + } + if (!asset->InProgress()) continue; + + // Find the queued task for this asset. + auto prio_end = queue_.end(); + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + // Remember the first task that is not prioritized so that we can insert + // new prioritized tasks just before. + if (prio_end == queue_.end() && it->deadline == absl::InfiniteFuture()) { + prio_end = it; + } + + if (it->relative_path == asset->RelativePath() && + it->filename == asset->Name()) { + // If this asset is not yet prioritized, |prio_end| will be set + // accordingly and we move |*it| to the end of the prioritized tasks. + if (it->deadline == absl::InfiniteFuture()) { + it->deadline = prio_asset.received + min_processing_time_; + it->prioritized = true; // Expliciy prioritization. + queue_.insert(prio_end, std::move(*it)); + queue_.erase(it); + } + break; + } + } + } + + return min_received + min_processing_time_; +} + +} // namespace cdc_ft diff --git a/manifest/pending_assets_queue.h b/manifest/pending_assets_queue.h new file mode 100644 index 0000000..7e6eda0 --- /dev/null +++ b/manifest/pending_assets_queue.h @@ -0,0 +1,102 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MANIFEST_PENDING_ASSETS_QUEUE_H +#define MANIFEST_PENDING_ASSETS_QUEUE_H + +#include "absl/time/time.h" +#include "manifest/manifest_proto_defs.h" + +namespace cdc_ft { + +class ManifestBuilder; + +// Holds an asset that was requested to be prioritized at a given point in time. +struct PriorityAsset { + // Relative Unix file path. + std::string rel_file_path; + // Timestamp when this request was received. + absl::Time received; +}; + +// Represents an asset that has not been fully processed yet. +struct PendingAsset { + PendingAsset() {} + PendingAsset(AssetProto::Type type, std::string relative_path, + std::string filename, absl::Time deadline) + : type(type), + relative_path(std::move(relative_path)), + filename(std::move(filename)), + deadline(deadline) {} + + // The asset type (either FILE or DIRECTORY). + AssetProto::Type type = AssetProto::UNKNOWN; + + // Relative unix path of the directory containing this asset. + std::string relative_path; + + // File name of the asset that still needs processing. + std::string filename; + + // If this asset was explicitly prioritized, this field is set to true, + // otherwise false. + bool prioritized = false; + + // If a deadline is set, it means that this asset was prioritized + // (implicitly or explicitly) and should be processed by this deadline. Once + // this asset has been processed, the manifest should be flushed if the + // deadline has expired. Otherwise, additional related assets can be queued + // and processed (implicit prioritization). + absl::Time deadline; +}; + +// Queues assets that still need to be processed before they are completed. +class PendingAssetsQueue { + public: + // Signature for a callback function to accept items to dequeue. + using AcceptFunc = std::function; + + // The |min_processing_time| is used to calculate the deadline by which a + // pending asset should be returned to the requesting instance. + PendingAssetsQueue(absl::Duration min_processing_time); + + // Adds the given asset |pending| to the queue of assets to complete. + // PendingAssets without a deadline will be queued at the end, while those + // with a given deadline will be inserted after other assets having a + // deadline. + void Add(PendingAsset pending); + + // Removes a PendingAsset from the queue and stores it in |pending|. If + // |accept| is given, then only items for which |accept| returns true are + // considered. Returns true if an item was stored in |pending|, otherwise + // false is returned. + bool Dequeue(PendingAsset* pending, AcceptFunc accept = nullptr); + + // Returns true if the queue is empty, otherwise returns false. + bool Empty() const { return queue_.empty(); } + + // Modifies the list of queued assets to prioritize the assets given in + // |prio_assets|. Returns the deadline by which the processed assets should be + // returned to the requested instance. + absl::Time Prioritize(const std::vector& prio_assets, + ManifestBuilder* manifest_builder); + + private: + const absl::Duration min_processing_time_; + std::list queue_; +}; + +} // namespace cdc_ft + +#endif // MANIFEST_PENDING_ASSETS_QUEUE_H diff --git a/proto/asset_stream_service.proto b/proto/asset_stream_service.proto index 0a9d2f1..4709d75 100644 --- a/proto/asset_stream_service.proto +++ b/proto/asset_stream_service.proto @@ -14,10 +14,6 @@ // This proto defines the service to stream chunks from workstations to // gamelet instances. -// -// References: -// * (internal).0 -// * (internal) syntax = "proto3"; @@ -25,6 +21,8 @@ import "proto/manifest.proto"; package cdc_ft.proto; +// Describes the interface to fetch data from the asset streaming server running +// on the workstation. service AssetStreamService { // Requests the contents of a chunk by its id. rpc GetContent(GetContentRequest) returns (GetContentResponse) {} @@ -53,12 +51,20 @@ message SendCachedContentIdsRequest { message SendCachedContentIdsResponse {} +// Describes the interface to receive manifest updates and prioritize processing +// of specific assets. service ConfigStreamService { + // Streaming channel to receive continuous manifest updates. rpc GetManifestId(GetManifestIdRequest) returns (stream GetManifestIdResponse) {} + // Acknowledges that a specific manifest ID has been received. rpc AckManifestIdReceived(AckManifestIdReceivedRequest) returns (AckManifestIdReceivedResponse) {} + + // Requests the server to process the given in-progress assets as soon as + // possible. + rpc ProcessAssets(ProcessAssetsRequest) returns (ProcessAssetsResponse) {} } message GetManifestIdRequest {} @@ -73,3 +79,9 @@ message AckManifestIdReceivedRequest { } message AckManifestIdReceivedResponse {} + +message ProcessAssetsRequest { + repeated string relative_paths = 1; +} + +message ProcessAssetsResponse {}