Merge dynamic manifest updates to Github (#7)

This change introduces dynamic manifest updates to asset streaming.

Asset streaming describes the directory to be streamed in a manifest, which is a proto definition of all content metadata. This information is sufficient to answer `stat` and `readdir` calls in the FUSE layer without additional round-trips to the workstation.

When a directory is streamed for the first time, the corresponding manifest is created in two steps:
1. The directory is traversed recursively and the inode information of all contained files and directories is written to the manifest.
2. The content of all identified files is processed to generate each file's chunk list. This list is part of the definition of a file in the manifest.
  * The chunk boundaries are identified using our implementation of the FastCDC algorithm.
  * The hash of each chunk is calculated using the BLAKE3 hash function.
  * The length and hash of each chunk is appended to the file's chunk list.

Prior to this change, when the user mounted a workstation directory on a client, the asset streaming server pushed an intermediate manifest to the gamelet as soon as step 1 was completed. At this point, the FUSE client started serving the virtual file system and was ready to answer `stat` and `readdir` calls. In case the FUSE client received any call that required file contents, such as `read`, it would block the caller until the server completed step 2 above and pushed the final manifest to the client. This works well for large directories (> 100GB) with a reasonable number of files (< 100k). But when dealing with millions of tiny files, creating the full manifest can take several minutes.

With this change, we introduce dynamic manifest updates. When the FUSE layer receives an `open` or `readdir` request for a file or directory that is incomplete, it sends an RPC to the workstation about what information is missing from the manifest. The workstation identifies the corresponding file chunker or directory scanner tasks and moves them to the front of the queue. As soon as the task is completed, the workstation pushes an updated intermediate manifest to the client which now includes the information to serve the FUSE request. The queued FUSE request is resumed and returns the result to the caller.

While this does not reduce the required time to build the final manifest, it splits up the work into smaller tasks. This allows us to interrupt the current work and prioritize those tasks which are required to handle an incoming request from the client. While this still takes a round-trip to the workstation plus the processing time for the task, an updated manifest is received within a few seconds, which is much better than blocking for several minutes. 

This latency is only visible when serving data while the manifest is still being created. The situation improves as the manifest creation on the workstation progresses. As soon as the final manifest is pushed, all metadata can be served directly without having to wait for pending tasks.
This commit is contained in:
chrschng
2022-11-16 11:20:32 +01:00
committed by GitHub
parent 23fcd5ef1d
commit 76bbdb01bb
32 changed files with 1286 additions and 411 deletions

View File

@@ -26,11 +26,11 @@ AssetStreamServer::AssetStreamServer(std::string src_dir,
std::unique_ptr<AssetStreamServer> AssetStreamServer::Create(
AssetStreamServerType type, std::string src_dir,
DataStoreReader* data_store_reader, FileChunkMap* file_chunks,
ContentSentHandler content_sent) {
ContentSentHandler content_sent, PrioritizeAssetsHandler prio_assets) {
switch (type) {
case AssetStreamServerType::kGrpc:
return std::make_unique<GrpcAssetStreamServer>(src_dir, data_store_reader,
file_chunks, content_sent);
return std::make_unique<GrpcAssetStreamServer>(
src_dir, data_store_reader, file_chunks, content_sent, prio_assets);
case AssetStreamServerType::kTest:
return std::make_unique<TestingAssetStreamServer>(
src_dir, data_store_reader, file_chunks);
@@ -38,4 +38,5 @@ std::unique_ptr<AssetStreamServer> AssetStreamServer::Create(
assert(false);
return nullptr;
}
} // namespace cdc_ft

View File

@@ -33,6 +33,11 @@ namespace cdc_ft {
using ContentSentHandler = std::function<void(
size_t byte_count, size_t chunk_count, std::string instance_id)>;
// Handles requests to prioritize the given list of assets while updating the
// manifest. |rel_paths| relative Unix paths of assets to prioritize.
using PrioritizeAssetsHandler =
std::function<void(std::vector<std::string> rel_paths)>;
class DataStoreReader;
class FileChunkMap;
@@ -49,7 +54,7 @@ class AssetStreamServer {
static std::unique_ptr<AssetStreamServer> Create(
AssetStreamServerType type, std::string src_dir,
DataStoreReader* data_store_reader, FileChunkMap* file_chunks,
ContentSentHandler content_sent);
ContentSentHandler content_sent, PrioritizeAssetsHandler prio_assets);
AssetStreamServer(const AssetStreamServer& other) = delete;
AssetStreamServer& operator=(const AssetStreamServer& other) = delete;

View File

@@ -40,6 +40,8 @@ using GetManifestIdResponse = proto::GetManifestIdResponse;
using AckManifestIdReceivedRequest = proto::AckManifestIdReceivedRequest;
using AckManifestIdReceivedResponse = proto::AckManifestIdReceivedResponse;
using ConfigStreamService = proto::ConfigStreamService;
using ProcessAssetsRequest = proto::ProcessAssetsRequest;
using ProcessAssetsResponse = proto::ProcessAssetsResponse;
} // namespace
@@ -64,6 +66,8 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service {
std::string rel_path;
uint64_t offset;
size_t size;
std::string instance_id = instance_ids_->Get(context->peer());
for (const ContentIdProto& id : request->id()) {
uint32_t uint32_size;
if (file_chunks_->Lookup(id, &rel_path, &offset, &uint32_size)) {
@@ -77,7 +81,6 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service {
RETURN_GRPC_IF_ERROR(
ReadFromDataStore(id, response->add_data(), &size));
}
std::string instance_id = instance_ids_->Get(context->peer());
if (content_sent_ != nullptr) {
content_sent_(size, 1, instance_id);
}
@@ -144,8 +147,9 @@ class AssetStreamServiceImpl final : public AssetStreamService::Service {
class ConfigStreamServiceImpl final : public ConfigStreamService::Service {
public:
explicit ConfigStreamServiceImpl(InstanceIdMap* instance_ids)
: instance_ids_(instance_ids) {}
ConfigStreamServiceImpl(InstanceIdMap* instance_ids,
PrioritizeAssetsHandler prio_handler)
: instance_ids_(instance_ids), prio_handler_(std::move(prio_handler)) {}
~ConfigStreamServiceImpl() { Shutdown(); }
grpc::Status GetManifestId(
@@ -183,6 +187,20 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service {
return grpc::Status::OK;
}
grpc::Status ProcessAssets(grpc::ServerContext* context,
const ProcessAssetsRequest* request,
ProcessAssetsResponse* response) override {
if (!prio_handler_) return grpc::Status::OK;
std::vector<std::string> rel_paths;
rel_paths.reserve(request->relative_paths().size());
for (const std::string& rel_path : request->relative_paths()) {
rel_paths.push_back(rel_path);
}
prio_handler_(std::move(rel_paths));
return grpc::Status::OK;
}
void SetManifestId(const ContentIdProto& id) ABSL_LOCKS_EXCLUDED(mutex_) {
LOG_INFO("Updating manifest id '%s' in configuration service",
ContentId::ToHexString(id));
@@ -219,6 +237,10 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service {
return id_;
}
void SetPrioritizeAssetsHandler(PrioritizeAssetsHandler handler) {
prio_handler_ = handler;
}
private:
// Returns false if the update process was cancelled.
bool WaitForUpdate(ContentIdProto& local_id) ABSL_LOCKS_EXCLUDED(mutex_) {
@@ -234,23 +256,24 @@ class ConfigStreamServiceImpl final : public ConfigStreamService::Service {
mutable absl::Mutex mutex_;
ContentIdProto id_ ABSL_GUARDED_BY(mutex_);
bool running_ ABSL_GUARDED_BY(mutex_) = true;
InstanceIdMap* instance_ids_;
InstanceIdMap* instance_ids_ = nullptr;
PrioritizeAssetsHandler prio_handler_;
// Maps instance ids to the last acknowledged manifest id.
using AckedManifestIdsMap = std::unordered_map<std::string, ContentIdProto>;
AckedManifestIdsMap acked_manifest_ids_ ABSL_GUARDED_BY(mutex_);
};
GrpcAssetStreamServer::GrpcAssetStreamServer(std::string src_dir,
DataStoreReader* data_store_reader,
FileChunkMap* file_chunks,
ContentSentHandler content_sent)
GrpcAssetStreamServer::GrpcAssetStreamServer(
std::string src_dir, DataStoreReader* data_store_reader,
FileChunkMap* file_chunks, ContentSentHandler content_sent,
PrioritizeAssetsHandler prio_assets)
: AssetStreamServer(src_dir, data_store_reader, file_chunks),
asset_stream_service_(std::make_unique<AssetStreamServiceImpl>(
std::move(src_dir), data_store_reader, file_chunks, &instance_ids_,
content_sent)),
config_stream_service_(
std::make_unique<ConfigStreamServiceImpl>(&instance_ids_)) {}
config_stream_service_(std::make_unique<ConfigStreamServiceImpl>(
&instance_ids_, std::move(prio_assets))) {}
GrpcAssetStreamServer::~GrpcAssetStreamServer() = default;

View File

@@ -40,7 +40,8 @@ class GrpcAssetStreamServer : public AssetStreamServer {
// Creates a new asset streaming gRpc server.
GrpcAssetStreamServer(std::string src_dir, DataStoreReader* data_store_reader,
FileChunkMap* file_chunks,
ContentSentHandler content_sent);
ContentSentHandler content_sent,
PrioritizeAssetsHandler prio_assets);
~GrpcAssetStreamServer();

View File

@@ -57,15 +57,13 @@ class LocalAssetsStreamManagerServiceImpl final
// if it exists.
grpc::Status StartSession(grpc::ServerContext* context,
const StartSessionRequest* request,
StartSessionResponse* response) override
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
StartSessionResponse* response) override;
// Stops the streaming session to the instance with id
// |request->gamelet_id()|. Returns a NotFound error if no session exists.
grpc::Status StopSession(grpc::ServerContext* context,
const StopSessionRequest* request,
StopSessionResponse* response) override
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
StopSessionResponse* response) override;
private:
// Convert StartSessionRequest enum to metrics enum.

View File

@@ -96,10 +96,25 @@ MultiSessionRunner::MultiSessionRunner(
absl::Status MultiSessionRunner::Initialize(int port,
AssetStreamServerType type,
ContentSentHandler content_sent) {
// Create the manifest updater.
UpdaterConfig cfg;
cfg.num_threads = num_updater_threads_;
cfg.src_dir = src_dir_;
assert(!manifest_updater_);
manifest_updater_ =
std::make_unique<ManifestUpdater>(data_store_, std::move(cfg));
// Let the manifest updater handle requests to prioritize certain assets.
PrioritizeAssetsHandler prio_assets =
std::bind(&ManifestUpdater::AddPriorityAssets, manifest_updater_.get(),
std::placeholders::_1);
// Start the server.
assert(!server_);
server_ = AssetStreamServer::Create(type, src_dir_, data_store_,
&file_chunks_, content_sent);
&file_chunks_, std::move(content_sent),
std::move(prio_assets));
assert(server_);
RETURN_IF_ERROR(server_->Start(port),
"Failed to start asset stream server for '%s'", src_dir_);
@@ -162,12 +177,6 @@ ContentIdProto MultiSessionRunner::ManifestId() const {
}
void MultiSessionRunner::Run() {
// Create the manifest updater.
UpdaterConfig cfg;
cfg.num_threads = num_updater_threads_;
cfg.src_dir = src_dir_;
ManifestUpdater manifest_updater(data_store_, std::move(cfg));
// Set up file watcher.
// The streamed path should be a directory and exist at the beginning.
FileWatcherWin watcher(src_dir_);
@@ -179,28 +188,23 @@ void MultiSessionRunner::Run() {
return;
}
// Push an intermediate manifest containing the full directory structure, but
// potentially missing chunks. The purpose is that the FUSE can immediately
// show the structure and inode stats. FUSE will block on file reads that
// cannot be served due to missing chunks until the manifest is ready.
auto push_intermediate_manifest = [this](const ContentIdProto& manifest_id) {
// Push the intermediate manifest(s) and the final version with this handler.
auto push_handler = [this](const ContentIdProto& manifest_id) {
SetManifest(manifest_id);
};
// Bring the manifest up to date.
LOG_INFO("Updating manifest for '%s'...", src_dir_);
Stopwatch sw;
status =
manifest_updater.UpdateAll(&file_chunks_, push_intermediate_manifest);
RecordManifestUpdate(manifest_updater, sw.Elapsed(),
status = manifest_updater_->UpdateAll(&file_chunks_, push_handler);
RecordManifestUpdate(*manifest_updater_, sw.Elapsed(),
metrics::UpdateTrigger::kInitUpdateAll, status);
if (!status.ok()) {
SetStatus(
WrapStatus(status, "Failed to update manifest for '%s'", src_dir_));
return;
}
RecordMultiSessionStart(manifest_updater);
SetManifest(manifest_updater.ManifestId());
RecordMultiSessionStart(*manifest_updater_);
LOG_INFO("Manifest for '%s' updated in %0.3f seconds", src_dir_,
sw.ElapsedSeconds());
@@ -256,30 +260,26 @@ void MultiSessionRunner::Run() {
src_dir_);
modified_files.clear();
sw.Reset();
status = manifest_updater.UpdateAll(&file_chunks_);
RecordManifestUpdate(manifest_updater, sw.Elapsed(),
status = manifest_updater_->UpdateAll(&file_chunks_, push_handler);
RecordManifestUpdate(*manifest_updater_, sw.Elapsed(),
metrics::UpdateTrigger::kRunningUpdateAll, status);
if (!status.ok()) {
LOG_WARNING(
"Updating manifest for '%s' after re-creating directory failed: "
"'%s'",
src_dir_, status.ToString());
SetManifest(manifest_updater.DefaultManifestId());
} else {
SetManifest(manifest_updater.ManifestId());
SetManifest(manifest_updater_->DefaultManifestId());
}
} else if (!modified_files.empty()) {
ManifestUpdater::OperationList ops = GetFileOperations(modified_files);
sw.Reset();
status = manifest_updater.Update(&ops, &file_chunks_);
RecordManifestUpdate(manifest_updater, sw.Elapsed(),
status = manifest_updater_->Update(&ops, &file_chunks_, push_handler);
RecordManifestUpdate(*manifest_updater_, sw.Elapsed(),
metrics::UpdateTrigger::kRegularUpdate, status);
if (!status.ok()) {
LOG_WARNING("Updating manifest for '%s' failed: %s", src_dir_,
status.ToString());
SetManifest(manifest_updater.DefaultManifestId());
} else {
SetManifest(manifest_updater.ManifestId());
SetManifest(manifest_updater_->DefaultManifestId());
}
}
@@ -482,15 +482,16 @@ absl::Status MultiSession::Shutdown() {
sessions_.erase(instance_id);
}
absl::Status status;
if (runner_) {
RETURN_IF_ERROR(runner_->Shutdown());
status = runner_->Shutdown();
}
if (heartbeat_watcher_.joinable()) {
heartbeat_watcher_.join();
}
return absl::OkStatus();
return status;
}
absl::Status MultiSession::Status() {
@@ -528,7 +529,7 @@ absl::Status MultiSession::StartSession(const std::string& instance_id,
RETURN_IF_ERROR(session->Start(local_asset_stream_port_,
kAssetStreamPortFirst, kAssetStreamPortLast));
// Wait for the FUSE to receive the intermediate manifest.
// Wait for the FUSE to receive the first intermediate manifest.
RETURN_IF_ERROR(runner_->WaitForManifestAck(instance_id, absl::Seconds(5)));
sessions_[instance_id] = std::move(session);

View File

@@ -112,6 +112,7 @@ class MultiSessionRunner {
const uint32_t num_updater_threads_;
const ManifestUpdatedCb manifest_updated_cb_;
std::unique_ptr<AssetStreamServer> server_;
std::unique_ptr<ManifestUpdater> manifest_updater_;
// Modifications (shutdown, file changes).
absl::Mutex mutex_;

View File

@@ -65,6 +65,21 @@ class MetricsServiceForTest : public MultiSessionMetricsRecorder {
metrics_records_.push_back(MetricsRecord(std::move(event), code));
}
// Waits until |num_events| events of type |type| have been recorded, or until
// the function times out. Returns true if the condition was met and false if
// in case of a timeout.
bool WaitForEvents(metrics::EventType type, int num_events = 1,
absl::Duration timeout = absl::Seconds(1)) {
absl::MutexLock lock(&mutex_);
auto cond = [this, type, num_events]() {
return std::count_if(metrics_records_.begin(), metrics_records_.end(),
[type](const MetricsRecord& mr) {
return mr.code == type;
}) >= num_events;
};
return mutex_.AwaitWithTimeout(absl::Condition(&cond), timeout);
}
std::vector<MetricsRecord> GetEventsAndClear(metrics::EventType type)
ABSL_LOCKS_EXCLUDED(mutex_) {
std::vector<MetricsRecord> events;
@@ -172,8 +187,16 @@ class MultiSessionTest : public ManifestTestBase {
metrics::ManifestUpdateData* data =
events[i].evt.as_manager_data->manifest_update_data.get();
EXPECT_LT(data->local_duration_ms, 60000ull);
manifests[i].local_duration_ms = data->local_duration_ms;
EXPECT_EQ(*data, manifests[i]);
EXPECT_EQ(data->status, manifests[i].status);
EXPECT_EQ(data->total_assets_added_or_updated,
manifests[i].total_assets_added_or_updated);
EXPECT_EQ(data->total_assets_deleted, manifests[i].total_assets_deleted);
EXPECT_EQ(data->total_chunks, manifests[i].total_chunks);
EXPECT_EQ(data->total_files_added_or_updated,
manifests[i].total_files_added_or_updated);
EXPECT_EQ(data->total_processed_bytes,
manifests[i].total_processed_bytes);
EXPECT_EQ(data->trigger, manifests[i].trigger);
}
}
@@ -268,13 +291,13 @@ TEST_F(MultiSessionTest, GetCachePath_DoesNotSplitUtfCodePoints) {
TEST_F(MultiSessionTest, MultiSessionRunnerOnEmpty) {
cfg_.src_dir = test_dir_path_;
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout));
// The first update is always the empty manifest, wait for the second one.
ASSERT_TRUE(WaitForManifestUpdated(2));
EXPECT_TRUE(WaitForManifestUpdated(2));
ASSERT_TRUE(
metrics_service_->WaitForEvents(metrics::EventType::kMultiSessionStart));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckMultiSessionStartRecorded(0, 0, 0);
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
@@ -290,13 +313,13 @@ TEST_F(MultiSessionTest, MultiSessionRunnerNonEmptySucceeds) {
// Contains a.txt, subdir/b.txt, subdir/c.txt, subdir/d.txt.
cfg_.src_dir = path::Join(base_dir_, "non_empty");
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout));
// The first update is always the empty manifest, wait for the second one.
ASSERT_TRUE(WaitForManifestUpdated(2));
EXPECT_TRUE(WaitForManifestUpdated(2));
ASSERT_TRUE(
metrics_service_->WaitForEvents(metrics::EventType::kMultiSessionStart));
CheckMultiSessionStartRecorded(46, 4, 4);
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals(
{"a.txt", "subdir", "subdir/b.txt", "subdir/c.txt", "subdir/d.txt"},
@@ -309,31 +332,50 @@ TEST_F(MultiSessionTest, MultiSessionRunnerNonEmptySucceeds) {
TEST_F(MultiSessionTest, MultiSessionRunnerAddFileSucceeds) {
cfg_.src_dir = test_dir_path_;
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout));
// The first update is always the empty manifest, wait for the second one.
ASSERT_TRUE(WaitForManifestUpdated(2));
ASSERT_OK(runner.Status());
CheckMultiSessionStartRecorded(0, 0, 0);
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll,
absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)});
const std::string file_path = path::Join(test_dir_path_, "file.txt");
EXPECT_OK(path::WriteFile(file_path, kData, kDataSize));
// 1 file was added = incremented exp_num_manifest_updates.
ASSERT_TRUE(WaitForManifestUpdated(3));
ASSERT_NO_FATAL_FAILURE(
ExpectManifestEquals({"file.txt"}, runner.ManifestId()));
CheckMultiSessionStartNotRecorded();
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kRegularUpdate,
absl::StatusCode::kOk, 1, 0, 1, 1, 0, kDataSize)});
{
SCOPED_TRACE("Initialize.");
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
// 1 file was added, 1 intermediate + 1 final manifest is pushed.
EXPECT_TRUE(WaitForManifestUpdated(2));
EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout));
EXPECT_TRUE(metrics_service_->WaitForEvents(
metrics::EventType::kMultiSessionStart));
ASSERT_OK(runner.Status());
}
{
SCOPED_TRACE("Created base manifest for the test directory.");
CheckMultiSessionStartRecorded(0, 0, 0);
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll,
absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)});
}
{
SCOPED_TRACE("Added file.txt.");
uint32_t prev_updates = num_manifest_updates_;
const std::string file_path = path::Join(test_dir_path_, "file.txt");
EXPECT_OK(path::WriteFile(file_path, kData, kDataSize));
// 1 file was added, 1 intermediate + 1 final manifest is pushed.
EXPECT_TRUE(WaitForManifestUpdated(prev_updates + 2));
EXPECT_TRUE(
metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated));
ASSERT_NO_FATAL_FAILURE(
ExpectManifestEquals({"file.txt"}, runner.ManifestId()));
CheckMultiSessionStartNotRecorded();
CheckManifestUpdateRecorded(
std::vector<metrics::ManifestUpdateData>{GetManifestUpdateData(
metrics::UpdateTrigger::kRegularUpdate, absl::StatusCode::kOk, 1, 0,
1, 1, 0, kDataSize)});
}
EXPECT_OK(runner.Status());
EXPECT_OK(runner.Shutdown());
}
@@ -343,7 +385,7 @@ TEST_F(MultiSessionTest, MultiSessionRunnerAddFileSucceeds) {
TEST_F(MultiSessionTest, MultiSessionRunnerNoDirFails) {
cfg_.src_dir = path::Join(base_dir_, "non_existing");
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
@@ -365,16 +407,16 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) {
kDataSize));
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
{
SCOPED_TRACE("Originally, only the streamed directory contains file.txt.");
EXPECT_OK(runner.WaitForManifestAck(kInstance, kTimeout));
// The first update is always the empty manifest, wait for the second one.
ASSERT_TRUE(WaitForManifestUpdated(2));
EXPECT_TRUE(WaitForManifestUpdated(2));
ASSERT_TRUE(metrics_service_->WaitForEvents(
metrics::EventType::kMultiSessionStart));
CheckMultiSessionStartRecorded((uint64_t)kDataSize, 1, 1);
ASSERT_NO_FATAL_FAILURE(
ExpectManifestEquals({"file.txt"}, runner.ManifestId()));
@@ -387,8 +429,9 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) {
{
SCOPED_TRACE(
"Remove the streamed directory, the manifest should become empty.");
uint32_t prev_updates = num_manifest_updates_;
EXPECT_OK(path::RemoveDirRec(test_dir_path_));
ASSERT_TRUE(WaitForManifestUpdated(3));
ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 1));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(
std::vector<metrics::ManifestUpdateData>{GetManifestUpdateData(
@@ -400,9 +443,13 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) {
SCOPED_TRACE(
"Create the watched directory -> an empty manifest should be "
"streamed.");
uint32_t prev_updates = num_manifest_updates_;
EXPECT_OK(path::CreateDirRec(test_dir_path_));
EXPECT_TRUE(WaitForManifestUpdated(4));
// The first update is always the empty manifest, wait for the second one.
EXPECT_TRUE(WaitForManifestUpdated(prev_updates + 2));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
EXPECT_TRUE(
metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated));
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll,
absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)});
@@ -410,11 +457,16 @@ TEST_F(MultiSessionTest, MultiSessionRunnerDirRecreatedSucceeds) {
{
SCOPED_TRACE("Create 'new_file.txt' -> new manifest should be created.");
uint32_t prev_updates = num_manifest_updates_;
EXPECT_OK(path::WriteFile(path::Join(test_dir_path_, "new_file.txt"), kData,
kDataSize));
ASSERT_TRUE(WaitForManifestUpdated(5));
// The first update doesn't have the chunks for new_file.txt, wait for the
// second one.
ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 2));
ASSERT_NO_FATAL_FAILURE(
ExpectManifestEquals({"new_file.txt"}, runner.ManifestId()));
EXPECT_TRUE(
metrics_service_->WaitForEvents(metrics::EventType::kManifestUpdated));
CheckManifestUpdateRecorded(
std::vector<metrics::ManifestUpdateData>{GetManifestUpdateData(
metrics::UpdateTrigger::kRegularUpdate, absl::StatusCode::kOk, 1, 0,
@@ -432,11 +484,11 @@ TEST_F(MultiSessionTest, MultiSessionRunnerFileAsStreamedDirFails) {
EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize));
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
ASSERT_FALSE(WaitForManifestUpdated(1, absl::Milliseconds(10)));
ASSERT_FALSE(WaitForManifestUpdated(1, absl::Milliseconds(100)));
CheckMultiSessionStartNotRecorded();
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{});
EXPECT_NOT_OK(runner.Shutdown());
@@ -452,33 +504,49 @@ TEST_F(MultiSessionTest,
EXPECT_OK(path::CreateDirRec(cfg_.src_dir));
MultiSessionRunner runner(cfg_.src_dir, &data_store_, &process_factory_,
false /*enable_stats*/, kTimeout, kNumThreads,
/*enable_stats=*/false, kTimeout, kNumThreads,
metrics_service_,
[this]() { OnManifestUpdated(); });
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
ASSERT_TRUE(WaitForManifestUpdated(2));
CheckMultiSessionStartRecorded(0, 0, 0);
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll,
absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)});
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
{
SCOPED_TRACE("Initialize manifest in test directory.");
// Remove the streamed directory, the manifest should become empty.
EXPECT_OK(path::RemoveDirRec(cfg_.src_dir));
ASSERT_TRUE(WaitForManifestUpdated(3));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll,
absl::StatusCode::kNotFound, 0, 0, 0, 0, 0, 0)});
EXPECT_OK(runner.Initialize(kPort, AssetStreamServerType::kTest));
ASSERT_TRUE(WaitForManifestUpdated(2));
ASSERT_TRUE(metrics_service_->WaitForEvents(
metrics::EventType::kMultiSessionStart));
CheckMultiSessionStartRecorded(0, 0, 0);
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kInitUpdateAll,
absl::StatusCode::kOk, 0, 0, 0, 0, 0, 0)});
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
}
EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize));
EXPECT_TRUE(WaitForManifestUpdated(4));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(
std::vector<metrics::ManifestUpdateData>{GetManifestUpdateData(
metrics::UpdateTrigger::kRunningUpdateAll,
absl::StatusCode::kFailedPrecondition, 0, 0, 0, 0, 0, 0)});
CheckMultiSessionStartNotRecorded();
{
SCOPED_TRACE("Remove the streamed directory, the manifest becomes empty.");
uint32_t prev_updates = num_manifest_updates_;
EXPECT_OK(path::RemoveDirRec(cfg_.src_dir));
ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 1));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
CheckManifestUpdateRecorded(std::vector<metrics::ManifestUpdateData>{
GetManifestUpdateData(metrics::UpdateTrigger::kRunningUpdateAll,
absl::StatusCode::kNotFound, 0, 0, 0, 0, 0, 0)});
}
{
SCOPED_TRACE("Create a file in place of the directory");
uint32_t prev_updates = num_manifest_updates_;
EXPECT_OK(path::WriteFile(cfg_.src_dir, kData, kDataSize));
ASSERT_TRUE(WaitForManifestUpdated(prev_updates + 2));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals({}, runner.ManifestId()));
metrics::ManifestUpdateData update_data = GetManifestUpdateData(
metrics::UpdateTrigger::kRunningUpdateAll,
absl::StatusCode::kFailedPrecondition, 0, 0, 0, 0, 0, 0);
CheckManifestUpdateRecorded(
std::vector<metrics::ManifestUpdateData>{update_data, update_data});
CheckMultiSessionStartNotRecorded();
}
EXPECT_OK(runner.Status());
EXPECT_OK(runner.Shutdown());

