From 269fb2be45f2849250f737595852c1a30239ccec Mon Sep 17 00:00:00 2001 From: Lutz Justen Date: Fri, 18 Nov 2022 10:59:42 +0100 Subject: [PATCH] [cdc_stream] Add a CLI client to start/stop asset streaming sessions (#4) Implements the cdc_stream client and adjusts asset streaming in various places to work better outside of a GGP environment. This CL tries to get quoting for SSH commands right. It also brings back the ability to start a streaming session from asset_stream_manager. Also cleans up Bazel targets setup. Since the sln file is now in root, it is no longer necessary to prepend ../ to relative filenames to make clicking on errors work. --- NMakeBazelProject.targets | 14 +- all_files.vcxitems | 6 + asset_stream_manager/BUILD | 1 + asset_stream_manager/asset_stream_config.cc | 31 +++- asset_stream_manager/asset_stream_config.h | 35 ++-- .../asset_stream_manager.vcxproj | 3 +- asset_stream_manager/cdc_fuse_manager.cc | 47 +++--- asset_stream_manager/cdc_fuse_manager.h | 9 +- ...ocal_assets_stream_manager_service_impl.cc | 140 ++++++++++++---- ...local_assets_stream_manager_service_impl.h | 22 +++ asset_stream_manager/main.cc | 55 +++--- asset_stream_manager/multi_session.cc | 20 +-- asset_stream_manager/multi_session.h | 26 +-- asset_stream_manager/session.cc | 23 ++- asset_stream_manager/session.h | 25 ++- asset_stream_manager/session_manager.cc | 30 ++-- asset_stream_manager/session_manager.h | 37 ++-- cdc_fuse_fs/cdc_fuse_fs.vcxproj | 1 - cdc_rsync/cdc_rsync.vcxproj | 7 +- cdc_rsync/cdc_rsync_client.cc | 18 +- cdc_rsync/params.cc | 4 +- cdc_rsync_server/cdc_rsync_server.vcxproj | 1 - cdc_stream/.gitignore | 3 + cdc_stream/BUILD | 26 +++ .../local_assets_stream_manager_client.cc | 62 +++++++ .../local_assets_stream_manager_client.h | 63 +++++++ cdc_stream/main.cc | 158 ++++++++++++++++++ common/path.cc | 4 + common/path.h | 3 + common/remote_util.cc | 127 +++++++------- common/remote_util.h | 48 +++--- common/remote_util_test.cc | 59 ++++--- data_store/data_provider_test.cc | 2 - proto/local_assets_stream_manager.proto | 39 +++-- tests_asset_streaming_30/BUILD | 1 + .../tests_asset_streaming_30.vcxproj | 1 - tests_cdc_rsync/tests_cdc_rsync.vcxproj | 1 - tests_common/tests_common.vcxproj | 1 - 38 files changed, 797 insertions(+), 356 deletions(-) create mode 100644 cdc_stream/.gitignore create mode 100644 cdc_stream/BUILD create mode 100644 cdc_stream/local_assets_stream_manager_client.cc create mode 100644 cdc_stream/local_assets_stream_manager_client.h create mode 100644 cdc_stream/main.cc diff --git a/NMakeBazelProject.targets b/NMakeBazelProject.targets index 8c10aad..ea50ab9 100644 --- a/NMakeBazelProject.targets +++ b/NMakeBazelProject.targets @@ -6,8 +6,6 @@ Usage: Define 4 properties in your project file: BazelTargets: Labels of Bazel targets to build, e.g. //common:status. BazelOutputFile: Output filename, e.g. cdc_rsync.exe - BazelSourcePathPrefix: Prefix for source paths, to translate paths relative to (/foo) to project-relative paths (../foo). - Must be escaped for sed (e.g. / -> \/), e.g. ..\/..\/..\/ Optionally, define: BazelIncludePaths: Include paths, used for Intellisense. Import NMakeCMakeProject.targets. --> @@ -16,7 +14,6 @@ - @@ -27,15 +24,10 @@ k8 dbg opt - - | sed -r "s/^([^:\(]+[:\(][[:digit:]]+(,[[:digit:]]+)?[:\)])/$(BazelSourcePathPrefix)\\1/" - - 2>&1 $(BazelSedCommand) --config=$(BazelPlatform) $(BazelArgs) --linkopt=-Wl,--strip-all - $(BazelArgs) --distinct_host_configuration=false + $(BazelArgs) --copt=/GL @@ -51,9 +43,9 @@ $(SolutionDir)..\..\bazel-out\$(BazelPlatformDir)-$(BazelCompilationMode)\bin;$(BazelIncludePaths) - $(RmBazelOutDir) bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(BazelSedCommand) $(MakeRW) + $(RmBazelOutDir) bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(MakeRW) $(RmBazelOutDir) bazel clean - $(RmBazelOutDir) bazel clean && bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(BazelSedCommand) $(MakeRW) + $(RmBazelOutDir) bazel clean && bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(MakeRW) $(OutDir)$(BazelOutputFile) diff --git a/all_files.vcxitems b/all_files.vcxitems index 4dc5840..6eaceac 100644 --- a/all_files.vcxitems +++ b/all_files.vcxitems @@ -28,6 +28,7 @@ + @@ -39,6 +40,8 @@ + + @@ -150,6 +153,7 @@ + @@ -158,6 +162,7 @@ + @@ -237,6 +242,7 @@ + diff --git a/asset_stream_manager/BUILD b/asset_stream_manager/BUILD index 87c4374..4ca9c0c 100644 --- a/asset_stream_manager/BUILD +++ b/asset_stream_manager/BUILD @@ -9,6 +9,7 @@ cc_binary( deps = [ ":asset_stream_config", ":session_management_server", + "//cdc_stream", "//common:log", "//common:path", "//common:sdk_util", diff --git a/asset_stream_manager/asset_stream_config.cc b/asset_stream_manager/asset_stream_config.cc index d68d518..8f1b353 100644 --- a/asset_stream_manager/asset_stream_config.cc +++ b/asset_stream_manager/asset_stream_config.cc @@ -26,9 +26,6 @@ #include "common/status_macros.h" #include "json/json.h" -ABSL_DECLARE_FLAG(std::string, src_dir); -ABSL_DECLARE_FLAG(std::string, instance_ip); -ABSL_DECLARE_FLAG(uint16_t, instance_port); ABSL_DECLARE_FLAG(int, verbosity); ABSL_DECLARE_FLAG(bool, debug); ABSL_DECLARE_FLAG(bool, singlethreaded); @@ -42,6 +39,14 @@ ABSL_DECLARE_FLAG(uint32_t, access_idle_timeout); ABSL_DECLARE_FLAG(int, manifest_updater_threads); ABSL_DECLARE_FLAG(int, file_change_wait_duration_ms); +// Development flags. +ABSL_DECLARE_FLAG(std::string, dev_src_dir); +ABSL_DECLARE_FLAG(std::string, dev_user_host); +ABSL_DECLARE_FLAG(uint16_t, dev_ssh_port); +ABSL_DECLARE_FLAG(std::string, dev_ssh_command); +ABSL_DECLARE_FLAG(std::string, dev_scp_command); +ABSL_DECLARE_FLAG(std::string, dev_mount_dir); + // Declare AS20 flags, so that AS30 can be used on older SDKs simply by // replacing the binary. Note that the RETIRED_FLAGS macro can't be used // because the flags contain dashes. This code mimics the macro. @@ -62,9 +67,6 @@ const auto RETIRED_FLAGS_REG_allow_edge = namespace cdc_ft { AssetStreamConfig::AssetStreamConfig() { - src_dir_ = absl::GetFlag(FLAGS_src_dir); - instance_ip_ = absl::GetFlag(FLAGS_instance_ip); - instance_port_ = absl::GetFlag(FLAGS_instance_port); session_cfg_.verbosity = absl::GetFlag(FLAGS_verbosity); session_cfg_.fuse_debug = absl::GetFlag(FLAGS_debug); session_cfg_.fuse_singlethreaded = absl::GetFlag(FLAGS_singlethreaded); @@ -80,6 +82,13 @@ AssetStreamConfig::AssetStreamConfig() { absl::GetFlag(FLAGS_manifest_updater_threads); session_cfg_.file_change_wait_duration_ms = absl::GetFlag(FLAGS_file_change_wait_duration_ms); + + dev_src_dir_ = absl::GetFlag(FLAGS_dev_src_dir); + dev_target_.user_host = absl::GetFlag(FLAGS_dev_user_host); + dev_target_.ssh_port = absl::GetFlag(FLAGS_dev_ssh_port); + dev_target_.ssh_command = absl::GetFlag(FLAGS_dev_ssh_command); + dev_target_.scp_command = absl::GetFlag(FLAGS_dev_scp_command); + dev_target_.mount_dir = absl::GetFlag(FLAGS_dev_mount_dir); } AssetStreamConfig::~AssetStreamConfig() = default; @@ -105,7 +114,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { } \ } while (0) - ASSIGN_VAR(src_dir_, src_dir, String); ASSIGN_VAR(session_cfg_.verbosity, verbosity, Int); ASSIGN_VAR(session_cfg_.fuse_debug, debug, Bool); ASSIGN_VAR(session_cfg_.fuse_singlethreaded, singlethreaded, Bool); @@ -144,7 +152,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { std::string AssetStreamConfig::ToString() { std::ostringstream ss; - ss << "src_dir = " << src_dir_ << std::endl; ss << "verbosity = " << session_cfg_.verbosity << std::endl; ss << "debug = " << session_cfg_.fuse_debug @@ -166,6 +173,14 @@ std::string AssetStreamConfig::ToString() { << session_cfg_.manifest_updater_threads << std::endl; ss << "file_change_wait_duration_ms = " << session_cfg_.file_change_wait_duration_ms << std::endl; + ss << "dev_src_dir = " << dev_src_dir_ << std::endl; + ss << "dev_user_host = " << dev_target_.user_host << std::endl; + ss << "dev_ssh_port = " << dev_target_.ssh_port << std::endl; + ss << "dev_ssh_command = " << dev_target_.ssh_command + << std::endl; + ss << "dev_scp_command = " << dev_target_.scp_command + << std::endl; + ss << "dev_mount_dir = " << dev_target_.mount_dir << std::endl; return ss.str(); } diff --git a/asset_stream_manager/asset_stream_config.h b/asset_stream_manager/asset_stream_config.h index 3dd6517..8cdbcca 100644 --- a/asset_stream_manager/asset_stream_config.h +++ b/asset_stream_manager/asset_stream_config.h @@ -23,6 +23,7 @@ #include "absl/status/status.h" #include "asset_stream_manager/session_config.h" +#include "session.h" namespace cdc_ft { @@ -38,7 +39,6 @@ class AssetStreamConfig { // Loads a configuration from the JSON file at |path| and overrides any config // values that are set in this file. Sample json file: // { - // "src_dir":"C:\\path\\to\\assets", // "verbosity":3, // "debug":0, // "singlethreaded":0, @@ -67,34 +67,29 @@ class AssetStreamConfig { // read from the JSON file. std::string GetFlagReadErrors(); - // Workstation directory to stream. Should usually be empty since mounts are - // triggered by the CLI or the partner portal via a gRPC call, but useful - // during development. - const std::string& src_dir() const { return src_dir_; } - - // IP address of the instance to stream to. Should usually be empty since - // mounts are triggered by the CLI or the partner portal via a gRPC call, but - // useful during development. - const std::string& instance_ip() const { return instance_ip_; } - - // IP address of the instance to stream to. Should usually be unset (0) since - // mounts are triggered by the CLI or the partner portal via a gRPC call, but - // useful during development. - const uint16_t instance_port() const { return instance_port_; } - // Session configuration. - const SessionConfig session_cfg() const { return session_cfg_; } + const SessionConfig& session_cfg() const { return session_cfg_; } + + // Workstation directory to be streamed. Used for development purposes only + // to start a session right away when the service starts up. See dev CLI args. + const std::string& dev_src_dir() const { return dev_src_dir_; } + + // Session target. Used for development purposes only to start a session right + // away when the service starts up. See dev CLI args. + const SessionTarget& dev_target() const { return dev_target_; } // Whether to log to a file or to stdout. bool log_to_stdout() const { return log_to_stdout_; } private: - std::string src_dir_; - std::string instance_ip_; - uint16_t instance_port_ = 0; SessionConfig session_cfg_; bool log_to_stdout_ = false; + // Configuration used for development. Allows users to specify a session + // via the service's command line. + std::string dev_src_dir_; + SessionTarget dev_target_; + // Use a set, so the flags are sorted alphabetically. std::set flags_read_from_file_; diff --git a/asset_stream_manager/asset_stream_manager.vcxproj b/asset_stream_manager/asset_stream_manager.vcxproj index e659952..1f1a099 100644 --- a/asset_stream_manager/asset_stream_manager.vcxproj +++ b/asset_stream_manager/asset_stream_manager.vcxproj @@ -66,10 +66,9 @@ - //asset_stream_manager + //asset_stream_manager //cdc_stream asset_stream_manager.exe ..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath) - ..\/ diff --git a/asset_stream_manager/cdc_fuse_manager.cc b/asset_stream_manager/cdc_fuse_manager.cc index e4cd4cc..29c9ba0 100644 --- a/asset_stream_manager/cdc_fuse_manager.cc +++ b/asset_stream_manager/cdc_fuse_manager.cc @@ -29,13 +29,10 @@ namespace { constexpr char kFuseFilename[] = "cdc_fuse_fs"; constexpr char kLibFuseFilename[] = "libfuse.so"; constexpr char kFuseStdoutPrefix[] = "cdc_fuse_fs_stdout"; -constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc_file_transfer/"; - -// Mount point for FUSE on the gamelet. -constexpr char kMountDir[] = "/mnt/workstation"; +constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc-file-transfer/bin/"; // Cache directory on the gamelet to store data chunks. -constexpr char kCacheDir[] = "/var/cache/asset_streaming"; +constexpr char kCacheDir[] = "~/.cache/cdc-file-transfer/chunks"; } // namespace @@ -56,23 +53,14 @@ absl::Status CdcFuseManager::Deploy() { std::string exe_dir; RETURN_IF_ERROR(path::GetExeDir(&exe_dir), "Failed to get exe directory"); - std::string local_exe_path = path::Join(exe_dir, kFuseFilename); - std::string local_lib_path = path::Join(exe_dir, kLibFuseFilename); + // Set the cwd to the exe dir and pass the filenames to scp. Otherwise, some + // scp implementations can get confused and create the wrong remote filenames. + path::SetCwd(exe_dir); -#ifdef _DEBUG - // Sync FUSE to the gamelet in debug. Debug builds are rather large, so - // there's a gain from using sync. - LOG_DEBUG("Syncing FUSE"); - RETURN_IF_ERROR( - remote_util_->Sync({local_exe_path, local_lib_path}, kRemoteToolsBinDir), - "Failed to sync FUSE to gamelet"); - LOG_DEBUG("Syncing FUSE succeeded"); -#else - // Copy FUSE to the gamelet. This is usually faster in production since it - // doesn't have to deploy ggp__server first. + // Copy FUSE to the gamelet. LOG_DEBUG("Copying FUSE"); - RETURN_IF_ERROR(remote_util_->Scp({local_exe_path, local_lib_path}, - kRemoteToolsBinDir, true), + RETURN_IF_ERROR(remote_util_->Scp({kFuseFilename, kLibFuseFilename}, + kRemoteToolsBinDir, /*compress=*/false), "Failed to copy FUSE to gamelet"); LOG_DEBUG("Copying FUSE succeeded"); @@ -82,12 +70,12 @@ absl::Status CdcFuseManager::Deploy() { RETURN_IF_ERROR(remote_util_->Chmod("a+x", remotePath), "Failed to set executable flag on FUSE"); LOG_DEBUG("Making FUSE succeeded"); -#endif return absl::OkStatus(); } -absl::Status CdcFuseManager::Start(uint16_t local_port, uint16_t remote_port, +absl::Status CdcFuseManager::Start(const std::string& mount_dir, + uint16_t local_port, uint16_t remote_port, int verbosity, bool debug, bool singlethreaded, bool enable_stats, bool check, uint64_t cache_capacity, @@ -115,15 +103,18 @@ absl::Status CdcFuseManager::Start(uint16_t local_port, uint16_t remote_port, // Build the remote command. std::string remotePath = path::JoinUnix(kRemoteToolsBinDir, kFuseFilename); std::string remote_command = absl::StrFormat( - "LD_LIBRARY_PATH=%s %s --instance='%s' " - "--components='%s' --port=%i --cache_dir=%s " + "mkdir -p %s; LD_LIBRARY_PATH=%s %s " + "--instance=%s " + "--components=%s --port=%i --cache_dir=%s " "--verbosity=%i --cleanup_timeout=%i --access_idle_timeout=%i --stats=%i " "--check=%i --cache_capacity=%u -- -o allow_root -o ro -o nonempty -o " "auto_unmount %s%s%s", - kRemoteToolsBinDir, remotePath, instance_, component_args, remote_port, - kCacheDir, verbosity, cleanup_timeout_sec, access_idle_timeout_sec, - enable_stats, check, cache_capacity, kMountDir, debug ? " -d" : "", - singlethreaded ? " -s" : ""); + kRemoteToolsBinDir, kRemoteToolsBinDir, remotePath, + RemoteUtil::QuoteForSsh(instance_), + RemoteUtil::QuoteForSsh(component_args), remote_port, kCacheDir, + verbosity, cleanup_timeout_sec, access_idle_timeout_sec, enable_stats, + check, cache_capacity, RemoteUtil::QuoteForSsh(mount_dir), + debug ? " -d" : "", singlethreaded ? " -s" : ""); bool needs_deploy = false; RETURN_IF_ERROR( diff --git a/asset_stream_manager/cdc_fuse_manager.h b/asset_stream_manager/cdc_fuse_manager.h index f629da9..26be602 100644 --- a/asset_stream_manager/cdc_fuse_manager.h +++ b/asset_stream_manager/cdc_fuse_manager.h @@ -40,6 +40,7 @@ class CdcFuseManager { // |remote_port| to the workstation's |local_port|. Deploys the binary if // necessary. // + // |mount_dir| is the remote directory where to mount the FUSE. // |verbosity| is the log verbosity used by the filesystem. // |debug| puts the filesystem into debug mode if set to true. This also // causes the process to run in the foreground, so that logs are piped through @@ -51,10 +52,10 @@ class CdcFuseManager { // |cleanup_timeout_sec| defines the data provider cleanup timeout in seconds. // |access_idle_timeout_sec| defines the number of seconds after which data // provider is considered to be access-idling. - absl::Status Start(uint16_t local_port, uint16_t remote_port, int verbosity, - bool debug, bool singlethreaded, bool enable_stats, - bool check, uint64_t cache_capacity, - uint32_t cleanup_timeout_sec, + absl::Status Start(const std::string& mount_dir, uint16_t local_port, + uint16_t remote_port, int verbosity, bool debug, + bool singlethreaded, bool enable_stats, bool check, + uint64_t cache_capacity, uint32_t cleanup_timeout_sec, uint32_t access_idle_timeout_sec); // Stops the CDC FUSE. diff --git a/asset_stream_manager/local_assets_stream_manager_service_impl.cc b/asset_stream_manager/local_assets_stream_manager_service_impl.cc index 86ab715..7f62507 100644 --- a/asset_stream_manager/local_assets_stream_manager_service_impl.cc +++ b/asset_stream_manager/local_assets_stream_manager_service_impl.cc @@ -17,6 +17,7 @@ #include #include "absl/strings/str_format.h" +#include "absl/strings/str_replace.h" #include "absl/strings/str_split.h" #include "asset_stream_manager/multi_session.h" #include "asset_stream_manager/session_manager.h" @@ -26,11 +27,21 @@ #include "common/process.h" #include "common/sdk_util.h" #include "common/status.h" +#include "google/protobuf/text_format.h" #include "manifest/manifest_updater.h" +using TextFormat = google::protobuf::TextFormat; + namespace cdc_ft { namespace { +std::string RequestToString(const google::protobuf::Message& request) { + std::string str; + google::protobuf::TextFormat::PrintToString(request, &str); + if (!str.empty() && str.back() == '\n') str.pop_back(); + return absl::StrReplaceAll(str, {{"\n", ", "}}); +} + // Parses |instance_name| of the form // "organizations/{org-id}/projects/{proj-id}/pools/{pool-id}/gamelets/{gamelet-id}" // into parts. The pool id is not returned. @@ -102,48 +113,18 @@ LocalAssetsStreamManagerServiceImpl::~LocalAssetsStreamManagerServiceImpl() = grpc::Status LocalAssetsStreamManagerServiceImpl::StartSession( grpc::ServerContext* /*context*/, const StartSessionRequest* request, StartSessionResponse* /*response*/) { - LOG_INFO("RPC:StartSession(gamelet_name='%s', workstation_directory='%s'", - request->gamelet_name(), request->workstation_directory()); + LOG_INFO("RPC:StartSession(%s)", RequestToString(*request)); - metrics::DeveloperLogEvent evt; - evt.as_manager_data = std::make_unique(); - evt.as_manager_data->session_start_data = - std::make_unique(); - evt.as_manager_data->session_start_data->absl_status = absl::StatusCode::kOk; - evt.as_manager_data->session_start_data->status = - metrics::SessionStartStatus::kOk; - evt.as_manager_data->session_start_data->origin = - ConvertOrigin(request->origin()); - - // Parse instance/project/org id. - absl::Status status; MultiSession* ms = nullptr; - std::string instance_id, project_id, organization_id, instance_ip; - uint16_t instance_port = 0; - if (!ParseInstanceName(request->gamelet_name(), &instance_id, &project_id, - &organization_id)) { - status = absl::InvalidArgumentError(absl::StrFormat( - "Failed to parse instance name '%s'", request->gamelet_name())); - } else { - evt.project_id = project_id; - evt.organization_id = organization_id; - - status = InitSsh(instance_id, project_id, organization_id, &instance_ip, - &instance_port); - - if (status.ok()) { - status = session_manager_->StartSession( - instance_id, project_id, organization_id, instance_ip, instance_port, - request->workstation_directory(), &ms, - &evt.as_manager_data->session_start_data->status); - } - } + metrics::DeveloperLogEvent evt; + std::string instance_id; + absl::Status status = StartSessionInternal(request, &instance_id, &ms, &evt); evt.as_manager_data->session_start_data->absl_status = status.code(); if (ms) { evt.as_manager_data->session_start_data->concurrent_session_count = ms->GetSessionCount(); - if (!instance_id.empty() && ms->HasSessionForInstance(instance_id)) { + if (!instance_id.empty() && ms->HasSession(instance_id)) { ms->RecordSessionEvent(std::move(evt), metrics::EventType::kSessionStart, instance_id); } else { @@ -166,9 +147,14 @@ grpc::Status LocalAssetsStreamManagerServiceImpl::StartSession( grpc::Status LocalAssetsStreamManagerServiceImpl::StopSession( grpc::ServerContext* /*context*/, const StopSessionRequest* request, StopSessionResponse* /*response*/) { - LOG_INFO("RPC:StopSession(gamelet_id='%s')", request->gamelet_id()); + LOG_INFO("RPC:StopSession(%s)", RequestToString(*request)); - absl::Status status = session_manager_->StopSession(request->gamelet_id()); + std::string instance_id = + !request->gamelet_id().empty() // Stadia use case + ? request->gamelet_id() + : absl::StrCat(request->user_host(), ":", request->mount_dir()); + + absl::Status status = session_manager_->StopSession(instance_id); if (status.ok()) { LOG_INFO("StopSession() succeeded"); } else { @@ -177,6 +163,86 @@ grpc::Status LocalAssetsStreamManagerServiceImpl::StopSession( return ToGrpcStatus(status); } +absl::Status LocalAssetsStreamManagerServiceImpl::StartSessionInternal( + const StartSessionRequest* request, std::string* instance_id, + MultiSession** ms, metrics::DeveloperLogEvent* evt) { + instance_id->clear(); + *ms = nullptr; + evt->as_manager_data = std::make_unique(); + evt->as_manager_data->session_start_data = + std::make_unique(); + evt->as_manager_data->session_start_data->absl_status = absl::StatusCode::kOk; + evt->as_manager_data->session_start_data->status = + metrics::SessionStartStatus::kOk; + evt->as_manager_data->session_start_data->origin = + ConvertOrigin(request->origin()); + + if (!(request->gamelet_name().empty() ^ request->user_host().empty())) { + return absl::InvalidArgumentError( + "Must set either gamelet_name or user_host."); + } + + if (request->mount_dir().empty()) { + return absl::InvalidArgumentError("mount_dir cannot be empty."); + } + + SessionTarget target; + if (!request->gamelet_name().empty()) { + ASSIGN_OR_RETURN(target, + GetTargetForStadia(*request, instance_id, &evt->project_id, + &evt->organization_id)); + } else { + target = GetTarget(*request, instance_id); + } + + return session_manager_->StartSession( + *instance_id, request->workstation_directory(), target, evt->project_id, + evt->organization_id, ms, + &evt->as_manager_data->session_start_data->status); +} + +absl::StatusOr +LocalAssetsStreamManagerServiceImpl::GetTargetForStadia( + const StartSessionRequest& request, std::string* instance_id, + std::string* project_id, std::string* organization_id) { + SessionTarget target; + target.mount_dir = request.mount_dir(); + target.ssh_command = request.ssh_command(); + target.scp_command = request.scp_command(); + + // Parse instance/project/org id. + if (!ParseInstanceName(request.gamelet_name(), instance_id, project_id, + organization_id)) { + return absl::InvalidArgumentError(absl::StrFormat( + "Failed to parse instance name '%s'", request.gamelet_name())); + } + + // Run 'ggp ssh init' to determine IP (host) and port. + std::string instance_ip; + uint16_t instance_port = 0; + RETURN_IF_ERROR(InitSsh(*instance_id, *project_id, *organization_id, + &instance_ip, &instance_port)); + + target.user_host = "cloudcast@" + instance_ip; + target.ssh_port = instance_port; + return target; +} + +SessionTarget LocalAssetsStreamManagerServiceImpl::GetTarget( + const StartSessionRequest& request, std::string* instance_id) { + SessionTarget target; + target.user_host = request.user_host(); + target.mount_dir = request.mount_dir(); + target.ssh_command = request.ssh_command(); + target.scp_command = request.scp_command(); + target.ssh_port = request.port() > 0 && request.port() <= UINT16_MAX + ? static_cast(request.port()) + : RemoteUtil::kDefaultSshPort; + + *instance_id = absl::StrCat(target.user_host, ":", target.mount_dir); + return target; +} + metrics::RequestOrigin LocalAssetsStreamManagerServiceImpl::ConvertOrigin( StartSessionRequestOrigin origin) const { switch (origin) { diff --git a/asset_stream_manager/local_assets_stream_manager_service_impl.h b/asset_stream_manager/local_assets_stream_manager_service_impl.h index 3f5c6d5..e22726b 100644 --- a/asset_stream_manager/local_assets_stream_manager_service_impl.h +++ b/asset_stream_manager/local_assets_stream_manager_service_impl.h @@ -19,6 +19,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "asset_stream_manager/session.h" #include "asset_stream_manager/session_config.h" #include "metrics/metrics.h" #include "proto/local_assets_stream_manager.grpc.pb.h" @@ -66,6 +67,27 @@ class LocalAssetsStreamManagerServiceImpl final StopSessionResponse* response) override; private: + // Internal implementation of StartSession(). Returns the unique session + // identifier |instance_id|, the created or retrieved MultiSession |ms| as + // well as the filled metrics event |evt|. + absl::Status LocalAssetsStreamManagerServiceImpl::StartSessionInternal( + const StartSessionRequest* request, std::string* instance_id, + MultiSession** ms, metrics::DeveloperLogEvent* evt); + + // Stadia-specific: Returns a SessionTarget from a gamelet name and fills in + // the gamelet's |instance_id|, |project_id| and |organization_id|. + // Used if request.gamelet_name() is set. + // Fails if the gamelet name fails to parse or if ggp ssh init fails. + absl::StatusOr GetTargetForStadia( + const StartSessionRequest& request, std::string* instance_id, + std::string* project_id, std::string* organization_id); + + // Returns a SessionTarget from the corresponding fields in |request|. + // |instance_id| is set to [user@]host:mount_dir. + // Used if request.gamelet_name() is not set. + SessionTarget GetTarget(const StartSessionRequest& request, + std::string* instance_id); + // Convert StartSessionRequest enum to metrics enum. metrics::RequestOrigin ConvertOrigin(StartSessionRequestOrigin origin) const; diff --git a/asset_stream_manager/main.cc b/asset_stream_manager/main.cc index 8fc4f3c..7e4d74e 100644 --- a/asset_stream_manager/main.cc +++ b/asset_stream_manager/main.cc @@ -20,6 +20,7 @@ #include "asset_stream_manager/local_assets_stream_manager_service_impl.h" #include "asset_stream_manager/session_management_server.h" #include "asset_stream_manager/session_manager.h" +#include "common/grpc_status.h" #include "common/log.h" #include "common/path.h" #include "common/process.h" @@ -49,15 +50,19 @@ absl::Status Run(const AssetStreamConfig& cfg) { background_service.SetExitCallback( [&sm_server]() { return sm_server.Shutdown(); }); - RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort)); - if (!cfg.src_dir().empty()) { - MultiSession* ms_unused; - metrics::SessionStartStatus status_unused; - RETURN_IF_ERROR(session_manager.StartSession( - /*instance_id=*/cfg.instance_ip(), /*project_id=*/std::string(), - /*organization_id=*/std::string(), cfg.instance_ip(), - cfg.instance_port(), cfg.src_dir(), &ms_unused, &status_unused)); + if (!cfg.dev_src_dir().empty()) { + localassetsstreammanager::StartSessionRequest request; + request.set_workstation_directory(cfg.dev_src_dir()); + request.set_user_host(cfg.dev_target().user_host); + request.set_mount_dir(cfg.dev_target().mount_dir); + request.set_port(cfg.dev_target().ssh_port); + request.set_ssh_command(cfg.dev_target().ssh_command); + request.set_scp_command(cfg.dev_target().scp_command); + localassetsstreammanager::StartSessionResponse response; + RETURN_ABSL_IF_ERROR( + session_service.StartSession(nullptr, &request, &response)); } + RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort)); sm_server.RunUntilShutdown(); return absl::OkStatus(); } @@ -96,18 +101,6 @@ const auto RETIRED_FLAGS_REG_allow_edge = } // namespace } // namespace cdc_ft -ABSL_FLAG(std::string, src_dir, "", - "Start a streaming session immediately from the given Windows path. " - "Used during development. Must have exactly one gamelet reserved or " - "specify the target gamelet with --instance."); -ABSL_FLAG(std::string, instance_ip, "", - "Connect to the instance with the given IP address for this session. " - "This flag is ignored unless --src_dir is set as well. Used " - "during development. "); -ABSL_FLAG(uint16_t, instance_port, 0, - "Connect to the instance through the given SSH port. " - "This flag is ignored unless --src_dir is set as well. Used " - "during development. "); ABSL_FLAG(int, verbosity, 2, "Verbosity of the log output"); ABSL_FLAG(bool, debug, false, "Run FUSE filesystem in debug mode"); ABSL_FLAG(bool, singlethreaded, false, @@ -132,6 +125,28 @@ ABSL_FLAG(uint32_t, access_idle_timeout, cdc_ft::DataProvider::kAccessIdleSec, "Do not run instance cache cleanups for this many seconds after the " "last file access"); +// Development args. +ABSL_FLAG(std::string, dev_src_dir, "", + "Start a streaming session immediately from the given Windows path. " + "Used during development. Must also specify --dev_user_host and " + "--dev_mount_dir and possibly other --dev flags, depending on the " + "SSH setup"); +ABSL_FLAG(std::string, dev_user_host, "", + "Username and host to stream to, of the form [user@]host. Used " + "during development. See --dev_src_dir for more info."); +ABSL_FLAG(uint16_t, dev_ssh_port, cdc_ft::RemoteUtil::kDefaultSshPort, + "SSH port to use for the connection to the host. Used during " + "development. See --dev_src_dir for more info."); +ABSL_FLAG(std::string, dev_ssh_command, "", + "Ssh command and extra flags to use for the connection to the host. " + "Used during development. See --dev_src_dir for more info."); +ABSL_FLAG(std::string, dev_scp_command, "", + "Scp command and extra flags to use for the connection to the host. " + "Used during development. See --dev_src_dir for more info."); +ABSL_FLAG(std::string, dev_mount_dir, "", + "Directory on the host to stream to. Used during development. See " + "--dev_src_dir for more info."); + int main(int argc, char* argv[]) { absl::ParseCommandLine(argc, argv); diff --git a/asset_stream_manager/multi_session.cc b/asset_stream_manager/multi_session.cc index c46acee..9493cab 100644 --- a/asset_stream_manager/multi_session.cc +++ b/asset_stream_manager/multi_session.cc @@ -478,7 +478,7 @@ absl::Status MultiSession::Shutdown() { while (!sessions_.empty()) { std::string instance_id = sessions_.begin()->first; RETURN_IF_ERROR(StopSession(instance_id), - "Failed to stop session for instance id %s", instance_id); + "Failed to stop session for instance id '%s'", instance_id); sessions_.erase(instance_id); } @@ -499,10 +499,9 @@ absl::Status MultiSession::Status() { } absl::Status MultiSession::StartSession(const std::string& instance_id, + const SessionTarget& target, const std::string& project_id, - const std::string& organization_id, - const std::string& instance_ip, - uint16_t instance_port) { + const std::string& organization_id) { absl::MutexLock lock(&sessions_mutex_); if (sessions_.find(instance_id) != sessions_.end()) { @@ -523,9 +522,8 @@ absl::Status MultiSession::StartSession(const std::string& instance_id, metrics_recorder_->GetMetricsService(), metrics_recorder_->MultiSessionId(), project_id, organization_id); - auto session = - std::make_unique(instance_id, instance_ip, instance_port, cfg_, - process_factory_, std::move(metrics_recorder)); + auto session = std::make_unique( + instance_id, target, cfg_, process_factory_, std::move(metrics_recorder)); RETURN_IF_ERROR(session->Start(local_asset_stream_port_, kAssetStreamPortFirst, kAssetStreamPortLast)); @@ -552,7 +550,7 @@ absl::Status MultiSession::StopSession(const std::string& instance_id) { return absl::OkStatus(); } -bool MultiSession::HasSessionForInstance(const std::string& instance_id) { +bool MultiSession::HasSession(const std::string& instance_id) { absl::ReaderMutexLock lock(&sessions_mutex_); return sessions_.find(instance_id) != sessions_.end(); } @@ -663,19 +661,19 @@ void MultiSession::OnContentSent(size_t byte_count, size_t chunck_count, std::string instance_id) { if (instance_id.empty()) { // |instance_id| is empty only in case when manifest wasn't acknowledged by - // the gamelet yet (ConfigStreamServiceImpl::AckManifestIdReceived was not + // the instance yet (ConfigStreamServiceImpl::AckManifestIdReceived was not // invoked). This means MultiSession::StartSession is still waiting for // manifest acknowledge and |sessions_mutex_| is currently locked. In this // case invoking MultiSession::FindSession and waiting for |sessions_mutex_| // to get unlocked will block the current thread, which is also responsible // for receiving a call at ConfigStreamServiceImpl::AckManifestIdReceived. // This causes a deadlock and leads to a DeadlineExceeded error. - LOG_WARNING("Can not record session content for an empty instance_id."); + LOG_WARNING("Cannot record session content for an empty instance_id."); return; } Session* session = FindSession(instance_id); if (session == nullptr) { - LOG_WARNING("Failed to find active session by instrance id: %s", + LOG_WARNING("Failed to find active session by instance id '%s'", instance_id); return; } diff --git a/asset_stream_manager/multi_session.h b/asset_stream_manager/multi_session.h index 673f0e7..52a8f76 100644 --- a/asset_stream_manager/multi_session.h +++ b/asset_stream_manager/multi_session.h @@ -36,6 +36,7 @@ namespace cdc_ft { class ProcessFactory; class Session; +struct SessionTarget; using ManifestUpdatedCb = std::function; // Updates the manifest and runs a file watcher in a background thread. @@ -65,7 +66,7 @@ class MultiSessionRunner { // Stops updating the manifest and |server_|. absl::Status Shutdown() ABSL_LOCKS_EXCLUDED(mutex_); - // Waits until a manifest is ready and the gamelet |instance_id| has + // Waits until a manifest is ready and the session for |instance_id| has // acknowledged the reception of the currently set manifest id. |fuse_timeout| // is the timeout for waiting for the FUSE manifest ack. The time required to // generate the manifest is not part of this timeout as this could take a @@ -172,30 +173,29 @@ class MultiSession { // Not thread-safe. absl::Status Status(); - // Starts a new streaming session to the instance with given |instance_id| and + // Starts a new streaming session to the instance described by |target| and // waits until the FUSE has received the initial manifest id. // Returns an error if a session for that instance already exists. - // |instance_id| is the instance id of the target remote instance. - // |project_id| is id of the project that contains the instance. - // |organization_id| is id of the organization that contains the instance. - // |instance_ip| is the IP address of the instance. - // |instance_port| is the SSH port for connecting to the remote instance. + // |instance_id| is a unique id for the remote instance and mount directory, + // e.g. user@host:mount_dir. + // |target| identifies the remote target and how to connect to it. + // |project_id| is the project that owns the instance. Stadia only. + // |organization_id| is organization that contains the instance. Stadia only. // Thread-safe. absl::Status StartSession(const std::string& instance_id, + const SessionTarget& target, const std::string& project_id, - const std::string& organization_id, - const std::string& instance_ip, - uint16_t instance_port) + const std::string& organization_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); - // Starts a new streaming session to the gamelet with given |instance_id|. + // Stops the session for the given |instance_id|. // Returns a NotFound error if a session for that instance does not exists. // Thread-safe. absl::Status StopSession(const std::string& instance_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); // Returns true if there is an existing session for |instance_id|. - bool HasSessionForInstance(const std::string& instance_id) + bool HasSession(const std::string& instance_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); // Returns true if the FUSE process is up and running for an existing session @@ -226,7 +226,7 @@ class MultiSession { void RecordMultiSessionEvent(metrics::DeveloperLogEvent event, metrics::EventType code); - // Record an event for a session associated with the |instance|. + // Record an event for a session associated with the |instance_id|. void RecordSessionEvent(metrics::DeveloperLogEvent event, metrics::EventType code, const std::string& instance_id); diff --git a/asset_stream_manager/session.cc b/asset_stream_manager/session.cc index 77dcab8..ae0ca73 100644 --- a/asset_stream_manager/session.cc +++ b/asset_stream_manager/session.cc @@ -26,7 +26,7 @@ namespace cdc_ft { namespace { // Timeout for initial gamelet connection. -constexpr double kInstanceConnectionTimeoutSec = 10.0f; +constexpr double kInstanceConnectionTimeoutSec = 60.0f; metrics::DeveloperLogEvent GetEventWithHeartBeatData(size_t bytes, size_t chunks) { @@ -40,18 +40,24 @@ metrics::DeveloperLogEvent GetEventWithHeartBeatData(size_t bytes, } // namespace -Session::Session(std::string instance_id, std::string instance_ip, - uint16_t instance_port, SessionConfig cfg, - ProcessFactory* process_factory, +Session::Session(std::string instance_id, const SessionTarget& target, + SessionConfig cfg, ProcessFactory* process_factory, std::unique_ptr metrics_recorder) : instance_id_(std::move(instance_id)), + mount_dir_(target.mount_dir), cfg_(std::move(cfg)), process_factory_(process_factory), remote_util_(cfg_.verbosity, cfg_.quiet, process_factory, /*forward_output_to_logging=*/true), metrics_recorder_(std::move(metrics_recorder)) { assert(metrics_recorder_); - remote_util_.SetUserHostAndPort(instance_ip, instance_port); + remote_util_.SetUserHostAndPort(target.user_host, target.ssh_port); + if (!target.ssh_command.empty()) { + remote_util_.SetSshCommand(target.ssh_command); + } + if (!target.scp_command.empty()) { + remote_util_.SetScpCommand(target.scp_command); + } } Session::~Session() { @@ -80,9 +86,10 @@ absl::Status Session::Start(int local_port, int first_remote_port, fuse_ = std::make_unique(instance_id_, process_factory_, &remote_util_); RETURN_IF_ERROR( - fuse_->Start(local_port, remote_port, cfg_.verbosity, cfg_.fuse_debug, - cfg_.fuse_singlethreaded, cfg_.stats, cfg_.fuse_check, - cfg_.fuse_cache_capacity, cfg_.fuse_cleanup_timeout_sec, + fuse_->Start(mount_dir_, local_port, remote_port, cfg_.verbosity, + cfg_.fuse_debug, cfg_.fuse_singlethreaded, cfg_.stats, + cfg_.fuse_check, cfg_.fuse_cache_capacity, + cfg_.fuse_cleanup_timeout_sec, cfg_.fuse_access_idle_timeout_sec), "Failed to start instance component"); return absl::OkStatus(); diff --git a/asset_stream_manager/session.h b/asset_stream_manager/session.h index f6d57dc..424c795 100644 --- a/asset_stream_manager/session.h +++ b/asset_stream_manager/session.h @@ -32,17 +32,29 @@ class CdcFuseManager; class ProcessFactory; class Process; -// Manages the connection of a workstation to a single gamelet. +// Defines a remote target and how to connect to it. +struct SessionTarget { + // SSH username and hostname of the remote target, formed as [user@]host. + std::string user_host; + // Port to use for SSH connections to the remote target. + uint16_t ssh_port; + // Ssh command to use to connect to the remote target. + std::string ssh_command; + // Scp command to use to copy files to the remote target. + std::string scp_command; + // Directory on the remote target where to mount the streamed directory. + std::string mount_dir; +}; + +// Manages the connection of a workstation to a single remote instance. class Session { public: // |instance_id| is a unique id for the remote instance. - // |instance_ip| is the IP address of the remote instance. - // |instance_port| is the SSH tunnel port for connecting to the instance. + // |target| identifies the remote target and how to connect to it. // |cfg| contains generic configuration parameters for the session. // |process_factory| abstracts process creation. - Session(std::string instance_id, std::string instance_ip, - uint16_t instance_port, SessionConfig cfg, - ProcessFactory* process_factory, + Session(std::string instance_id, const SessionTarget& target, + SessionConfig cfg, ProcessFactory* process_factory, std::unique_ptr metrics_recorder); ~Session(); @@ -71,6 +83,7 @@ class Session { private: const std::string instance_id_; + const std::string mount_dir_; const SessionConfig cfg_; ProcessFactory* const process_factory_; diff --git a/asset_stream_manager/session_manager.cc b/asset_stream_manager/session_manager.cc index 66829b4..6e10001 100644 --- a/asset_stream_manager/session_manager.cc +++ b/asset_stream_manager/session_manager.cc @@ -59,10 +59,10 @@ absl::Status SessionManager::Shutdown() { } absl::Status SessionManager::StartSession( - const std::string& instance_id, const std::string& project_id, - const std::string& organization_id, const std::string& instance_ip, - uint16_t instance_port, const std::string& src_dir, - MultiSession** multi_session, metrics::SessionStartStatus* metrics_status) { + const std::string& instance_id, const std::string& src_dir, + const SessionTarget& target, const std::string& project_id, + const std::string& organization_id, MultiSession** multi_session, + metrics::SessionStartStatus* metrics_status) { *multi_session = nullptr; *metrics_status = metrics::SessionStartStatus::kOk; @@ -83,7 +83,7 @@ absl::Status SessionManager::StartSession( // Early out if we are streaming the workstation dir to the given gamelet. MultiSession* ms = GetMultiSession(src_dir); *multi_session = ms; - if (ms && ms->HasSessionForInstance(instance_id)) { + if (ms && ms->HasSession(instance_id)) { if (ms->IsSessionHealthy(instance_id)) { LOG_INFO("Reusing existing session"); return absl::OkStatus(); @@ -95,8 +95,8 @@ absl::Status SessionManager::StartSession( // We could also fall through, but this might restart the MultiSession. status = ms->StopSession(instance_id); if (status.ok()) { - status = ms->StartSession(instance_id, project_id, organization_id, - instance_ip, instance_port); + status = + ms->StartSession(instance_id, target, project_id, organization_id); } if (!status.ok()) { *metrics_status = metrics::SessionStartStatus::kRestartSessionError; @@ -129,8 +129,7 @@ absl::Status SessionManager::StartSession( // Start the session. LOG_INFO("Starting streaming session from path '%s' to instance '%s'", src_dir, instance_id); - status = ms->StartSession(instance_id, project_id, organization_id, - instance_ip, instance_port); + status = ms->StartSession(instance_id, target, project_id, organization_id); if (!status.ok()) { *metrics_status = metrics::SessionStartStatus::kStartSessionError; } @@ -164,15 +163,16 @@ absl::StatusOr SessionManager::GetOrCreateMultiSession( return iter->second.get(); } -absl::Status SessionManager::StopSessionInternal(const std::string& instance) { +absl::Status SessionManager::StopSessionInternal( + const std::string& instance_id) { absl::Status status; for (const auto& [key, ms] : sessions_) { - if (!ms->HasSessionForInstance(instance)) continue; + if (!ms->HasSession(instance_id)) continue; LOG_INFO("Stopping session streaming from '%s' to instance '%s'", - ms->src_dir(), instance); - RETURN_IF_ERROR(ms->StopSession(instance), - "Failed to stop session for instance '%s'", instance); + ms->src_dir(), instance_id); + RETURN_IF_ERROR(ms->StopSession(instance_id), + "Failed to stop session for instance '%s'", instance_id); // Session was stopped. If the MultiSession is empty now, delete it. if (ms->Empty()) { @@ -187,7 +187,7 @@ absl::Status SessionManager::StopSessionInternal(const std::string& instance) { } return absl::NotFoundError( - absl::StrFormat("No session for instance id '%s' found", instance)); + absl::StrFormat("No session for instance '%s' found", instance_id)); } } // namespace cdc_ft diff --git a/asset_stream_manager/session_manager.h b/asset_stream_manager/session_manager.h index dccd428..acc6e16 100644 --- a/asset_stream_manager/session_manager.h +++ b/asset_stream_manager/session_manager.h @@ -30,43 +30,46 @@ namespace cdc_ft { class MultiSession; class ProcessFactory; +struct SessionTarget; -// Implements a service to start and stop streaming sessions as a server. -// The corresponding clients are implemented by the ggp CLI and SDK Proxy. -// The CLI triggers StartSession() from `ggp instance mount --local-dir` and -// StopSession() from `ggp instance unmount`. SDK Proxy invokes StartSession() -// when a user starts a new game from the partner portal and sets an `Asset -// streaming directory` in the `Advanced settings` in the `Play settings` -// dialog. -// This service is owned by SessionManagementServer. +// Adds logic around MultiSession to start and stop streaming sessions. Makes +// sure that some invariants are maintained, like no two streaming sessions +// exist to the same target user@host:dir. class SessionManager { public: SessionManager(SessionConfig cfg, ProcessFactory* process_factory, metrics::MetricsService* metrics_service); ~SessionManager(); - // Starts a session and populates |multi_session| and |metrics_status|. + // Starts a new session or reuses an existing one. + // |instance_id| is a unique id for the remote instance and mount directory, + // e.g. user@host:mount_dir. + // |src_dir| is the local directory to stream. + // |target| identifies the remote target and how to connect to it. + // |project_id| is the project that owns the instance. Stadia only. + // |organization_id| is organization that contains the instance. Stadia only. + // Populates |multi_session| and |metrics_status| on success. absl::Status StartSession(const std::string& instance_id, + const std::string& src_dir, + const SessionTarget& target, const std::string& project_id, const std::string& organization_id, - const std::string& instance_ip, - uint16_t instance_port, const std::string& src_dir, MultiSession** multi_session, metrics::SessionStartStatus* metrics_status) ABSL_LOCKS_EXCLUDED(sessions_mutex_); - // Stops the session for the given |instance|. Returns a NotFound error if no - // session exists. - absl::Status StopSession(const std::string& instance) + // Stops the session for the given |instance_id|. + // Returns a NotFound error if no session exists. + absl::Status StopSession(const std::string& instance_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); // Shuts down all existing MultiSessions. absl::Status Shutdown() ABSL_LOCKS_EXCLUDED(sessions_mutex_); private: - // Stops the session for the given |instance|. Returns a NotFound error if no - // session exists. - absl::Status StopSessionInternal(const std::string& instance) + // Stops the session for the given |instance_id|. Returns a NotFound error if + // no session exists. + absl::Status StopSessionInternal(const std::string& instance_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(sessions_mutex_); // Returns the MultiSession for the given workstation directory |src_dir| or diff --git a/cdc_fuse_fs/cdc_fuse_fs.vcxproj b/cdc_fuse_fs/cdc_fuse_fs.vcxproj index b60e370..25c080d 100644 --- a/cdc_fuse_fs/cdc_fuse_fs.vcxproj +++ b/cdc_fuse_fs/cdc_fuse_fs.vcxproj @@ -52,7 +52,6 @@ //cdc_fuse_fs cdc_fuse_fs ..\;..\third_party\absl;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;$(AdditionalIncludeDirectories) - ..\/ diff --git a/cdc_rsync/cdc_rsync.vcxproj b/cdc_rsync/cdc_rsync.vcxproj index c83069a..0a255b6 100644 --- a/cdc_rsync/cdc_rsync.vcxproj +++ b/cdc_rsync/cdc_rsync.vcxproj @@ -69,16 +69,15 @@ //cdc_rsync cdc_rsync.exe ..\;..\third_party\absl;..\third_party\blake3\c;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;$(VC_IncludePath);$(WindowsSDK_IncludePath) - ..\/ diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index ea04cab..c595bcb 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -46,8 +46,8 @@ constexpr int kExitCodeNotFound = 127; constexpr int kForwardPortFirst = 44450; constexpr int kForwardPortLast = 44459; -constexpr char kGgpServerFilename[] = "cdc_rsync_server"; -constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc_file_transfer/"; +constexpr char kCdcServerFilename[] = "cdc_rsync_server"; +constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc-file-transfer/bin/"; SetOptionsRequest::FilterRule::Type ToProtoType(PathFilter::Rule::Type type) { switch (type) { @@ -178,7 +178,7 @@ absl::Status CdcRsyncClient::StartServer() { std::vector components; status = GameletComponent::Get( - {path::Join(component_dir, kGgpServerFilename)}, &components); + {path::Join(component_dir, kCdcServerFilename)}, &components); if (!status.ok()) { return MakeStatus( "Required instance component not found. Make sure the file " @@ -202,7 +202,7 @@ absl::Status CdcRsyncClient::StartServer() { int port = *port_res; std::string remote_server_path = - std::string(kRemoteToolsBinDir) + kGgpServerFilename; + std::string(kRemoteToolsBinDir) + kCdcServerFilename; // Test existence manually to prevent misleading bash output message // "bash: .../cdc_rsync_server: No such file or directory". // Also create the bin dir because otherwise scp below might fail. @@ -411,9 +411,9 @@ absl::Status CdcRsyncClient::DeployServer() { // scp cdc_rsync_server to a temp location on the gamelet. std::string remoteServerTmpPath = - absl::StrFormat("%s%s.%s", kRemoteToolsBinDir, kGgpServerFilename, + absl::StrFormat("%s%s.%s", kRemoteToolsBinDir, kCdcServerFilename, Util::GenerateUniqueId()); - std::string localServerPath = path::Join(exe_dir, kGgpServerFilename); + std::string localServerPath = path::Join(exe_dir, kCdcServerFilename); status = remote_util_.Scp({localServerPath}, remoteServerTmpPath, /*compress=*/true); if (!status.ok()) { @@ -424,9 +424,9 @@ absl::Status CdcRsyncClient::DeployServer() { // - Make the old cdc_rsync_server writable (if it exists). // - Make the new cdc_rsync_server executable. // - Replace the old cdc_rsync_server by the new one. - std::string old_path = RemoteUtil::EscapeForWindows( - std::string(kRemoteToolsBinDir) + kGgpServerFilename); - std::string new_path = RemoteUtil::EscapeForWindows(remoteServerTmpPath); + std::string old_path = RemoteUtil::QuoteForWindows( + std::string(kRemoteToolsBinDir) + kCdcServerFilename); + std::string new_path = RemoteUtil::QuoteForWindows(remoteServerTmpPath); std::string replace_cmd = absl::StrFormat( " ([ ! -f %s ] || chmod u+w %s) && chmod a+x %s && mv %s %s", old_path, old_path, new_path, new_path, old_path); diff --git a/cdc_rsync/params.cc b/cdc_rsync/params.cc index d98597a..aa6d91c 100644 --- a/cdc_rsync/params.cc +++ b/cdc_rsync/params.cc @@ -73,10 +73,10 @@ Options: -R, --relative Use relative path names --existing Skip creating new files on instance --copy-dest dir Use files from dir as sync base if files are missing - --ssh-command Path and arguments of SSH command to use, e.g. + --ssh-command Path and arguments of ssh command to use, e.g. C:\path\to\ssh.exe -F config -i id_rsa -oStrictHostKeyChecking=yes -oUserKnownHostsFile="""known_hosts""" Can also be specified by the CDC_SSH_COMMAND environment variable. - --scp-command Path and arguments of SSH command to use, e.g. + --scp-command Path and arguments of scp command to use, e.g. C:\path\to\scp.exe -F config -i id_rsa -oStrictHostKeyChecking=yes -oUserKnownHostsFile="""known_hosts""" Can also be specified by the CDC_SCP_COMMAND environment variable. -h --help Help for cdc_rsync diff --git a/cdc_rsync_server/cdc_rsync_server.vcxproj b/cdc_rsync_server/cdc_rsync_server.vcxproj index 4aa0d73..81027c5 100644 --- a/cdc_rsync_server/cdc_rsync_server.vcxproj +++ b/cdc_rsync_server/cdc_rsync_server.vcxproj @@ -50,7 +50,6 @@ //cdc_rsync_server:cdc_rsync_server cdc_rsync_server ..\;..\third_party\absl;..\third_party\blake3\c;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\googletest\googletest\include;..\third_party\protobuf\src - ..\/ diff --git a/cdc_stream/.gitignore b/cdc_stream/.gitignore new file mode 100644 index 0000000..7dc8dde --- /dev/null +++ b/cdc_stream/.gitignore @@ -0,0 +1,3 @@ +x64/* +*.log +*.user \ No newline at end of file diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD new file mode 100644 index 0000000..6c1ae02 --- /dev/null +++ b/cdc_stream/BUILD @@ -0,0 +1,26 @@ +package(default_visibility = [ + "//:__subpackages__", +]) + +cc_binary( + name = "cdc_stream", + srcs = ["main.cc"], + deps = [ + ":local_assets_stream_manager_client", + "//common:log", + "//common:path", + "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/status", + ], +) + +cc_library( + name = "local_assets_stream_manager_client", + srcs = ["local_assets_stream_manager_client.cc"], + hdrs = ["local_assets_stream_manager_client.h"], + deps = [ + "//common:grpc_status", + "//proto:local_assets_stream_manager_grpc_proto", + "@com_google_absl//absl/status", + ], +) diff --git a/cdc_stream/local_assets_stream_manager_client.cc b/cdc_stream/local_assets_stream_manager_client.cc new file mode 100644 index 0000000..434e6d4 --- /dev/null +++ b/cdc_stream/local_assets_stream_manager_client.cc @@ -0,0 +1,62 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cdc_stream/local_assets_stream_manager_client.h" + +#include "absl/status/status.h" +#include "common/grpc_status.h" + +namespace cdc_ft { + +using StartSessionRequest = localassetsstreammanager::StartSessionRequest; +using StartSessionResponse = localassetsstreammanager::StartSessionResponse; +using StopSessionRequest = localassetsstreammanager::StopSessionRequest; +using StopSessionResponse = localassetsstreammanager::StopSessionResponse; + +LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient( + std::shared_ptr channel) { + stub_ = LocalAssetsStreamManager::NewStub(std::move(channel)); +} + +LocalAssetsStreamManagerClient::~LocalAssetsStreamManagerClient() = default; + +absl::Status LocalAssetsStreamManagerClient::StartSession( + const std::string& src_dir, const std::string& user_host, uint16_t ssh_port, + const std::string& mount_dir, const std::string& ssh_command, + const std::string& scp_command) { + StartSessionRequest request; + request.set_workstation_directory(src_dir); + request.set_user_host(user_host); + request.set_port(ssh_port); + request.set_mount_dir(mount_dir); + request.set_ssh_command(ssh_command); + request.set_scp_command(scp_command); + + grpc::ClientContext context; + StartSessionResponse response; + return ToAbslStatus(stub_->StartSession(&context, request, &response)); +} + +absl::Status LocalAssetsStreamManagerClient::StopSession( + const std::string& user_host, const std::string& mount_dir) { + StopSessionRequest request; + request.set_user_host(user_host); + request.set_mount_dir(mount_dir); + + grpc::ClientContext context; + StopSessionResponse response; + return ToAbslStatus(stub_->StopSession(&context, request, &response)); +} + +} // namespace cdc_ft diff --git a/cdc_stream/local_assets_stream_manager_client.h b/cdc_stream/local_assets_stream_manager_client.h new file mode 100644 index 0000000..3aaf67e --- /dev/null +++ b/cdc_stream/local_assets_stream_manager_client.h @@ -0,0 +1,63 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_ +#define ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_ + +#include + +#include "absl/status/status.h" +#include "grpcpp/channel.h" +#include "proto/local_assets_stream_manager.grpc.pb.h" + +namespace grpc_impl { +class Channel; +} + +namespace cdc_ft { + +// gRpc client for starting/stopping asset streaming sessions. +class LocalAssetsStreamManagerClient { + public: + // |channel| is a grpc channel to use. + explicit LocalAssetsStreamManagerClient( + std::shared_ptr channel); + ~LocalAssetsStreamManagerClient(); + + // Starts streaming the Windows directory |src_dir| to the Linux target + // |user_host_dir|, which must be formatted as [user@]host:dir, e.g. + // jdoe@jdoe.corp.foo.com:~/assets + // Starting a second session to the same target will stop the first one. + absl::Status StartSession(const std::string& src_dir, + const std::string& user_host, uint16_t ssh_port, + const std::string& mount_dir, + const std::string& ssh_command, + const std::string& scp_command); + + // Stops the streaming session to the Linux target |user_host_dir|, which must + // be formatted as [user@]host:dir, e.g. jdoe@jdoe.corp.foo.com:~/assets + absl::Status StopSession(const std::string& user_host_dir, + const std::string& mount_dir); + + private: + using LocalAssetsStreamManager = + localassetsstreammanager::LocalAssetsStreamManager; + std::unique_ptr stub_; +}; + +} // namespace cdc_ft + +#endif // ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_ diff --git a/cdc_stream/main.cc b/cdc_stream/main.cc new file mode 100644 index 0000000..697e754 --- /dev/null +++ b/cdc_stream/main.cc @@ -0,0 +1,158 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/flags/usage.h" +#include "absl/status/status.h" +#include "absl/strings/str_split.h" +#include "cdc_stream/local_assets_stream_manager_client.h" +#include "common/log.h" +#include "common/path.h" +#include "grpcpp/channel.h" +#include "grpcpp/security/credentials.h" + +ABSL_FLAG(uint16_t, service_port, 44432, + "Port to use while connecting to the local asset stream service."); +ABSL_FLAG( + uint16_t, ssh_port, 22, + "Port to use while connecting to the remote instance being streamed to."); +ABSL_FLAG(std::string, ssh_command, "", + "Path and arguments of ssh command to use, e.g. " + "\"C:\\path\\to\\ssh.exe -F config_file\". Can also be specified by " + "the CDC_SSH_COMMAND environment variable."); +ABSL_FLAG(std::string, scp_command, "", + "Path and arguments of scp command to use, e.g. " + "\"C:\\path\\to\\scp.exe -F config_file\". Can also be specified by " + "the CDC_SCP_COMMAND environment variable."); +ABSL_FLAG(int, verbosity, 2, + "Verbosity of the log output. Increase to make logs more verbose."); + +namespace { + +constexpr char kHelpText[] = R"!(Stream files from a Windows to a Linux device + +Usage: cdc_stream [flags] start windows_dir [user@]host:linux_dir + Streams the Windows directory windows_dir to directory linux_dir on the + Linux host using SSH username user. + + cdc_stream [flags] stop [user@]host:linux_dir + Stops the streaming session to the given Linux target. + +Type cdc_stream --helpfull for available flags.)!"; + +// Splits |user_host_dir| = [user@]host:dir up into [user@]host and dir. +// Does not touch Windows drives, e.g. C:\foo. +bool ParseUserHostDir(const std::string& user_host_dir, std::string* user_host, + std::string* dir) { + std::vector parts = + absl::StrSplit(user_host_dir, absl::MaxSplits(':', 1)); + if (parts.size() < 2 || + (parts[0].size() == 1 && toupper(parts[0][0]) >= 'A' && + toupper(parts[0][0]) <= 'Z')) { + LOG_ERROR( + "Failed to parse '%s'. Make sure it is of the form " + "[user@]host:linux_dir.", + user_host_dir); + return false; + } + + *user_host = parts[0]; + *dir = parts[1]; + return true; +} + +std::string GetFlagFromEnvOrArgs(const char* env, + const absl::Flag& flag) { + std::string value = absl::GetFlag(flag); + if (value.empty()) { + cdc_ft::path::GetEnv(env, &value).IgnoreError(); + } + return value; +} + +} // namespace + +int main(int argc, char* argv[]) { + absl::SetProgramUsageMessage(kHelpText); + std::vector args = absl::ParseCommandLine(argc, argv); + + uint16_t service_port = absl::GetFlag(FLAGS_service_port); + int verbosity = absl::GetFlag(FLAGS_verbosity); + std::string ssh_command = + GetFlagFromEnvOrArgs("CDC_SSH_COMMAND", FLAGS_ssh_command); + std::string scp_command = + GetFlagFromEnvOrArgs("CDC_SCP_COMMAND", FLAGS_scp_command); + uint16_t ssh_port = absl::GetFlag(FLAGS_ssh_port); + + cdc_ft::LogLevel level = cdc_ft::Log::VerbosityToLogLevel(verbosity); + cdc_ft::Log::Initialize(std::make_unique(level)); + + if (args.size() < 2) { + LOG_INFO(kHelpText); + return 0; + } + std::string command = args[1]; + if (command != "start" && command != "stop") { + LOG_ERROR("Unknown command '%s'. Must be 'start' or 'stop'.", command); + return 1; + } + + // Start a gRpc client. + std::string client_address = absl::StrFormat("localhost:%u", service_port); + std::shared_ptr grpc_channel = grpc::CreateCustomChannel( + client_address, grpc::InsecureChannelCredentials(), + grpc::ChannelArguments()); + + cdc_ft::LocalAssetsStreamManagerClient client(grpc_channel); + + absl::Status status; + if (command == "start") { + if (args.size() < 4) { + LOG_ERROR( + "Command 'start' needs 2 arguments: the Windows directory to stream" + " and the Linux target [user@]host:dir."); + return 1; + } + + std::string src_dir = args[2]; + std::string user_host, mount_dir; + if (!ParseUserHostDir(args[3], &user_host, &mount_dir)) return 1; + + status = client.StartSession(src_dir, user_host, ssh_port, mount_dir, + ssh_command, scp_command); + } else /* if (command == "stop") */ { + if (args.size() < 3) { + LOG_ERROR( + "Command 'stop' needs 1 argument: the Linux target " + "[user@]host:linux_dir."); + return 1; + } + + std::string user_host, mount_dir; + if (!ParseUserHostDir(args[2], &user_host, &mount_dir)) return 1; + + status = client.StopSession(user_host, mount_dir); + } + + if (!status.ok()) { + LOG_ERROR("Error: %s", status.ToString()); + } + + cdc_ft::Log::Shutdown(); + return static_cast(status.code()); +} diff --git a/common/path.cc b/common/path.cc index b5d7e87..57d1e85 100644 --- a/common/path.cc +++ b/common/path.cc @@ -326,6 +326,10 @@ std::string GetDrivePrefix(const std::string& path) { std::string GetCwd() { return std::filesystem::current_path().u8string(); } +void SetCwd(const std::string& path) { + std::filesystem::current_path(std::filesystem::u8path(path)); +} + std::string GetFullPath(const std::string& path) { if (path.empty()) { return std::string(); diff --git a/common/path.h b/common/path.h index 8901277..204580f 100644 --- a/common/path.h +++ b/common/path.h @@ -130,6 +130,9 @@ std::string GetDrivePrefix(const std::string& path); // Gets the current working directory. std::string GetCwd(); +// Sets the current working directory. +void SetCwd(const std::string& path); + // Expands a relative path to an absolute path (relative to the current working // directory). Also canonicalizes the path, removing any . and .. elements. // Note that if the path does not exist or contains invalid characters, it may diff --git a/common/remote_util.cc b/common/remote_util.cc index cf5b557..edaaece 100644 --- a/common/remote_util.cc +++ b/common/remote_util.cc @@ -16,6 +16,7 @@ #include +#include "absl/strings/match.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "common/path.h" @@ -46,6 +47,7 @@ void RemoteUtil::SetUserHostAndPort(std::string user_host, int port) { user_host_ = std::move(user_host); ssh_port_ = port; } + void RemoteUtil::SetScpCommand(std::string scp_command) { scp_command_ = std::move(scp_command); } @@ -56,7 +58,7 @@ void RemoteUtil::SetSshCommand(std::string ssh_command) { absl::Status RemoteUtil::Scp(std::vector source_filepaths, const std::string& dest, bool compress) { - absl::Status status = CheckHostPort(); + absl::Status status = CheckUserHostPort(); if (!status.ok()) { return status; } @@ -64,7 +66,11 @@ absl::Status RemoteUtil::Scp(std::vector source_filepaths, std::string source_args; for (const std::string& sourceFilePath : source_filepaths) { // Workaround for scp thinking that C is a host in C:\path\to\foo. - source_args += QuoteArgument("//./" + sourceFilePath) + " "; + if (absl::StrContains(path::GetDrivePrefix(sourceFilePath), ":")) { + source_args += QuoteForWindows("//./" + sourceFilePath) + " "; + } else { + source_args += QuoteForWindows(sourceFilePath) + " "; + } } // -p preserves timestamps. This enables timestamp-based up-to-date checks. @@ -73,66 +79,27 @@ absl::Status RemoteUtil::Scp(std::vector source_filepaths, "%s " "%s %s -p -T " "-P %i %s " - "%s", - scp_command_, quiet_ || verbosity_ < 2 ? "-q" : "", compress ? "-C" : "", - ssh_port_, source_args, QuoteArgument(user_host_ + ":" + dest)); + "%s:%s", + QuoteForWindows(scp_command_), quiet_ || verbosity_ < 2 ? "-q" : "", + compress ? "-C" : "", ssh_port_, source_args, QuoteForWindows(user_host_), + QuoteForWindows(dest)); start_info.name = "scp"; start_info.forward_output_to_log = forward_output_to_log_; return process_factory_->Run(start_info); } -absl::Status RemoteUtil::Sync(std::vector source_filepaths, - const std::string& dest) { - absl::Status status = CheckHostPort(); - if (!status.ok()) { - return status; - } - - std::string source_args; - for (const std::string& sourceFilePath : source_filepaths) { - source_args += QuoteArgument(sourceFilePath) + " "; - } - - ProcessStartInfo start_info; - start_info.command = absl::StrFormat( - "cdc_rsync --ip=%s --port=%i -z " - "%s %s%s", - QuoteArgument(user_host_), ssh_port_, - quiet_ || verbosity_ < 2 ? "-q " : " ", source_args, QuoteArgument(dest)); - start_info.name = "cdc_rsync"; - start_info.forward_output_to_log = forward_output_to_log_; - - return process_factory_->Run(start_info); -} - absl::Status RemoteUtil::Chmod(const std::string& mode, const std::string& remote_path, bool quiet) { std::string remote_command = - absl::StrFormat("chmod %s %s %s", QuoteArgument(mode), - EscapeForWindows(remote_path), quiet ? "-f" : ""); + absl::StrFormat("chmod %s %s %s", QuoteForSsh(mode), + QuoteForSsh(remote_path), quiet ? "-f" : ""); return Run(remote_command, "chmod"); } -absl::Status RemoteUtil::Rm(const std::string& remote_path, bool force) { - std::string remote_command = absl::StrFormat("rm %s %s", force ? "-f" : "", - EscapeForWindows(remote_path)); - - return Run(remote_command, "rm"); -} - -absl::Status RemoteUtil::Mv(const std::string& old_remote_path, - const std::string& new_remote_path) { - std::string remote_command = - absl::StrFormat("mv %s %s", EscapeForWindows(old_remote_path), - EscapeForWindows(new_remote_path)); - - return Run(remote_command, "mv"); -} - absl::Status RemoteUtil::Run(std::string remote_command, std::string name) { - absl::Status status = CheckHostPort(); + absl::Status status = CheckUserHostPort(); if (!status.ok()) { return status; } @@ -177,33 +144,61 @@ ProcessStartInfo RemoteUtil::BuildProcessStartInfoForSshInternal( "-oServerAliveCountMax=6 " // Number of lost msgs before ssh terminates "-oServerAliveInterval=5 " // Time interval between alive msgs "%s %s -p %i %s", - ssh_command_, quiet_ || verbosity_ < 2 ? "-q" : "", forward_arg, - QuoteArgument(user_host_), ssh_port_, remote_command_arg); + QuoteForWindows(ssh_command_), quiet_ || verbosity_ < 2 ? "-q" : "", + forward_arg, QuoteForWindows(user_host_), ssh_port_, remote_command_arg); start_info.forward_output_to_log = forward_output_to_log_; return start_info; } -std::string RemoteUtil::EscapeForWindows(const std::string& argument) { - std::string str = +std::string RemoteUtil::QuoteForWindows(const std::string& argument) { + // Escape certain backslashes (see doc of this function). + std::string escaped = std::regex_replace(argument, std::regex(R"(\\*(?="|$))"), "$&$&"); - return std::regex_replace(str, std::regex(R"(")"), R"(\")"); + // Escape " -> \". + escaped = std::regex_replace(escaped, std::regex(R"(")"), R"(\")"); + // Quote. + return absl::StrCat("\"", escaped, "\""); } -std::string RemoteUtil::QuoteArgument(const std::string& argument) { - return absl::StrCat("\"", EscapeForWindows(argument), "\""); +std::string RemoteUtil::QuoteForSsh(const std::string& argument) { + // Escape \ ->: \\. + std::string escaped = + std::regex_replace(argument, std::regex(R"(\\)"), R"(\\)"); + // Escape " -> \". + escaped = std::regex_replace(escaped, std::regex(R"(")"), R"(\")"); + + // Quote, but handle special case for ~. + if (escaped.empty() || escaped[0] != '~') { + return QuoteForWindows(absl::StrCat("\"", escaped, "\"")); + } + + // Simple special cases. Quote() isn't required, but called for consistency. + if (escaped == "~" || escaped == "~/") { + return QuoteForWindows(escaped); + } + + // Check whether the username contains only valid characters. + // E.g. ~user name/foo -> Quote(~user name/foo) + size_t slash_pos = escaped.find('/'); + size_t username_end_pos = + slash_pos == std::string::npos ? escaped.size() : slash_pos; + if (username_end_pos > 1 && + !std::regex_match(escaped.substr(1, username_end_pos - 1), + std::regex("^[a-z][-a-z0-9]*"))) { + return QuoteForWindows(absl::StrCat("\"", escaped, "\"")); + } + + if (slash_pos == std::string::npos) { + // E.g. ~username -> Quote(~username) + return QuoteForWindows(escaped); + } + + // E.g. or ~username/foo -> Quote(~username/"foo") + return QuoteForWindows(absl::StrCat(escaped.substr(0, slash_pos + 1), "\"", + escaped.substr(slash_pos + 1), "\"")); } -std::string RemoteUtil::QuoteArgumentForSsh(const std::string& argument) { - return absl::StrFormat( - "'%s'", std::regex_replace(argument, std::regex("'"), "'\\''")); -} - -std::string RemoteUtil::QuoteAndEscapeArgumentForSsh( - const std::string& argument) { - return EscapeForWindows(QuoteArgumentForSsh(argument)); -} - -absl::Status RemoteUtil::CheckHostPort() { +absl::Status RemoteUtil::CheckUserHostPort() { if (user_host_.empty() || ssh_port_ == 0) { return MakeStatus("IP or port not set"); } diff --git a/common/remote_util.h b/common/remote_util.h index 31e59b3..2916486 100644 --- a/common/remote_util.h +++ b/common/remote_util.h @@ -61,25 +61,11 @@ class RemoteUtil { absl::Status Scp(std::vector source_filepaths, const std::string& dest, bool compress); - // Syncs |source_filepaths| to the remote folder |dest| on the gamelet using - // cdc_rsync. Must call SetUserHostAndPort before calling this method. - absl::Status Sync(std::vector source_filepaths, - const std::string& dest); - // Calls 'chmod |mode| |remote_path|' on the gamelet. // Must call SetUserHostAndPort before calling this method. absl::Status Chmod(const std::string& mode, const std::string& remote_path, bool quiet = false); - // Calls 'rm [-f] |remote_path|' on the gamelet. - // Must call SetUserHostAndPort before calling this method. - absl::Status Rm(const std::string& remote_path, bool force); - - // Calls `mv |old_remote_path| |new_remote_path| on the gamelet. - // Must call SetUserHostAndPort before calling this method. - absl::Status Mv(const std::string& old_remote_path, - const std::string& new_remote_path); - // Runs |remote_command| on the gamelet. The command must be properly escaped. // |name| is the name of the command displayed in the logs. // Must call SetUserHostAndPort before calling this method. @@ -107,28 +93,32 @@ class RemoteUtil { // Returns whether output is suppressed. bool Quiet() const { return quiet_; } - // Escapes command line argument for the Microsoft command line parser in - // preparation for quoting. Double quotes are backslash-escaped. One or more - // backslashes are backslash-escaped if they are followed by a double quote, - // or if they occur at the end of the string, e.g. - // foo\bar -> foo\bar, foo\ -> foo\\, foo\\"bar -> foo\\\\\"bar. - static std::string EscapeForWindows(const std::string& argument); - // Quotes and escapes a command line argument following the convention // understood by the Microsoft command line parser. - static std::string QuoteArgument(const std::string& argument); - - // Quotes and escapes a command line argument for usage in SSH. - static std::string QuoteArgumentForSsh(const std::string& argument); + // Double quotes are backslash-escaped. One or more backslashes are backslash- + // escaped if they are followed by a double quote, or if they occur at the end + // of the string, e.g. + // foo -> "foo" + // foo\bar -> "foo\bar" + // foo\ -> "foo\\" + // foo\\"bar -> "foo\\\\\"bar". + static std::string QuoteForWindows(const std::string& argument); // Quotes and escapes a command line arguments for use in SSH command. The - // argument is first escaped and quoted for Linux using single quotes and then + // argument is first escaped and quoted for Linux using double quotes and then // it is escaped to be used by the Microsoft command line parser. - static std::string QuoteAndEscapeArgumentForSsh(const std::string& argument); + // Properly supports path starting with ~ and ~username. + // foo -> "\"foo\"" + // foo\bar -> "\"foo\bar\"" + // foo\ -> "\"foo\\\\\"" + // foo\"bar -> "\"foo\\\\\\\"bar\"". + // ~/foo -> "~/\"foo\"" + // ~user/foo -> "~user/\"foo\"" + static std::string QuoteForSsh(const std::string& argument); private: - // Verifies that both || and |ssh_port_| are set. - absl::Status CheckHostPort(); + // Verifies that both |user_host_| and |ssh_port_| are set. + absl::Status CheckUserHostPort(); // Common code for BuildProcessStartInfoForSsh*. ProcessStartInfo BuildProcessStartInfoForSshInternal( diff --git a/common/remote_util_test.cc b/common/remote_util_test.cc index 7ac730a..11293b8 100644 --- a/common/remote_util_test.cc +++ b/common/remote_util_test.cc @@ -97,35 +97,40 @@ TEST_F(RemoteUtilTest, BuildProcessStartInfoForSshWithCustomCommand) { ExpectContains(si.command, {kCustomSshCmd}); } -TEST_F(RemoteUtilTest, EscapeForWindows) { - EXPECT_EQ("foo", RemoteUtil::EscapeForWindows("foo")); - EXPECT_EQ("foo bar", RemoteUtil::EscapeForWindows("foo bar")); - EXPECT_EQ("foo\\bar", RemoteUtil::EscapeForWindows("foo\\bar")); - EXPECT_EQ("\\\\foo", RemoteUtil::EscapeForWindows("\\\\foo")); - EXPECT_EQ("foo\\\\", RemoteUtil::EscapeForWindows("foo\\")); - EXPECT_EQ("foo\\\\\\\\", RemoteUtil::EscapeForWindows("foo\\\\")); - EXPECT_EQ("foo\\\"", RemoteUtil::EscapeForWindows("foo\"")); - EXPECT_EQ("foo\\\"bar", RemoteUtil::EscapeForWindows("foo\"bar")); - EXPECT_EQ("foo\\\\\\\"bar", RemoteUtil::EscapeForWindows("foo\\\"bar")); - EXPECT_EQ("foo\\\\\\\\\\\"bar", RemoteUtil::EscapeForWindows("foo\\\\\"bar")); - EXPECT_EQ("\\\"foo\\\"", RemoteUtil::EscapeForWindows("\"foo\"")); - EXPECT_EQ("\\\" \\file.txt", RemoteUtil::EscapeForWindows("\" \\file.txt")); +TEST_F(RemoteUtilTest, QuoteForWindows) { + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo"), "\"foo\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo bar"), "\"foo bar\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\bar"), "\"foo\\bar\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("\\\\foo"), "\"\\\\foo\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\"), "\"foo\\\\\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\\"), "\"foo\\\\\\\\\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\""), "\"foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\"bar"), "\"foo\\\"bar\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\"bar"), "\"foo\\\\\\\"bar\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\\\"bar"), + "\"foo\\\\\\\\\\\"bar\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("\"foo\""), "\"\\\"foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForWindows("\" \\file.txt"), + "\"\\\" \\file.txt\""); } -TEST_F(RemoteUtilTest, QuoteArgument) { - EXPECT_EQ("\"foo\"", RemoteUtil::QuoteArgument("foo")); - EXPECT_EQ("\"foo bar\"", RemoteUtil::QuoteArgument("foo bar")); - EXPECT_EQ("\"foo\\bar\"", RemoteUtil::QuoteArgument("foo\\bar")); - EXPECT_EQ("\"\\\\foo\"", RemoteUtil::QuoteArgument("\\\\foo")); - EXPECT_EQ("\"foo\\\\\"", RemoteUtil::QuoteArgument("foo\\")); - EXPECT_EQ("\"foo\\\\\\\\\"", RemoteUtil::QuoteArgument("foo\\\\")); - EXPECT_EQ("\"foo\\\"\"", RemoteUtil::QuoteArgument("foo\"")); - EXPECT_EQ("\"foo\\\"bar\"", RemoteUtil::QuoteArgument("foo\"bar")); - EXPECT_EQ("\"foo\\\\\\\"bar\"", RemoteUtil::QuoteArgument("foo\\\"bar")); - EXPECT_EQ("\"foo\\\\\\\\\\\"bar\"", - RemoteUtil::QuoteArgument("foo\\\\\"bar")); - EXPECT_EQ("\"\\\"foo\\\"\"", RemoteUtil::QuoteArgument("\"foo\"")); - EXPECT_EQ("\"\\\" \\file.txt\"", RemoteUtil::QuoteArgument("\" \\file.txt")); +TEST_F(RemoteUtilTest, QuoteForSsh) { + EXPECT_EQ(RemoteUtil::QuoteForSsh("foo"), "\"\\\"foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\bar"), "\"\\\"foo\\\\bar\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\"), "\"\\\"foo\\\\\\\\\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\\"bar"), + "\"\\\"foo\\\\\\\\\\\\\\\"bar\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~"), "\"~\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~username"), "\"~username\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~/foo"), "\"~/\\\"foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~username/foo"), + "\"~username/\\\"foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~invalid user name"), + "\"\\\"~invalid user name\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~invalid user name/foo"), + "\"\\\"~invalid user name/foo\\\"\""); + EXPECT_EQ(RemoteUtil::QuoteForSsh("~user-name69/foo"), + "\"~user-name69/\\\"foo\\\"\""); // Nice! } } // namespace diff --git a/data_store/data_provider_test.cc b/data_store/data_provider_test.cc index b21df42..fdb56b4 100644 --- a/data_store/data_provider_test.cc +++ b/data_store/data_provider_test.cc @@ -14,14 +14,12 @@ #include "data_store/data_provider.h" -#include #include #include #include "common/path.h" #include "common/status_test_macros.h" #include "common/testing_clock.h" -#include "common/util.h" #include "data_store/disk_data_store.h" #include "data_store/mem_data_store.h" #include "gtest/gtest.h" diff --git a/proto/local_assets_stream_manager.proto b/proto/local_assets_stream_manager.proto index a658629..74dc868 100644 --- a/proto/local_assets_stream_manager.proto +++ b/proto/local_assets_stream_manager.proto @@ -25,35 +25,50 @@ service LocalAssetsStreamManager { rpc StopSession(StopSessionRequest) returns (StopSessionResponse) {} } -// NextID: 7 +// NextID: 12 message StartSessionRequest { - // ID of assets streaming target gamelet. gamelet_id will continue to be set - // alongside gamelet_name for backwards compatibility, but new code should - // not read from the gamelet_id field. - string gamelet_id = 1; // The resource name of the assets streaming target gamelet, in the form // "organizations/{org-id}/projects/{proj-id}/pools/{pool-id}/gamelets/{gamelet-id}". - // If gamelet_name is specified, it will take precedence over gamelet_id. + // Only used by Stadia. Should set either this or user_host. string gamelet_name = 5; - // Path in the local workstation to stream assets from. + // Directory in the local workstation to stream assets from. string workstation_directory = 2; - // The user's email. - string account = 3; - // The OnePlatForm Url of the publishing API. - string url = 4; - // Caller of the SartSession request. + // Caller of the StartSession request. + // Only used by Stadia. May be left unspecified. enum Origin { ORIGIN_UNKNOWN = 0; ORIGIN_CLI = 1; ORIGIN_PARTNER_PORTAL = 2; } Origin origin = 6; + // Username and host, in the form [user@]host. + string user_host = 7; + // Remote directory where to mount the streamed directory. + string mount_dir = 8; + // SSH port to use while connecting to the remote instance. + // Optional, falls back to port 22 (default SSH port). + int32 port = 9; + // SSH command to connect to the remote instance. + // Optional, falls back to searching ssh. + string ssh_command = 10; + // SCP command to copy files to the remote instance. + // Optional, falls back to searching scp. + string scp_command = 11; + + reserved 1, 3, 4; } message StartSessionResponse {} +// NextID: 4 message StopSessionRequest { + // ID of assets streaming target gamelet. + // Only used by Stadia. Should set either this or user_host_dir. string gamelet_id = 1; + // Username and host, in the form [user@]host. + string user_host = 2; + // Remote directory where the streamed directory is mounted. + string mount_dir = 3; } message StopSessionResponse {} diff --git a/tests_asset_streaming_30/BUILD b/tests_asset_streaming_30/BUILD index b79fbc3..b9452db 100644 --- a/tests_asset_streaming_30/BUILD +++ b/tests_asset_streaming_30/BUILD @@ -28,6 +28,7 @@ cc_binary( "//asset_stream_manager:multi_session", "//cdc_fuse_fs:asset", "//cdc_fuse_fs:cdc_fuse_fs_lib_mocked", + "//cdc_fuse_fs:mock_config_stream_client", "//common:test_main", "//common:testing_clock", "//data_store:data_provider", diff --git a/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj b/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj index 03c725c..729c1bf 100644 --- a/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj +++ b/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj @@ -62,7 +62,6 @@ //tests_asset_streaming_30 tests_asset_streaming_30.exe ..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath) - ..\/ diff --git a/tests_cdc_rsync/tests_cdc_rsync.vcxproj b/tests_cdc_rsync/tests_cdc_rsync.vcxproj index e7d478a..2335070 100644 --- a/tests_cdc_rsync/tests_cdc_rsync.vcxproj +++ b/tests_cdc_rsync/tests_cdc_rsync.vcxproj @@ -63,7 +63,6 @@ //tests_cdc_rsync tests_cdc_rsync.exe ..\;..\third_party\absl;..\third_party\jsoncpp\include;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath) - ..\/ diff --git a/tests_common/tests_common.vcxproj b/tests_common/tests_common.vcxproj index 3ea056c..121d334 100644 --- a/tests_common/tests_common.vcxproj +++ b/tests_common/tests_common.vcxproj @@ -62,7 +62,6 @@ //tests_common tests_common.exe ..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath) - ..\/