diff --git a/WORKSPACE b/WORKSPACE index aa3519f..51d1aa5 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -47,10 +47,10 @@ http_archive( http_archive( name = "com_github_fuse", build_file = "@//third_party/fuse:BUILD", - sha256 = "832432d1ad4f833c20e13b57cf40ce5277a9d33e483205fc63c78111b3358874", - strip_prefix = "fuse-2.9.7", patch_args = ["-p1"], patches = ["@//third_party/fuse:disable_symbol_versioning.patch"], + sha256 = "832432d1ad4f833c20e13b57cf40ce5277a9d33e483205fc63c78111b3358874", + strip_prefix = "fuse-2.9.7", url = "https://github.com/libfuse/libfuse/releases/download/fuse-2.9.7/fuse-2.9.7.tar.gz", ) @@ -70,6 +70,14 @@ http_archive( url = "https://github.com/tronkko/dirent/archive/refs/tags/1.23.2.tar.gz", ) +http_archive( + name = "com_github_lyra", + build_file = "@//third_party/lyra:BUILD.bazel", + sha256 = "a93f247ed89eba11ca36eb24c4f8ba7be636bf24e74aaaa8e1066e0954bec7e3", + strip_prefix = "Lyra-1.6.1", + url = "https://github.com/bfgroup/Lyra/archive/refs/tags/1.6.1.tar.gz", +) + local_repository( name = "com_google_absl", path = "third_party/absl", diff --git a/all_files.vcxitems b/all_files.vcxitems index 6eaceac..e28a753 100644 --- a/all_files.vcxitems +++ b/all_files.vcxitems @@ -18,6 +18,7 @@ + @@ -29,6 +30,7 @@ + @@ -145,6 +147,7 @@ + @@ -154,6 +157,7 @@ + diff --git a/asset_stream_manager/BUILD b/asset_stream_manager/BUILD index 30f3719..84f6e69 100644 --- a/asset_stream_manager/BUILD +++ b/asset_stream_manager/BUILD @@ -7,12 +7,28 @@ cc_binary( srcs = ["main.cc"], data = [":roots_pem"], deps = [ - ":asset_stream_config", - ":session_management_server", + ":commands", "//cdc_stream", "//common:log", "//common:path", - "//data_store:data_provider", + ], +) + +cc_library( + name = "commands", + srcs = [ + "base_command.cc", + "start_service_command.cc", + ], + hdrs = [ + "base_command.h", + "start_service_command.h", + ], + deps = [ + ":asset_stream_config", + ":session_management_server", + "@com_github_lyra//:lyra", + "@com_google_absl//absl/status", ], ) @@ -53,8 +69,10 @@ cc_library( "//common:log", "//common:path", "//common:status_macros", + "//data_store:data_provider", + "//data_store:disk_data_store", "@com_github_jsoncpp//:jsoncpp", - "@com_google_absl//absl/flags:parse", + "@com_github_lyra//:lyra", ], ) diff --git a/asset_stream_manager/asset_stream_config.cc b/asset_stream_manager/asset_stream_config.cc index 8f1b353..db00123 100644 --- a/asset_stream_manager/asset_stream_config.cc +++ b/asset_stream_manager/asset_stream_config.cc @@ -16,83 +16,139 @@ #include -#include "absl/flags/flag.h" -#include "absl/flags/parse.h" #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "absl_helper/jedec_size_flag.h" #include "common/buffer.h" #include "common/path.h" #include "common/status_macros.h" +#include "data_store/data_provider.h" +#include "data_store/disk_data_store.h" #include "json/json.h" - -ABSL_DECLARE_FLAG(int, verbosity); -ABSL_DECLARE_FLAG(bool, debug); -ABSL_DECLARE_FLAG(bool, singlethreaded); -ABSL_DECLARE_FLAG(bool, stats); -ABSL_DECLARE_FLAG(bool, quiet); -ABSL_DECLARE_FLAG(bool, check); -ABSL_DECLARE_FLAG(bool, log_to_stdout); -ABSL_DECLARE_FLAG(cdc_ft::JedecSize, cache_capacity); -ABSL_DECLARE_FLAG(uint32_t, cleanup_timeout); -ABSL_DECLARE_FLAG(uint32_t, access_idle_timeout); -ABSL_DECLARE_FLAG(int, manifest_updater_threads); -ABSL_DECLARE_FLAG(int, file_change_wait_duration_ms); - -// Development flags. -ABSL_DECLARE_FLAG(std::string, dev_src_dir); -ABSL_DECLARE_FLAG(std::string, dev_user_host); -ABSL_DECLARE_FLAG(uint16_t, dev_ssh_port); -ABSL_DECLARE_FLAG(std::string, dev_ssh_command); -ABSL_DECLARE_FLAG(std::string, dev_scp_command); -ABSL_DECLARE_FLAG(std::string, dev_mount_dir); - -// Declare AS20 flags, so that AS30 can be used on older SDKs simply by -// replacing the binary. Note that the RETIRED_FLAGS macro can't be used -// because the flags contain dashes. This code mimics the macro. -absl::flags_internal::RetiredFlag RETIRED_FLAGS_session_ports; -absl::flags_internal::RetiredFlag RETIRED_FLAGS_gm_mount_point; -absl::flags_internal::RetiredFlag RETIRED_FLAGS_allow_edge; - -const auto RETIRED_FLAGS_REG_session_ports = - (RETIRED_FLAGS_session_ports.Retire("session-ports"), - ::absl::flags_internal::FlagRegistrarEmpty{}); -const auto RETIRED_FLAGS_REG_gm_mount_point = - (RETIRED_FLAGS_gm_mount_point.Retire("gamelet-mount-point"), - ::absl::flags_internal::FlagRegistrarEmpty{}); -const auto RETIRED_FLAGS_REG_allow_edge = - (RETIRED_FLAGS_allow_edge.Retire("allow-edge"), - ::absl::flags_internal::FlagRegistrarEmpty{}); +#include "lyra/lyra.hpp" namespace cdc_ft { +namespace { +constexpr int kDefaultVerbosity = 2; +constexpr uint32_t kDefaultManifestUpdaterThreads = 4; +constexpr uint32_t kDefaultFileChangeWaitDurationMs = 500; +} // namespace -AssetStreamConfig::AssetStreamConfig() { - session_cfg_.verbosity = absl::GetFlag(FLAGS_verbosity); - session_cfg_.fuse_debug = absl::GetFlag(FLAGS_debug); - session_cfg_.fuse_singlethreaded = absl::GetFlag(FLAGS_singlethreaded); - session_cfg_.stats = absl::GetFlag(FLAGS_stats); - session_cfg_.quiet = absl::GetFlag(FLAGS_quiet); - session_cfg_.fuse_check = absl::GetFlag(FLAGS_check); - log_to_stdout_ = absl::GetFlag(FLAGS_log_to_stdout); - session_cfg_.fuse_cache_capacity = absl::GetFlag(FLAGS_cache_capacity).Size(); - session_cfg_.fuse_cleanup_timeout_sec = absl::GetFlag(FLAGS_cleanup_timeout); - session_cfg_.fuse_access_idle_timeout_sec = - absl::GetFlag(FLAGS_access_idle_timeout); - session_cfg_.manifest_updater_threads = - absl::GetFlag(FLAGS_manifest_updater_threads); - session_cfg_.file_change_wait_duration_ms = - absl::GetFlag(FLAGS_file_change_wait_duration_ms); - - dev_src_dir_ = absl::GetFlag(FLAGS_dev_src_dir); - dev_target_.user_host = absl::GetFlag(FLAGS_dev_user_host); - dev_target_.ssh_port = absl::GetFlag(FLAGS_dev_ssh_port); - dev_target_.ssh_command = absl::GetFlag(FLAGS_dev_ssh_command); - dev_target_.scp_command = absl::GetFlag(FLAGS_dev_scp_command); - dev_target_.mount_dir = absl::GetFlag(FLAGS_dev_mount_dir); -} +AssetStreamConfig::AssetStreamConfig() = default; AssetStreamConfig::~AssetStreamConfig() = default; +void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd) { + session_cfg_.verbosity = kDefaultVerbosity; + cmd.add_argument(lyra::opt(session_cfg_.verbosity, "num") + .name("--verbosity") + .help("Verbosity of the log output, default: " + + std::to_string(kDefaultVerbosity))); + + cmd.add_argument( + lyra::opt(session_cfg_.stats) + .name("--stats") + .help("Collect and print detailed streaming statistics")); + + cmd.add_argument( + lyra::opt(session_cfg_.quiet) + .name("--quiet") + .help("Do not print any output except errors and stats")); + + session_cfg_.manifest_updater_threads = kDefaultManifestUpdaterThreads; + cmd.add_argument(lyra::opt(session_cfg_.manifest_updater_threads, "count") + .name("--manifest-updater-threads") + .help("Number of threads used to compute file hashes on " + "the workstation, default: " + + std::to_string(kDefaultManifestUpdaterThreads))); + + session_cfg_.file_change_wait_duration_ms = kDefaultFileChangeWaitDurationMs; + cmd.add_argument( + lyra::opt(session_cfg_.file_change_wait_duration_ms, "ms") + .name("--file-change-wait-duration-ms") + .help("Time in milliseconds to wait until pushing a file change " + "to the instance after detecting it, default: " + + std::to_string(kDefaultFileChangeWaitDurationMs))); + + cmd.add_argument(lyra::opt(session_cfg_.fuse_debug) + .name("--debug") + .help("Run FUSE filesystem in debug mode")); + + cmd.add_argument(lyra::opt(session_cfg_.fuse_singlethreaded) + .name("--singlethreaded") + .optional() + .help("Run FUSE filesystem in single-threaded mode")); + + cmd.add_argument(lyra::opt(session_cfg_.fuse_check) + .name("--check") + .help("Check FUSE consistency and log check results")); + + session_cfg_.fuse_cache_capacity = DiskDataStore::kDefaultCapacity; + cmd.add_argument(lyra::opt(JedecParser("--cache-capacity", + &session_cfg_.fuse_cache_capacity), + "bytes") + .name("--cache-capacity") + .help("FUSE cache capacity, default: " + + std::to_string(DiskDataStore::kDefaultCapacity) + + ". Supports common unit suffixes K, M, G.")); + + session_cfg_.fuse_cleanup_timeout_sec = DataProvider::kCleanupTimeoutSec; + cmd.add_argument( + lyra::opt(session_cfg_.fuse_cleanup_timeout_sec, "sec") + .name("--cleanup-timeout") + .help("Period in seconds at which instance cache cleanups are run, " + "default: " + + std::to_string(DataProvider::kCleanupTimeoutSec))); + + session_cfg_.fuse_access_idle_timeout_sec = DataProvider::kAccessIdleSec; + cmd.add_argument( + lyra::opt(session_cfg_.fuse_access_idle_timeout_sec, "sec") + .name("--access-idle-timeout") + .help("Do not run instance cache cleanups for this long after the " + "last file access, default: " + + std::to_string(DataProvider::kAccessIdleSec))); + + cmd.add_argument(lyra::opt(log_to_stdout_) + .name("--log-to-stdout") + .help("Log to stdout instead of to a file")); + cmd.add_argument( + lyra::opt(dev_src_dir_, "dir") + .name("--dev-src-dir") + .help("Start a streaming session immediately from the given Windows " + "path. Used during development. Must also specify other --dev " + "flags.")); + + cmd.add_argument( + lyra::opt(dev_target_.user_host, "[user@]host") + .name("--dev-user-host") + .help("Username and host to stream to. See also --dev-src-dir.")); + + dev_target_.ssh_port = RemoteUtil::kDefaultSshPort; + cmd.add_argument( + lyra::opt(dev_target_.ssh_port, "port") + .name("--dev-ssh-port") + .help("SSH port to use for the connection to the host, default: " + + std::to_string(RemoteUtil::kDefaultSshPort) + + ". See also --dev-src-dir.")); + + cmd.add_argument( + lyra::opt(dev_target_.ssh_command, "cmd") + .name("--dev-ssh-command") + .help("Ssh command and extra flags to use for the " + "connection to the host. See also --dev-src-dir.")); + + cmd.add_argument( + lyra::opt(dev_target_.scp_command, "cmd") + .name("--dev-scp-command") + .help("Scp command and extra flags to use for the " + "connection to the host. See also --dev-src-dir.")); + + cmd.add_argument( + lyra::opt(dev_target_.mount_dir, "dir") + .name("--dev-mount-dir") + .help("Directory on the host to stream to. See also --dev-src-dir.")); +} + absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { Buffer buffer; RETURN_IF_ERROR(path::ReadFile(path, &buffer)); @@ -106,31 +162,31 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { reader.getFormattedErrorMessages())); } -#define ASSIGN_VAR(var, flag, type) \ - do { \ - if (config.isMember(#flag)) { \ - var = config[#flag].as##type(); \ - flags_read_from_file_.insert(#flag); \ - } \ +#define ASSIGN_VAR(var, flag, type) \ + do { \ + if (config.isMember(flag)) { \ + var = config[flag].as##type(); \ + flags_read_from_file_.insert(flag); \ + } \ } while (0) - ASSIGN_VAR(session_cfg_.verbosity, verbosity, Int); - ASSIGN_VAR(session_cfg_.fuse_debug, debug, Bool); - ASSIGN_VAR(session_cfg_.fuse_singlethreaded, singlethreaded, Bool); - ASSIGN_VAR(session_cfg_.stats, stats, Bool); - ASSIGN_VAR(session_cfg_.quiet, quiet, Bool); - ASSIGN_VAR(session_cfg_.fuse_check, check, Bool); - ASSIGN_VAR(log_to_stdout_, log_to_stdout, Bool); - ASSIGN_VAR(session_cfg_.fuse_cleanup_timeout_sec, cleanup_timeout, Int); - ASSIGN_VAR(session_cfg_.fuse_access_idle_timeout_sec, access_idle_timeout, + ASSIGN_VAR(session_cfg_.verbosity, "verbosity", Int); + ASSIGN_VAR(session_cfg_.fuse_debug, "debug", Bool); + ASSIGN_VAR(session_cfg_.fuse_singlethreaded, "singlethreaded", Bool); + ASSIGN_VAR(session_cfg_.stats, "stats", Bool); + ASSIGN_VAR(session_cfg_.quiet, "quiet", Bool); + ASSIGN_VAR(session_cfg_.fuse_check, "check", Bool); + ASSIGN_VAR(log_to_stdout_, "log-to-stdout", Bool); + ASSIGN_VAR(session_cfg_.fuse_cleanup_timeout_sec, "cleanup-timeout", Int); + ASSIGN_VAR(session_cfg_.fuse_access_idle_timeout_sec, "access-idle-timeout", Int); - ASSIGN_VAR(session_cfg_.manifest_updater_threads, manifest_updater_threads, + ASSIGN_VAR(session_cfg_.manifest_updater_threads, "manifest-updater-threads", Int); ASSIGN_VAR(session_cfg_.file_change_wait_duration_ms, - file_change_wait_duration_ms, Int); + "file-change-wait-duration-ms", Int); // cache_capacity requires Jedec size conversion. - constexpr char kCacheCapacity[] = "cache_capacity"; + constexpr char kCacheCapacity[] = "cache-capacity"; if (config.isMember(kCacheCapacity)) { JedecSize cache_capacity; std::string error; @@ -162,25 +218,25 @@ std::string AssetStreamConfig::ToString() { ss << "quiet = " << session_cfg_.quiet << std::endl; ss << "check = " << session_cfg_.fuse_check << std::endl; - ss << "log_to_stdout = " << log_to_stdout_ << std::endl; - ss << "cache_capacity = " << session_cfg_.fuse_cache_capacity + ss << "log-to-stdout = " << log_to_stdout_ << std::endl; + ss << "cache-capacity = " << session_cfg_.fuse_cache_capacity << std::endl; - ss << "cleanup_timeout = " + ss << "cleanup-timeout = " << session_cfg_.fuse_cleanup_timeout_sec << std::endl; - ss << "access_idle_timeout = " + ss << "access-idle-timeout = " << session_cfg_.fuse_access_idle_timeout_sec << std::endl; - ss << "manifest_updater_threads = " + ss << "manifest-updater-threads = " << session_cfg_.manifest_updater_threads << std::endl; - ss << "file_change_wait_duration_ms = " + ss << "file-change-wait-duration-ms = " << session_cfg_.file_change_wait_duration_ms << std::endl; - ss << "dev_src_dir = " << dev_src_dir_ << std::endl; - ss << "dev_user_host = " << dev_target_.user_host << std::endl; - ss << "dev_ssh_port = " << dev_target_.ssh_port << std::endl; - ss << "dev_ssh_command = " << dev_target_.ssh_command + ss << "dev-src-dir = " << dev_src_dir_ << std::endl; + ss << "dev-user-host = " << dev_target_.user_host << std::endl; + ss << "dev-ssh-port = " << dev_target_.ssh_port << std::endl; + ss << "dev-ssh-command = " << dev_target_.ssh_command << std::endl; - ss << "dev_scp_command = " << dev_target_.scp_command + ss << "dev-scp-command = " << dev_target_.scp_command << std::endl; - ss << "dev_mount_dir = " << dev_target_.mount_dir << std::endl; + ss << "dev-mount-dir = " << dev_target_.mount_dir << std::endl; return ss.str(); } @@ -195,5 +251,18 @@ std::string AssetStreamConfig::GetFlagReadErrors() { error_str.empty() ? "" : "\n", flag, error); return error_str; } +std::function AssetStreamConfig::JedecParser( + const char* flag_name, uint64_t* bytes) { + return [flag_name, bytes, + error = &jedec_parse_error_](const std::string& value) { + JedecSize size; + if (AbslParseFlag(value, &size, error)) { + *bytes = size.Size(); + } else { + *error = absl::StrFormat("Failed to parse --%s=%s: %s", flag_name, value, + *error); + } + }; +} } // namespace cdc_ft diff --git a/asset_stream_manager/asset_stream_config.h b/asset_stream_manager/asset_stream_config.h index 8cdbcca..08f6256 100644 --- a/asset_stream_manager/asset_stream_config.h +++ b/asset_stream_manager/asset_stream_config.h @@ -25,6 +25,10 @@ #include "asset_stream_manager/session_config.h" #include "session.h" +namespace lyra { +class command; +} + namespace cdc_ft { // Class containing all configuration settings for asset streaming. @@ -36,6 +40,9 @@ class AssetStreamConfig { AssetStreamConfig(); ~AssetStreamConfig(); + // Registers arguments with Lyra. + void RegisterCommandLineFlags(lyra::command& cmd); + // Loads a configuration from the JSON file at |path| and overrides any config // values that are set in this file. Sample json file: // { @@ -81,7 +88,16 @@ class AssetStreamConfig { // Whether to log to a file or to stdout. bool log_to_stdout() const { return log_to_stdout_; } + // Workaround for Lyra not accepting errors from parsers. + const std::string& jedec_parse_error() const { return jedec_parse_error_; } + private: + // Jedec parser for Lyra options. Usage: + // lyra::opt(JedecParser("size-flag", &size_bytes), "bytes")) + // Sets jedec_parse_error_ on error, Lyra doesn't support errors from lambdas. + std::function JedecParser(const char* flag_name, + uint64_t* bytes); + SessionConfig session_cfg_; bool log_to_stdout_ = false; @@ -95,6 +111,9 @@ class AssetStreamConfig { // Maps flags to errors occurred while reading this flag. std::map flag_read_errors_; + + // Errors from parsing JEDEC sizes. + std::string jedec_parse_error_; }; }; // namespace cdc_ft diff --git a/asset_stream_manager/base_command.cc b/asset_stream_manager/base_command.cc new file mode 100644 index 0000000..f211cf2 --- /dev/null +++ b/asset_stream_manager/base_command.cc @@ -0,0 +1,71 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "asset_stream_manager/base_command.h" + +#include "lyra/lyra.hpp" + +namespace cdc_ft { + +BaseCommand::BaseCommand(std::string name, std::string help, int* exit_code) + : name_(name), help_(help), exit_code_(exit_code) { + assert(exit_code_); +} + +BaseCommand::~BaseCommand() = default; + +void BaseCommand::Register(lyra::cli& cli) { + lyra::command cmd(name_, + [this](const lyra::group& g) { this->CommandHandler(g); }); + cmd.help(help_); + cmd.add_argument(lyra::help(show_help_)); + + RegisterCommandLineFlags(cmd); + + // Workaround for Lyra treating --unknown_flags as positional argument. + // If this argument is not empty, it's an unknown arg. + cmd.add_argument(lyra::arg(invalid_arg_, "")); + + // Register command with CLI. + cli.add_argument(std::move(cmd)); +} + +void BaseCommand::CommandHandler(const lyra::group& g) { + // Handle -h, --help. + if (show_help_) { + std::cout << g; + *exit_code_ = 0; + return; + } + + // Handle invalid arguments. + if (!invalid_arg_.empty()) { + std::cerr << "Error: Unknown parameter '" << invalid_arg_ + << "'. Try -h for help." << std::endl; + *exit_code_ = 1; + return; + } + + // Run and print error. + absl::Status status = Run(); + if (!status.ok()) { + std::cerr << "Error: " << status.message() << std::endl; + } + + // Write status code to |exit_code_|. + static_assert(static_cast(absl::StatusCode::kOk) == 0, "kOk not 0"); + *exit_code_ = static_cast(status.code()); +} + +} // namespace cdc_ft diff --git a/asset_stream_manager/base_command.h b/asset_stream_manager/base_command.h new file mode 100644 index 0000000..6c85bff --- /dev/null +++ b/asset_stream_manager/base_command.h @@ -0,0 +1,73 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASSET_STREAM_MANAGER_BASE_COMMAND_H_ +#define ASSET_STREAM_MANAGER_BASE_COMMAND_H_ + +#include + +#include "absl/status/status.h" + +namespace lyra { +class cli; +class command; +class group; +} // namespace lyra + +namespace cdc_ft { + +// Base class for commands that wraps Lyra commands to reduce common +// boilerplate like help text display, invalid args and return values from +// command execution. +class BaseCommand { + public: + // Creates a new command with given |name| and |help| text. After the command + // ran, the status code as returned by Run() is written to |exit_code|. + BaseCommand(std::string name, std::string help, int* exit_code); + ~BaseCommand(); + + // Registers the command with Lyra. Must be called before parsing args. + void Register(lyra::cli& cli); + + protected: + // Adds all optional and required arguments used by the command. + // Called by Register(). + virtual void RegisterCommandLineFlags(lyra::command& cmd) = 0; + + // Runs the command. Called by lyra::cli::parse() when this command is + // actually triggered and all flags have been parsed successfully. + virtual absl::Status Run() = 0; + + private: + // Called by lyra::cli::parse() after successfully parsing arguments. Catches + // unknown arguments (Lyra interprets those as positional args, not as an + // error!), and displays the help text if appropriate, otherwise calls Run(). + void CommandHandler(const lyra::group& g); + + std::string name_; + std::string help_; + int* exit_code_ = nullptr; + + bool show_help_ = false; + + // Workaround for invalid args. Lyra doesn't interpret --invalid as invalid + // argument, but as positional argument "--invalid". + std::string invalid_arg_; +}; + +} // namespace cdc_ft + +#endif // ASSET_STREAM_MANAGER_BASE_COMMAND_H_ diff --git a/asset_stream_manager/main.cc b/asset_stream_manager/main.cc index ad38f56..5cb2fbb 100644 --- a/asset_stream_manager/main.cc +++ b/asset_stream_manager/main.cc @@ -12,202 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "absl/flags/flag.h" -#include "absl/flags/parse.h" -#include "absl_helper/jedec_size_flag.h" -#include "asset_stream_manager/asset_stream_config.h" -#include "asset_stream_manager/background_service_impl.h" -#include "asset_stream_manager/local_assets_stream_manager_service_impl.h" -#include "asset_stream_manager/session_management_server.h" -#include "asset_stream_manager/session_manager.h" -#include "common/grpc_status.h" -#include "common/log.h" -#include "common/path.h" -#include "common/process.h" -#include "common/status_macros.h" -#include "data_store/data_provider.h" -#include "data_store/disk_data_store.h" -#include "metrics/metrics.h" - -namespace cdc_ft { -namespace { - -constexpr int kSessionManagementPort = 44432; - -absl::Status Run(const AssetStreamConfig& cfg) { - WinProcessFactory process_factory; - metrics::MetricsService metrics_service; - - SessionManager session_manager(cfg.session_cfg(), &process_factory, - &metrics_service); - BackgroundServiceImpl background_service; - LocalAssetsStreamManagerServiceImpl session_service( - &session_manager, &process_factory, &metrics_service); - - SessionManagementServer sm_server(&session_service, &background_service, - &session_manager); - background_service.SetExitCallback( - [&sm_server]() { return sm_server.Shutdown(); }); - - if (!cfg.dev_src_dir().empty()) { - localassetsstreammanager::StartSessionRequest request; - request.set_workstation_directory(cfg.dev_src_dir()); - request.set_user_host(cfg.dev_target().user_host); - request.set_mount_dir(cfg.dev_target().mount_dir); - request.set_port(cfg.dev_target().ssh_port); - request.set_ssh_command(cfg.dev_target().ssh_command); - request.set_scp_command(cfg.dev_target().scp_command); - localassetsstreammanager::StartSessionResponse response; - RETURN_ABSL_IF_ERROR( - session_service.StartSession(nullptr, &request, &response)); - } - RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort)); - sm_server.RunUntilShutdown(); - return absl::OkStatus(); -} - -std::string GetLogPath(const char* log_dir, const char* log_base_name) { - DefaultSystemClock* clock = DefaultSystemClock::GetInstance(); - std::string timestamp_ext = clock->FormatNow(".%Y%m%d-%H%M%S.log", false); - return path::Join(log_dir, log_base_name + timestamp_ext); -} - -void InitLogging(std::string& log_dir, bool log_to_stdout, int verbosity) { - LogLevel level = cdc_ft::Log::VerbosityToLogLevel(verbosity); - if (log_to_stdout) { - cdc_ft::Log::Initialize(std::make_unique(level)); - } else { - if (path::ExpandPathVariables(&log_dir).ok() && - path::CreateDirRec(log_dir).ok()) { - cdc_ft::Log::Initialize(std::make_unique( - level, GetLogPath(log_dir.c_str(), "assets_stream_manager").c_str())); - } else { - LOG_ERROR("Failed to create log directory '%s'", log_dir); - exit(1); - } - } -} - -// Declare AS20 flags, so that AS30 can be used on older SDKs simply by -// replacing the binary. Note that the RETIRED_FLAGS macro can't be used -// because the flags contain dashes. This code mimics the macro. -absl::flags_internal::RetiredFlag RETIRED_FLAGS_port; -absl::flags_internal::RetiredFlag RETIRED_FLAGS_session_ports; -absl::flags_internal::RetiredFlag RETIRED_FLAGS_gm_mount_point; -absl::flags_internal::RetiredFlag RETIRED_FLAGS_allow_edge; -const auto RETIRED_FLAGS_REG_port = - (RETIRED_FLAGS_port.Retire("port"), - ::absl::flags_internal::FlagRegistrarEmpty{}); -const auto RETIRED_FLAGS_REG_session_ports = - (RETIRED_FLAGS_session_ports.Retire("session-ports"), - ::absl::flags_internal::FlagRegistrarEmpty{}); -const auto RETIRED_FLAGS_REG_gm_mount_point = - (RETIRED_FLAGS_gm_mount_point.Retire("gamelet-mount-point"), - ::absl::flags_internal::FlagRegistrarEmpty{}); -const auto RETIRED_FLAGS_REG_allow_edge = - (RETIRED_FLAGS_allow_edge.Retire("allow-edge"), - ::absl::flags_internal::FlagRegistrarEmpty{}); - -} // namespace -} // namespace cdc_ft - -ABSL_FLAG(int, verbosity, 2, "Verbosity of the log output"); -ABSL_FLAG(bool, debug, false, "Run FUSE filesystem in debug mode"); -ABSL_FLAG(bool, singlethreaded, false, - "Run FUSE filesystem in singlethreaded mode"); -ABSL_FLAG(bool, stats, false, - "Collect and print detailed streaming statistics"); -ABSL_FLAG(bool, quiet, false, - "Do not print any output except errors and stats"); -ABSL_FLAG(int, manifest_updater_threads, 4, - "Number of threads used to compute file hashes on the workstation."); -ABSL_FLAG(int, file_change_wait_duration_ms, 500, - "Time in milliseconds to wait until pushing a file change to the " - "instance after detecting it"); -ABSL_FLAG(bool, check, false, "Check FUSE consistency and log check results"); -ABSL_FLAG(bool, log_to_stdout, false, "Log to stdout instead of to a file"); -ABSL_FLAG(cdc_ft::JedecSize, cache_capacity, - cdc_ft::JedecSize(cdc_ft::DiskDataStore::kDefaultCapacity), - "Cache capacity. Supports common unit suffixes K, M, G"); -ABSL_FLAG(uint32_t, cleanup_timeout, cdc_ft::DataProvider::kCleanupTimeoutSec, - "Period in seconds at which instance cache cleanups are run"); -ABSL_FLAG(uint32_t, access_idle_timeout, cdc_ft::DataProvider::kAccessIdleSec, - "Do not run instance cache cleanups for this many seconds after the " - "last file access"); -ABSL_FLAG(std::string, config_file, - "%APPDATA%\\cdc-file-transfer\\assets_stream_manager.json", - "Json configuration file for asset stream manager"); -ABSL_FLAG(std::string, log_dir, "%APPDATA%\\cdc-file-transfer\\logs", - "Directory to store log files for asset stream manager"); - -// Development args. -ABSL_FLAG(std::string, dev_src_dir, "", - "Start a streaming session immediately from the given Windows path. " - "Used during development. Must also specify --dev_user_host and " - "--dev_mount_dir and possibly other --dev flags, depending on the " - "SSH setup"); -ABSL_FLAG(std::string, dev_user_host, "", - "Username and host to stream to, of the form [user@]host. Used " - "during development. See --dev_src_dir for more info."); -ABSL_FLAG(uint16_t, dev_ssh_port, cdc_ft::RemoteUtil::kDefaultSshPort, - "SSH port to use for the connection to the host. Used during " - "development. See --dev_src_dir for more info."); -ABSL_FLAG(std::string, dev_ssh_command, "", - "Ssh command and extra flags to use for the connection to the host. " - "Used during development. See --dev_src_dir for more info."); -ABSL_FLAG(std::string, dev_scp_command, "", - "Scp command and extra flags to use for the connection to the host. " - "Used during development. See --dev_src_dir for more info."); -ABSL_FLAG(std::string, dev_mount_dir, "", - "Directory on the host to stream to. Used during development. See " - "--dev_src_dir for more info."); +#include "asset_stream_manager/start_service_command.h" +#include "lyra/lyra.hpp" int main(int argc, char* argv[]) { - absl::ParseCommandLine(argc, argv); + // Set up commands. + auto cli = lyra::cli(); + bool show_help = false; + int exit_code = -1; + cli.add_argument(lyra::help(show_help)); - // Set up config. Allow overriding this config |config_file|. - cdc_ft::AssetStreamConfig cfg; - std::string config_file = absl::GetFlag(FLAGS_config_file); - absl::Status cfg_load_status = - cdc_ft::path::ExpandPathVariables(&config_file); - cfg_load_status.Update(cfg.LoadFromFile(config_file)); + cdc_ft::StartServiceCommand start_service(&exit_code); + start_service.Register(cli); - std::string log_dir = absl::GetFlag(FLAGS_log_dir); - cdc_ft::InitLogging(log_dir, cfg.log_to_stdout(), - cfg.session_cfg().verbosity); - - // Log status of loaded configuration. Errors are not critical. - if (cfg_load_status.ok()) { - LOG_INFO("Successfully loaded configuration file at '%s'", config_file); - } else if (absl::IsNotFound(cfg_load_status)) { - LOG_INFO("No configuration file found at '%s'", config_file); - } else { - LOG_ERROR("%s", cfg_load_status.message()); + // Parse args and run. Note that parse actually runs the commands. + // exit_code is -1 if no command was run. + auto result = cli.parse({argc, argv}); + if (show_help || exit_code == -1) { + std::cout << cli; + return 0; } - - std::string flags_read = cfg.GetFlagsReadFromFile(); - if (!flags_read.empty()) { - LOG_INFO( - "The following settings were read from the configuration file and " - "override the corresponding command line flags if set: %s", - flags_read); + if (!result) { + // Parse error. + std::cerr << "Error: " << result.message() << std::endl; + return 1; } + // If cli.parse() succeeds, it also runs the commands and writes |exit_code|. - std::string flag_errors = cfg.GetFlagReadErrors(); - if (!flag_errors.empty()) { - LOG_WARNING("%s", flag_errors); - } - - LOG_DEBUG("Configuration:\n%s", cfg.ToString()); - - absl::Status status = cdc_ft::Run(cfg); - if (!status.ok()) { - LOG_ERROR("%s", status.ToString()); - } else { - LOG_INFO("Asset stream manager shut down successfully."); - } - - cdc_ft::Log::Shutdown(); - static_assert(static_cast(absl::StatusCode::kOk) == 0, "kOk not 0"); - return static_cast(status.code()); + return exit_code; } diff --git a/asset_stream_manager/start_service_command.cc b/asset_stream_manager/start_service_command.cc new file mode 100644 index 0000000..14ce97b --- /dev/null +++ b/asset_stream_manager/start_service_command.cc @@ -0,0 +1,162 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "asset_stream_manager/start_service_command.h" + +#include "asset_stream_manager/background_service_impl.h" +#include "asset_stream_manager/local_assets_stream_manager_service_impl.h" +#include "asset_stream_manager/session_management_server.h" +#include "asset_stream_manager/session_manager.h" +#include "common/clock.h" +#include "common/grpc_status.h" +#include "common/log.h" +#include "common/path.h" +#include "common/process.h" +#include "common/status_macros.h" +#include "lyra/lyra.hpp" +#include "metrics/metrics.h" + +namespace cdc_ft { +namespace { + +constexpr int kSessionManagementPort = 44432; + +std::string GetLogPath(const char* log_dir, const char* log_base_name) { + DefaultSystemClock* clock = DefaultSystemClock::GetInstance(); + std::string timestamp_ext = clock->FormatNow(".%Y%m%d-%H%M%S.log", false); + return path::Join(log_dir, log_base_name + timestamp_ext); +} + +} // namespace + +StartServiceCommand::StartServiceCommand(int* exit_code) + : BaseCommand("start-service", "Start streaming service", exit_code) {} +StartServiceCommand::~StartServiceCommand() = default; + +void StartServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) { + config_file_ = "%APPDATA%\\cdc-file-transfer\\assets_stream_manager.json"; + cmd.add_argument( + lyra::opt(config_file_, "path") + .name("--config-file") + .help("Json configuration file, default: " + config_file_)); + + log_dir_ = "%APPDATA%\\cdc-file-transfer\\logs"; + cmd.add_argument( + lyra::opt(log_dir_, "dir") + .name("--log-dir") + .help("Directory to store log files, default: " + log_dir_)); + + cfg_.RegisterCommandLineFlags(cmd); +} + +absl::Status StartServiceCommand::Run() { + if (!cfg_.jedec_parse_error().empty()) { + return absl::InvalidArgumentError(cfg_.jedec_parse_error()); + } + + // Set up config. Allow overriding this config with |config_file|. + absl::Status cfg_load_status = path::ExpandPathVariables(&config_file_); + cfg_load_status.Update(cfg_.LoadFromFile(config_file_)); + + RETURN_IF_ERROR(InitLogging()); + + // Log status of loaded configuration. Errors are not critical. + if (cfg_load_status.ok()) { + LOG_INFO("Successfully loaded configuration file at '%s'", config_file_); + } else if (absl::IsNotFound(cfg_load_status)) { + LOG_INFO("No configuration file found at '%s'", config_file_); + } else { + LOG_ERROR("%s", cfg_load_status.message()); + } + + std::string flags_read = cfg_.GetFlagsReadFromFile(); + if (!flags_read.empty()) { + LOG_INFO( + "The following settings were read from the configuration file and " + "override the corresponding command line flags if set: %s", + flags_read); + } + + std::string flag_errors = cfg_.GetFlagReadErrors(); + if (!flag_errors.empty()) { + LOG_WARNING("%s", flag_errors); + } + + LOG_DEBUG("Configuration:\n%s", cfg_.ToString()); + + absl::Status status = RunService(); + if (!status.ok()) { + LOG_ERROR("%s", status.ToString()); + } else { + LOG_INFO("Asset stream manager shut down successfully."); + } + + Log::Shutdown(); + return status; +} + +absl::Status StartServiceCommand::InitLogging() { + LogLevel level = Log::VerbosityToLogLevel(cfg_.session_cfg().verbosity); + if (cfg_.log_to_stdout()) { + // Log to stdout. + Log::Initialize(std::make_unique(level)); + return absl::OkStatus(); + } + + // Log to file. + if (!path::ExpandPathVariables(&log_dir_).ok() || + !path::CreateDirRec(log_dir_).ok()) { + return absl::InvalidArgumentError( + absl::StrFormat("Failed to create log directory '%s'", log_dir_)); + } + + Log::Initialize(std::make_unique( + level, GetLogPath(log_dir_.c_str(), "assets_stream_manager").c_str())); + return absl::OkStatus(); +} + +// Runs the session management service and returns when it finishes. +absl::Status StartServiceCommand::RunService() { + WinProcessFactory process_factory; + metrics::MetricsService metrics_service; + + SessionManager session_manager(cfg_.session_cfg(), &process_factory, + &metrics_service); + BackgroundServiceImpl background_service; + LocalAssetsStreamManagerServiceImpl session_service( + &session_manager, &process_factory, &metrics_service); + + SessionManagementServer sm_server(&session_service, &background_service, + &session_manager); + background_service.SetExitCallback( + [&sm_server]() { return sm_server.Shutdown(); }); + + if (!cfg_.dev_src_dir().empty()) { + localassetsstreammanager::StartSessionRequest request; + request.set_workstation_directory(cfg_.dev_src_dir()); + request.set_user_host(cfg_.dev_target().user_host); + request.set_mount_dir(cfg_.dev_target().mount_dir); + request.set_port(cfg_.dev_target().ssh_port); + request.set_ssh_command(cfg_.dev_target().ssh_command); + request.set_scp_command(cfg_.dev_target().scp_command); + localassetsstreammanager::StartSessionResponse response; + RETURN_ABSL_IF_ERROR( + session_service.StartSession(nullptr, &request, &response)); + } + RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort)); + sm_server.RunUntilShutdown(); + return absl::OkStatus(); +} + +} // namespace cdc_ft diff --git a/asset_stream_manager/start_service_command.h b/asset_stream_manager/start_service_command.h new file mode 100644 index 0000000..533dcb4 --- /dev/null +++ b/asset_stream_manager/start_service_command.h @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASSET_STREAM_MANAGER_START_SERVICE_COMMAND_H_ +#define ASSET_STREAM_MANAGER_START_SERVICE_COMMAND_H_ + +#include "asset_stream_manager/asset_stream_config.h" +#include "asset_stream_manager/base_command.h" + +namespace cdc_ft { + +// Handler for the start-service command. Starts the asset streaming service +// and returns when the service is shut down. +class StartServiceCommand : public BaseCommand { + public: + explicit StartServiceCommand(int* exit_code); + ~StartServiceCommand(); + + // BaseCommand: + void RegisterCommandLineFlags(lyra::command& cmd) override; + absl::Status Run() override; + + private: + // Initializes LOG* logging. + // Depending on the flags, might log to console or to a file. + absl::Status InitLogging(); + + // Runs the asset streaming service. + absl::Status RunService(); + + AssetStreamConfig cfg_; + std::string config_file_; + std::string log_dir_; +}; + +} // namespace cdc_ft + +#endif // ASSET_STREAM_MANAGER_START_SERVICE_COMMAND_H_ diff --git a/third_party/lyra/BUILD.bazel b/third_party/lyra/BUILD.bazel new file mode 100644 index 0000000..4329b4e --- /dev/null +++ b/third_party/lyra/BUILD.bazel @@ -0,0 +1,35 @@ +# Description: +# This project provides Linux compatible Dirent interface for Microsoft Windows. +# + +licenses(["notice"]) # BSL-1.0 license + +exports_files(["LICENSE"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "lyra", + hdrs = [ + "include/lyra/arg.hpp", + "include/lyra/args.hpp", + "include/lyra/arguments.hpp", + "include/lyra/cli.hpp", + "include/lyra/cli_parser.hpp", + "include/lyra/command.hpp", + "include/lyra/detail", + "include/lyra/exe_name.hpp", + "include/lyra/group.hpp", + "include/lyra/help.hpp", + "include/lyra/literal.hpp", + "include/lyra/lyra.hpp", + "include/lyra/main.hpp", + "include/lyra/opt.hpp", + "include/lyra/option_style.hpp", + "include/lyra/parser.hpp", + "include/lyra/parser_result.hpp", + "include/lyra/val.hpp", + "include/lyra/version.hpp", + ], + includes = ["include"], +)