View File

@@ -50,6 +50,7 @@ cc_test(
srcs = ["cdc_fuse_fs_test.cc"],
deps = [
":cdc_fuse_fs_lib_mocked",
":mock_config_stream_client",
"//common:status_test_macros",
"//data_store",
"//data_store:mem_data_store",
@@ -130,6 +131,13 @@ cc_library(
],
)
cc_library(
name = "mock_config_stream_client",
srcs = ["mock_config_stream_client.cc"],
hdrs = ["mock_config_stream_client.h"],
deps = [":config_stream_client"],
)
filegroup(
name = "all_test_sources",
srcs = glob(["*_test.cc"]),

View File

@@ -125,12 +125,28 @@ struct Inode {
// Asset proto -> inode map.
using InodeMap = std::unordered_map<const AssetProto*, std::shared_ptr<Inode>>;
// Queued request to open a file that has not been processed yet and should be
// processed once the manifest is updated.
struct OpenRequest {
fuse_req_t req;
fuse_ino_t ino;
struct fuse_file_info* fi;
// Queued request that cannot be processed yet and should be processed once the
// manifest is updated.
struct QueuedRequest {
// The request type that was blocked.
enum class Type { kOpen, kOpenDir, kLookup };
Type type;
std::string rel_path;
union {
// Only valid for type == kOpen or type == kOpenDir.
struct Open {
fuse_req_t req;
fuse_ino_t ino;
struct fuse_file_info fi;
} open;
// Only valid for type == kLookup.
struct Lookup {
fuse_req_t req;
fuse_ino_t parent_ino;
const char* name;
} lookup;
} u;
};
// Global context. Fuse is based on loose callbacks, so this holds the fs state.
@@ -170,12 +186,13 @@ struct CdcFuseFsContext {
static thread_local Buffer buffer;
// Configuration client to get configuration updates from the workstation.
std::unique_ptr<ConfigStreamClient> config_stream_client_;
std::unique_ptr<ConfigStreamClient> config_stream_client;
// Queue for requests to open files that have not been processed yet.
absl::Mutex queued_open_requests_mutex_;
std::vector<OpenRequest> queued_open_requests_
ABSL_GUARDED_BY(queued_open_requests_mutex_);
// Queue for requests to open files or directories that have not been
// processed yet.
absl::Mutex queued_requests_mutex;
std::vector<QueuedRequest> queued_requests
ABSL_GUARDED_BY(queued_requests_mutex);
// Identifies whether FUSE consistency should be inspected after manifest
// update.
@@ -372,6 +389,75 @@ bool ValidateInode(fuse_req_t req, Inode& inode, fuse_ino_t ino) {
}
return true;
}
// Returns the full relative file path for the given |inode|.
std::string GetRelativePath(const Inode& inode) {
if (inode.asset.parent_ino() == FUSE_ROOT_ID)
return inode.asset.proto()->name();
std::string rel_path = GetRelativePath(GetInode(inode.asset.parent_ino()));
absl::StrAppend(&rel_path, "/", inode.asset.proto()->name());
return rel_path;
}
// Asks the server to prioritize the asset at |rel_file_path|.
void PrioritizeAssetOnServer(const std::string& rel_file_path) {
std::vector<std::string> assets{rel_file_path};
LOG_INFO("Requesing server to prioritize asset '%s'", rel_file_path);
absl::Status status =
ctx->config_stream_client->ProcessAssets(std::move(assets));
// An error is not critical, but we should log it.
if (!status.ok()) {
LOG_ERROR(
"Failed to request prioritization for asset '%s' from the server: %s",
rel_file_path, status.ToString());
}
}
// Queues a CdcFuseOpen request in the list of pending requests. Thread-safe.
void QueueOpenRequest(const std::string& rel_path, fuse_req_t req,
fuse_ino_t ino, struct fuse_file_info* fi)
ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) {
QueuedRequest qr{QueuedRequest::Type::kOpen, rel_path};
qr.u.open.req = req;
qr.u.open.ino = ino;
qr.u.open.fi = *fi;
{
absl::MutexLock lock(&ctx->queued_requests_mutex);
ctx->queued_requests.emplace_back(std::move(qr));
}
PrioritizeAssetOnServer(rel_path);
}
// Queues a CdcFuseOpenDir request in the list of pending requests. Thread-safe.
void QueueOpenDirRequest(const std::string& rel_path, fuse_req_t req,
fuse_ino_t ino, struct fuse_file_info* fi)
ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) {
QueuedRequest qr{QueuedRequest::Type::kOpenDir, rel_path};
qr.u.open.req = req;
qr.u.open.ino = ino;
qr.u.open.fi = *fi;
{
absl::MutexLock lock(&ctx->queued_requests_mutex);
ctx->queued_requests.emplace_back(std::move(qr));
}
PrioritizeAssetOnServer(rel_path);
}
// Queues a CdcFuseLookup reuquest in the list of pending requests. Thread-safe.
void QueueLookupRequest(const std::string& rel_path, fuse_req_t req,
fuse_ino_t parent_ino, const char* name)
ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) {
QueuedRequest qr{QueuedRequest::Type::kLookup, rel_path};
qr.u.lookup.req = req;
qr.u.lookup.parent_ino = parent_ino;
qr.u.lookup.name = name;
{
absl::MutexLock lock(&ctx->queued_requests_mutex);
ctx->queued_requests.emplace_back(std::move(qr));
}
PrioritizeAssetOnServer(rel_path);
}
} // namespace
// Implementation of the Fuse lookup() method.
@@ -384,6 +470,17 @@ void CdcFuseLookup(fuse_req_t req, fuse_ino_t parent_ino, const char* name)
if (!ValidateInode(req, parent, parent_ino)) {
return;
}
if (parent.asset.proto()->in_progress()) {
// This directory has not been processed yet. Queue up the request and block
// until an updated manifest is available.
std::string rel_path = GetRelativePath(parent);
LOG_INFO("Request to open ino %u queued (file '%s' not ready)", parent_ino,
rel_path);
QueueLookupRequest(rel_path, req, parent_ino, name);
return;
}
absl::StatusOr<const AssetProto*> proto = parent.asset.Lookup(name);
if (!proto.ok()) {
LOG_ERROR("Lookup of '%s' in ino %u failed: '%s'", name, parent_ino,
@@ -468,13 +565,11 @@ void CdcFuseOpen(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi)
return;
}
if (proto->file_size() > 0 && proto->file_chunks_size() == 0 &&
proto->file_indirect_chunks_size() == 0) {
if (proto->file_size() > 0 && proto->in_progress()) {
// This file has not been processed yet. Queue up the request Block until an
// updated manifest is available.
LOG_DEBUG("Request to open ino %u queued (file not ready)", ino);
absl::MutexLock lock(&ctx->queued_open_requests_mutex_);
ctx->queued_open_requests_.push_back({req, ino, fi});
QueueOpenRequest(GetRelativePath(inode), req, ino, fi);
return;
}
@@ -566,6 +661,15 @@ void CdcFuseOpenDir(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi)
fuse_reply_err(req, ENOTDIR);
return;
}
if (proto->in_progress()) {
// This directory has not been processed yet. Queue up the request until an
// updated manifest is available.
LOG_DEBUG("Request to open directory '%s' ino %u queued (dir not ready)",
proto->name(), ino);
QueueOpenDirRequest(GetRelativePath(inode), req, ino, fi);
return;
}
fuse_reply_open(req, fi);
}
@@ -1462,8 +1566,8 @@ absl::Status SetManifest(const ContentIdProto& manifest_id)
absl::MutexLock inodes_lock(&ctx->inodes_mutex);
for (const auto& [proto, inode] : ctx->inodes) {
// Reset kUpdatedProto to kInitialized. The state was only used for
// validation. kUpdated is still needed for clearing kernel caches when a
// file is opened.
// validation. kUpdated is still needed for clearing kernel caches when
// a file is opened.
assert(inode->IsValid());
if (inode->IsUpdatedProto() ||
inode->asset.proto()->type() == AssetProto::DIRECTORY) {
@@ -1474,21 +1578,36 @@ absl::Status SetManifest(const ContentIdProto& manifest_id)
}
// Process outstanding open requests. Be sure to move the vector because
// CdcFuseOpen() might requeue requests.
std::vector<OpenRequest> requests;
// processing might requeue requests.
std::vector<QueuedRequest> requests;
{
absl::MutexLock lock(&ctx->queued_open_requests_mutex_);
requests.swap(ctx->queued_open_requests_);
absl::MutexLock lock(&ctx->queued_requests_mutex);
requests.swap(ctx->queued_requests);
}
for (const OpenRequest request : requests) {
LOG_DEBUG("Resuming request to open ino %u", request.ino);
CdcFuseOpen(request.req, request.ino, request.fi);
for (QueuedRequest& qr : requests) {
switch (qr.type) {
case QueuedRequest::Type::kLookup:
LOG_DEBUG("Resuming request to look up '%s' in '%s' (ino %u)",
qr.u.lookup.name, qr.rel_path, qr.u.lookup.parent_ino);
CdcFuseLookup(qr.u.lookup.req, qr.u.lookup.parent_ino,
qr.u.lookup.name);
break;
case QueuedRequest::Type::kOpen:
LOG_DEBUG("Resuming request to open file '%s' (ino %u)", qr.rel_path,
qr.u.open.ino);
CdcFuseOpen(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi);
break;
case QueuedRequest::Type::kOpenDir:
LOG_DEBUG("Resuming request to open dir '%s' (ino %u)", qr.rel_path,
qr.u.open.ino);
CdcFuseOpenDir(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi);
break;
}
}
#ifndef USE_MOCK_LIBFUSE
// Acknowledge that the manifest id was received and FUSE was updated.
absl::Status status =
ctx->config_stream_client_->SendManifestAck(manifest_id);
absl::Status status = ctx->config_stream_client->SendManifestAck(manifest_id);
if (!status.ok()) {
LOG_ERROR("Failed to send ack for manifest '%s'",
ContentId::ToHexString(manifest_id));
@@ -1500,16 +1619,14 @@ absl::Status SetManifest(const ContentIdProto& manifest_id)
return absl::OkStatus();
}
absl::Status StartConfigClient(std::string instance,
std::shared_ptr<grpc::Channel> channel) {
void SetConfigClient(
std::unique_ptr<cdc_ft::ConfigStreamClient> config_client) {
LOG_DEBUG("Starting configuration client");
assert(ctx && ctx->initialized);
if (ctx->config_stream_client_) {
ctx->config_stream_client_.reset();
if (ctx->config_stream_client) {
ctx->config_stream_client.reset();
}
ctx->config_stream_client_ = std::make_unique<ConfigStreamClient>(
std::move(instance), std::move(channel));
return absl::OkStatus();
ctx->config_stream_client = std::move(config_client);
}
// Initializes FUSE with a manifest for an empty directory:
@@ -1542,7 +1659,7 @@ absl::Status Run(DataStoreReader* data_store_reader, bool consistency_check) {
if (res == -1) return MakeStatus("Session loop failed");
LOG_INFO("Session loop finished.");
ctx->config_stream_client_->Shutdown();
ctx->config_stream_client->Shutdown();
#else
// This code is not unit tested.
#endif

View File

@@ -62,10 +62,8 @@ namespace cdc_fuse_fs {
// see fuse_common.h.
absl::Status Initialize(int argc, char** argv);
// Starts a client to read configuration updates over gRPC |channel|.
// |instance| is the gamelet instance id.
absl::Status StartConfigClient(std::string instance,
std::shared_ptr<grpc::Channel> channel);
// Sets the client to read configuration updates to |config_client|.
void SetConfigClient(std::unique_ptr<ConfigStreamClient> config_client);
// Sets the |data_store_reader| to load data from, initializes FUSE with a
// manifest for an empty directory, and starts the filesystem. The call does

View File

@@ -12,11 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Bazel does not honor copts for test targets, so we need to define
// USE_MOCK_LIBFUSE in the test code itself *before* including cdc_fuse_fs.h.
#ifndef USE_MOCK_LIBFUSE
#define USE_MOCK_LIBFUSE = 1
#endif
#include "cdc_fuse_fs/cdc_fuse_fs.h"
#include <memory>
#include <vector>
#include "cdc_fuse_fs/mock_config_stream_client.h"
#include "cdc_fuse_fs/mock_libfuse.h"
#include "common/log.h"
#include "common/path.h"
@@ -91,6 +98,7 @@ class CdcFuseFsTest : public ::testing::Test {
CdcFuseFsTest() : builder_(&cache_) {
cdc_fuse_fs::Initialize(0, nullptr).IgnoreError();
Log::Initialize(std::make_unique<FuseLog>(LogLevel::kInfo));
cdc_fuse_fs::SetConfigClient(std::make_unique<MockConfigStreamClient>());
}
~CdcFuseFsTest() {
Log::Shutdown();
@@ -157,17 +165,26 @@ class CdcFuseFsTest : public ::testing::Test {
ExpectAccessError(R_OK | W_OK | X_OK, 0 /*error*/);
}
// Wipes chunks for |kFile1Name| to simulate an intermediate manifest, i.e.
// the manifest that contains all assets, but misses file chunks.
// Wipes chunks for |kFile1Name| and assets for |kSubDirName| to simulate an
// intermediate manifest, i.e. the manifest for which some files and
// directories are not processed yet.
// Returns the intermediate manifest id.
ContentIdProto CreateIntermediateManifestId() {
ManifestProto manifest;
EXPECT_OK(cache_.GetProto(manifest_id_, &manifest));
EXPECT_GT(manifest.root_dir().dir_assets_size(), 0);
if (manifest.root_dir().dir_assets_size() == 0) return ContentIdProto();
AssetProto* file1 = manifest.mutable_root_dir()->mutable_dir_assets(0);
EXPECT_EQ(file1->name(), kFile1Name);
file1->clear_file_chunks();
file1->set_in_progress(true);
AssetProto* subdir = manifest.mutable_root_dir()->mutable_dir_assets(1);
EXPECT_EQ(subdir->name(), kSubdirName);
subdir->clear_dir_assets();
subdir->set_in_progress(true);
return cache_.AddProto(manifest);
}
@@ -270,40 +287,59 @@ TEST_F(CdcFuseFsTest, OpenFailsWriteAccess) {
EXPECT_EQ(fuse_.errors[0], EACCES);
}
TEST_F(CdcFuseFsTest, OpenQueuedForIntermediateManifest) {
TEST_F(CdcFuseFsTest, RequestsQueuedForIntermediateManifest) {
ContentIdProto intermediate_manifest_id = CreateIntermediateManifestId();
EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id));
fuse_file_info fi;
auto cfg_client_ptr = std::make_unique<MockConfigStreamClient>();
MockConfigStreamClient* cfg_client = cfg_client_ptr.get();
cdc_fuse_fs::SetConfigClient(std::move(cfg_client_ptr));
// Opening file1 should be queued as it contains no chunks.
// Opening file1 should be queued as it is marked as in-progress.
CdcFuseLookup(req_, FUSE_ROOT_ID, kFile1Name);
ASSERT_EQ(fuse_.entries.size(), 1);
fuse_file_info fi;
CdcFuseOpen(req_, fuse_.entries[0].ino, &fi);
ASSERT_EQ(fuse_.open_files.size(), 0);
EXPECT_EQ(fuse_.open_files.size(), 0);
EXPECT_EQ(cfg_client->ReleasePrioritizedAssets(),
std::vector<std::string>({kFile1Name}));
// Opening subdir should be queued as it is marked as in-progress.
CdcFuseLookup(req_, FUSE_ROOT_ID, kSubdirName);
ASSERT_EQ(fuse_.entries.size(), 2);
CdcFuseOpenDir(req_, fuse_.entries[1].ino, &fi);
EXPECT_EQ(fuse_.open_files.size(), 0);
EXPECT_EQ(cfg_client->ReleasePrioritizedAssets(),
std::vector<std::string>({kSubdirName}));
// Setting the final manifest should fulfill queued open requests.
EXPECT_OK(cdc_fuse_fs::SetManifest(manifest_id_));
ASSERT_EQ(fuse_.open_files.size(), 1);
EXPECT_EQ(fuse_.open_files.size(), 2);
}
TEST_F(CdcFuseFsTest, OpenQueuedRequestsRequeue) {
TEST_F(CdcFuseFsTest, QueuedRequestsRequeue) {
ContentIdProto intermediate_manifest_id = CreateIntermediateManifestId();
EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id));
fuse_file_info fi;
// Opening file1 should be queued as it contains no chunks.
// Opening file1 should be queued as it is marked as in-progress
CdcFuseLookup(req_, FUSE_ROOT_ID, kFile1Name);
ASSERT_EQ(fuse_.entries.size(), 1);
fuse_file_info fi;
CdcFuseOpen(req_, fuse_.entries[0].ino, &fi);
ASSERT_EQ(fuse_.open_files.size(), 0);
// Opening subdir should be queued as it is marked as in-progress.
CdcFuseLookup(req_, FUSE_ROOT_ID, kSubdirName);
ASSERT_EQ(fuse_.entries.size(), 2);
CdcFuseOpenDir(req_, fuse_.entries[1].ino, &fi);
EXPECT_EQ(fuse_.open_files.size(), 0);
// Setting the same incomplete manifest again should requeue the request.
EXPECT_OK(cdc_fuse_fs::SetManifest(intermediate_manifest_id));
ASSERT_EQ(fuse_.open_files.size(), 0);
// Setting the final manifest should fulfill queued open requests.
EXPECT_OK(cdc_fuse_fs::SetManifest(manifest_id_));
ASSERT_EQ(fuse_.open_files.size(), 1);
ASSERT_EQ(fuse_.open_files.size(), 2);
}
TEST_F(CdcFuseFsTest, ReadSucceeds) {

View File

@@ -33,7 +33,7 @@ using ConfigStreamService = proto::ConfigStreamService;
// from the workstation.
class ManifestIdReader {
public:
ManifestIdReader(ConfigStreamService::Stub* stub) : stub_(stub) {}
explicit ManifestIdReader(ConfigStreamService::Stub* stub) : stub_(stub) {}
// Starts a GetManifestId() request and listens to the stream of manifest ids
// sent from the workstation. Calls |callback| on every manifest id received.
@@ -88,21 +88,22 @@ class ManifestIdReader {
std::unique_ptr<std::thread> reader_thread_;
};
ConfigStreamClient::ConfigStreamClient(std::string instance,
std::shared_ptr<grpc::Channel> channel)
ConfigStreamGrpcClient::ConfigStreamGrpcClient(
std::string instance, std::shared_ptr<grpc::Channel> channel)
: instance_(std::move(instance)),
stub_(ConfigStreamService::NewStub(std::move(channel))),
read_client_(std::make_unique<ManifestIdReader>(stub_.get())) {}
ConfigStreamClient::~ConfigStreamClient() = default;
ConfigStreamGrpcClient::~ConfigStreamGrpcClient() = default;
absl::Status ConfigStreamClient::StartListeningToManifestUpdates(
absl::Status ConfigStreamGrpcClient::StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback) {
LOG_INFO("Starting to listen to manifest updates");
return read_client_->StartListeningToManifestUpdates(callback);
}
absl::Status ConfigStreamClient::SendManifestAck(ContentIdProto manifest_id) {
absl::Status ConfigStreamGrpcClient::SendManifestAck(
ContentIdProto manifest_id) {
AckManifestIdReceivedRequest request;
request.set_gamelet_id(instance_);
*request.mutable_manifest_id() = std::move(manifest_id);
@@ -114,7 +115,21 @@ absl::Status ConfigStreamClient::SendManifestAck(ContentIdProto manifest_id) {
return absl::OkStatus();
}
void ConfigStreamClient::Shutdown() {
absl::Status ConfigStreamGrpcClient::ProcessAssets(
std::vector<std::string> assets) {
proto::ProcessAssetsRequest request;
for (std::string& asset : assets)
request.add_relative_paths(std::move(asset));
grpc::ClientContext context_;
proto::ProcessAssetsResponse response;
// The caller is waiting for the updated manifest anyway, so we can just wait
// for the response.
RETURN_ABSL_IF_ERROR(stub_->ProcessAssets(&context_, request, &response));
return absl::OkStatus();
}
void ConfigStreamGrpcClient::Shutdown() {
LOG_INFO("Stopping to listen to manifest updates");
read_client_->Shutdown();
}

View File

@@ -32,26 +32,45 @@ namespace cdc_ft {
class ManifestIdReader;
// Interface class for the config stream client.
class ConfigStreamClient {
public:
// |instance| is the id of the gamelet.
// |channel| is a gRPC channel to use.
ConfigStreamClient(std::string instance,
std::shared_ptr<grpc::Channel> channel);
~ConfigStreamClient();
ConfigStreamClient() = default;
virtual ~ConfigStreamClient() = default;
// Sends a request to get a stream of manifest id updates. |callback| is
// called from a background thread for every manifest id received.
// Returns immediately without waiting for the first manifest id.
absl::Status StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback);
virtual absl::Status StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback) = 0;
// Sends a message to indicate that the |manifest_id| was received and FUSE
// has been updated to use the new manifest.
absl::Status SendManifestAck(ContentIdProto manifest_id);
virtual absl::Status SendManifestAck(ContentIdProto manifest_id) = 0;
// Sends a message to prioritize processing of the pending assets in |assets|.
// All assets are given as full relative Unix paths to the file or directory.
virtual absl::Status ProcessAssets(std::vector<std::string> assets) = 0;
// Stops listening for manifest updates.
void Shutdown();
virtual void Shutdown() = 0;
};
class ConfigStreamGrpcClient : public ConfigStreamClient {
public:
// |instance| is the id of the gamelet.
// |channel| is a gRPC channel to use.
ConfigStreamGrpcClient(std::string instance,
std::shared_ptr<grpc::Channel> channel);
~ConfigStreamGrpcClient();
// ConfigStreamClient
absl::Status StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback) override;
absl::Status SendManifestAck(ContentIdProto manifest_id) override;
absl::Status ProcessAssets(std::vector<std::string> assets) override;
void Shutdown() override;
private:
using ConfigStreamService = proto::ConfigStreamService;

View File

@@ -19,6 +19,7 @@
#include "absl/flags/parse.h"
#include "absl_helper/jedec_size_flag.h"
#include "cdc_fuse_fs/cdc_fuse_fs.h"
#include "cdc_fuse_fs/config_stream_client.h"
#include "cdc_fuse_fs/constants.h"
#include "common/gamelet_component.h"
#include "common/log.h"
@@ -191,10 +192,9 @@ int main(int argc, char* argv[]) {
prefetch_size, dp_cleanup_timeout,
dp_access_idle_timeout);
if (!cdc_ft::cdc_fuse_fs::StartConfigClient(instance, grpc_channel).ok()) {
LOG_ERROR("Could not start reading configuration updates'");
return 1;
}
cdc_ft::cdc_fuse_fs::SetConfigClient(
std::make_unique<cdc_ft::ConfigStreamGrpcClient>(
std::move(instance), std::move(grpc_channel)));
// Run FUSE.
LOG_INFO("Running filesystem");

View File

@@ -0,0 +1,44 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cdc_fuse_fs/mock_config_stream_client.h"
namespace cdc_ft {
std::vector<std::string> MockConfigStreamClient::ReleasePrioritizedAssets() {
return std::move(prioritized_assets_);
}
absl::Status MockConfigStreamClient::StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback) {
return absl::OkStatus();
}
absl::Status MockConfigStreamClient::SendManifestAck(
ContentIdProto manifest_id) {
return absl::OkStatus();
}
absl::Status MockConfigStreamClient::ProcessAssets(
std::vector<std::string> assets) {
prioritized_assets_.insert(prioritized_assets_.end(), assets.begin(),
assets.end());
return absl::OkStatus();
}
void MockConfigStreamClient::Shutdown() {
// Do nothing.
}
} // namespace cdc_ft

View File

@@ -0,0 +1,43 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_
#define CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_
#include "cdc_fuse_fs/config_stream_client.h"
namespace cdc_ft {
// Mock ConfigStreamClient implementation, used for testing only.
class MockConfigStreamClient : public ConfigStreamClient {
public:
// Returns the list of relative file paths to assets that have been
// prioritized via ProcessAssets() and clears the list.
std::vector<std::string> ReleasePrioritizedAssets();
// ConfigStreamClient
absl::Status StartListeningToManifestUpdates(
std::function<absl::Status(const ContentIdProto&)> callback) override;
absl::Status SendManifestAck(ContentIdProto manifest_id) override;
absl::Status ProcessAssets(std::vector<std::string> assets) override;
void Shutdown() override;
private:
std::vector<std::string> prioritized_assets_;
};
} // namespace cdc_ft
#endif // CDC_FUSE_FS_MOCK_CONFIG_STREAM_CLIENT_H_

View File

@@ -14,7 +14,9 @@
#include "common/file_watcher_win.h"
#define WIN32_LEAN_AND_MEAN
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <windows.h>
#include <atomic>
@@ -98,7 +100,7 @@ class AsyncFileWatcher {
~AsyncFileWatcher() { Shutdown(); }
absl::Status GetStatus() ABSL_LOCKS_EXCLUDED(status_mutex_) const {
absl::Status GetStatus() const ABSL_LOCKS_EXCLUDED(status_mutex_) {
absl::MutexLock mutex(&status_mutex_);
return status_;
}
@@ -145,24 +147,24 @@ class AsyncFileWatcher {
modified_files_.clear();
}
uint32_t GetEventCount() ABSL_LOCKS_EXCLUDED(modified_files_mutex_) const {
uint32_t GetEventCount() const ABSL_LOCKS_EXCLUDED(modified_files_mutex_) {
absl::MutexLock mutex(&modified_files_mutex_);
return event_count_;
}
uint32_t GetDirRecreateEventCount()
ABSL_LOCKS_EXCLUDED(modified_files_mutex_) const {
uint32_t GetDirRecreateEventCount() const
ABSL_LOCKS_EXCLUDED(modified_files_mutex_) {
absl::MutexLock mutex(&modified_files_mutex_);
return dir_recreate_count_;
}
bool IsWatching() ABSL_LOCKS_EXCLUDED(state_mutex) const {
bool IsWatching() const ABSL_LOCKS_EXCLUDED(state_mutex_) {
absl::MutexLock mutex(&state_mutex_);
return state_ != FileWatcherState::kDefault &&
state_ != FileWatcherState::kShuttingDown;
}
bool IsShuttingDown() ABSL_LOCKS_EXCLUDED(state_mutex) const {
bool IsShuttingDown() const ABSL_LOCKS_EXCLUDED(state_mutex_) {
absl::MutexLock mutex(&state_mutex_);
return state_ == FileWatcherState::kShuttingDown;
}
@@ -186,7 +188,7 @@ class AsyncFileWatcher {
void WatchDirChanges() {
// TODO: Adjust also if there was no directory at the beginning. Currently,
// the directory exists; otherwise, ManifestUpdater would fail.
bool prev_dir_exists = true;
bool first_run = true, prev_run_was_success = false;
while (true) {
ScopedHandle read_event(CreateEvent(nullptr, /* no security attributes */
TRUE, /* manual-reset event */
@@ -200,27 +202,30 @@ class AsyncFileWatcher {
FILE_BASIC_INFO dir_info;
absl::StatusOr<ScopedHandle> status = GetValidDirHandle(&dir_info);
if (!status.ok()) {
SetStatus(status.status());
} else {
SetStatus(status.status());
if (status.ok()) {
// The watched directory exists and its handle is valid.
if (!prev_dir_exists) {
if (!first_run) {
++dir_recreate_count_;
prev_dir_exists = true;
SetStatus(absl::OkStatus());
if (dir_recreated_cb_) dir_recreated_cb_();
}
first_run = false;
prev_run_was_success = true;
// Keep reading directory changes. This function only returns once it
// gets the shutdown signal, the watched directory is removed, or an
// error occurs while reading file changes.
ReadDirChanges(*status, dir_info, read_event);
if (IsShuttingDown()) {
LOG_DEBUG("Shutting down watching '%s'.", dir_path_);
return;
}
LOG_WARNING("Watched directory '%s' was possibly removed.", dir_path_);
++dir_recreate_count_;
ClearModifiedFiles();
} else if (prev_run_was_success) {
prev_run_was_success = false;
++dir_recreate_count_;
if (dir_recreated_cb_) dir_recreated_cb_();
}
prev_dir_exists = false;
// The shutdown event should be caught on both levels: when the
// watched directory was not removed and when it was recreated. Here
// the shutdown event is considered when the watched directory itself was
@@ -530,7 +535,7 @@ class AsyncFileWatcher {
std::thread dir_reader_; // watching thread.
mutable absl::Mutex status_mutex_;
absl::Status status_ = absl::OkStatus() ABSL_GUARDED_BY(status_mutex_);
absl::Status status_ ABSL_GUARDED_BY(status_mutex_);
mutable absl::Mutex modified_files_mutex_;
FileMap modified_files_ ABSL_GUARDED_BY(modified_files_mutex_);
@@ -538,8 +543,8 @@ class AsyncFileWatcher {
uint32_t dir_recreate_count_ ABSL_GUARDED_BY(modified_files_mutex_) = 0;
mutable absl::Mutex state_mutex_;
FileWatcherState state_ = FileWatcherState::kDefault ABSL_GUARDED_BY(
state_mutex_); // the current watcher state.
FileWatcherState state_ ABSL_GUARDED_BY(state_mutex_) =
FileWatcherState::kDefault; // the current watcher state.
// Pointer to ReadDirectoryChangesExW function if available.
decltype(ReadDirectoryChangesExW)* read_directory_changes_ex_ = nullptr;

View File

@@ -115,9 +115,8 @@ class FileWatcherParameterizedTest : public ::testing::TestWithParam<bool> {
bool changed = false;
do {
auto cond = [this]() { return files_changed_; };
files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond),
kWaitTimeout);
changed = files_changed_;
changed = files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond),
kWaitTimeout);
files_changed_ = false;
} while (changed && watcher_.GetEventCountForTesting() < min_event_count);
return changed;
@@ -128,9 +127,8 @@ class FileWatcherParameterizedTest : public ::testing::TestWithParam<bool> {
bool changed = false;
do {
auto cond = [this]() { return dir_recreated_; };
files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond),
kWaitTimeout);
changed = dir_recreated_;
changed = files_changed_mutex_.AwaitWithTimeout(absl::Condition(&cond),
kWaitTimeout);
dir_recreated_ = false;
} while (changed &&
watcher_.GetDirRecreateEventCountForTesting() < min_event_count);
@@ -511,7 +509,10 @@ TEST_P(FileWatcherParameterizedTest, ModifiedTime) {
TEST_P(FileWatcherParameterizedTest, DeleteWatchedDir) {
EXPECT_OK(watcher_.StartWatching([this]() { OnFilesChanged(); },
[this]() { OnDirRecreated(); }));
[this]() { OnDirRecreated(); }, kFWTimeout));
EXPECT_OK(path::WriteFile(first_file_path_, kFirstData, kFirstDataSize));
EXPECT_TRUE(WaitForChange(2u)); // 2x modify
EXPECT_OK(path::RemoveDirRec(watcher_dir_path_));
EXPECT_TRUE(WaitForDirRecreated(1u));
@@ -586,6 +587,8 @@ TEST_P(FileWatcherParameterizedTest, RecreateWatchedDirNoOldChanges) {
EXPECT_OK(path::WriteFile(first_file_path_, kFirstData, kFirstDataSize));
EXPECT_OK(path::RemoveDirRec(watcher_dir_path_));
EXPECT_TRUE(WaitForDirRecreated(1u));
EXPECT_OK(path::CreateDirRec(watcher_dir_path_));
EXPECT_TRUE(WaitForDirRecreated(2u));

View File

@@ -111,8 +111,14 @@ cc_library(
cc_library(
name = "manifest_updater",
srcs = ["manifest_updater.cc"],
hdrs = ["manifest_updater.h"],
srcs = [
"manifest_updater.cc",
"pending_assets_queue.cc",
],
hdrs = [
"manifest_updater.h",
"pending_assets_queue.h",
],
deps = [
":file_chunk_map",
":manifest_builder",

View File

@@ -91,15 +91,19 @@ absl::StatusOr<AssetBuilder> ManifestBuilder::GetOrCreateAsset(
name = parts.back();
parts.pop_back();
}
DirCreateMode create_mode =
force_create ? DirCreateMode::kForceCreate : DirCreateMode::kCreate;
DirCreateMode create_mode = type == AssetProto::UNKNOWN
? DirCreateMode::kNoCreate
: force_create ? DirCreateMode::kForceCreate
: DirCreateMode::kCreate;
AssetProto* dir;
ASSIGN_OR_RETURN(dir, FindOrCreateDirPath(parts, create_mode),
"Failed to create directory '%s'", JoinUnixPath(parts));
if (name.empty()) {
// Special case: return the root directory for a DIRECTORY with empty name.
if (type == AssetProto::DIRECTORY) return AssetBuilder(dir, std::string());
if (type == AssetProto::DIRECTORY || type == AssetProto::UNKNOWN) {
return AssetBuilder(dir, std::string());
}
return absl::InvalidArgumentError("Empty path given");
}
@@ -108,6 +112,10 @@ absl::StatusOr<AssetBuilder> ManifestBuilder::GetOrCreateAsset(
AssetProto* asset = nullptr;
if (result.ok()) {
asset = result.value();
// If the asset type is unknown, we return any type.
if (type == AssetProto::UNKNOWN) {
return AssetBuilder(asset, path::DirName(unix_path));
}
// Verify that both assets are of the same type.
if (asset->type() != type) {
if (force_create) {
@@ -125,11 +133,15 @@ absl::StatusOr<AssetBuilder> ManifestBuilder::GetOrCreateAsset(
}
// Create the asset if it was not found or it was deleted.
if (!asset) {
if (type == AssetProto::UNKNOWN) {
return absl::NotFoundError(
absl::StrFormat("Asset '%s' does not exist.", path));
}
asset = dir->add_dir_assets();
InitNewAsset(name, type, asset);
if (created) *created = true;
}
return AssetBuilder(asset, path::ToUnix(path::DirName(path)));
return AssetBuilder(asset, path::DirName(unix_path));
}
absl::Status ManifestBuilder::DeleteAsset(const std::string& path) {

View File

@@ -76,6 +76,10 @@ class ManifestBuilder {
// asset is removed (recursively for directories) and a new asset with the
// same name is created instead.
//
// When |type| is UNKNOWN, an existing assets of any type is returned, no new
// asset is created when it does not exist, nor are any of the directories
// that lead up to that asset.
//
// When |created| is given, then it will be set to true if that asset was
// actually added, otherwise it will be set to false.
absl::StatusOr<AssetBuilder> GetOrCreateAsset(const std::string& path,

View File

@@ -344,6 +344,52 @@ TEST_F(ManifestBuilderTest, FilesDirsCreatedOnlyOnce) {
VerifyAssets(assets, builder.ManifestId());
}
TEST_F(ManifestBuilderTest, GetAssetsOfUnkonwnType) {
ManifestBuilder builder(cdc_params_, &cache_);
AssetMap assets;
assets["file1.txt"] = {"a"};
assets["dir1"] = {};
ASSERT_OK(AddAssets(assets, &builder));
bool created = false;
// Get existing assets, force_create == false
EXPECT_OK(builder.GetOrCreateAsset("file1.txt", AssetProto::UNKNOWN, false,
&created));
EXPECT_FALSE(created);
EXPECT_OK(
builder.GetOrCreateAsset("dir1", AssetProto::UNKNOWN, false, &created));
EXPECT_FALSE(created);
// Get existing assets, force_create == true
EXPECT_OK(builder.GetOrCreateAsset("file1.txt", AssetProto::UNKNOWN, true,
&created));
EXPECT_FALSE(created);
EXPECT_OK(
builder.GetOrCreateAsset("dir1", AssetProto::UNKNOWN, true, &created));
EXPECT_FALSE(created);
// Get the root directory.
EXPECT_OK(builder.GetOrCreateAsset("", AssetProto::UNKNOWN));
// Get non-existing file fails, force_create = false
EXPECT_NOT_OK(
builder.GetOrCreateAsset("does_not_exist", AssetProto::UNKNOWN, false));
// Get non-existing file fails, force_create = true
EXPECT_NOT_OK(
builder.GetOrCreateAsset("does_not_exist", AssetProto::UNKNOWN, true));
// Get non-existing file fails, no sub-directories are created.
EXPECT_NOT_OK(builder.GetOrCreateAsset("new_dir1/does_not_exist",
AssetProto::UNKNOWN, false));
EXPECT_NOT_OK(builder.GetOrCreateAsset("new_dir2/does_not_exist",
AssetProto::UNKNOWN, true));
ASSERT_OK(builder.Flush());
VerifyAssets(assets, builder.ManifestId());
}
TEST_F(ManifestBuilderTest, Deduplication) {
ManifestBuilder builder(cdc_params_, &cache_);
AssetMap assets;

View File

@@ -79,7 +79,7 @@ absl::Status ManifestIterator::Open(const std::string& manifest_file) {
std::string msg =
absl::StrFormat("failed to open file '%s' for reading", manifest_file);
if (errno) {
status_ = ErrnoToCanonicalStatus(errno, msg);
status_ = ErrnoToCanonicalStatus(errno, "%s", msg);
} else {
status_ = absl::UnknownError(msg);
}

View File

@@ -82,12 +82,14 @@ ManifestTestBase::ManifestTestBase(std::string base_dir)
std::vector<ManifestTestBase::AssetInfoForTest>
ManifestTestBase::GetAllManifestAssets(ContentIdProto actual_manifest_id) {
ContentIdProto manifest_id;
EXPECT_OK(data_store_.GetProto(manifest_store_id_, &manifest_id));
EXPECT_EQ(manifest_id, actual_manifest_id);
ContentIdProto expected_manifest_id;
EXPECT_OK(data_store_.GetProto(manifest_store_id_, &expected_manifest_id));
EXPECT_EQ(ContentId::ToHexString(expected_manifest_id),
ContentId::ToHexString(actual_manifest_id))
<< DumpDataStoreProtos();
ManifestIterator manifest_iter(&data_store_);
EXPECT_OK(manifest_iter.Open(manifest_id));
EXPECT_OK(manifest_iter.Open(expected_manifest_id));
std::vector<AssetInfoForTest> assets;
const AssetProto* entry;
@@ -168,10 +170,10 @@ void ManifestTestBase::ExpectAssetInfosEqual(std::vector<AssetInfoForTest> a,
void ManifestTestBase::ExpectManifestEquals(
std::initializer_list<std::string> rel_paths,
const ContentIdProto& actual_manifest_id) {
std::vector<AssetInfoForTest> manifest_ais =
std::vector<AssetInfoForTest> actual_ais =
GetAllManifestAssets(actual_manifest_id);
std::vector<AssetInfoForTest> expected_ais = MakeAssetInfos(rel_paths);
ExpectAssetInfosEqual(manifest_ais, expected_ais);
ExpectAssetInfosEqual(actual_ais, expected_ais);
}
bool ManifestTestBase::InProgress(const ContentIdProto& manifest_id,

View File

@@ -90,7 +90,7 @@ class ManifestTestBase : public ::testing::Test {
// Compares the contents of the manifest to the real files at |rel_paths|.
// The paths are relative to |cfg_.src_dir|.
void ExpectManifestEquals(std::initializer_list<std::string> rel_paths,
const ContentIdProto& actual_manifest_id);
const ContentIdProto& got_manifest_id);
// Returns true if the file at Unix |path| contains file chunks in the
// manifest referenced by |manifest_id|.

View File

@@ -35,6 +35,16 @@
namespace cdc_ft {
namespace {
// A generic finalizer that invokes a given function at the end of its lifetime.
class Finalizer {
public:
explicit Finalizer(std::function<void()> finalize) : finalize_(finalize) {}
~Finalizer() { finalize_(); }
private:
std::function<void()> finalize_;
};
// Returns AssetInfos for all files and dirs in |src_dir| + |rel_path|. Does not
// recurse into sub-directories.
absl::Status GetAllSrcAssets(const std::string& src_dir,
@@ -104,49 +114,49 @@ void AssetInfo::AppendMoveChunks(RepeatedChunkRefProto* list,
// Common fields for tasks that fill in manifest data.
class ManifestTask : public Task {
public:
ManifestTask(std::string src_dir, std::string relative_unix_path,
std::string filename)
: src_dir_(std::move(src_dir)),
rel_unix_path_(std::move(relative_unix_path)),
filename_(std::move(filename)) {}
ManifestTask(std::string src_dir, PendingAsset asset)
: src_dir_(std::move(src_dir)), asset_(std::move(asset)) {}
// Relative unix path of the directory containing the file or directory for
// this task.
const std::string& RelativeUnixPath() const { return rel_unix_path_; }
const std::string& RelativeUnixPath() const { return asset_.relative_path; }
// Relative unix path of the file or directory for this task.
std::string RelativeUnixFilePath() const {
return path::JoinUnix(rel_unix_path_, filename_);
return path::JoinUnix(RelativeUnixPath(), Filename());
}
// Name of the file or directory to process with this task.
const std::string& Filename() const { return filename_; }
const std::string& Filename() const { return asset_.filename; }
// Full path of the file or directory to process with this task.
std::string FilePath() const {
return path::Join(src_dir_, path::ToNative(rel_unix_path_), filename_);
return path::Join(src_dir_, path::ToNative(RelativeUnixPath()),
asset_.filename);
}
// Returns the final status of the task.
// Should not be accessed before the task is finished.
const absl::Status& Status() const { return status_; }
// Returns whether or not this asset is explicitly prioritized.
bool Prioritized() const { return asset_.prioritized; }
// Returns the pending asset's deadline.
absl::Time Deadline() const { return asset_.deadline; }
protected:
const std::string src_dir_;
const std::string rel_unix_path_;
const std::string filename_;
const PendingAsset asset_;
absl::Status status_;
};
// ThreadPool task that runs the CDC chunker on a given file.
class FileChunkerTask : public ManifestTask {
public:
FileChunkerTask(std::string src_dir, std::string relative_path,
std::string filename, const fastcdc::Config* cfg,
Buffer buffer)
: ManifestTask(std::move(src_dir), std::move(relative_path),
std::move(filename)),
FileChunkerTask(std::string src_dir, PendingAsset asset,
const fastcdc::Config* cfg, Buffer buffer)
: ManifestTask(std::move(src_dir), std::move(asset)),
cfg_(cfg),
buffer_(std::move(buffer)) {
assert(cfg_->max_size > 0);
@@ -223,11 +233,9 @@ class FileChunkerTask : public ManifestTask {
// ThreadPool task that creates assets for the contents of a directory.
class DirScannerTask : public ManifestTask {
public:
DirScannerTask(std::string src_dir, std::string relative_path,
std::string filename, AssetBuilder dir,
DirScannerTask(std::string src_dir, PendingAsset asset, AssetBuilder dir,
DataStoreReader* data_store)
: ManifestTask(std::move(src_dir), std::move(relative_path),
std::move(filename)),
: ManifestTask(std::move(src_dir), std::move(asset)),
dir_(dir),
data_store_(data_store) {}
@@ -419,15 +427,16 @@ absl::Status ManifestUpdater::IsValidDir(std::string dir) {
}
ManifestUpdater::ManifestUpdater(DataStoreWriter* data_store, UpdaterConfig cfg)
: data_store_(data_store), cfg_(std::move(cfg)) {
: data_store_(data_store),
cfg_(std::move(cfg)),
queue_(kMinAssetProcessingTime) {
path::EnsureEndsWithPathSeparator(&cfg_.src_dir);
}
ManifestUpdater::~ManifestUpdater() = default;
absl::Status ManifestUpdater::UpdateAll(
FileChunkMap* file_chunks,
PushIntermediateManifest push_intermediate_manifest) {
absl::Status ManifestUpdater::UpdateAll(FileChunkMap* file_chunks,
PushManifestHandler push_handler) {
RETURN_IF_ERROR(ManifestUpdater::IsValidDir(cfg_.src_dir));
// Don't use the Windows localized time from path::GetStats.
@@ -441,9 +450,8 @@ absl::Status ManifestUpdater::UpdateAll(
std::vector<Operation> operations{{Operator::kAdd, std::move(ri)}};
absl::Status status =
Update(&operations, file_chunks, push_intermediate_manifest,
/*recursive=*/true);
absl::Status status = Update(&operations, file_chunks, push_handler,
/*recursive=*/true);
if (status.ok() || !absl::IsUnavailable(status)) return status;
@@ -456,7 +464,7 @@ absl::Status ManifestUpdater::UpdateAll(
RETURN_IF_ERROR(data_store_->Wipe());
file_chunks->Clear();
RETURN_IF_ERROR(Update(&operations, file_chunks, push_intermediate_manifest,
RETURN_IF_ERROR(Update(&operations, file_chunks, push_handler,
/*recursive=*/true),
"Failed to build manifest from scratch");
@@ -495,29 +503,97 @@ ContentIdProto ManifestUpdater::DefaultManifestId() {
return manifest_id_;
}
size_t ManifestUpdater::QueueTasks(Threadpool* pool,
const fastcdc::Config* cdc_cfg,
ManifestBuilder* manifest_builder) {
const size_t max_tasks_queued = MaxQueuedTasks(*pool);
size_t num_tasks_queued = 0;
while (pool->NumQueuedTasks() < max_tasks_queued && !queue_.empty() &&
!buffers_.empty()) {
PendingAsset asset = std::move(queue_.front());
absl::StatusOr<AssetBuilder> dir;
queue_.pop_front();
absl::Status ManifestUpdater::FlushAndPushManifest(
FileChunkMap* file_chunks,
std::unordered_set<ContentIdProto>* manifest_content_ids,
PushManifestHandler push_manifest_handler) {
file_chunks->FlushUpdates();
ASSIGN_OR_RETURN(manifest_id_, manifest_builder_->Flush(),
"Failed to flush intermediate manifest");
// Add all content IDs that were just written back.
manifest_content_ids->insert(manifest_builder_->FlushedContentIds().begin(),
manifest_builder_->FlushedContentIds().end());
if (push_manifest_handler) push_manifest_handler(manifest_id_);
last_manifest_flush_ = absl::Now();
return absl::OkStatus();
}
bool ManifestUpdater::WantManifestFlushed(
PushManifestHandler push_manifest_handler) const {
return push_manifest_handler && flush_deadline_ < absl::Now() &&
last_manifest_flush_ + kMinDelayBetweenFlush < absl::Now();
}
absl::Status ManifestUpdater::MaybeFlushAndPushManifest(
size_t dir_scanner_tasks_queued, FileChunkMap* file_chunks,
std::unordered_set<ContentIdProto>* manifest_content_ids,
PushManifestHandler push_manifest) {
// Flush only if there are no DirScannerTask active.
if (dir_scanner_tasks_queued == 0 && WantManifestFlushed(push_manifest)) {
flush_deadline_ = absl::InfiniteFuture();
return FlushAndPushManifest(file_chunks, manifest_content_ids,
push_manifest);
}
return absl::OkStatus();
}
void ManifestUpdater::AddPriorityAssets(std::vector<std::string> rel_paths) {
absl::MutexLock lock(&priority_mutex_);
absl::Time now = absl::Now();
for (std::string& rel_path : rel_paths) {
priority_assets_.push_back(PriorityAsset{std::move(rel_path), now});
}
}
void ManifestUpdater::PrioritizeQueuedAssets() {
std::vector<PriorityAsset> prio_assets;
{
absl::MutexLock lock(&priority_mutex_);
if (priority_assets_.empty()) return;
std::swap(prio_assets, priority_assets_);
}
absl::Time deadline = queue_.Prioritize(prio_assets, manifest_builder_.get());
if (deadline < flush_deadline_) flush_deadline_ = deadline;
}
ManifestUpdater::QueueTasksResult ManifestUpdater::QueueTasks(
bool drain_dir_scanner_tasks, Threadpool* pool,
const fastcdc::Config* cdc_cfg) {
// Prioritize requested assets before queuing new tasks.
PrioritizeQueuedAssets();
const size_t max_tasks_queued = MaxQueuedTasks(*pool);
size_t file_chunker_tasks = 0, dir_scanner_tasks = 0;
// Skip DIRECTORY assets if we should drain DirScannerTasks.
PendingAssetsQueue::AcceptFunc accept = nullptr;
if (drain_dir_scanner_tasks) {
accept = [](const PendingAsset& p) {
return p.type != AssetProto::DIRECTORY;
};
}
absl::StatusOr<AssetBuilder> dir;
PendingAsset asset;
while (pool->NumQueuedTasks() < max_tasks_queued && !buffers_.empty() &&
queue_.Dequeue(&asset, accept)) {
switch (asset.type) {
case AssetProto::FILE:
pool->QueueTask(std::make_unique<FileChunkerTask>(
cfg_.src_dir, std::move(asset.relative_path),
std::move(asset.filename), cdc_cfg, std::move(buffers_.back())));
cfg_.src_dir, std::move(asset), cdc_cfg,
std::move(buffers_.back())));
buffers_.pop_back();
++file_chunker_tasks;
break;
case AssetProto::DIRECTORY:
dir = manifest_builder->GetOrCreateAsset(
// Flushing the manifest may invalidate the pointers to the directory
// proto returned from GetOrCreateAsset(), so the manifest cannot be
// flushed as long as DirScannerTask are in the queue.
dir = manifest_builder_->GetOrCreateAsset(
path::JoinUnix(asset.relative_path, asset.filename),
AssetProto::DIRECTORY, true);
AssetProto::DIRECTORY, /*force_create=*/true);
if (!dir.ok()) {
LOG_ERROR(
"Failed to locate directory '%s' in the manifest, skipping it: "
@@ -526,8 +602,9 @@ size_t ManifestUpdater::QueueTasks(Threadpool* pool,
continue;
}
pool->QueueTask(std::make_unique<DirScannerTask>(
cfg_.src_dir, std::move(asset.relative_path),
std::move(asset.filename), std::move(dir.value()), data_store_));
cfg_.src_dir, std::move(asset), std::move(dir.value()),
data_store_));
++dir_scanner_tasks;
break;
default:
@@ -535,15 +612,13 @@ size_t ManifestUpdater::QueueTasks(Threadpool* pool,
AssetProto::Type_Name(asset.type), asset.relative_path);
continue;
}
++num_tasks_queued;
}
return num_tasks_queued;
return QueueTasksResult{dir_scanner_tasks, file_chunker_tasks};
}
absl::Status ManifestUpdater::ApplyOperations(
std::vector<Operation>* operations, FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder, AssetBuilder* parent, bool recursive) {
assert(manifest_builder != nullptr);
AssetBuilder* parent, absl::Time deadline, bool recursive) {
if (operations->empty()) return absl::OkStatus();
// First, handle all deletions to make the outcome independent of the order of
@@ -561,7 +636,7 @@ absl::Status ManifestUpdater::ApplyOperations(
// skipped.
continue;
}
RETURN_IF_ERROR(manifest_builder->DeleteAsset(ai.path),
RETURN_IF_ERROR(manifest_builder_->DeleteAsset(ai.path),
"Failed to delete asset '%s' from manifest", ai.path);
last_deleted = &ai.path;
}
@@ -591,8 +666,8 @@ absl::Status ManifestUpdater::ApplyOperations(
case Operator::kUpdate:
ASSIGN_OR_RETURN(asset_builder,
manifest_builder->GetOrCreateAsset(ai.path, ai.type,
true, &created),
manifest_builder_->GetOrCreateAsset(ai.path, ai.type,
true, &created),
"Failed to add '%s' to the manifest", ai.path);
break;
}
@@ -609,29 +684,30 @@ absl::Status ManifestUpdater::ApplyOperations(
asset_builder.SetFileSize(ai.size);
// Queue chunker tasks for files.
asset_builder.SetInProgress(true);
} else if (recursive && ai.type == AssetProto::DIRECTORY) {
// We are recursing into all sub-directories, so we add queue up the
// child directory for scanning.
asset_builder.SetInProgress(true);
} else if (ai.type == AssetProto::DIRECTORY) {
asset_builder.SetPermissions(ManifestBuilder::kDefaultDirPerms);
// We are recursing into all sub-directories, so we queue up the child
// directory for scanning.
if (recursive) asset_builder.SetInProgress(true);
}
// If the asset is marked as in-progress, we need to queue it up.
if (asset_builder.InProgress()) {
queue_.emplace_back(ai.type, asset_builder.RelativePath(),
asset_builder.Name());
PendingAsset pending(ai.type, asset_builder.RelativePath(),
asset_builder.Name(), deadline);
queue_.Add(std::move(pending));
}
}
return absl::OkStatus();
}
absl::Status ManifestUpdater::HandleFileChunkerResult(
FileChunkerTask* task, FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder) {
FileChunkerTask* task, FileChunkMap* file_chunks) {
const std::string rel_file_path = task->RelativeUnixFilePath();
buffers_.emplace_back(task->ReleaseBuffer());
AssetBuilder asset_builder;
ASSIGN_OR_RETURN(asset_builder, manifest_builder->GetOrCreateAsset(
ASSIGN_OR_RETURN(asset_builder, manifest_builder_->GetOrCreateAsset(
rel_file_path, AssetProto::FILE));
asset_builder.SetInProgress(false);
if (!task->Status().ok()) {
@@ -663,7 +739,6 @@ absl::Status ManifestUpdater::HandleFileChunkerResult(
absl::Status ManifestUpdater::HandleDirScannerResult(
DirScannerTask* task, FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder,
std::unordered_set<ContentIdProto>* manifest_content_ids) {
// Include the error in the stats, but we can still try to process the
// (partial) results.
@@ -671,21 +746,26 @@ absl::Status ManifestUpdater::HandleDirScannerResult(
++stats_.total_dirs_failed;
}
// If there's a chance we can do more work within the parent's deadline, we
// propagate the deadline to the children.
// TODO(chrschn) Use SteadyClock instead of the system clock.
absl::Time deadline = task->Deadline() > absl::Now() ? task->Deadline()
: absl::InfiniteFuture();
// DirScannerTasks are inherently recursive.
RETURN_IF_ERROR(ApplyOperations(task->Operations(), file_chunks,
manifest_builder, task->Dir(),
/*recursive=*/true));
RETURN_IF_ERROR(ApplyOperations(task->Operations(), file_chunks, task->Dir(),
deadline, /*recursive=*/true));
task->Dir()->SetInProgress(false);
// Union all manifest chunk content IDs.
assert(manifest_content_ids != nullptr);
manifest_content_ids->insert(task->ManifestContentIds()->begin(),
task->ManifestContentIds()->end());
return task->Status();
}
absl::Status ManifestUpdater::Update(
OperationList* operations, FileChunkMap* file_chunks,
PushIntermediateManifest push_intermediate_manifest, bool recursive) {
absl::Status ManifestUpdater::Update(OperationList* operations,
FileChunkMap* file_chunks,
PushManifestHandler push_handler,
bool recursive) {
Stopwatch sw;
LOG_INFO(
"Updating manifest for '%s': applying %u changes, "
@@ -694,11 +774,20 @@ absl::Status ManifestUpdater::Update(
stats_ = UpdaterStats();
// Collects the content IDs that make up the manifest when recursing. They are
// used to prune the manifest cache directory at the end of the Update()
// process.
std::unordered_set<ContentIdProto> manifest_content_ids;
CdcParamsProto cdc_params;
cdc_params.set_min_chunk_size(cfg_.min_chunk_size);
cdc_params.set_avg_chunk_size(cfg_.avg_chunk_size);
cdc_params.set_max_chunk_size(cfg_.max_chunk_size);
ManifestBuilder manifest_builder(cdc_params, data_store_);
manifest_builder_ =
std::make_unique<ManifestBuilder>(cdc_params, data_store_);
// Release the ManifestBuilder at the end of this function to free memory.
Finalizer finalizer([b = &manifest_builder_]() { b->reset(); });
// Load the manifest id from the store.
ContentIdProto manifest_id;
@@ -711,17 +800,17 @@ absl::Status ManifestUpdater::Update(
// A non-existing manifest is not an issue, just build it from scratch.
LOG_INFO("No cached manifest found. Building from scratch.");
} else {
RETURN_IF_ERROR(manifest_builder.LoadManifest(manifest_id),
RETURN_IF_ERROR(manifest_builder_->LoadManifest(manifest_id),
"Failed to load manifest with id '%s'",
ContentId::ToHexString(manifest_id));
// The CDC params might have changed when loading the manifest.
if (ValidateCdcParams(manifest_builder.Manifest()->cdc_params())) {
cdc_params = manifest_builder.Manifest()->cdc_params();
if (ValidateCdcParams(manifest_builder_->Manifest()->cdc_params())) {
cdc_params = manifest_builder_->Manifest()->cdc_params();
}
}
RETURN_IF_ERROR(ApplyOperations(operations, file_chunks, &manifest_builder,
nullptr, recursive));
RETURN_IF_ERROR(ApplyOperations(operations, file_chunks, nullptr,
absl::InfiniteFuture(), recursive));
Threadpool pool(cfg_.num_threads > 0 ? cfg_.num_threads
: std::thread::hardware_concurrency());
@@ -730,36 +819,37 @@ absl::Status ManifestUpdater::Update(
buffers_.reserve(max_queued_tasks);
while (buffers_.size() < max_queued_tasks)
buffers_.emplace_back(cfg_.max_chunk_size << 1);
size_t num_tasks_queued = 0;
size_t total_tasks_queued = 0, scanner_tasks_queued = 0;
// Collect the content IDs that make up the manifest when recursing. They are
// used to prune the manifest cache directory in the end.
std::unordered_set<ContentIdProto> manifest_content_ids;
// Push intermediate manifest if there are queued chunker tasks.
if (push_intermediate_manifest && !queue_.empty()) {
file_chunks->FlushUpdates();
ASSIGN_OR_RETURN(manifest_id_, manifest_builder.Flush(),
"Failed to flush intermediate manifest");
// Add all content IDs that were just written back.
manifest_content_ids.insert(manifest_builder.FlushedContentIds().begin(),
manifest_builder.FlushedContentIds().end());
push_intermediate_manifest(manifest_id_);
// Push intermediate manifest if there are queued tasks.
if (push_handler && !queue_.Empty()) {
RETURN_IF_ERROR(
FlushAndPushManifest(file_chunks, &manifest_content_ids, push_handler));
}
fastcdc::Config cdc_cfg = CdcConfigFromProto(cdc_params);
// Wait for the chunker tasks and update file assets.
while (!queue_.empty() || num_tasks_queued > 0) {
num_tasks_queued += QueueTasks(&pool, &cdc_cfg, &manifest_builder);
// Wait for the chunker and scanner tasks.
while (!queue_.Empty() || total_tasks_queued > 0) {
RETURN_IF_ERROR(MaybeFlushAndPushManifest(scanner_tasks_queued, file_chunks,
&manifest_content_ids,
push_handler));
// Flushing the manifest may invalidate the AssetProto pointers held by the
// queued DirScannerTask. If the manifest should be flushed, we drain the
// queue from those tasks so that the push is safe.
bool drain_dir_scanners = WantManifestFlushed(push_handler);
QueueTasksResult queued = QueueTasks(drain_dir_scanners, &pool, &cdc_cfg);
total_tasks_queued += queued.dir_scanners + queued.file_chunkers;
scanner_tasks_queued += queued.dir_scanners;
std::unique_ptr<Task> task = pool.GetCompletedTask();
assert(num_tasks_queued > 0);
--num_tasks_queued;
assert(total_tasks_queued > 0);
--total_tasks_queued;
FileChunkerTask* chunker_task = dynamic_cast<FileChunkerTask*>(task.get());
if (chunker_task) {
status =
HandleFileChunkerResult(chunker_task, file_chunks, &manifest_builder);
status = HandleFileChunkerResult(chunker_task, file_chunks);
if (!status.ok()) {
LOG_ERROR("Failed to process file '%s': %s", chunker_task->FilePath(),
@@ -770,8 +860,10 @@ absl::Status ManifestUpdater::Update(
DirScannerTask* scanner_task = dynamic_cast<DirScannerTask*>(task.get());
if (scanner_task) {
assert(scanner_tasks_queued > 0);
--scanner_tasks_queued;
status = HandleDirScannerResult(scanner_task, file_chunks,
&manifest_builder, &manifest_content_ids);
&manifest_content_ids);
if (!status.ok()) {
LOG_ERROR("Failed to process directory '%s': %s",
scanner_task->FilePath(), status.ToString());
@@ -780,25 +872,23 @@ absl::Status ManifestUpdater::Update(
}
}
file_chunks->FlushUpdates();
ASSIGN_OR_RETURN(manifest_id_, manifest_builder.Flush(),
"Failed to flush manifest");
// Don't pass in the push_handler here. We first want to write back the new
// manifest ID to the data store before we call the handler.
RETURN_IF_ERROR(
FlushAndPushManifest(file_chunks, &manifest_content_ids, nullptr));
// Save the manifest id to the store.
std::string id_str = manifest_id_.SerializeAsString();
RETURN_IF_ERROR(
data_store_->Put(GetManifestStoreId(), id_str.data(), id_str.size()),
"Failed to store manifest id");
if (push_handler) push_handler(manifest_id_);
// Remove manifest chunks that are no longer referenced when recursing through
// all sub-directories. This also makes sure that all referenced manifest
// chunks are present.
if (status.ok() && recursive) {
// Retain the chunk that stores the manifest ID.
manifest_content_ids.insert(ManifestUpdater::GetManifestStoreId());
// Add all content IDs that were just written back.
manifest_content_ids.insert(manifest_builder.FlushedContentIds().begin(),
manifest_builder.FlushedContentIds().end());
manifest_content_ids.insert(GetManifestStoreId());
status = data_store_->Prune(std::move(manifest_content_ids));
if (!status.ok()) {
// Signal to the caller that the manifest needs to be rebuilt from

View File

@@ -26,6 +26,7 @@
#include "manifest/asset_builder.h"
#include "manifest/file_chunk_map.h"
#include "manifest/manifest_proto_defs.h"
#include "manifest/pending_assets_queue.h"
namespace cdc_ft {
namespace fastcdc {
@@ -140,7 +141,7 @@ class ManifestUpdater {
// Returns an error if |dir| does not exist or it is not a directory.
static absl::Status IsValidDir(std::string dir);
using PushIntermediateManifest =
using PushManifestHandler =
std::function<void(const ContentIdProto& manifest_id)>;
// |data_store| is used to store manifest chunks. File data chunks are not
@@ -156,27 +157,29 @@ class ManifestUpdater {
// Reads the full source directory and syncs the manifest to it. Prunes old,
// unreferenced manifest chunks. Updates and flushes |file_chunks|.
//
// If a valid |push_intermediate_manifest| is passed, then a manifest is
// flushed after the root directory has been added, but before all files and
// If a valid |push_handler| is passed, then a manifest is flushed at least
// twice and the handler is called:
// - after the root directory has been added, but before all files and
// directories have been processed. That means, the manifest does not yet
// contains all assets, all incomplete assets are set to in-progress.
// - after an asset that was prioritized with AddPriorityAssets() has been
// completed.
// - at the end of the update process in case of success
absl::Status UpdateAll(FileChunkMap* file_chunks,
PushIntermediateManifest push_intermediate_manifest =
PushIntermediateManifest());
PushManifestHandler push_handler = nullptr);
// Updates the manifest by applying the |operations| list. Deletions are
// handled first to make the outcome independent of the order in the list.
// Also updates and flushes |file_chunks| with the changes made. See
// UpdateAll() for a description of |push_intermediate_manifest|.
// Also updates and flushes |file_chunks| with the changes made. The
// |push_handler| is called at least twice during the operation and at the
// end, see UpdateAll() for more details.
//
// All paths should be Unix paths. If |recursive| is true, then a directory
// scanner task is enqueued for each directory that is added to the manifest.
// This is only needed during UpdateAll(). When the manifest is updated in
// response to file watcher changes, then |recursive| should be set to false.
absl::Status Update(OperationList* operations, FileChunkMap* file_chunks,
PushIntermediateManifest push_intermediate_manifest =
PushIntermediateManifest(),
bool recursive = false);
PushManifestHandler push_handler, bool recursive = false);
// Content id of the current manifest.
const ContentIdProto& ManifestId() const { return manifest_id_; }
@@ -190,62 +193,84 @@ class ManifestUpdater {
// Returns an empty manifest.
ContentIdProto DefaultManifestId();
private:
// Adds enough pending assets from |queue_| as tasks to the |pool| to keep all
// worker threads busy. Returns the number of tasks that were added.
size_t QueueTasks(Threadpool* pool, const fastcdc::Config* cdc_cfg,
ManifestBuilder* manifest_builder);
// Appends the given |rel_paths| to the list of assets to prioritize. All
// paths must be given as Unix paths.
void AddPriorityAssets(std::vector<std::string> rel_paths)
ABSL_LOCKS_EXCLUDED(priority_mutex_);
// Applies the |operatio ns| list to the manifest owned by the
// |manifest_builder|. First, all deletions are handled and the corresponding
// files are removed from the |file_chunks| map, then all added or updated
// assets are processed. This guarantees that the outcome is independent of
// the order in the list.
private:
// Holds the number of queued tasks returned by QueueTasks().
struct QueueTasksResult {
size_t dir_scanners = 0, file_chunkers = 0;
};
// Adds enough pending assets from |queue_| as tasks to the |pool| to keep all
// worker threads busy. If |drain_dir_scanner_tasks| is true, only
// FileChunkerTasks are queued, others are skipped. Returns the number of
// tasks that were queued as a QueueTaskResult.
QueueTasksResult QueueTasks(bool drain_dir_scanner_tasks, Threadpool* pool,
const fastcdc::Config* cdc_cfg);
// Modifies the list of queued tasks to prioritize those assets that were
// previously selected using the AddPriorityAssets() method.
void PrioritizeQueuedAssets() ABSL_LOCKS_EXCLUDED(priority_mutex_);
// Returns true if all of the following conditions are satisfied:
// - |push_manifest_handler| is valid
// - the flush deadline that was set by a prioritized asset is due
// - the manifest was not flushed recently
bool WantManifestFlushed(PushManifestHandler push_manifest_handler) const;
// Checks if it is safe and desired to flush the manifest, then calls
// FlushAndPushManifest() if that is the case.
// |dir_scanner_tasks_queued| must be the number of currently queued
// DirScannerTasks.
// |file_chunks| is updated by the flush operation, if it is executed.
// |push_manifest_handler| is invoked if the manifest gets flushed and pushed.
absl::Status MaybeFlushAndPushManifest(
size_t dir_scanner_tasks_queued, FileChunkMap* file_chunks,
std::unordered_set<ContentIdProto>* manifest_content_ids,
PushManifestHandler push_manifest_handler);
// Flushes the in-progress manifest and the updates queued in |file_chunks|.
// If |push_manifest_handler| is not nullptr, it is invoked with the resulting
// manifest ID.
absl::Status FlushAndPushManifest(
FileChunkMap* file_chunks,
std::unordered_set<ContentIdProto>* manifest_content_ids,
PushManifestHandler push_manifest_handler);
// Applies the |operations| list to the manifest owned by the manifest
// builder. First, all deletions are handled and the corresponding files are
// removed from the |file_chunks| map, then all added or updated assets are
// processed. This guarantees that the outcome is independent of the order in
// the list.
//
// If |parent| is non-null, then it must be of type DIRECTORY and all added
// assets are made direct children of |parent|. The function does *not* verify
// that all children have |parent| as directory path.
// that all children have |parent| as directory path. This is used to
// efficently handle the result of a DirScannerTask.
//
// Enqueues tasks to chunk the given files for files that were added or
// updated. If |recursive| is true, then it will also enqueue directory
// scanner tasks for all given directories.
// scanner tasks for all given directories. All follow-up tasks have the given
// |deadline| set, which determines the deadline after which the manifest
// should be flushed.
absl::Status ApplyOperations(std::vector<Operation>* operations,
FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder,
AssetBuilder* parent, bool recursive);
FileChunkMap* file_chunks, AssetBuilder* parent,
absl::Time deadline, bool recursive);
// Handles the results of a completed FileChunkerTask.
absl::Status HandleFileChunkerResult(FileChunkerTask* task,
FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder);
FileChunkMap* file_chunks);
// Handles the results of a completed DirScannerTask.
absl::Status HandleDirScannerResult(
DirScannerTask* task, FileChunkMap* file_chunks,
ManifestBuilder* manifest_builder,
std::unordered_set<ContentIdProto>* manifest_content_ids);
// Represents an asset that has not been fully processed yet.
struct PendingAsset {
PendingAsset() {}
PendingAsset(AssetProto::Type type, std::string relative_path,
std::string filename)
: type(type),
relative_path(std::move(relative_path)),
filename(std::move(filename)) {}
// The asset type (either FILE or DIRECTORY).
AssetProto::Type type = AssetProto::UNKNOWN;
// Relative unix path of the directory containing this asset.
std::string relative_path;
// File name of the asset that still needs processing.
std::string filename;
};
// Queue of pending assets waiting for completion.
std::list<PendingAsset> queue_;
PendingAssetsQueue queue_;
// Pool of pre-allocated buffers
std::vector<Buffer> buffers_;
@@ -261,6 +286,29 @@ class ManifestUpdater {
// Stats for the last Update*() operation.
UpdaterStats stats_;
// The builder used for updating the manifest.
std::unique_ptr<ManifestBuilder> manifest_builder_;
// Holds the assets that should be prioritized while updating the manifest.
std::vector<PriorityAsset> priority_assets_ ABSL_GUARDED_BY(priority_mutex_);
absl::Mutex priority_mutex_;
// Deadline by which the manifest should be flushed again.
absl::Time flush_deadline_ = absl::InfiniteFuture();
// The time when the manifest was flushed last.
absl::Time last_manifest_flush_;
// How much time we allow at least for processing a prioritized asset. The
// manifest won't be flushed for that time, to allow more assets to be
// finalized before the manifest is sent to the client.
static constexpr absl::Duration kMinAssetProcessingTime =
absl::Milliseconds(200);
// How often we allow an intermediate manifest to be flushed and pushed.
static constexpr absl::Duration kMinDelayBetweenFlush =
absl::Milliseconds(500);
};
}; // namespace cdc_ft

View File

@@ -107,7 +107,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_AddFileIncremental) {
EXPECT_OK(updater.UpdateAll(&file_chunks_));
EXPECT_OK(updater.Update(
MakeDeleteOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}),
&file_chunks_));
&file_chunks_, nullptr));
ASSERT_NO_FATAL_FAILURE(
ExpectManifestEquals({"a.txt", "subdir"}, updater.ManifestId()));
@@ -173,13 +173,13 @@ TEST_F(ManifestUpdaterTest, UpdateAll_PrunesUnreferencedChunks) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr));
// 1 for manifest id, 1 for manifest, 1 indirect assets.
EXPECT_EQ(data_store_.Chunks().size(), 3);
EXPECT_OK(updater.Update(
MakeUpdateOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}),
&file_chunks_));
&file_chunks_, nullptr));
// 1 for manifest id, 1 for manifest, 5 indirect assets.
// 2 additional chunks from the first Update() that are now unreferenced.
// -1, because the indirect asset for "a.txt" is deduplicated
@@ -207,7 +207,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_RecoversFromMissingChunks) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr));
// 1 for manifest id, 1 for manifest, 1 indirect assets.
EXPECT_EQ(data_store_.Chunks().size(), 3)
<< "Manifest: " << ContentId::ToHexString(updater.ManifestId())
@@ -225,7 +225,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_RecoversFromMissingChunks) {
EXPECT_OK(updater.UpdateAll(&file_chunks_));
// 1 for manifest id, 1 for manifest, 5 indirect assets.
// There would be 7 chunks without the removal above, see UpdateAll_Prune.
// There would be 8 chunks without the removal above, see UpdateAll_Prune.
EXPECT_EQ(data_store_.Chunks().size(), 7)
<< "Manifest: " << ContentId::ToHexString(updater.ManifestId())
<< std::endl
@@ -272,15 +272,17 @@ TEST_F(ManifestUpdaterTest, UpdateAll_FileChunkMapAfterUpdate) {
// Verifies that the intermediate manifest contains the expected files.
TEST_F(ManifestUpdaterTest, UpdateAll_PushIntermediateManifest) {
ContentIdProto intermediate_id;
auto push_intermediate_manifest =
[&intermediate_id](const ContentIdProto& manifest_id) {
intermediate_id = manifest_id;
};
auto push_manifest = [&intermediate_id](const ContentIdProto& manifest_id) {
// Catch the first (= intermediate) manifest.
if (intermediate_id == ContentIdProto()) {
intermediate_id = manifest_id;
}
};
// Contains a.txt and subdir/b.txt.
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.UpdateAll(&file_chunks_, push_intermediate_manifest));
EXPECT_OK(updater.UpdateAll(&file_chunks_, push_manifest));
// Double check that the files in the final manifest are no longer in
// progress.
@@ -301,7 +303,7 @@ TEST_F(ManifestUpdaterTest, UpdateAll_PushIntermediateManifest) {
TEST_F(ManifestUpdaterTest, Update_AddFile) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_added_or_updated, 1);
@@ -319,7 +321,8 @@ TEST_F(ManifestUpdaterTest, Update_AddFile) {
TEST_F(ManifestUpdaterTest, Update_AddFileAutoCreateSubdir) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_));
EXPECT_OK(
updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_added_or_updated, 1);
@@ -346,7 +349,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteFiles) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.UpdateAll(&file_chunks_));
EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_added_or_updated, 0);
@@ -360,7 +363,8 @@ TEST_F(ManifestUpdaterTest, Update_DeleteFiles) {
updater.ManifestId()));
// Delete another one in a subdirectory.
EXPECT_OK(updater.Update(MakeDeleteOps({"subdir/b.txt"}), &file_chunks_));
EXPECT_OK(
updater.Update(MakeDeleteOps({"subdir/b.txt"}), &file_chunks_, nullptr));
ASSERT_NO_FATAL_FAILURE(ExpectManifestEquals(
{"subdir", "subdir/c.txt", "subdir/d.txt"}, updater.ManifestId()));
}
@@ -370,7 +374,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteDir) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.UpdateAll(&file_chunks_));
EXPECT_OK(updater.Update(MakeDeleteOps({"subdir"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeDeleteOps({"subdir"}), &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_added_or_updated, 0);
@@ -390,7 +394,7 @@ TEST_F(ManifestUpdaterTest, Update_DeleteNonExistingAsset) {
// We need to craft AssetInfos for non-existing assets manually.
AssetInfo ai{"non_existing", AssetProto::DIRECTORY};
ManifestUpdater::OperationList ops{{Operator::kDelete, ai}};
EXPECT_OK(updater.Update(&ops, &file_chunks_));
EXPECT_OK(updater.Update(&ops, &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_deleted, 1);
@@ -406,7 +410,7 @@ TEST_F(ManifestUpdaterTest, Update_AddNonExistingFile) {
ai.path = "non_existing";
ManifestUpdater::OperationList ops{
{Operator::kAdd, ai}, {Operator::kAdd, MakeAssetInfo("a.txt").info}};
EXPECT_OK(updater.Update(&ops, &file_chunks_));
EXPECT_OK(updater.Update(&ops, &file_chunks_, nullptr));
const UpdaterStats& stats = updater.Stats();
EXPECT_EQ(stats.total_assets_added_or_updated, 2);
@@ -428,17 +432,19 @@ TEST_F(ManifestUpdaterTest, Update_PushIntermediateManifest) {
EXPECT_OK(updater.UpdateAll(&file_chunks_));
EXPECT_OK(updater.Update(
MakeDeleteOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}),
&file_chunks_));
&file_chunks_, nullptr));
// Add a.txt back and check intermediate manifest.
ContentIdProto intermediate_id;
auto push_intermediate_manifest =
[&intermediate_id](const ContentIdProto& manifest_id) {
intermediate_id = manifest_id;
};
auto push_manifest = [&intermediate_id](const ContentIdProto& manifest_id) {
// Catch the first (= intermediate) manifest.
if (intermediate_id == ContentIdProto()) {
intermediate_id = manifest_id;
}
};
EXPECT_OK(updater.Update(
MakeUpdateOps({"subdir/b.txt", "subdir/c.txt", "subdir/d.txt"}),
&file_chunks_, push_intermediate_manifest));
&file_chunks_, push_manifest));
EXPECT_GT(intermediate_id.blake3_sum_160().size(), 0);
// Only file a.txt is done in the intermediate manifest, all others are in
@@ -460,17 +466,18 @@ TEST_F(ManifestUpdaterTest, Update_FileChunkMap) {
ManifestUpdater updater(&data_store_, cfg_);
// Add a.txt.
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr));
ValidateChunkLookup("a.txt", true);
ValidateChunkLookup("subdir/b.txt", false);
// Add subdir/b.txt.
EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_));
EXPECT_OK(
updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_, nullptr));
ValidateChunkLookup("a.txt", true);
ValidateChunkLookup("subdir/b.txt", true);
// Remove a.txt.
EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeDeleteOps({"a.txt"}), &file_chunks_, nullptr));
ValidateChunkLookup("a.txt", false);
ValidateChunkLookup("subdir/b.txt", true);
}
@@ -482,18 +489,20 @@ TEST_F(ManifestUpdaterTest, Update_IntermediateFileChunkMap) {
ManifestUpdater updater(&data_store_, cfg_);
// Add a.txt.
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_));
EXPECT_OK(updater.Update(MakeUpdateOps({"a.txt"}), &file_chunks_, nullptr));
// Add subdir/b.txt and check intermediate lookups.
auto push_intermediate_manifest = [this](const ContentIdProto&) {
int count = 0;
auto push_manifest = [this, &count](const ContentIdProto&) {
++count;
ValidateChunkLookup("a.txt", true);
ValidateChunkLookup("subdir/b.txt", false); // Not in yet.
// The first (= intermediate) manifest does not have the chunks, the second
// (= final) does.
ValidateChunkLookup("subdir/b.txt", count > 1);
};
EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_,
push_intermediate_manifest));
ValidateChunkLookup("a.txt", true);
ValidateChunkLookup("subdir/b.txt", true); // Now it's in!
push_manifest));
}
// A call to ManifestId() returns the manifest id!!!
@@ -507,6 +516,70 @@ TEST_F(ManifestUpdaterTest, ManifestId) {
EXPECT_EQ(updater.ManifestId(), manifest_id);
}
TEST_F(ManifestUpdaterTest, VerifyPermissions) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
EXPECT_OK(updater.UpdateAll(&file_chunks_));
ManifestIterator manifest_iter(&data_store_);
EXPECT_OK(manifest_iter.Open(updater.ManifestId()));
const AssetProto* entry;
while ((entry = manifest_iter.NextEntry()) != nullptr) {
switch (entry->type()) {
case AssetProto::FILE:
EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultFilePerms);
break;
case AssetProto::DIRECTORY:
EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultDirPerms);
break;
case AssetProto::SYMLINK:
// Symlinks don't have their own permissions.
break;
default:
FAIL() << "Unhandled type: " << AssetProto::Type_Name(entry->type());
break;
}
}
}
TEST_F(ManifestUpdaterTest, VerifyIntermediateFilesAreExecutable) {
cfg_.src_dir = path::Join(base_dir_, "non_empty");
ManifestUpdater updater(&data_store_, cfg_);
int count = 0;
auto push_intermediate_manifest = [this, &count](
const ContentIdProto& manifest_id) {
++count;
ManifestIterator manifest_iter(&data_store_);
EXPECT_OK(manifest_iter.Open(manifest_id));
const AssetProto* entry;
while ((entry = manifest_iter.NextEntry()) != nullptr) {
switch (entry->type()) {
case AssetProto::FILE:
if (count == 1) {
// While the manifest is in-progress, all files are set to be
// executable.
EXPECT_EQ(entry->permissions(), ManifestUpdater::kExecutablePerms);
} else {
EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultFilePerms);
}
break;
case AssetProto::DIRECTORY:
EXPECT_EQ(entry->permissions(), ManifestBuilder::kDefaultDirPerms);
break;
default:
FAIL() << "Unhandled type: " << AssetProto::Type_Name(entry->type());
break;
}
}
};
// Add subdir/b.txt and verify the file permissions.
EXPECT_OK(updater.Update(MakeUpdateOps({"subdir/b.txt"}), &file_chunks_,
push_intermediate_manifest));
EXPECT_EQ(updater.Stats().total_files_added_or_updated, 1);
}
// Makes sure that executables are properly detected.
TEST_F(ManifestUpdaterTest, DetectExecutables) {
cfg_.src_dir = path::Join(base_dir_, "executables");

View File

@@ -0,0 +1,94 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "manifest/pending_assets_queue.h"
#include "common/log.h"
#include "manifest/manifest_builder.h"
namespace cdc_ft {
PendingAssetsQueue::PendingAssetsQueue(absl::Duration min_processing_time)
: min_processing_time_(min_processing_time) {}
void PendingAssetsQueue::Add(PendingAsset pending) {
if (pending.deadline == absl::InfiniteFuture()) {
queue_.push_back(std::move(pending));
return;
}
// Pending assets with a deadline will be added at the end of other
// prioritized assets.
auto it =
std::find_if(queue_.begin(), queue_.end(), [](const PendingAsset& pa) {
return pa.deadline == absl::InfiniteFuture();
});
queue_.insert(it, std::move(pending));
}
bool PendingAssetsQueue::Dequeue(PendingAsset* pending, AcceptFunc accept) {
auto it = queue_.begin();
while (it != queue_.end() && accept && !accept(*it)) ++it;
if (it == queue_.end()) return false;
*pending = std::move(*it);
queue_.erase(it);
return true;
}
absl::Time PendingAssetsQueue::Prioritize(
const std::vector<PriorityAsset>& prio_assets,
ManifestBuilder* manifest_builder) {
absl::Time min_received = absl::InfiniteFuture();
for (const PriorityAsset& prio_asset : prio_assets) {
if (prio_asset.received < min_received) min_received = prio_asset.received;
// Check if this asset is still in progress.
absl::StatusOr<AssetBuilder> asset = manifest_builder->GetOrCreateAsset(
prio_asset.rel_file_path, AssetProto::UNKNOWN);
if (!asset.ok()) {
LOG_ERROR("Failed to prioritize asset '%s': %s", prio_asset.rel_file_path,
asset.status().ToString());
continue;
}
if (!asset->InProgress()) continue;
// Find the queued task for this asset.
auto prio_end = queue_.end();
for (auto it = queue_.begin(); it != queue_.end(); ++it) {
// Remember the first task that is not prioritized so that we can insert
// new prioritized tasks just before.
if (prio_end == queue_.end() && it->deadline == absl::InfiniteFuture()) {
prio_end = it;
}
if (it->relative_path == asset->RelativePath() &&
it->filename == asset->Name()) {
// If this asset is not yet prioritized, |prio_end| will be set
// accordingly and we move |*it| to the end of the prioritized tasks.
if (it->deadline == absl::InfiniteFuture()) {
it->deadline = prio_asset.received + min_processing_time_;
it->prioritized = true; // Expliciy prioritization.
queue_.insert(prio_end, std::move(*it));
queue_.erase(it);
}
break;
}
}
}
return min_received + min_processing_time_;
}
} // namespace cdc_ft

View File

@@ -0,0 +1,102 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef MANIFEST_PENDING_ASSETS_QUEUE_H
#define MANIFEST_PENDING_ASSETS_QUEUE_H
#include "absl/time/time.h"
#include "manifest/manifest_proto_defs.h"
namespace cdc_ft {
class ManifestBuilder;
// Holds an asset that was requested to be prioritized at a given point in time.
struct PriorityAsset {
// Relative Unix file path.
std::string rel_file_path;
// Timestamp when this request was received.
absl::Time received;
};
// Represents an asset that has not been fully processed yet.
struct PendingAsset {
PendingAsset() {}
PendingAsset(AssetProto::Type type, std::string relative_path,
std::string filename, absl::Time deadline)
: type(type),
relative_path(std::move(relative_path)),
filename(std::move(filename)),
deadline(deadline) {}
// The asset type (either FILE or DIRECTORY).
AssetProto::Type type = AssetProto::UNKNOWN;
// Relative unix path of the directory containing this asset.
std::string relative_path;
// File name of the asset that still needs processing.
std::string filename;
// If this asset was explicitly prioritized, this field is set to true,
// otherwise false.
bool prioritized = false;
// If a deadline is set, it means that this asset was prioritized
// (implicitly or explicitly) and should be processed by this deadline. Once
// this asset has been processed, the manifest should be flushed if the
// deadline has expired. Otherwise, additional related assets can be queued
// and processed (implicit prioritization).
absl::Time deadline;
};
// Queues assets that still need to be processed before they are completed.
class PendingAssetsQueue {
public:
// Signature for a callback function to accept items to dequeue.
using AcceptFunc = std::function<bool(const PendingAsset&)>;
// The |min_processing_time| is used to calculate the deadline by which a
// pending asset should be returned to the requesting instance.
PendingAssetsQueue(absl::Duration min_processing_time);
// Adds the given asset |pending| to the queue of assets to complete.
// PendingAssets without a deadline will be queued at the end, while those
// with a given deadline will be inserted after other assets having a
// deadline.
void Add(PendingAsset pending);
// Removes a PendingAsset from the queue and stores it in |pending|. If
// |accept| is given, then only items for which |accept| returns true are
// considered. Returns true if an item was stored in |pending|, otherwise
// false is returned.
bool Dequeue(PendingAsset* pending, AcceptFunc accept = nullptr);
// Returns true if the queue is empty, otherwise returns false.
bool Empty() const { return queue_.empty(); }
// Modifies the list of queued assets to prioritize the assets given in
// |prio_assets|. Returns the deadline by which the processed assets should be
// returned to the requested instance.
absl::Time Prioritize(const std::vector<PriorityAsset>& prio_assets,
ManifestBuilder* manifest_builder);
private:
const absl::Duration min_processing_time_;
std::list<PendingAsset> queue_;
};
} // namespace cdc_ft
#endif // MANIFEST_PENDING_ASSETS_QUEUE_H

View File

@@ -14,10 +14,6 @@
// This proto defines the service to stream chunks from workstations to
// gamelet instances.
//
// References:
// * (internal).0
// * (internal)
syntax = "proto3";
@@ -25,6 +21,8 @@ import "proto/manifest.proto";
package cdc_ft.proto;
// Describes the interface to fetch data from the asset streaming server running
// on the workstation.
service AssetStreamService {
// Requests the contents of a chunk by its id.
rpc GetContent(GetContentRequest) returns (GetContentResponse) {}
@@ -53,12 +51,20 @@ message SendCachedContentIdsRequest {
message SendCachedContentIdsResponse {}
// Describes the interface to receive manifest updates and prioritize processing
// of specific assets.
service ConfigStreamService {
// Streaming channel to receive continuous manifest updates.
rpc GetManifestId(GetManifestIdRequest)
returns (stream GetManifestIdResponse) {}
// Acknowledges that a specific manifest ID has been received.
rpc AckManifestIdReceived(AckManifestIdReceivedRequest)
returns (AckManifestIdReceivedResponse) {}
// Requests the server to process the given in-progress assets as soon as
// possible.
rpc ProcessAssets(ProcessAssetsRequest) returns (ProcessAssetsResponse) {}
}
message GetManifestIdRequest {}
@@ -73,3 +79,9 @@ message AckManifestIdReceivedRequest {
}
message AckManifestIdReceivedResponse {}
message ProcessAssetsRequest {
repeated string relative_paths = 1;
}
message ProcessAssetsResponse {}