Files
netris-cdc-file-transfer/data_store/data_provider.cc
Lutz Justen c21503d21b [cdc_stream] Fix issues found in tests (#40)
* [cdc_stream] Fix issues found in tests

Fixes a couple of issues found by integration testing:
- Unicode command line args in cdc_stream show up as question marks.
- Log is still named assets_stream_manager instead of cdc_stream.
- An error message contains stadia_assets_stream_manager_v3.exe.
- mount_dir was not the last arg as required by FUSE
- Promoted cache cleanup logs to INFO level since they're important
  for the proper workings of the system.
- Asset streaming cache dir is still %APPDATA%\GGP\asset_streaming.

* Address comments
2022-12-07 11:25:43 +01:00

370 lines
13 KiB
C++

// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "data_store/data_provider.h"
#include <algorithm>
#include <thread>
#include "absl/strings/str_format.h"
#include "common/log.h"
#include "common/status.h"
#include "common/stopwatch.h"
#include "manifest/content_id.h"
namespace cdc_ft {
namespace {
// FUSE limits the maximum read request size to 128k. Larger requests will be
// split up into smaller requests up to at most this size. This constant can
// be used to identify max. size requests.
constexpr uint64_t kMaxFuseRequestSize = 1 << 17;
} // namespace
DataProvider::DataProvider(
std::unique_ptr<DataStoreWriter> writer,
std::vector<std::unique_ptr<DataStoreReader>> readers, size_t prefetch_size,
uint32_t cleanup_timeout_sec, uint32_t access_idle_timeout_sec)
: prefetch_size_(prefetch_size),
writer_(std::move(writer)),
readers_(std::move(readers)),
chunks_updated_{true},
cleanup_timeout_sec_(cleanup_timeout_sec),
access_idle_timeout_sec_(access_idle_timeout_sec) {
if (writer_) {
assert(!async_cleaner_);
async_cleaner_ =
std::make_unique<std::thread>([this]() { CleanupThreadMain(); });
}
}
DataProvider::~DataProvider() { Shutdown(); }
void DataProvider::Shutdown() {
{
absl::MutexLock lock(&shutdown_mutex_);
shutdown_ = true;
}
if (async_cleaner_) {
if (async_cleaner_->joinable()) async_cleaner_->join();
async_cleaner_.reset();
}
}
size_t DataProvider::PrefetchSize(size_t read_size) const {
// If the read size matches the maximum FUSE request size, it is very likely
// that the next chunk is needed as well, so we enlarge the read size by the
// prefetch size.
if (read_size == kMaxFuseRequestSize) read_size += prefetch_size_;
return read_size;
}
absl::StatusOr<size_t> DataProvider::Get(const ContentIdProto& content_id,
void* data, size_t offset,
size_t size) {
last_access_sec_ = GetSteadyNowSec();
absl::Mutex* content_mutex = GetContentMutex(content_id);
absl::StatusOr<size_t> read_bytes;
if (writer_) {
{
absl::ReaderMutexLock read_lock(content_mutex);
read_bytes = writer_->Get(content_id, data, offset, size);
}
if (read_bytes.ok()) {
return read_bytes;
}
LogWriterWarning(read_bytes.status(), content_id);
}
// To prevent reading the same chunk from multiple threads, make read/write
// atomic.
absl::WriterMutexLock write_lock(content_mutex);
// Read from the writer_ again, in case the cache has been populated by
// another thread.
if (writer_ && absl::IsNotFound(read_bytes.status())) {
read_bytes = writer_->Get(content_id, data, offset, size);
if (read_bytes.ok()) {
return read_bytes;
}
LogWriterWarning(read_bytes.status(), content_id);
}
for (auto& reader : readers_) {
Buffer buffer;
absl::Status status = reader->Get(content_id, &buffer);
if (!status.ok()) {
// Try next reader if this one doesn't contain the chunk.
if (absl::IsNotFound(status)) continue;
// TODO: Add reader identification for debugging.
return WrapStatus(status, "Failed to get '%s'.",
ContentId::ToHexString(content_id));
}
if (writer_) {
status = writer_->Put(content_id, buffer.data(), buffer.size());
chunks_updated_ = true;
if (!status.ok()) {
LOG_ERROR("Failed to write chunk '%s': %s.",
ContentId::ToHexString(content_id), status.ToString());
}
}
if (buffer.size() <= offset) return 0;
size_t return_bytes = std::min(buffer.size() - offset, size);
memcpy(data, buffer.data() + offset, return_bytes);
return return_bytes;
}
return absl::NotFoundError(absl::StrFormat(
"Failed to find %s.", ContentId::ToHexString(content_id)));
}
absl::Status DataProvider::Get(ChunkTransferList* chunks) {
last_access_sec_ = GetSteadyNowSec();
// Try to fetch chunks from the cache first.
RETURN_IF_ERROR(GetFromWriter(chunks, /*lock_required=*/true));
if (chunks->ReadDone()) return absl::OkStatus();
// Get list of all missing chunk IDs.
std::vector<const ContentIdProto*> chunk_ids;
for (const ChunkTransferTask& chunk : *chunks) {
if (!chunk.done) chunk_ids.push_back(&chunk.id);
}
// Acquire writer locks for all missing chunks.
WriterMutexLockList locks;
WriteLockAll(std::move(chunk_ids), &locks);
// Read from the |writer_| again, in case the cache has been populated by
// another thread. We hold all chunk locks already.
RETURN_IF_ERROR(GetFromWriter(chunks, /*lock_required=*/false));
if (chunks->ReadDone()) return absl::OkStatus();
// Try to read from all readers.
for (auto& reader : readers_) {
absl::Status status = reader->Get(chunks);
if (!status.ok()) {
// TODO: Add reader identification for debugging.
return WrapStatus(status, "Failed to get chunks [%s] from list [%s]",
chunks->UndoneToHexString(), chunks->ToHexString());
}
if (chunks->PrefetchDone()) break;
}
// Cache complete chunks in the writer.
if (writer_) {
for (ChunkTransferTask& chunk : *chunks) {
if (!chunk.done || chunk.chunk_data.empty()) continue;
absl::Status status = writer_->Put(chunk.id, chunk.chunk_data.data(),
chunk.chunk_data.size());
chunks_updated_ = true;
if (!status.ok()) {
LOG_WARNING("Failed to put '%s' to writer: %s.",
ContentId::ToHexString(chunk.id), status.message());
}
}
}
return absl::OkStatus();
}
absl::Status DataProvider::Get(const ContentIdProto& content_id, Buffer* data) {
last_access_sec_ = GetSteadyNowSec();
absl::Mutex* content_mutex = GetContentMutex(content_id);
absl::Status status = absl::OkStatus();
if (writer_) {
{
absl::ReaderMutexLock read_lock(content_mutex);
status = writer_->Get(content_id, data);
}
if (status.ok()) {
return absl::OkStatus();
}
LogWriterWarning(status, content_id);
}
// To prevent reading the same chunk from multiple threads, make read/write
// atomic.
absl::WriterMutexLock write_lock(content_mutex);
// Read from the writer_ again, in case the cache has been populated by
// another thread.
if (writer_ && absl::IsNotFound(status)) {
status = writer_->Get(content_id, data);
if (status.ok()) {
return absl::OkStatus();
}
LogWriterWarning(status, content_id);
}
for (auto& reader : readers_) {
status = reader->Get(content_id, data);
if (!status.ok()) {
// Try next reader if this one doesn't contain the chunk.
if (absl::IsNotFound(status)) continue;
// TODO: Add reader identification for debugging.
return WrapStatus(status, "Failed to get '%s'.",
ContentId::ToHexString(content_id));
}
if (writer_) {
writer_->Put(content_id, data->data(), data->size()).IgnoreError();
chunks_updated_ = true;
}
return absl::OkStatus();
}
return absl::NotFoundError(absl::StrFormat(
"Failed to find '%s'.", ContentId::ToHexString(content_id)));
}
void DataProvider::LogWriterWarning(const absl::Status& status,
const ContentIdProto& content_id) {
if (!absl::IsNotFound(status)) {
LOG_WARNING("Failed to get '%s' from writer: %s.",
ContentId::ToHexString(content_id), status.message());
}
}
absl::Mutex* DataProvider::GetContentMutex(const ContentIdProto& content_id) {
interrupt_ = true;
uint8_t id = ContentId::GetByte(content_id, 0);
return &content_mutexes_[id];
}
void DataProvider::WriteLockAll(std::vector<const ContentIdProto*> chunk_ids,
WriterMutexLockList* locks) {
// Sorting the list avoids cycles when locking from multiple threads
// concurrently, thus avoiding deadlocks when holding some mutexes while
// trying to lock others.
std::sort(
chunk_ids.begin(), chunk_ids.end(),
[](const ContentIdProto* a, const ContentIdProto* b) { return *a < *b; });
std::unordered_set<absl::Mutex*> locked;
for (const ContentIdProto* id : chunk_ids) {
absl::Mutex* mu = GetContentMutex(*id);
auto [_, inserted] = locked.insert(mu);
if (!inserted) continue;
locks->push_back(std::make_unique<absl::WriterMutexLock>(mu));
}
}
absl::Status DataProvider::GetFromWriter(ChunkTransferList* chunks,
bool lock_required) {
if (!writer_ || chunks->ReadDone()) return absl::OkStatus();
// Try to read all remaining chunks from the cache.
absl::StatusOr<size_t> read_bytes;
for (ChunkTransferTask& chunk : *chunks) {
if (chunk.done) continue;
{
std::unique_ptr<absl::ReaderMutexLock> lock;
if (lock_required) {
lock =
std::make_unique<absl::ReaderMutexLock>(GetContentMutex(chunk.id));
}
if (!chunk.size) {
// Check if the prefetch chunk is already present, no further processing
// needed.
chunk.done = writer_->Contains(chunk.id);
continue;
}
// Read the requested data.
read_bytes = writer_->Get(chunk.id, chunk.data, chunk.offset, chunk.size);
}
if (!read_bytes.ok()) {
LogWriterWarning(read_bytes.status(), chunk.id);
} else if (*read_bytes == chunk.size) {
chunk.done = true;
if (chunks->ReadDone()) return absl::OkStatus();
} else {
LogWriterWarning(
MakeStatus("Expected %u bytes, got %u", chunk.size, *read_bytes),
chunk.id);
// Remove the corrupted chunk from the cache, but only if the chunk was
// write-locked by the caller.
if (!lock_required) {
absl::Status status = writer_->Remove(chunk.id);
if (!status.ok()) {
LOG_WARNING("Failed to remove chunk '%s' from the cache: %s",
ContentId::ToHexString(chunk.id), status.ToString());
}
}
}
}
return absl::OkStatus();
}
void DataProvider::LockAllMutexes(WriterMutexLockList* locks) {
for (absl::Mutex& mu : content_mutexes_) {
locks->push_back(std::make_unique<absl::WriterMutexLock>(&mu));
}
}
void DataProvider::CleanupThreadMain() {
assert(writer_);
writer_->RegisterInterrupt(&interrupt_);
absl::MutexLock lock(&shutdown_mutex_);
SteadyClock::Timestamp next_cleanup_time =
steady_clock_->Now() + std::chrono::seconds(cleanup_timeout_sec_);
while (!shutdown_) {
auto cond = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(shutdown_mutex_) {
return shutdown_;
};
shutdown_mutex_.AwaitWithTimeout(
absl::Condition(&cond),
std::max(absl::Seconds(access_idle_timeout_sec_),
absl::Seconds(std::chrono::duration_cast<std::chrono::seconds>(
next_cleanup_time - steady_clock_->Now())
.count())));
int64_t time_sec_since_last_access =
GetSteadyNowSec() - last_access_sec_.load();
if (chunks_updated_ &&
time_sec_since_last_access > access_idle_timeout_sec_) {
WriterMutexLockList locks;
LockAllMutexes(&locks);
chunks_updated_ = false;
LOG_INFO("Starting cache cleanup");
Stopwatch sw;
absl::Status status = writer_->Cleanup();
LOG_INFO("Finished cache cleanup in %0.3f seconds", sw.ElapsedSeconds());
next_cleanup_time =
steady_clock_->Now() + std::chrono::seconds(cleanup_timeout_sec_);
absl::MutexLock cleaned_lock(&cleaned_mutex_);
if (!status.ok()) {
LOG_WARNING("Failed to cleanup the cache: %s", status.message());
chunks_updated_ = true;
is_cleaned_ = false;
} else {
is_cleaned_ = true;
}
}
interrupt_ = false;
}
}
bool DataProvider::WaitForCleanupAndResetForTesting(absl::Duration timeout) {
absl::MutexLock lock(&cleaned_mutex_);
auto cond = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(cleaned_mutex_) {
return is_cleaned_;
};
cleaned_mutex_.AwaitWithTimeout(absl::Condition(&cond), timeout);
bool is_cleaned = is_cleaned_;
is_cleaned_ = false;
return is_cleaned;
}
int64_t DataProvider::GetSteadyNowSec() {
return std::chrono::duration_cast<std::chrono::seconds>(steady_clock_->Now() -
first_now_)
.count();
}
} // namespace cdc_ft