// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "cdc_fuse_fs/cdc_fuse_fs.h" #include #include #include #include #include #include #include "cdc_fuse_fs/asset.h" #include "common/buffer.h" #include "common/log.h" #include "common/path.h" #include "common/platform.h" #include "common/status.h" #include "common/status_macros.h" #include "common/threadpool.h" #include "common/util.h" #include "data_store/data_store_reader.h" #include "json/json.h" #include "manifest/content_id.h" #include "manifest/manifest_proto_defs.h" #ifndef USE_MOCK_LIBFUSE #include "include/fuse.h" #include "include/fuse_lowlevel.h" #else // This code is tested using a fake testing FUSE layer. #include "cdc_fuse_fs/mock_libfuse.h" #endif namespace cdc_ft { namespace { enum class InodeState { kInitialized, kUpdatedProto, // inode's proto was changed, but the content is the same. kUpdated, // inode was updated and its file should be reopened. kInvalid // the corresponding file was removed. }; const char* InodeStateToString(const InodeState& state) { switch (state) { case InodeState::kInitialized: return "INITIALIZED"; case InodeState::kUpdatedProto: return "UPDATED_PROTO"; case InodeState::kUpdated: return "UPDATED"; case InodeState::kInvalid: return "INVALID"; default: return "UNKNOWN"; } } struct Inode; fuse_ino_t GetIno(const Inode& inode); struct Inode { Asset asset; // Inode nlookup: how many times the file was accessed. It is reduced by // forget(). The inode is removed if nlookup = 0 and children_nlookup = 0. std::atomic_uint64_t nlookup{0}; // The number of accessed children (used for directories), whose nlookup > 0. std::atomic_uint64_t children_nlookup{0}; // Shows if this inode is a FUSE root inode. bool is_root = false; // The state during manifest swap. std::atomic state{InodeState::kInitialized}; Inode() = default; // Delete copy/move constructor and assignments. We don't need any. Inode(const Inode&) = delete; Inode(Inode&& inode) = delete; Inode& operator=(Inode&& inode) = delete; Inode& operator=(const Inode&) = delete; bool IsInitialized() const { return state == InodeState::kInitialized; } bool IsUpdated() const { return state == InodeState::kUpdated; } bool IsUpdatedProto() const { return state == InodeState::kUpdatedProto; } bool IsValid() const { return state != InodeState::kInvalid; } Json::Value ToJson(bool with_proto) const { Json::Value value; value["ino"] = GetIno(*this); value["parent_ino"] = asset.parent_ino(); value["nlookup"] = nlookup.load(); value["children_nlookup"] = children_nlookup.load(); value["state"] = InodeStateToString(state); value["proto"] = asset.proto(); if (with_proto) { if (asset.proto()) { value["name"] = asset.proto()->name(); value["type"] = asset.proto()->type(); } else { value["message"] = "Proto message is not set"; } } return value; } }; // Asset proto -> inode map. using InodeMap = std::unordered_map>; // Queued request that cannot be processed yet and should be processed once the // manifest is updated. struct QueuedRequest { // The request type that was blocked. enum class Type { kOpen, kOpenDir, kLookup }; Type type; std::string rel_path; union { // Only valid for type == kOpen or type == kOpenDir. struct Open { fuse_req_t req; fuse_ino_t ino; struct fuse_file_info fi; } open; // Only valid for type == kLookup. struct Lookup { fuse_req_t req; fuse_ino_t parent_ino; const char* name; } lookup; } u; }; // Global context. Fuse is based on loose callbacks, so this holds the fs state. struct CdcFuseFsContext { #ifndef USE_MOCK_LIBFUSE // Fuse state. fuse_args args = FUSE_ARGS_INIT(0, nullptr); fuse_chan* channel = nullptr; char* mountpoint = nullptr; fuse_session* session = nullptr; bool signal_handlers_set = false; int multithreaded = 1; #endif bool initialized = false; // Interface for loading chunks (assets, data etc.). DataStoreReader* data_store_reader = nullptr; // Mutex to protect manifest update process. absl::Mutex manifest_mutex ABSL_ACQUIRED_BEFORE(inodes_mutex); // Loaded manifest. std::unique_ptr manifest ABSL_GUARDED_BY(manifest_mutex) = std::make_unique(); // Root inode (points to manifest->root_dir()). std::shared_ptr root ABSL_GUARDED_BY(manifest_mutex) = std::make_shared(); // Mutex to protect inodes. absl::Mutex inodes_mutex ABSL_ACQUIRED_AFTER(manifest_mutex); // Maps asset protos to Inodes, which contains the proto + metadata. InodeMap inodes ABSL_GUARDED_BY(inodes_mutex); // One buffer per thread to serve read, readdir etc. requests. static thread_local Buffer buffer; // Configuration client to get configuration updates from the workstation. std::unique_ptr config_stream_client; // Queue for requests to open files or directories that have not been // processed yet. absl::Mutex queued_requests_mutex; std::vector queued_requests ABSL_GUARDED_BY(queued_requests_mutex); // Identifies whether FUSE consistency should be inspected after manifest // update. bool consistency_check = false; // Contains invalid inodes, which should be deleted after they are forgotten. std::unordered_map> invalid_inodes ABSL_GUARDED_BY(inodes_mutex); }; thread_local Buffer CdcFuseFsContext::buffer; // Global context for the (static!) Fuse callbacks. CdcFuseFsContext* ctx; // Inode IDs (fuse_ino_t) are just the Inode pointer addresses. // That allows quick lock-free access to inodes. static_assert(sizeof(Inode*) == sizeof(fuse_ino_t), "Size mismatch!"); #ifndef USE_MOCK_LIBFUSE // Sanity check for correct compiler options. // Note: There doesn't seem to be a way to make this 64 bit on Windows in a way // that doesn't cause havoc (but that's for testing only, anyway). static_assert(sizeof(off_t) == 8, "off_t must be 64 bit"); static_assert(sizeof(ino_t) == 8, "ino_t must be 64 bit"); static_assert(sizeof(stat::st_ino) == 8, "st_ino must be 64 bit"); #endif // Converts Inode to fuse_ino_t (cheap typecast). fuse_ino_t GetIno(const Inode& inode) { if (inode.is_root) { return FUSE_ROOT_ID; } return reinterpret_cast(&inode); } // Converts fuse_ino_t to Inode (root inode for FUSE_ROOT_ID, otherwise cheap // typecast). Inode& GetInode(fuse_ino_t ino) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) { if (ino == FUSE_ROOT_ID) { return *ctx->root; } // |ino| is just the inode pointer. return *reinterpret_cast(ino); } // Converts asset.permissions() to a file mode by OR'ing the file type flag. uint32_t GetMode(const AssetProto& asset) { switch (asset.type()) { case AssetProto::FILE: return asset.permissions() | path::MODE_IFREG; case AssetProto::DIRECTORY: return asset.permissions() | path::MODE_IFDIR; default: return asset.permissions(); } } // Fills |stbuf| with data from the asset pointed to by |ino|. void FillStatBuffer(fuse_ino_t ino, struct stat* stbuf) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) { assert(stbuf); const AssetProto& asset = *GetInode(ino).asset.proto(); stbuf->st_ino = ino; stbuf->st_mode = GetMode(asset); // For directories, this is going to be 0 (does that matter?). stbuf->st_size = asset.file_size(); // Number of hard links to the file (number of directories with entries for // this file). Should always be 1 for this read-only filesystem. stbuf->st_nlink = internal::kCdcFuseDefaultNLink; #ifndef USE_MOCK_LIBFUSE stbuf->st_mtim.tv_sec = asset.mtime_seconds(); #else stbuf->st_mtime = asset.mtime_seconds(); #endif stbuf->st_uid = internal::kCdcFuseCloudcastUid; stbuf->st_gid = internal::kCdcFuseCloudcastGid; LOG_DEBUG("FillStatBuffer, ino=%u, size=%u, mode=%u, time=%u", ino, stbuf->st_size, stbuf->st_mode, asset.mtime_seconds()); } // Gets or creates an inode for |proto|. Inode* GetOrCreateInode(Inode& parent, const AssetProto* proto) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->inodes_mutex) { std::shared_ptr& inode = ctx->inodes[proto]; if (inode) { assert(inode->asset.proto()); // Found existing inode. ++inode->nlookup; } else { // A new inode was created. // Note: No other thread can access this node right now. inode = std::make_shared(); inode->asset.Initialize(GetIno(parent), ctx->data_store_reader, proto); inode->nlookup = 1; ++parent.children_nlookup; } return inode.get(); } // Adds an entry with given |name| and stat info from the asset at the given // |ino|. Usually, |name| matches the asset name, except for the "." and ".." // directories. Stores the entry in some Fuse-internal format in |buffer|. void AddDirectoryEntry(fuse_req_t req, Buffer* buffer, const char* name, fuse_ino_t ino) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) { struct stat stbuf; memset(&stbuf, 0, sizeof(stbuf)); // Note: fuse_add_direntry() only uses those two entries. stbuf.st_ino = ino; stbuf.st_mode = GetMode(*GetInode(ino).asset.proto()); // Call fuse_add_direntry with null args to get the size of the entry. size_t old_size = buffer->size(); size_t entry_size = fuse_add_direntry(req, NULL, 0, name, NULL, 0); // Append the new entry at the end of the buffer. buffer->resize(old_size + entry_size); fuse_add_direntry(req, buffer->data() + old_size, buffer->size() - old_size, name, &stbuf, static_cast(buffer->size())); } void ForgetChild(fuse_ino_t ino) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->inodes_mutex) { Inode& inode = GetInode(ino); assert(inode.children_nlookup > 0); --inode.children_nlookup; // Maintain children_nlookup on the root, but never remove it. if (ino == FUSE_ROOT_ID) { return; } if (inode.nlookup == 0 && inode.children_nlookup == 0) { const AssetProto* proto = inode.asset.proto(); ForgetChild(inode.asset.parent_ino()); ctx->inodes.erase(proto); } } void ForgetOne(fuse_ino_t ino, uint64_t nlookup) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->inodes_mutex) { // Supports forgetting outdated inodes - do not need to check validity. Inode& inode = GetInode(ino); LOG_DEBUG("Current nlookup %u to reduce by %u", inode.nlookup.load(), nlookup); inode.nlookup = inode.nlookup > nlookup ? inode.nlookup - nlookup : 0; // Maintain nlookup on the root, but never remove it. if (ino == FUSE_ROOT_ID) { return; } if (inode.nlookup == 0 && inode.children_nlookup == 0) { const AssetProto* proto = inode.asset.proto(); ForgetChild(inode.asset.parent_ino()); size_t count = 0; if (!proto) { count = ctx->invalid_inodes.erase(ino); LOG_DEBUG("Erased invalid inode"); } else { count = ctx->inodes.erase(proto); LOG_DEBUG("Erased inode"); } assert(count); (void)count; } } // Returns inos of previously accessed children inodes for |asset|. std::vector CollectLoadedChildInos(const Asset& asset) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { std::vector protos = asset.GetLoadedChildProtos(); std::vector children; absl::ReaderMutexLock inode_lock(&ctx->inodes_mutex); for (const AssetProto* proto : protos) { InodeMap::iterator it = ctx->inodes.find(proto); if (it != ctx->inodes.end()) { children.push_back(GetIno(*it->second.get())); } } return children; } // Returns true if |inode| with |ino| is valid (it was not changed by any // manifest update). bool ValidateInode(fuse_req_t req, Inode& inode, fuse_ino_t ino) { if (!inode.IsValid()) { LOG_WARNING("Ino %u was outdated after the manifest update", ino); fuse_reply_err(req, ENOENT); return false; } return true; } // Returns the full relative file path for the given |inode|. std::string GetRelativePath(const Inode& inode) { if (inode.asset.parent_ino() == FUSE_ROOT_ID) return inode.asset.proto()->name(); std::string rel_path = GetRelativePath(GetInode(inode.asset.parent_ino())); absl::StrAppend(&rel_path, "/", inode.asset.proto()->name()); return rel_path; } // Asks the server to prioritize the asset at |rel_file_path|. void PrioritizeAssetOnServer(const std::string& rel_file_path) { std::vector assets{rel_file_path}; LOG_INFO("Requesing server to prioritize asset '%s'", rel_file_path); absl::Status status = ctx->config_stream_client->ProcessAssets(std::move(assets)); // An error is not critical, but we should log it. if (!status.ok()) { LOG_ERROR( "Failed to request prioritization for asset '%s' from the server: %s", rel_file_path, status.ToString()); } } // Queues a CdcFuseOpen request in the list of pending requests. Thread-safe. void QueueOpenRequest(const std::string& rel_path, fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { QueuedRequest qr{QueuedRequest::Type::kOpen, rel_path}; qr.u.open.req = req; qr.u.open.ino = ino; qr.u.open.fi = *fi; { absl::MutexLock lock(&ctx->queued_requests_mutex); ctx->queued_requests.emplace_back(std::move(qr)); } PrioritizeAssetOnServer(rel_path); } // Queues a CdcFuseOpenDir request in the list of pending requests. Thread-safe. void QueueOpenDirRequest(const std::string& rel_path, fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { QueuedRequest qr{QueuedRequest::Type::kOpenDir, rel_path}; qr.u.open.req = req; qr.u.open.ino = ino; qr.u.open.fi = *fi; { absl::MutexLock lock(&ctx->queued_requests_mutex); ctx->queued_requests.emplace_back(std::move(qr)); } PrioritizeAssetOnServer(rel_path); } // Queues a CdcFuseLookup reuquest in the list of pending requests. Thread-safe. void QueueLookupRequest(const std::string& rel_path, fuse_req_t req, fuse_ino_t parent_ino, const char* name) ABSL_LOCKS_EXCLUDED(ctx->queued_requests_mutex) { QueuedRequest qr{QueuedRequest::Type::kLookup, rel_path}; qr.u.lookup.req = req; qr.u.lookup.parent_ino = parent_ino; qr.u.lookup.name = name; { absl::MutexLock lock(&ctx->queued_requests_mutex); ctx->queued_requests.emplace_back(std::move(qr)); } PrioritizeAssetOnServer(rel_path); } } // namespace // Implementation of the Fuse lookup() method. // See include/fuse_lowlevel.h. void CdcFuseLookup(fuse_req_t req, fuse_ino_t parent_ino, const char* name) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex, ctx->inodes_mutex) { LOG_DEBUG("CdcFuseLookup, parent_ino=%u, name='%s'", parent_ino, name); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& parent = GetInode(parent_ino); if (!ValidateInode(req, parent, parent_ino)) { return; } if (parent.asset.proto()->in_progress()) { // This directory has not been processed yet. Queue up the request and block // until an updated manifest is available. std::string rel_path = GetRelativePath(parent); LOG_INFO("Request to open ino %u queued (file '%s' not ready)", parent_ino, rel_path); QueueLookupRequest(rel_path, req, parent_ino, name); return; } absl::StatusOr proto = parent.asset.Lookup(name); if (!proto.ok()) { LOG_ERROR("Lookup of '%s' in ino %u failed: '%s'", name, parent_ino, proto.status().ToString().c_str()); fuse_reply_err(req, ENOENT); return; } if (!*proto) { fuse_reply_err(req, ENOENT); return; } Inode* inode; { absl::MutexLock inode_lock(&ctx->inodes_mutex); inode = GetOrCreateInode(parent, *proto); } if (!ValidateInode(req, *inode, GetIno(*inode))) { return; } fuse_entry_param e; memset(&e, 0, sizeof(e)); e.attr_timeout = internal::kCdcFuseInodeTimeoutSec; e.entry_timeout = internal::kCdcFuseInodeTimeoutSec; e.ino = GetIno(*inode); FillStatBuffer(e.ino, &e.attr); fuse_reply_entry(req, &e); } // Implementation of the Fuse getattr() method. // See include/fuse_lowlevel.h. void CdcFuseGetAttr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* /*fi*/) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseGetAttr, ino=%u", ino); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); if (!ValidateInode(req, GetInode(ino), ino)) { return; } struct stat stbuf; memset(&stbuf, 0, sizeof(stbuf)); FillStatBuffer(ino, &stbuf); fuse_reply_attr(req, &stbuf, internal::kCdcFuseInodeTimeoutSec); } void CdcFuseSetAttr(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, struct fuse_file_info* fi) { LOG_DEBUG("CdcFuseSetAttr, ino=%u to_set=%04x mode=%04o", ino, to_set, attr->st_mode); // TODO: Verify that the bits are already set or store the new permissions in // a separate variable. CdcFuseGetAttr(req, ino, fi); } // Implementation of the FUSE open() method. // See include/fuse_lowlevel.h. void CdcFuseOpen(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseOpen, ino=%u, flags=%u", ino, fi->flags); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino)) { return; } const AssetProto* proto = inode.asset.proto(); if (proto->type() == AssetProto::DIRECTORY) { fuse_reply_err(req, EISDIR); return; } // TODO: Handle links. if (proto->type() != AssetProto::FILE) { fuse_reply_err(req, EINVAL); return; } if ((fi->flags & 3) != O_RDONLY) { fuse_reply_err(req, EACCES); return; } if (proto->file_size() > 0 && proto->in_progress()) { // This file has not been processed yet. Queue up the request Block until an // updated manifest is available. LOG_DEBUG("Request to open ino %u queued (file not ready)", ino); QueueOpenRequest(GetRelativePath(inode), req, ino, fi); return; } if (fi->flags & O_DIRECT) { fi->keep_cache = 0; fi->direct_io = 1; } else { fi->keep_cache = 1; fi->direct_io = 0; } // If the manifest was changed, open files "from scratch" to be able to get // the updated data. if (inode.IsUpdated()) { fi->keep_cache = 0; inode.state = InodeState::kInitialized; } fuse_reply_open(req, fi); } // Implementation of the FUSE read() method. // See include/fuse_lowlevel.h. void CdcFuseRead(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* /*fi*/) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseRead, ino=%u, size=%u, off=%u", ino, size, off); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino)) { return; } if (inode.IsUpdated()) { LOG_ERROR("Manifest has been updated, the file '%s' should be reopened", inode.asset.proto()->name()); fuse_reply_err(req, EIO); return; } ctx->buffer.resize(size); absl::StatusOr bytes_read = inode.asset.Read(off, ctx->buffer.data(), size); if (!bytes_read.ok()) { LOG_ERROR("Reading %u bytes from offset %u of asset '%s' failed: '%s'", size, off, inode.asset.proto()->name().c_str(), bytes_read.status().ToString().c_str()); fuse_reply_err(req, EIO); return; } fuse_reply_buf(req, ctx->buffer.data(), *bytes_read); } // Implementation of the FUSE release() method. // See include/fuse_lowlevel.h. void CdcFuseRelease(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseRelease, ino=%u", ino); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino)) { return; } const AssetProto* proto = inode.asset.proto(); if (proto->type() == AssetProto::DIRECTORY) { fuse_reply_err(req, EISDIR); return; } if (proto->type() != AssetProto::FILE) { fuse_reply_err(req, EINVAL); return; } fuse_reply_err(req, 0); } // Implementation of the FUSE opendir() method. // See include/fuse_lowlevel.h. void CdcFuseOpenDir(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseOpenDir, ino=%u", ino); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino)) { return; } const AssetProto* proto = inode.asset.proto(); if (proto->type() != AssetProto::DIRECTORY) { fuse_reply_err(req, ENOTDIR); return; } if (proto->in_progress()) { // This directory has not been processed yet. Queue up the request until an // updated manifest is available. LOG_DEBUG("Request to open directory '%s' ino %u queued (dir not ready)", proto->name(), ino); QueueOpenDirRequest(GetRelativePath(inode), req, ino, fi); return; } fuse_reply_open(req, fi); } // Implementation of the FUSE readdir() method. // See include/fuse_lowlevel.h. void CdcFuseReadDir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, fuse_file_info* /*fi*/) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex, ctx->inodes_mutex) { LOG_DEBUG("CdcFuseReadDir, ino=%u, size=%u, off=%u", ino, size, off); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino) || !ValidateInode(req, GetInode(inode.asset.parent_ino()), inode.asset.parent_ino())) { return; } if (inode.asset.proto()->type() != AssetProto::DIRECTORY) { fuse_reply_err(req, ENOTDIR); return; } // TODO: This is called at least twice for each ls call. Cache buffer or // similar. Buffer buffer; AddDirectoryEntry(req, &buffer, ".", ino); AddDirectoryEntry(req, &buffer, "..", inode.asset.parent_ino()); { absl::StatusOr> protos = inode.asset.GetAllChildProtos(); if (!protos.ok()) { LOG_ERROR("ReadDir of ino %u failed: '%s'", ino, protos.status().ToString().c_str()); fuse_reply_err(req, EBADF); return; } absl::MutexLock inode_lock(&ctx->inodes_mutex); for (const AssetProto* child_proto : *protos) { const Inode& child_inode = *GetOrCreateInode(inode, child_proto); if (!child_inode.IsValid()) continue; AddDirectoryEntry(req, &buffer, child_proto->name().c_str(), GetIno(child_inode)); } } if (off >= static_cast(buffer.size())) { // Out of bounds read. fuse_reply_buf(req, nullptr, 0); } else { // Return the part that the caller asks for. fuse_reply_buf(req, buffer.data() + off, std::min(buffer.size() - off, size)); } } void CdcFuseReleaseDir(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseReleaseDir, ino=%u", ino); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); Inode& inode = GetInode(ino); if (!ValidateInode(req, inode, ino)) { return; } if (inode.asset.proto()->type() != AssetProto::DIRECTORY) { fuse_reply_err(req, ENOTDIR); return; } fuse_reply_err(req, 0); } // Implementation of the FUSE forget() method. // See include/fuse_lowlevel.h. void CdcFuseForget(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex, ctx->inodes_mutex) { LOG_DEBUG("CdcFuseForget, ino=%u, nlookup=%u", ino, nlookup); assert(ctx && ctx->initialized); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); absl::MutexLock ctx_lock(&ctx->inodes_mutex); ForgetOne(ino, nlookup); fuse_reply_none(req); } // Implementation of the FUSE forget_multi() method. // See include/fuse_lowlevel.h. void CdcFuseForgetMulti(fuse_req_t req, size_t count, struct fuse_forget_data* forgets) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex, ctx->inodes_mutex) { LOG_DEBUG("CdcFuseForgetMulti, count=%u", count); assert(forgets); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); absl::MutexLock ctx_lock(&ctx->inodes_mutex); for (size_t i = 0; i < count; ++i) { ForgetOne(forgets[i].ino, forgets[i].nlookup); } fuse_reply_none(req); } // Implementation of the FUSE access() method. // See include/fuse_lowlevel.h. void CdcFuseAccess(fuse_req_t req, fuse_ino_t ino, int mask) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_DEBUG("CdcFuseAccess, ino=%u, mask=%u", ino, mask); absl::ReaderMutexLock manifest_lock(&ctx->manifest_mutex); struct fuse_context* context = fuse_get_context(); // Root always has access rights. if (context->uid == internal::kCdcFuseRootUid || context->gid == internal::kCdcFuseRootGid) { fuse_reply_err(req, 0); return; } if (!ValidateInode(req, GetInode(ino), ino)) { return; } struct stat stbuf; memset(&stbuf, 0, sizeof(stbuf)); FillStatBuffer(ino, &stbuf); int process_permission = stbuf.st_mode & 0x7; // world if (stbuf.st_gid == static_cast(context->gid)) { process_permission |= stbuf.st_mode >> 3 & 0x7; // group } if (stbuf.st_uid == static_cast(context->uid)) { process_permission |= stbuf.st_mode >> 6 & 0x7; // user } if ((process_permission & mask) != mask) { fuse_reply_err(req, EACCES); return; } fuse_reply_err(req, 0); } // Not-implemented functions for read-only FUSE. void CdcFuseReadLink(fuse_req_t req, fuse_ino_t ino) { LOG_WARNING("CdcFuseReadLink not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseFlush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseFlush not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseFSync(fuse_req_t req, fuse_ino_t ino, int datasync, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseFSync not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseFSyncDir(fuse_req_t req, fuse_ino_t ino, int datasync, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseFSyncDir not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseStatFS(fuse_req_t req, fuse_ino_t ino) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseStatFS not implemented, ino=%u", ino); // Mimic the default behavior of the FUSE library. struct statvfs buf; buf.f_bsize = 512; buf.f_namemax = 255; fuse_reply_statfs(req, &buf); } void CdcFuseSetXAttr(fuse_req_t req, fuse_ino_t ino, const char* name, const char* value, size_t size, int flags) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseSetXAttr not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseGetXAttr(fuse_req_t req, fuse_ino_t ino, const char* name, size_t size) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseGetXAttr not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseListXAttr(fuse_req_t req, fuse_ino_t ino, size_t size) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseListXAttr not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseGetLk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi, struct flock* lock) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseGetLk not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseSetLk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi, struct flock* lock, int sleep) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseSetLk not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseBMap(fuse_req_t req, fuse_ino_t ino, size_t blocksize, uint64_t idx) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseBMap not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseIoctl(fuse_req_t req, fuse_ino_t ino, int cmd, void* arg, struct fuse_file_info* fi, unsigned flags, const void* in_buf, size_t in_bufsz, size_t out_bufsz) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseIoctl not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFusePoll(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi, struct fuse_pollhandle* ph) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFusePoll not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseRetrieveReply(fuse_req_t req, void* cookie, fuse_ino_t ino, off_t offset, struct fuse_bufvec* bufv) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseRetrieveReply not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseFLock(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi, int op) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseFLock not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } void CdcFuseFAllocate(fuse_req_t req, fuse_ino_t ino, int mode, off_t offset, off_t length, struct fuse_file_info* fi) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex) { LOG_WARNING("CdcFuseFAllocate not implemented, ino=%u", ino); fuse_reply_err(req, ENOSYS); } size_t CdcFuseGetInodeCountForTesting() ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { assert(ctx); absl::MutexLock lock(&ctx->inodes_mutex); return ctx->inodes.size(); } size_t CdcFuseGetInvalidInodeCountForTesting() ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { assert(ctx); absl::MutexLock lock(&ctx->inodes_mutex); return ctx->invalid_inodes.size(); } namespace cdc_fuse_fs { absl::Status Initialize(int argc, char** argv) { assert(!ctx); ctx = new CdcFuseFsContext(); #ifndef USE_MOCK_LIBFUSE // Parse command line args. ctx->args = FUSE_ARGS_INIT(argc, argv); if (fuse_parse_cmdline(&ctx->args, &ctx->mountpoint, &ctx->multithreaded, /*foreground=*/nullptr) == -1) { Shutdown(); return MakeStatus("fuse_parse_cmdline() failed"); } // Initialize channel. ctx->channel = fuse_mount(ctx->mountpoint, &ctx->args); if (!ctx->channel) { Shutdown(); return MakeStatus("fuse_mount() failed"); } // Initialize session. fuse_lowlevel_ops fs_operations = {.lookup = CdcFuseLookup, .forget = CdcFuseForget, .getattr = CdcFuseGetAttr, .setattr = CdcFuseSetAttr, .readlink = CdcFuseReadLink, // .mknod // Read-only file system // .mkdir // Read-only file system // .unlink // Read-only file system // .rmdir // Read-only file system // .symlink // Read-only file system // .rename // Read-only file system // .link // Read-only file system .open = CdcFuseOpen, .read = CdcFuseRead, // .write // Read-only file system .flush = CdcFuseFlush, .release = CdcFuseRelease, .fsync = CdcFuseFSync, .opendir = CdcFuseOpenDir, .readdir = CdcFuseReadDir, .releasedir = CdcFuseReleaseDir, .fsyncdir = CdcFuseFSyncDir, .statfs = CdcFuseStatFS, .setxattr = CdcFuseSetXAttr, .getxattr = CdcFuseGetXAttr, .listxattr = CdcFuseListXAttr, // .removexattr // Read-only file system .access = CdcFuseAccess, // .create // Read-only file system .getlk = CdcFuseGetLk, .setlk = CdcFuseSetLk, .bmap = CdcFuseBMap, .ioctl = CdcFuseIoctl, .poll = CdcFusePoll, // .write_buf // Read-only file system .retrieve_reply = CdcFuseRetrieveReply, .forget_multi = CdcFuseForgetMulti, .flock = CdcFuseFLock, .fallocate = CdcFuseFAllocate}; ctx->session = fuse_lowlevel_new(&ctx->args, &fs_operations, sizeof(fs_operations), nullptr); if (!ctx->session) { Shutdown(); return MakeStatus("fuse_lowlevel_new() failed"); } // Set signal handlers. if (fuse_set_signal_handlers(ctx->session) == -1) { Shutdown(); return MakeStatus("fuse_set_signal_handlers() failed"); } ctx->signal_handlers_set = true; fuse_session_add_chan(ctx->session, ctx->channel); #else // This code is not unit tested. #endif ctx->initialized = true; return absl::OkStatus(); } void Shutdown() { assert(ctx); #ifndef USE_MOCK_LIBFUSE // Exact opposite of Create(). if (ctx->signal_handlers_set) { ctx->signal_handlers_set = false; fuse_session_remove_chan(ctx->channel); fuse_remove_signal_handlers(ctx->session); } if (ctx->session) { fuse_session_destroy(ctx->session); ctx->session = nullptr; } if (ctx->channel) { fuse_unmount(ctx->mountpoint, ctx->channel); ctx->channel = nullptr; } if (ctx->mountpoint) { free(ctx->mountpoint); ctx->mountpoint = nullptr; } fuse_opt_free_args(&ctx->args); #else // This code is not unit tested. #endif ctx->initialized = false; delete ctx; ctx = nullptr; } // Adds a warning message to |warnings| if |inode| does not point to // |context_proto|. void CheckProtoMismatch(const std::shared_ptr& inode, const AssetProto* context_proto, Json::Value& warnings) { if (context_proto != inode->asset.proto()) { LOG_WARNING("Proto mismatch %u", GetIno(*inode.get())); Json::Value value; value["ino"] = GetIno(*inode.get()); value["state"] = InodeStateToString(inode->state); value["context_proto"] = context_proto; value["actual_proto"] = inode->asset.proto(); warnings.append(value); } } // Adds a warning message to |warnings| if the proto of |inode| is not nullptr. // This check is relevant for invalidated inodes (corresponding files and // directories were removed from the manifest). void CheckProtoNotNull(const std::shared_ptr& inode, Json::Value& warnings) { if (inode->asset.proto()) { LOG_WARNING("Proto for invalidated inode is not NULL %u", GetIno(*inode.get())); Json::Value value; value["ino"] = GetIno(*inode.get()); warnings.append(value); } } Json::Value CreateWarningMessage(const Inode* inode, std::string&& message) { Json::Value warning; warning["ino"] = GetIno(*inode); warning["name"] = inode->asset.proto()->name(); warning["message"] = message; return warning; } // Adds a set of warning messages to |warnings| if inodes have wrong // properties, for example: a non-directory asset has directory assets. void CheckConsistencyIndividualInodes(const std::vector& inodes, Json::Value& warnings) { LOG_DEBUG("Checking consistency of individual inodes"); Json::Value inodes_wrong_properties; for (const Inode* inode : inodes) { std::string asset_check; if (!inode->asset.IsConsistent(&asset_check)) { inodes_wrong_properties.append( CreateWarningMessage(inode, std::move(asset_check))); } // Inode should be referenced. if (inode->nlookup + inode->children_nlookup == 0) { inodes_wrong_properties.append( CreateWarningMessage(inode, "Inode is not referenced")); } if (!inodes_wrong_properties.empty()) { warnings["inodes_wrong_properties"] = inodes_wrong_properties; } } } // Adds a set of warning messages to |warnings| if inodes have invalid parents // and thus cannot be reached from the updated manifest. It checks the // consistency of tree directory structure. void CheckConsistencyInodesHierarchy(const std::vector& inodes, Json::Value& warnings) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->manifest_mutex) { LOG_DEBUG("Checking consistency of inodes hierarchy"); std::deque inodes_queue; inodes_queue.insert(inodes_queue.end(), inodes.begin(), inodes.end()); Json::Value inodes_wrong_parent; std::unordered_set visited; while (!inodes_queue.empty()) { const Inode* inode = inodes_queue.front(); inodes_queue.pop_front(); if (visited.find(inode) != visited.end()) { continue; } visited.insert(inode); Inode& parent = GetInode(inode->asset.parent_ino()); // Only valid inodes can be on the list. if (!parent.IsValid()) { Json::Value message; message["ino"] = GetIno(*inode); message["parent"] = inode->asset.parent_ino(); message["name"] = inode->asset.proto()->name(); message["message"] = "Invalid parent"; inodes_wrong_parent.append(message); continue; } // Add the parent to the deque, as |inodes| includes only kUpdatedProto and // kUpdated. if (visited.find(&parent) == visited.end()) { inodes_queue.push_back(&parent); } } if (!visited.empty() && visited.find(ctx->root.get()) == visited.end()) { Json::Value message; message["message"] = "Inode hierarchy is not consistent: the root node was not reached"; inodes_wrong_parent.append(message); } if (!inodes_wrong_parent.empty()) { warnings["inodes_wrong_parent"] = inodes_wrong_parent; } } // Checks if the proto messages are reachable from ctx->manifest. // Returns the set of inodes with unreachable protos. std::set CheckProtoReachability(Json::Value& warnings) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { LOG_DEBUG("Checking proto reachability"); Json::Value reachability_warning; std::set unreachable_inodes; if (&ctx->manifest->root_dir() != ctx->root->asset.proto()) { Json::Value message; message["message"] = "Root inode does not point to the manifest proto"; reachability_warning.append(message); unreachable_inodes.emplace(ctx->root.get()); } absl::MutexLock lock(&ctx->inodes_mutex); std::vector root_protos = ctx->root->asset.GetLoadedChildProtos(); std::unordered_set manifest_protos(root_protos.begin(), root_protos.end()); // Start with the root node and its children, add children protos on the // way. std::deque collected_protos; collected_protos.insert(collected_protos.end(), manifest_protos.begin(), manifest_protos.end()); // Collect all protos reachable from the manifest. while (!collected_protos.empty()) { const AssetProto* proto = collected_protos.front(); collected_protos.pop_front(); InodeMap::iterator it = ctx->inodes.find(proto); // Collect child protos of all directories. if (it == ctx->inodes.end() || it->second->asset.proto()->type() != AssetProto::DIRECTORY) { continue; } std::vector subprotos = it->second->asset.GetLoadedChildProtos(); collected_protos.insert(collected_protos.end(), subprotos.begin(), subprotos.end()); manifest_protos.insert(subprotos.begin(), subprotos.end()); } for (const auto& [proto, inode] : ctx->inodes) { if (manifest_protos.find(proto) == manifest_protos.end()) { Json::Value message; message["message"] = absl::StrFormat( "Proto for inode %i is not reachable from the manifest", reinterpret_cast(&(*inode))); reachability_warning.append(message); unreachable_inodes.emplace(inode.get()); } } if (!reachability_warning.empty()) { warnings["proto_reachability"] = reachability_warning; } return unreachable_inodes; } // Checks if the FUSE state is consistent after the manifest update. In case of // any inconsistencies it prints out a pretty JSON string. |inodes_size| // describes the number of inodes before the manifest was set. void CheckFUSEConsistency(size_t inodes_size) ABSL_EXCLUSIVE_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { LOG_DEBUG("Starting FUSE consistency check"); std::vector inodes_to_check; Json::Value warnings; // Step I. Root consistency. LOG_DEBUG("Checking the root"); if (!ctx->root || ctx->root->asset.parent_ino() != FUSE_ROOT_ID || !ctx->root->IsValid() || ctx->root->IsInitialized()) { Json::Value warning_root = ctx->root->ToJson(true); warning_root["message"] = "The root inode is inconsistent"; warnings.append(warning_root); } // Step II. The total amount of inodes should not change. Json::Value initialized_json; Json::Value wrong_protos_json; std::vector invalid_inodes; size_t initialized_total = 0; size_t updated_proto_total = 0; size_t updated_total = 0; { LOG_DEBUG("Checking the number of inodes"); absl::ReaderMutexLock lock(&ctx->inodes_mutex); if (inodes_size != ctx->inodes.size() + ctx->invalid_inodes.size()) { Json::Value warning_size; warning_size["message"] = absl::StrFormat("Inodes' size mismatch: expected: %u, actual: %u", inodes_size, ctx->inodes.size()); warnings.append(warning_size); } // Step III. Consistency of ctx->inodes: inodes should point to the // correct asset proto and asset protos should point to the right inodes. LOG_DEBUG("Checking inode state"); for (const auto& [context_proto, inode] : ctx->inodes) { switch (inode->state) { case InodeState::kInitialized: // There must be no kInitialized inodes, all should be kUpdatedProto, // kUpdated, or kInvalid after manifest update. initialized_json.append(inode->ToJson(true)); ++initialized_total; break; case InodeState::kUpdatedProto: CheckProtoMismatch(inode, context_proto, wrong_protos_json); inodes_to_check.push_back(inode.get()); ++updated_proto_total; break; case InodeState::kUpdated: CheckProtoMismatch(inode, context_proto, wrong_protos_json); inodes_to_check.push_back(inode.get()); ++updated_total; break; case InodeState::kInvalid: CheckProtoNotNull(inode, wrong_protos_json); invalid_inodes.push_back(inode.get()); break; } } } LOG_DEBUG("Initialized=%u, updated_proto=%u, updated=%u, invalid=%u", initialized_total, updated_proto_total, updated_total, invalid_inodes.size()); if (!initialized_json.empty()) { warnings["initialized_inodes"] = initialized_json; } if (!wrong_protos_json.empty()) { warnings["wrong_protos_inodes"] = wrong_protos_json; } // IV. Tree consistency. CheckConsistencyInodesHierarchy(inodes_to_check, warnings); // V. Check reachability of all AssetProtos. std::set unreachable_inodes = CheckProtoReachability(warnings); inodes_to_check.push_back(ctx->root.get()); if (!unreachable_inodes.empty()) { LOG_WARNING("Skipping %i inodes from the consistency check", unreachable_inodes.size()); inodes_to_check.erase( std::remove_if(inodes_to_check.begin(), inodes_to_check.end(), [&unreachable_inodes](const Inode* inode) { return unreachable_inodes.find(inode) != unreachable_inodes.end(); }), inodes_to_check.end()); } // VI. Consistency of individual reachable inodes. CheckConsistencyIndividualInodes(inodes_to_check, warnings); Json::Value output; if (!warnings.empty()) { Json::Value updated_proto_json; Json::Value updated_json; for (const Inode* inode : inodes_to_check) { if (inode->IsUpdated()) { updated_json.append(inode->ToJson(true)); } else { assert(inode->IsUpdatedProto()); updated_proto_json.append(inode->ToJson(true)); } } Json::Value invalid_json; for (const Inode* inode : invalid_inodes) { invalid_json.append(inode->ToJson(false)); } output["updated_proto_inodes"] = updated_proto_json; output["updated_inodes"] = updated_json; output["invalid_inodes"] = invalid_json; output["warnings"] = warnings; } if (output.empty()) { LOG_INFO("FUSE consistency check succeeded"); } else { LOG_WARNING("FUSE consistency check: %s", output.toStyledString()); } } // Recursive procedure to invalidate the inode subtree for |ino| including // the root |ino| of the subtree. The elements cannot be directly removed as // they might be still referenced. void InvalidateTree(fuse_ino_t ino) ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { std::deque inos; inos.push_back(ino); while (!inos.empty()) { fuse_ino_t tmp_ino = inos.front(); Inode& inode = GetInode(tmp_ino); if (!inode.IsValid()) { LOG_WARNING( "ino should be valid before invalidation. ino %u is already invalid", ino); return; } inode.state = InodeState::kInvalid; if (inode.asset.proto()->type() == AssetProto::DIRECTORY) { std::vector child_inos = CollectLoadedChildInos(inode.asset); inos.insert(inos.end(), child_inos.begin(), child_inos.end()); } { absl::MutexLock inode_lock(&ctx->inodes_mutex); const AssetProto* outdated_proto = inode.asset.proto(); ctx->invalid_inodes[tmp_ino] = ctx->inodes[outdated_proto]; size_t count = ctx->inodes.erase(outdated_proto); assert(count); (void)count; } inode.asset.UpdateProto(nullptr); inos.pop_front(); } } struct UpdateInode { std::shared_ptr new_parent; fuse_ino_t old_ino; }; // ThreadPool task that runs the update of inodes. class UpdateInodeTask : public Task { public: UpdateInodeTask(UpdateInode* inode, std::vector* result) : update_inode_(inode), child_inodes_to_update_(result) {} // Task: void ThreadRun(IsCancelledPredicate is_cancelled) override ABSL_SHARED_LOCKS_REQUIRED(ctx->manifest_mutex) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { LOG_DEBUG("Updating inode %u", update_inode_->old_ino); assert((ctx->manifest_mutex.AssertHeld(), true)); const std::shared_ptr& new_parent = update_inode_->new_parent; Inode& old_inode = GetInode(update_inode_->old_ino); assert(old_inode.IsValid()); const std::string& name = old_inode.asset.proto()->name(); absl::StatusOr new_proto = new_parent->asset.Lookup(name.c_str()); // The asset does not exist anymore. It has to be removed from the parent's // set of children. If the node has its own children, they should be // invalidated as well. The final removal from the inode map can only be // done via forget() and forget_multi() calls. if (!new_proto.ok() || !*new_proto) { InvalidateTree(update_inode_->old_ino); return; } // Asset still exists in a new proto. Its inode id should be preserved. If a // new proto exists for the same name, but the asset has changed, an update // is necessary, the inode id remains stable. if (*(*new_proto) != *(old_inode.asset.proto())) { LOG_DEBUG("Inode %u is marked for update", update_inode_->old_ino); old_inode.state = InodeState::kUpdated; } else { old_inode.state = InodeState::kUpdatedProto; } const AssetProto* old_proto = old_inode.asset.proto(); std::shared_ptr new_inode; { absl::MutexLock inode_lock(&ctx->inodes_mutex); new_inode = ctx->inodes[*new_proto] = ctx->inodes[old_proto]; } // As there is an updated valid entry for the same inode in the map, // the old one can be removed. proto_to_remove_ = old_proto; std::vector child_inos = CollectLoadedChildInos(old_inode.asset); for (fuse_ino_t child_ino : child_inos) { UpdateInode child_to_update; child_to_update.new_parent = new_inode; child_to_update.old_ino = child_ino; child_inodes_to_update_->emplace_back(std::move(child_to_update)); } old_inode.asset.UpdateProto(*new_proto); } const AssetProto* ProtoToRemove() const { return proto_to_remove_; } private: const UpdateInode* const update_inode_; std::vector* child_inodes_to_update_; const AssetProto* proto_to_remove_ = nullptr; }; // Recursive procedure to update the inodes contents on a level after a request // to update the manifest id was received. void ParallelUpdateProtosOnLevel( Threadpool& pool, std::vector& input_inodes, std::vector>& result, std::vector& outdated_protos) { LOG_DEBUG("Update asset protos in parallel on the same level"); assert(input_inodes.size() == result.size()); for (unsigned int idx = 0; idx < input_inodes.size(); ++idx) { pool.QueueTask( std::make_unique(&input_inodes[idx], &result[idx])); } for (unsigned int idx = 0; idx < input_inodes.size(); ++idx) { std::unique_ptr task = pool.GetCompletedTask(); UpdateInodeTask* update_task = static_cast(task.get()); if (update_task->ProtoToRemove()) { outdated_protos.push_back(update_task->ProtoToRemove()); } } } std::shared_ptr UpdateProtosFromRoot(const AssetProto* new_root_proto) ABSL_LOCKS_EXCLUDED(ctx->inodes_mutex) { LOG_DEBUG("Updating inode hierarchy starting from the root"); assert((ctx->manifest_mutex.AssertHeld(), true)); // Create the new root. Make sure to preserve the lookup counts! std::shared_ptr new_root = std::make_shared(); new_root->asset.Initialize(FUSE_ROOT_ID, ctx->data_store_reader, new_root_proto); new_root->nlookup = ctx->root->nlookup.load(); new_root->children_nlookup = ctx->root->children_nlookup.load(); new_root->state = ctx->root->state.load(); new_root->is_root = true; std::shared_ptr old_root = ctx->root; std::vector children = CollectLoadedChildInos(old_root->asset); std::vector inos_to_update; inos_to_update.reserve(children.size()); for (fuse_ino_t child : children) { UpdateInode to_update; to_update.new_parent = new_root; to_update.old_ino = child; inos_to_update.emplace_back(std::move(to_update)); } // Outdated AssetProto(s) can be removed at the end, as they have a duplicated // updated entry in inodes. Only updated (not removed) inodes are included. std::vector outdated_protos; Threadpool pool(std::thread::hardware_concurrency()); while (!inos_to_update.empty()) { std::vector> level_result( inos_to_update.size(), std::vector()); ParallelUpdateProtosOnLevel(pool, inos_to_update, level_result, outdated_protos); inos_to_update.clear(); for (unsigned int idx = 0; idx < level_result.size(); ++idx) { for (unsigned int jdx = 0; jdx < level_result[idx].size(); ++jdx) { inos_to_update.push_back(level_result[idx][jdx]); } } } // Inodes should not be removed, just the map entries with old protos. absl::MutexLock inode_lock(&ctx->inodes_mutex); for (size_t idx = outdated_protos.size(); idx > 0; --idx) { assert(outdated_protos[idx - 1]); size_t count = ctx->inodes.erase(outdated_protos[idx - 1]); assert(count); (void)count; } return new_root; } absl::Status SetManifest(const ContentIdProto& manifest_id) ABSL_LOCKS_EXCLUDED(ctx->manifest_mutex, ctx->inodes_mutex) { LOG_DEBUG("Setting manifest '%s' in FUSE", ContentId::ToHexString(manifest_id)); assert(ctx && ctx->initialized && ctx->data_store_reader); { absl::WriterMutexLock manifest_lock(&ctx->manifest_mutex); size_t old_inodes_size; { absl::MutexLock inodes_lock(&ctx->inodes_mutex); old_inodes_size = ctx->inodes.size() + ctx->invalid_inodes.size(); } std::unique_ptr new_manifest = std::make_unique(); absl::Status status = ctx->data_store_reader->GetProto(manifest_id, new_manifest.get()); if (!status.ok()) { LOG_ERROR("Failed to get manifest '%s'", ContentId::ToHexString(manifest_id)); return WrapStatus(status, "Failed to get manifest '%s'", ContentId::ToHexString(manifest_id)); } ctx->root = UpdateProtosFromRoot(&new_manifest->root_dir()); if (ctx->manifest->root_dir() != new_manifest->root_dir()) { ctx->root->state = InodeState::kUpdated; } else { ctx->root->state = InodeState::kUpdatedProto; } ctx->manifest.swap(new_manifest); if (ctx->consistency_check) { CheckFUSEConsistency(old_inodes_size); } absl::MutexLock inodes_lock(&ctx->inodes_mutex); for (const auto& [proto, inode] : ctx->inodes) { // Reset kUpdatedProto to kInitialized. The state was only used for // validation. kUpdated is still needed for clearing kernel caches when // a file is opened. assert(inode->IsValid()); if (inode->IsUpdatedProto() || inode->asset.proto()->type() == AssetProto::DIRECTORY) { inode->state = InodeState::kInitialized; } } ctx->root->state = InodeState::kInitialized; } // Process outstanding open requests. Be sure to move the vector because // processing might requeue requests. std::vector requests; { absl::MutexLock lock(&ctx->queued_requests_mutex); requests.swap(ctx->queued_requests); } for (QueuedRequest& qr : requests) { switch (qr.type) { case QueuedRequest::Type::kLookup: LOG_DEBUG("Resuming request to look up '%s' in '%s' (ino %u)", qr.u.lookup.name, qr.rel_path, qr.u.lookup.parent_ino); CdcFuseLookup(qr.u.lookup.req, qr.u.lookup.parent_ino, qr.u.lookup.name); break; case QueuedRequest::Type::kOpen: LOG_DEBUG("Resuming request to open file '%s' (ino %u)", qr.rel_path, qr.u.open.ino); CdcFuseOpen(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi); break; case QueuedRequest::Type::kOpenDir: LOG_DEBUG("Resuming request to open dir '%s' (ino %u)", qr.rel_path, qr.u.open.ino); CdcFuseOpenDir(qr.u.open.req, qr.u.open.ino, &qr.u.open.fi); break; } } #ifndef USE_MOCK_LIBFUSE // Acknowledge that the manifest id was received and FUSE was updated. absl::Status status = ctx->config_stream_client->SendManifestAck(manifest_id); if (!status.ok()) { LOG_ERROR("Failed to send ack for manifest '%s'", ContentId::ToHexString(manifest_id)); return WrapStatus(status, "Failed to send ack for manifest '%s'", ContentId::ToHexString(manifest_id)); } #endif return absl::OkStatus(); } void SetConfigClient( std::unique_ptr config_client) { LOG_DEBUG("Starting configuration client"); assert(ctx && ctx->initialized); if (ctx->config_stream_client) { ctx->config_stream_client.reset(); } ctx->config_stream_client = std::move(config_client); } // Initializes FUSE with a manifest for an empty directory: // The user will be able to check the empty folder before the first update // of the manifest id is received. void InitializeRootManifest() { absl::MutexLock lock(&ctx->manifest_mutex); assert(ctx && ctx->root); ctx->manifest->mutable_root_dir()->set_type(AssetProto::DIRECTORY); ctx->root->asset.Initialize(FUSE_ROOT_ID, ctx->data_store_reader, &ctx->manifest->root_dir()); ctx->root->is_root = true; ctx->root->nlookup = 1; } absl::Status Run(DataStoreReader* data_store_reader, bool consistency_check) { assert(ctx && ctx->initialized && data_store_reader); ctx->consistency_check = consistency_check; ctx->data_store_reader = data_store_reader; InitializeRootManifest(); #ifndef USE_MOCK_LIBFUSE RETURN_IF_ERROR(ctx->config_stream_client->StartListeningToManifestUpdates( [](const ContentIdProto& id) { return SetManifest(id); }), "Failed to listen to manifest updates"); LOG_INFO("Starting session loop (mt = '%s')", ctx->multithreaded ? "true" : "false"); int res = ctx->multithreaded ? fuse_session_loop_mt(ctx->session) : fuse_session_loop(ctx->session); if (res == -1) return MakeStatus("Session loop failed"); LOG_INFO("Session loop finished."); ctx->config_stream_client->Shutdown(); #else // This code is not unit tested. #endif return absl::OkStatus(); } } // namespace cdc_fuse_fs } // namespace cdc_ft