diff --git a/NMakeBazelProject.targets b/NMakeBazelProject.targets
index 8c10aad..ea50ab9 100644
--- a/NMakeBazelProject.targets
+++ b/NMakeBazelProject.targets
@@ -6,8 +6,6 @@
Usage: Define 4 properties in your project file:
BazelTargets: Labels of Bazel targets to build, e.g. //common:status.
BazelOutputFile: Output filename, e.g. cdc_rsync.exe
- BazelSourcePathPrefix: Prefix for source paths, to translate paths relative to (/foo) to project-relative paths (../foo).
- Must be escaped for sed (e.g. / -> \/), e.g. ..\/..\/..\/
Optionally, define:
BazelIncludePaths: Include paths, used for Intellisense.
Import NMakeCMakeProject.targets. -->
@@ -16,7 +14,6 @@
-
@@ -27,15 +24,10 @@
k8
dbg
opt
-
- | sed -r "s/^([^:\(]+[:\(][[:digit:]]+(,[[:digit:]]+)?[:\)])/$(BazelSourcePathPrefix)\\1/"
-
- 2>&1 $(BazelSedCommand)
--config=$(BazelPlatform)
$(BazelArgs) --linkopt=-Wl,--strip-all
- $(BazelArgs) --distinct_host_configuration=false
+
$(BazelArgs) --copt=/GL
@@ -51,9 +43,9 @@
$(SolutionDir)..\..\bazel-out\$(BazelPlatformDir)-$(BazelCompilationMode)\bin;$(BazelIncludePaths)
- $(RmBazelOutDir) bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(BazelSedCommand) $(MakeRW)
+ $(RmBazelOutDir) bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(MakeRW)
$(RmBazelOutDir) bazel clean
- $(RmBazelOutDir) bazel clean && bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(BazelSedCommand) $(MakeRW)
+ $(RmBazelOutDir) bazel clean && bazel build --compilation_mode=$(BazelCompilationMode) $(BazelArgs) $(BazelTargets) $(MakeRW)
$(OutDir)$(BazelOutputFile)
diff --git a/all_files.vcxitems b/all_files.vcxitems
index 4dc5840..6eaceac 100644
--- a/all_files.vcxitems
+++ b/all_files.vcxitems
@@ -28,6 +28,7 @@
+
@@ -39,6 +40,8 @@
+
+
@@ -150,6 +153,7 @@
+
@@ -158,6 +162,7 @@
+
@@ -237,6 +242,7 @@
+
diff --git a/asset_stream_manager/BUILD b/asset_stream_manager/BUILD
index 87c4374..4ca9c0c 100644
--- a/asset_stream_manager/BUILD
+++ b/asset_stream_manager/BUILD
@@ -9,6 +9,7 @@ cc_binary(
deps = [
":asset_stream_config",
":session_management_server",
+ "//cdc_stream",
"//common:log",
"//common:path",
"//common:sdk_util",
diff --git a/asset_stream_manager/asset_stream_config.cc b/asset_stream_manager/asset_stream_config.cc
index d68d518..8f1b353 100644
--- a/asset_stream_manager/asset_stream_config.cc
+++ b/asset_stream_manager/asset_stream_config.cc
@@ -26,9 +26,6 @@
#include "common/status_macros.h"
#include "json/json.h"
-ABSL_DECLARE_FLAG(std::string, src_dir);
-ABSL_DECLARE_FLAG(std::string, instance_ip);
-ABSL_DECLARE_FLAG(uint16_t, instance_port);
ABSL_DECLARE_FLAG(int, verbosity);
ABSL_DECLARE_FLAG(bool, debug);
ABSL_DECLARE_FLAG(bool, singlethreaded);
@@ -42,6 +39,14 @@ ABSL_DECLARE_FLAG(uint32_t, access_idle_timeout);
ABSL_DECLARE_FLAG(int, manifest_updater_threads);
ABSL_DECLARE_FLAG(int, file_change_wait_duration_ms);
+// Development flags.
+ABSL_DECLARE_FLAG(std::string, dev_src_dir);
+ABSL_DECLARE_FLAG(std::string, dev_user_host);
+ABSL_DECLARE_FLAG(uint16_t, dev_ssh_port);
+ABSL_DECLARE_FLAG(std::string, dev_ssh_command);
+ABSL_DECLARE_FLAG(std::string, dev_scp_command);
+ABSL_DECLARE_FLAG(std::string, dev_mount_dir);
+
// Declare AS20 flags, so that AS30 can be used on older SDKs simply by
// replacing the binary. Note that the RETIRED_FLAGS macro can't be used
// because the flags contain dashes. This code mimics the macro.
@@ -62,9 +67,6 @@ const auto RETIRED_FLAGS_REG_allow_edge =
namespace cdc_ft {
AssetStreamConfig::AssetStreamConfig() {
- src_dir_ = absl::GetFlag(FLAGS_src_dir);
- instance_ip_ = absl::GetFlag(FLAGS_instance_ip);
- instance_port_ = absl::GetFlag(FLAGS_instance_port);
session_cfg_.verbosity = absl::GetFlag(FLAGS_verbosity);
session_cfg_.fuse_debug = absl::GetFlag(FLAGS_debug);
session_cfg_.fuse_singlethreaded = absl::GetFlag(FLAGS_singlethreaded);
@@ -80,6 +82,13 @@ AssetStreamConfig::AssetStreamConfig() {
absl::GetFlag(FLAGS_manifest_updater_threads);
session_cfg_.file_change_wait_duration_ms =
absl::GetFlag(FLAGS_file_change_wait_duration_ms);
+
+ dev_src_dir_ = absl::GetFlag(FLAGS_dev_src_dir);
+ dev_target_.user_host = absl::GetFlag(FLAGS_dev_user_host);
+ dev_target_.ssh_port = absl::GetFlag(FLAGS_dev_ssh_port);
+ dev_target_.ssh_command = absl::GetFlag(FLAGS_dev_ssh_command);
+ dev_target_.scp_command = absl::GetFlag(FLAGS_dev_scp_command);
+ dev_target_.mount_dir = absl::GetFlag(FLAGS_dev_mount_dir);
}
AssetStreamConfig::~AssetStreamConfig() = default;
@@ -105,7 +114,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) {
} \
} while (0)
- ASSIGN_VAR(src_dir_, src_dir, String);
ASSIGN_VAR(session_cfg_.verbosity, verbosity, Int);
ASSIGN_VAR(session_cfg_.fuse_debug, debug, Bool);
ASSIGN_VAR(session_cfg_.fuse_singlethreaded, singlethreaded, Bool);
@@ -144,7 +152,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) {
std::string AssetStreamConfig::ToString() {
std::ostringstream ss;
- ss << "src_dir = " << src_dir_ << std::endl;
ss << "verbosity = " << session_cfg_.verbosity
<< std::endl;
ss << "debug = " << session_cfg_.fuse_debug
@@ -166,6 +173,14 @@ std::string AssetStreamConfig::ToString() {
<< session_cfg_.manifest_updater_threads << std::endl;
ss << "file_change_wait_duration_ms = "
<< session_cfg_.file_change_wait_duration_ms << std::endl;
+ ss << "dev_src_dir = " << dev_src_dir_ << std::endl;
+ ss << "dev_user_host = " << dev_target_.user_host << std::endl;
+ ss << "dev_ssh_port = " << dev_target_.ssh_port << std::endl;
+ ss << "dev_ssh_command = " << dev_target_.ssh_command
+ << std::endl;
+ ss << "dev_scp_command = " << dev_target_.scp_command
+ << std::endl;
+ ss << "dev_mount_dir = " << dev_target_.mount_dir << std::endl;
return ss.str();
}
diff --git a/asset_stream_manager/asset_stream_config.h b/asset_stream_manager/asset_stream_config.h
index 3dd6517..8cdbcca 100644
--- a/asset_stream_manager/asset_stream_config.h
+++ b/asset_stream_manager/asset_stream_config.h
@@ -23,6 +23,7 @@
#include "absl/status/status.h"
#include "asset_stream_manager/session_config.h"
+#include "session.h"
namespace cdc_ft {
@@ -38,7 +39,6 @@ class AssetStreamConfig {
// Loads a configuration from the JSON file at |path| and overrides any config
// values that are set in this file. Sample json file:
// {
- // "src_dir":"C:\\path\\to\\assets",
// "verbosity":3,
// "debug":0,
// "singlethreaded":0,
@@ -67,34 +67,29 @@ class AssetStreamConfig {
// read from the JSON file.
std::string GetFlagReadErrors();
- // Workstation directory to stream. Should usually be empty since mounts are
- // triggered by the CLI or the partner portal via a gRPC call, but useful
- // during development.
- const std::string& src_dir() const { return src_dir_; }
-
- // IP address of the instance to stream to. Should usually be empty since
- // mounts are triggered by the CLI or the partner portal via a gRPC call, but
- // useful during development.
- const std::string& instance_ip() const { return instance_ip_; }
-
- // IP address of the instance to stream to. Should usually be unset (0) since
- // mounts are triggered by the CLI or the partner portal via a gRPC call, but
- // useful during development.
- const uint16_t instance_port() const { return instance_port_; }
-
// Session configuration.
- const SessionConfig session_cfg() const { return session_cfg_; }
+ const SessionConfig& session_cfg() const { return session_cfg_; }
+
+ // Workstation directory to be streamed. Used for development purposes only
+ // to start a session right away when the service starts up. See dev CLI args.
+ const std::string& dev_src_dir() const { return dev_src_dir_; }
+
+ // Session target. Used for development purposes only to start a session right
+ // away when the service starts up. See dev CLI args.
+ const SessionTarget& dev_target() const { return dev_target_; }
// Whether to log to a file or to stdout.
bool log_to_stdout() const { return log_to_stdout_; }
private:
- std::string src_dir_;
- std::string instance_ip_;
- uint16_t instance_port_ = 0;
SessionConfig session_cfg_;
bool log_to_stdout_ = false;
+ // Configuration used for development. Allows users to specify a session
+ // via the service's command line.
+ std::string dev_src_dir_;
+ SessionTarget dev_target_;
+
// Use a set, so the flags are sorted alphabetically.
std::set flags_read_from_file_;
diff --git a/asset_stream_manager/asset_stream_manager.vcxproj b/asset_stream_manager/asset_stream_manager.vcxproj
index e659952..1f1a099 100644
--- a/asset_stream_manager/asset_stream_manager.vcxproj
+++ b/asset_stream_manager/asset_stream_manager.vcxproj
@@ -66,10 +66,9 @@
- //asset_stream_manager
+ //asset_stream_manager //cdc_stream
asset_stream_manager.exe
..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath)
- ..\/
diff --git a/asset_stream_manager/cdc_fuse_manager.cc b/asset_stream_manager/cdc_fuse_manager.cc
index e4cd4cc..29c9ba0 100644
--- a/asset_stream_manager/cdc_fuse_manager.cc
+++ b/asset_stream_manager/cdc_fuse_manager.cc
@@ -29,13 +29,10 @@ namespace {
constexpr char kFuseFilename[] = "cdc_fuse_fs";
constexpr char kLibFuseFilename[] = "libfuse.so";
constexpr char kFuseStdoutPrefix[] = "cdc_fuse_fs_stdout";
-constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc_file_transfer/";
-
-// Mount point for FUSE on the gamelet.
-constexpr char kMountDir[] = "/mnt/workstation";
+constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc-file-transfer/bin/";
// Cache directory on the gamelet to store data chunks.
-constexpr char kCacheDir[] = "/var/cache/asset_streaming";
+constexpr char kCacheDir[] = "~/.cache/cdc-file-transfer/chunks";
} // namespace
@@ -56,23 +53,14 @@ absl::Status CdcFuseManager::Deploy() {
std::string exe_dir;
RETURN_IF_ERROR(path::GetExeDir(&exe_dir), "Failed to get exe directory");
- std::string local_exe_path = path::Join(exe_dir, kFuseFilename);
- std::string local_lib_path = path::Join(exe_dir, kLibFuseFilename);
+ // Set the cwd to the exe dir and pass the filenames to scp. Otherwise, some
+ // scp implementations can get confused and create the wrong remote filenames.
+ path::SetCwd(exe_dir);
-#ifdef _DEBUG
- // Sync FUSE to the gamelet in debug. Debug builds are rather large, so
- // there's a gain from using sync.
- LOG_DEBUG("Syncing FUSE");
- RETURN_IF_ERROR(
- remote_util_->Sync({local_exe_path, local_lib_path}, kRemoteToolsBinDir),
- "Failed to sync FUSE to gamelet");
- LOG_DEBUG("Syncing FUSE succeeded");
-#else
- // Copy FUSE to the gamelet. This is usually faster in production since it
- // doesn't have to deploy ggp__server first.
+ // Copy FUSE to the gamelet.
LOG_DEBUG("Copying FUSE");
- RETURN_IF_ERROR(remote_util_->Scp({local_exe_path, local_lib_path},
- kRemoteToolsBinDir, true),
+ RETURN_IF_ERROR(remote_util_->Scp({kFuseFilename, kLibFuseFilename},
+ kRemoteToolsBinDir, /*compress=*/false),
"Failed to copy FUSE to gamelet");
LOG_DEBUG("Copying FUSE succeeded");
@@ -82,12 +70,12 @@ absl::Status CdcFuseManager::Deploy() {
RETURN_IF_ERROR(remote_util_->Chmod("a+x", remotePath),
"Failed to set executable flag on FUSE");
LOG_DEBUG("Making FUSE succeeded");
-#endif
return absl::OkStatus();
}
-absl::Status CdcFuseManager::Start(uint16_t local_port, uint16_t remote_port,
+absl::Status CdcFuseManager::Start(const std::string& mount_dir,
+ uint16_t local_port, uint16_t remote_port,
int verbosity, bool debug,
bool singlethreaded, bool enable_stats,
bool check, uint64_t cache_capacity,
@@ -115,15 +103,18 @@ absl::Status CdcFuseManager::Start(uint16_t local_port, uint16_t remote_port,
// Build the remote command.
std::string remotePath = path::JoinUnix(kRemoteToolsBinDir, kFuseFilename);
std::string remote_command = absl::StrFormat(
- "LD_LIBRARY_PATH=%s %s --instance='%s' "
- "--components='%s' --port=%i --cache_dir=%s "
+ "mkdir -p %s; LD_LIBRARY_PATH=%s %s "
+ "--instance=%s "
+ "--components=%s --port=%i --cache_dir=%s "
"--verbosity=%i --cleanup_timeout=%i --access_idle_timeout=%i --stats=%i "
"--check=%i --cache_capacity=%u -- -o allow_root -o ro -o nonempty -o "
"auto_unmount %s%s%s",
- kRemoteToolsBinDir, remotePath, instance_, component_args, remote_port,
- kCacheDir, verbosity, cleanup_timeout_sec, access_idle_timeout_sec,
- enable_stats, check, cache_capacity, kMountDir, debug ? " -d" : "",
- singlethreaded ? " -s" : "");
+ kRemoteToolsBinDir, kRemoteToolsBinDir, remotePath,
+ RemoteUtil::QuoteForSsh(instance_),
+ RemoteUtil::QuoteForSsh(component_args), remote_port, kCacheDir,
+ verbosity, cleanup_timeout_sec, access_idle_timeout_sec, enable_stats,
+ check, cache_capacity, RemoteUtil::QuoteForSsh(mount_dir),
+ debug ? " -d" : "", singlethreaded ? " -s" : "");
bool needs_deploy = false;
RETURN_IF_ERROR(
diff --git a/asset_stream_manager/cdc_fuse_manager.h b/asset_stream_manager/cdc_fuse_manager.h
index f629da9..26be602 100644
--- a/asset_stream_manager/cdc_fuse_manager.h
+++ b/asset_stream_manager/cdc_fuse_manager.h
@@ -40,6 +40,7 @@ class CdcFuseManager {
// |remote_port| to the workstation's |local_port|. Deploys the binary if
// necessary.
//
+ // |mount_dir| is the remote directory where to mount the FUSE.
// |verbosity| is the log verbosity used by the filesystem.
// |debug| puts the filesystem into debug mode if set to true. This also
// causes the process to run in the foreground, so that logs are piped through
@@ -51,10 +52,10 @@ class CdcFuseManager {
// |cleanup_timeout_sec| defines the data provider cleanup timeout in seconds.
// |access_idle_timeout_sec| defines the number of seconds after which data
// provider is considered to be access-idling.
- absl::Status Start(uint16_t local_port, uint16_t remote_port, int verbosity,
- bool debug, bool singlethreaded, bool enable_stats,
- bool check, uint64_t cache_capacity,
- uint32_t cleanup_timeout_sec,
+ absl::Status Start(const std::string& mount_dir, uint16_t local_port,
+ uint16_t remote_port, int verbosity, bool debug,
+ bool singlethreaded, bool enable_stats, bool check,
+ uint64_t cache_capacity, uint32_t cleanup_timeout_sec,
uint32_t access_idle_timeout_sec);
// Stops the CDC FUSE.
diff --git a/asset_stream_manager/local_assets_stream_manager_service_impl.cc b/asset_stream_manager/local_assets_stream_manager_service_impl.cc
index 86ab715..7f62507 100644
--- a/asset_stream_manager/local_assets_stream_manager_service_impl.cc
+++ b/asset_stream_manager/local_assets_stream_manager_service_impl.cc
@@ -17,6 +17,7 @@
#include
#include "absl/strings/str_format.h"
+#include "absl/strings/str_replace.h"
#include "absl/strings/str_split.h"
#include "asset_stream_manager/multi_session.h"
#include "asset_stream_manager/session_manager.h"
@@ -26,11 +27,21 @@
#include "common/process.h"
#include "common/sdk_util.h"
#include "common/status.h"
+#include "google/protobuf/text_format.h"
#include "manifest/manifest_updater.h"
+using TextFormat = google::protobuf::TextFormat;
+
namespace cdc_ft {
namespace {
+std::string RequestToString(const google::protobuf::Message& request) {
+ std::string str;
+ google::protobuf::TextFormat::PrintToString(request, &str);
+ if (!str.empty() && str.back() == '\n') str.pop_back();
+ return absl::StrReplaceAll(str, {{"\n", ", "}});
+}
+
// Parses |instance_name| of the form
// "organizations/{org-id}/projects/{proj-id}/pools/{pool-id}/gamelets/{gamelet-id}"
// into parts. The pool id is not returned.
@@ -102,48 +113,18 @@ LocalAssetsStreamManagerServiceImpl::~LocalAssetsStreamManagerServiceImpl() =
grpc::Status LocalAssetsStreamManagerServiceImpl::StartSession(
grpc::ServerContext* /*context*/, const StartSessionRequest* request,
StartSessionResponse* /*response*/) {
- LOG_INFO("RPC:StartSession(gamelet_name='%s', workstation_directory='%s'",
- request->gamelet_name(), request->workstation_directory());
+ LOG_INFO("RPC:StartSession(%s)", RequestToString(*request));
- metrics::DeveloperLogEvent evt;
- evt.as_manager_data = std::make_unique();
- evt.as_manager_data->session_start_data =
- std::make_unique();
- evt.as_manager_data->session_start_data->absl_status = absl::StatusCode::kOk;
- evt.as_manager_data->session_start_data->status =
- metrics::SessionStartStatus::kOk;
- evt.as_manager_data->session_start_data->origin =
- ConvertOrigin(request->origin());
-
- // Parse instance/project/org id.
- absl::Status status;
MultiSession* ms = nullptr;
- std::string instance_id, project_id, organization_id, instance_ip;
- uint16_t instance_port = 0;
- if (!ParseInstanceName(request->gamelet_name(), &instance_id, &project_id,
- &organization_id)) {
- status = absl::InvalidArgumentError(absl::StrFormat(
- "Failed to parse instance name '%s'", request->gamelet_name()));
- } else {
- evt.project_id = project_id;
- evt.organization_id = organization_id;
-
- status = InitSsh(instance_id, project_id, organization_id, &instance_ip,
- &instance_port);
-
- if (status.ok()) {
- status = session_manager_->StartSession(
- instance_id, project_id, organization_id, instance_ip, instance_port,
- request->workstation_directory(), &ms,
- &evt.as_manager_data->session_start_data->status);
- }
- }
+ metrics::DeveloperLogEvent evt;
+ std::string instance_id;
+ absl::Status status = StartSessionInternal(request, &instance_id, &ms, &evt);
evt.as_manager_data->session_start_data->absl_status = status.code();
if (ms) {
evt.as_manager_data->session_start_data->concurrent_session_count =
ms->GetSessionCount();
- if (!instance_id.empty() && ms->HasSessionForInstance(instance_id)) {
+ if (!instance_id.empty() && ms->HasSession(instance_id)) {
ms->RecordSessionEvent(std::move(evt), metrics::EventType::kSessionStart,
instance_id);
} else {
@@ -166,9 +147,14 @@ grpc::Status LocalAssetsStreamManagerServiceImpl::StartSession(
grpc::Status LocalAssetsStreamManagerServiceImpl::StopSession(
grpc::ServerContext* /*context*/, const StopSessionRequest* request,
StopSessionResponse* /*response*/) {
- LOG_INFO("RPC:StopSession(gamelet_id='%s')", request->gamelet_id());
+ LOG_INFO("RPC:StopSession(%s)", RequestToString(*request));
- absl::Status status = session_manager_->StopSession(request->gamelet_id());
+ std::string instance_id =
+ !request->gamelet_id().empty() // Stadia use case
+ ? request->gamelet_id()
+ : absl::StrCat(request->user_host(), ":", request->mount_dir());
+
+ absl::Status status = session_manager_->StopSession(instance_id);
if (status.ok()) {
LOG_INFO("StopSession() succeeded");
} else {
@@ -177,6 +163,86 @@ grpc::Status LocalAssetsStreamManagerServiceImpl::StopSession(
return ToGrpcStatus(status);
}
+absl::Status LocalAssetsStreamManagerServiceImpl::StartSessionInternal(
+ const StartSessionRequest* request, std::string* instance_id,
+ MultiSession** ms, metrics::DeveloperLogEvent* evt) {
+ instance_id->clear();
+ *ms = nullptr;
+ evt->as_manager_data = std::make_unique();
+ evt->as_manager_data->session_start_data =
+ std::make_unique();
+ evt->as_manager_data->session_start_data->absl_status = absl::StatusCode::kOk;
+ evt->as_manager_data->session_start_data->status =
+ metrics::SessionStartStatus::kOk;
+ evt->as_manager_data->session_start_data->origin =
+ ConvertOrigin(request->origin());
+
+ if (!(request->gamelet_name().empty() ^ request->user_host().empty())) {
+ return absl::InvalidArgumentError(
+ "Must set either gamelet_name or user_host.");
+ }
+
+ if (request->mount_dir().empty()) {
+ return absl::InvalidArgumentError("mount_dir cannot be empty.");
+ }
+
+ SessionTarget target;
+ if (!request->gamelet_name().empty()) {
+ ASSIGN_OR_RETURN(target,
+ GetTargetForStadia(*request, instance_id, &evt->project_id,
+ &evt->organization_id));
+ } else {
+ target = GetTarget(*request, instance_id);
+ }
+
+ return session_manager_->StartSession(
+ *instance_id, request->workstation_directory(), target, evt->project_id,
+ evt->organization_id, ms,
+ &evt->as_manager_data->session_start_data->status);
+}
+
+absl::StatusOr
+LocalAssetsStreamManagerServiceImpl::GetTargetForStadia(
+ const StartSessionRequest& request, std::string* instance_id,
+ std::string* project_id, std::string* organization_id) {
+ SessionTarget target;
+ target.mount_dir = request.mount_dir();
+ target.ssh_command = request.ssh_command();
+ target.scp_command = request.scp_command();
+
+ // Parse instance/project/org id.
+ if (!ParseInstanceName(request.gamelet_name(), instance_id, project_id,
+ organization_id)) {
+ return absl::InvalidArgumentError(absl::StrFormat(
+ "Failed to parse instance name '%s'", request.gamelet_name()));
+ }
+
+ // Run 'ggp ssh init' to determine IP (host) and port.
+ std::string instance_ip;
+ uint16_t instance_port = 0;
+ RETURN_IF_ERROR(InitSsh(*instance_id, *project_id, *organization_id,
+ &instance_ip, &instance_port));
+
+ target.user_host = "cloudcast@" + instance_ip;
+ target.ssh_port = instance_port;
+ return target;
+}
+
+SessionTarget LocalAssetsStreamManagerServiceImpl::GetTarget(
+ const StartSessionRequest& request, std::string* instance_id) {
+ SessionTarget target;
+ target.user_host = request.user_host();
+ target.mount_dir = request.mount_dir();
+ target.ssh_command = request.ssh_command();
+ target.scp_command = request.scp_command();
+ target.ssh_port = request.port() > 0 && request.port() <= UINT16_MAX
+ ? static_cast(request.port())
+ : RemoteUtil::kDefaultSshPort;
+
+ *instance_id = absl::StrCat(target.user_host, ":", target.mount_dir);
+ return target;
+}
+
metrics::RequestOrigin LocalAssetsStreamManagerServiceImpl::ConvertOrigin(
StartSessionRequestOrigin origin) const {
switch (origin) {
diff --git a/asset_stream_manager/local_assets_stream_manager_service_impl.h b/asset_stream_manager/local_assets_stream_manager_service_impl.h
index 3f5c6d5..e22726b 100644
--- a/asset_stream_manager/local_assets_stream_manager_service_impl.h
+++ b/asset_stream_manager/local_assets_stream_manager_service_impl.h
@@ -19,6 +19,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
+#include "asset_stream_manager/session.h"
#include "asset_stream_manager/session_config.h"
#include "metrics/metrics.h"
#include "proto/local_assets_stream_manager.grpc.pb.h"
@@ -66,6 +67,27 @@ class LocalAssetsStreamManagerServiceImpl final
StopSessionResponse* response) override;
private:
+ // Internal implementation of StartSession(). Returns the unique session
+ // identifier |instance_id|, the created or retrieved MultiSession |ms| as
+ // well as the filled metrics event |evt|.
+ absl::Status LocalAssetsStreamManagerServiceImpl::StartSessionInternal(
+ const StartSessionRequest* request, std::string* instance_id,
+ MultiSession** ms, metrics::DeveloperLogEvent* evt);
+
+ // Stadia-specific: Returns a SessionTarget from a gamelet name and fills in
+ // the gamelet's |instance_id|, |project_id| and |organization_id|.
+ // Used if request.gamelet_name() is set.
+ // Fails if the gamelet name fails to parse or if ggp ssh init fails.
+ absl::StatusOr GetTargetForStadia(
+ const StartSessionRequest& request, std::string* instance_id,
+ std::string* project_id, std::string* organization_id);
+
+ // Returns a SessionTarget from the corresponding fields in |request|.
+ // |instance_id| is set to [user@]host:mount_dir.
+ // Used if request.gamelet_name() is not set.
+ SessionTarget GetTarget(const StartSessionRequest& request,
+ std::string* instance_id);
+
// Convert StartSessionRequest enum to metrics enum.
metrics::RequestOrigin ConvertOrigin(StartSessionRequestOrigin origin) const;
diff --git a/asset_stream_manager/main.cc b/asset_stream_manager/main.cc
index 8fc4f3c..7e4d74e 100644
--- a/asset_stream_manager/main.cc
+++ b/asset_stream_manager/main.cc
@@ -20,6 +20,7 @@
#include "asset_stream_manager/local_assets_stream_manager_service_impl.h"
#include "asset_stream_manager/session_management_server.h"
#include "asset_stream_manager/session_manager.h"
+#include "common/grpc_status.h"
#include "common/log.h"
#include "common/path.h"
#include "common/process.h"
@@ -49,15 +50,19 @@ absl::Status Run(const AssetStreamConfig& cfg) {
background_service.SetExitCallback(
[&sm_server]() { return sm_server.Shutdown(); });
- RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort));
- if (!cfg.src_dir().empty()) {
- MultiSession* ms_unused;
- metrics::SessionStartStatus status_unused;
- RETURN_IF_ERROR(session_manager.StartSession(
- /*instance_id=*/cfg.instance_ip(), /*project_id=*/std::string(),
- /*organization_id=*/std::string(), cfg.instance_ip(),
- cfg.instance_port(), cfg.src_dir(), &ms_unused, &status_unused));
+ if (!cfg.dev_src_dir().empty()) {
+ localassetsstreammanager::StartSessionRequest request;
+ request.set_workstation_directory(cfg.dev_src_dir());
+ request.set_user_host(cfg.dev_target().user_host);
+ request.set_mount_dir(cfg.dev_target().mount_dir);
+ request.set_port(cfg.dev_target().ssh_port);
+ request.set_ssh_command(cfg.dev_target().ssh_command);
+ request.set_scp_command(cfg.dev_target().scp_command);
+ localassetsstreammanager::StartSessionResponse response;
+ RETURN_ABSL_IF_ERROR(
+ session_service.StartSession(nullptr, &request, &response));
}
+ RETURN_IF_ERROR(sm_server.Start(kSessionManagementPort));
sm_server.RunUntilShutdown();
return absl::OkStatus();
}
@@ -96,18 +101,6 @@ const auto RETIRED_FLAGS_REG_allow_edge =
} // namespace
} // namespace cdc_ft
-ABSL_FLAG(std::string, src_dir, "",
- "Start a streaming session immediately from the given Windows path. "
- "Used during development. Must have exactly one gamelet reserved or "
- "specify the target gamelet with --instance.");
-ABSL_FLAG(std::string, instance_ip, "",
- "Connect to the instance with the given IP address for this session. "
- "This flag is ignored unless --src_dir is set as well. Used "
- "during development. ");
-ABSL_FLAG(uint16_t, instance_port, 0,
- "Connect to the instance through the given SSH port. "
- "This flag is ignored unless --src_dir is set as well. Used "
- "during development. ");
ABSL_FLAG(int, verbosity, 2, "Verbosity of the log output");
ABSL_FLAG(bool, debug, false, "Run FUSE filesystem in debug mode");
ABSL_FLAG(bool, singlethreaded, false,
@@ -132,6 +125,28 @@ ABSL_FLAG(uint32_t, access_idle_timeout, cdc_ft::DataProvider::kAccessIdleSec,
"Do not run instance cache cleanups for this many seconds after the "
"last file access");
+// Development args.
+ABSL_FLAG(std::string, dev_src_dir, "",
+ "Start a streaming session immediately from the given Windows path. "
+ "Used during development. Must also specify --dev_user_host and "
+ "--dev_mount_dir and possibly other --dev flags, depending on the "
+ "SSH setup");
+ABSL_FLAG(std::string, dev_user_host, "",
+ "Username and host to stream to, of the form [user@]host. Used "
+ "during development. See --dev_src_dir for more info.");
+ABSL_FLAG(uint16_t, dev_ssh_port, cdc_ft::RemoteUtil::kDefaultSshPort,
+ "SSH port to use for the connection to the host. Used during "
+ "development. See --dev_src_dir for more info.");
+ABSL_FLAG(std::string, dev_ssh_command, "",
+ "Ssh command and extra flags to use for the connection to the host. "
+ "Used during development. See --dev_src_dir for more info.");
+ABSL_FLAG(std::string, dev_scp_command, "",
+ "Scp command and extra flags to use for the connection to the host. "
+ "Used during development. See --dev_src_dir for more info.");
+ABSL_FLAG(std::string, dev_mount_dir, "",
+ "Directory on the host to stream to. Used during development. See "
+ "--dev_src_dir for more info.");
+
int main(int argc, char* argv[]) {
absl::ParseCommandLine(argc, argv);
diff --git a/asset_stream_manager/multi_session.cc b/asset_stream_manager/multi_session.cc
index c46acee..9493cab 100644
--- a/asset_stream_manager/multi_session.cc
+++ b/asset_stream_manager/multi_session.cc
@@ -478,7 +478,7 @@ absl::Status MultiSession::Shutdown() {
while (!sessions_.empty()) {
std::string instance_id = sessions_.begin()->first;
RETURN_IF_ERROR(StopSession(instance_id),
- "Failed to stop session for instance id %s", instance_id);
+ "Failed to stop session for instance id '%s'", instance_id);
sessions_.erase(instance_id);
}
@@ -499,10 +499,9 @@ absl::Status MultiSession::Status() {
}
absl::Status MultiSession::StartSession(const std::string& instance_id,
+ const SessionTarget& target,
const std::string& project_id,
- const std::string& organization_id,
- const std::string& instance_ip,
- uint16_t instance_port) {
+ const std::string& organization_id) {
absl::MutexLock lock(&sessions_mutex_);
if (sessions_.find(instance_id) != sessions_.end()) {
@@ -523,9 +522,8 @@ absl::Status MultiSession::StartSession(const std::string& instance_id,
metrics_recorder_->GetMetricsService(),
metrics_recorder_->MultiSessionId(), project_id, organization_id);
- auto session =
- std::make_unique(instance_id, instance_ip, instance_port, cfg_,
- process_factory_, std::move(metrics_recorder));
+ auto session = std::make_unique(
+ instance_id, target, cfg_, process_factory_, std::move(metrics_recorder));
RETURN_IF_ERROR(session->Start(local_asset_stream_port_,
kAssetStreamPortFirst, kAssetStreamPortLast));
@@ -552,7 +550,7 @@ absl::Status MultiSession::StopSession(const std::string& instance_id) {
return absl::OkStatus();
}
-bool MultiSession::HasSessionForInstance(const std::string& instance_id) {
+bool MultiSession::HasSession(const std::string& instance_id) {
absl::ReaderMutexLock lock(&sessions_mutex_);
return sessions_.find(instance_id) != sessions_.end();
}
@@ -663,19 +661,19 @@ void MultiSession::OnContentSent(size_t byte_count, size_t chunck_count,
std::string instance_id) {
if (instance_id.empty()) {
// |instance_id| is empty only in case when manifest wasn't acknowledged by
- // the gamelet yet (ConfigStreamServiceImpl::AckManifestIdReceived was not
+ // the instance yet (ConfigStreamServiceImpl::AckManifestIdReceived was not
// invoked). This means MultiSession::StartSession is still waiting for
// manifest acknowledge and |sessions_mutex_| is currently locked. In this
// case invoking MultiSession::FindSession and waiting for |sessions_mutex_|
// to get unlocked will block the current thread, which is also responsible
// for receiving a call at ConfigStreamServiceImpl::AckManifestIdReceived.
// This causes a deadlock and leads to a DeadlineExceeded error.
- LOG_WARNING("Can not record session content for an empty instance_id.");
+ LOG_WARNING("Cannot record session content for an empty instance_id.");
return;
}
Session* session = FindSession(instance_id);
if (session == nullptr) {
- LOG_WARNING("Failed to find active session by instrance id: %s",
+ LOG_WARNING("Failed to find active session by instance id '%s'",
instance_id);
return;
}
diff --git a/asset_stream_manager/multi_session.h b/asset_stream_manager/multi_session.h
index 673f0e7..52a8f76 100644
--- a/asset_stream_manager/multi_session.h
+++ b/asset_stream_manager/multi_session.h
@@ -36,6 +36,7 @@ namespace cdc_ft {
class ProcessFactory;
class Session;
+struct SessionTarget;
using ManifestUpdatedCb = std::function;
// Updates the manifest and runs a file watcher in a background thread.
@@ -65,7 +66,7 @@ class MultiSessionRunner {
// Stops updating the manifest and |server_|.
absl::Status Shutdown() ABSL_LOCKS_EXCLUDED(mutex_);
- // Waits until a manifest is ready and the gamelet |instance_id| has
+ // Waits until a manifest is ready and the session for |instance_id| has
// acknowledged the reception of the currently set manifest id. |fuse_timeout|
// is the timeout for waiting for the FUSE manifest ack. The time required to
// generate the manifest is not part of this timeout as this could take a
@@ -172,30 +173,29 @@ class MultiSession {
// Not thread-safe.
absl::Status Status();
- // Starts a new streaming session to the instance with given |instance_id| and
+ // Starts a new streaming session to the instance described by |target| and
// waits until the FUSE has received the initial manifest id.
// Returns an error if a session for that instance already exists.
- // |instance_id| is the instance id of the target remote instance.
- // |project_id| is id of the project that contains the instance.
- // |organization_id| is id of the organization that contains the instance.
- // |instance_ip| is the IP address of the instance.
- // |instance_port| is the SSH port for connecting to the remote instance.
+ // |instance_id| is a unique id for the remote instance and mount directory,
+ // e.g. user@host:mount_dir.
+ // |target| identifies the remote target and how to connect to it.
+ // |project_id| is the project that owns the instance. Stadia only.
+ // |organization_id| is organization that contains the instance. Stadia only.
// Thread-safe.
absl::Status StartSession(const std::string& instance_id,
+ const SessionTarget& target,
const std::string& project_id,
- const std::string& organization_id,
- const std::string& instance_ip,
- uint16_t instance_port)
+ const std::string& organization_id)
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
- // Starts a new streaming session to the gamelet with given |instance_id|.
+ // Stops the session for the given |instance_id|.
// Returns a NotFound error if a session for that instance does not exists.
// Thread-safe.
absl::Status StopSession(const std::string& instance_id)
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
// Returns true if there is an existing session for |instance_id|.
- bool HasSessionForInstance(const std::string& instance_id)
+ bool HasSession(const std::string& instance_id)
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
// Returns true if the FUSE process is up and running for an existing session
@@ -226,7 +226,7 @@ class MultiSession {
void RecordMultiSessionEvent(metrics::DeveloperLogEvent event,
metrics::EventType code);
- // Record an event for a session associated with the |instance|.
+ // Record an event for a session associated with the |instance_id|.
void RecordSessionEvent(metrics::DeveloperLogEvent event,
metrics::EventType code,
const std::string& instance_id);
diff --git a/asset_stream_manager/session.cc b/asset_stream_manager/session.cc
index 77dcab8..ae0ca73 100644
--- a/asset_stream_manager/session.cc
+++ b/asset_stream_manager/session.cc
@@ -26,7 +26,7 @@ namespace cdc_ft {
namespace {
// Timeout for initial gamelet connection.
-constexpr double kInstanceConnectionTimeoutSec = 10.0f;
+constexpr double kInstanceConnectionTimeoutSec = 60.0f;
metrics::DeveloperLogEvent GetEventWithHeartBeatData(size_t bytes,
size_t chunks) {
@@ -40,18 +40,24 @@ metrics::DeveloperLogEvent GetEventWithHeartBeatData(size_t bytes,
} // namespace
-Session::Session(std::string instance_id, std::string instance_ip,
- uint16_t instance_port, SessionConfig cfg,
- ProcessFactory* process_factory,
+Session::Session(std::string instance_id, const SessionTarget& target,
+ SessionConfig cfg, ProcessFactory* process_factory,
std::unique_ptr metrics_recorder)
: instance_id_(std::move(instance_id)),
+ mount_dir_(target.mount_dir),
cfg_(std::move(cfg)),
process_factory_(process_factory),
remote_util_(cfg_.verbosity, cfg_.quiet, process_factory,
/*forward_output_to_logging=*/true),
metrics_recorder_(std::move(metrics_recorder)) {
assert(metrics_recorder_);
- remote_util_.SetUserHostAndPort(instance_ip, instance_port);
+ remote_util_.SetUserHostAndPort(target.user_host, target.ssh_port);
+ if (!target.ssh_command.empty()) {
+ remote_util_.SetSshCommand(target.ssh_command);
+ }
+ if (!target.scp_command.empty()) {
+ remote_util_.SetScpCommand(target.scp_command);
+ }
}
Session::~Session() {
@@ -80,9 +86,10 @@ absl::Status Session::Start(int local_port, int first_remote_port,
fuse_ = std::make_unique(instance_id_, process_factory_,
&remote_util_);
RETURN_IF_ERROR(
- fuse_->Start(local_port, remote_port, cfg_.verbosity, cfg_.fuse_debug,
- cfg_.fuse_singlethreaded, cfg_.stats, cfg_.fuse_check,
- cfg_.fuse_cache_capacity, cfg_.fuse_cleanup_timeout_sec,
+ fuse_->Start(mount_dir_, local_port, remote_port, cfg_.verbosity,
+ cfg_.fuse_debug, cfg_.fuse_singlethreaded, cfg_.stats,
+ cfg_.fuse_check, cfg_.fuse_cache_capacity,
+ cfg_.fuse_cleanup_timeout_sec,
cfg_.fuse_access_idle_timeout_sec),
"Failed to start instance component");
return absl::OkStatus();
diff --git a/asset_stream_manager/session.h b/asset_stream_manager/session.h
index f6d57dc..424c795 100644
--- a/asset_stream_manager/session.h
+++ b/asset_stream_manager/session.h
@@ -32,17 +32,29 @@ class CdcFuseManager;
class ProcessFactory;
class Process;
-// Manages the connection of a workstation to a single gamelet.
+// Defines a remote target and how to connect to it.
+struct SessionTarget {
+ // SSH username and hostname of the remote target, formed as [user@]host.
+ std::string user_host;
+ // Port to use for SSH connections to the remote target.
+ uint16_t ssh_port;
+ // Ssh command to use to connect to the remote target.
+ std::string ssh_command;
+ // Scp command to use to copy files to the remote target.
+ std::string scp_command;
+ // Directory on the remote target where to mount the streamed directory.
+ std::string mount_dir;
+};
+
+// Manages the connection of a workstation to a single remote instance.
class Session {
public:
// |instance_id| is a unique id for the remote instance.
- // |instance_ip| is the IP address of the remote instance.
- // |instance_port| is the SSH tunnel port for connecting to the instance.
+ // |target| identifies the remote target and how to connect to it.
// |cfg| contains generic configuration parameters for the session.
// |process_factory| abstracts process creation.
- Session(std::string instance_id, std::string instance_ip,
- uint16_t instance_port, SessionConfig cfg,
- ProcessFactory* process_factory,
+ Session(std::string instance_id, const SessionTarget& target,
+ SessionConfig cfg, ProcessFactory* process_factory,
std::unique_ptr metrics_recorder);
~Session();
@@ -71,6 +83,7 @@ class Session {
private:
const std::string instance_id_;
+ const std::string mount_dir_;
const SessionConfig cfg_;
ProcessFactory* const process_factory_;
diff --git a/asset_stream_manager/session_manager.cc b/asset_stream_manager/session_manager.cc
index 66829b4..6e10001 100644
--- a/asset_stream_manager/session_manager.cc
+++ b/asset_stream_manager/session_manager.cc
@@ -59,10 +59,10 @@ absl::Status SessionManager::Shutdown() {
}
absl::Status SessionManager::StartSession(
- const std::string& instance_id, const std::string& project_id,
- const std::string& organization_id, const std::string& instance_ip,
- uint16_t instance_port, const std::string& src_dir,
- MultiSession** multi_session, metrics::SessionStartStatus* metrics_status) {
+ const std::string& instance_id, const std::string& src_dir,
+ const SessionTarget& target, const std::string& project_id,
+ const std::string& organization_id, MultiSession** multi_session,
+ metrics::SessionStartStatus* metrics_status) {
*multi_session = nullptr;
*metrics_status = metrics::SessionStartStatus::kOk;
@@ -83,7 +83,7 @@ absl::Status SessionManager::StartSession(
// Early out if we are streaming the workstation dir to the given gamelet.
MultiSession* ms = GetMultiSession(src_dir);
*multi_session = ms;
- if (ms && ms->HasSessionForInstance(instance_id)) {
+ if (ms && ms->HasSession(instance_id)) {
if (ms->IsSessionHealthy(instance_id)) {
LOG_INFO("Reusing existing session");
return absl::OkStatus();
@@ -95,8 +95,8 @@ absl::Status SessionManager::StartSession(
// We could also fall through, but this might restart the MultiSession.
status = ms->StopSession(instance_id);
if (status.ok()) {
- status = ms->StartSession(instance_id, project_id, organization_id,
- instance_ip, instance_port);
+ status =
+ ms->StartSession(instance_id, target, project_id, organization_id);
}
if (!status.ok()) {
*metrics_status = metrics::SessionStartStatus::kRestartSessionError;
@@ -129,8 +129,7 @@ absl::Status SessionManager::StartSession(
// Start the session.
LOG_INFO("Starting streaming session from path '%s' to instance '%s'",
src_dir, instance_id);
- status = ms->StartSession(instance_id, project_id, organization_id,
- instance_ip, instance_port);
+ status = ms->StartSession(instance_id, target, project_id, organization_id);
if (!status.ok()) {
*metrics_status = metrics::SessionStartStatus::kStartSessionError;
}
@@ -164,15 +163,16 @@ absl::StatusOr SessionManager::GetOrCreateMultiSession(
return iter->second.get();
}
-absl::Status SessionManager::StopSessionInternal(const std::string& instance) {
+absl::Status SessionManager::StopSessionInternal(
+ const std::string& instance_id) {
absl::Status status;
for (const auto& [key, ms] : sessions_) {
- if (!ms->HasSessionForInstance(instance)) continue;
+ if (!ms->HasSession(instance_id)) continue;
LOG_INFO("Stopping session streaming from '%s' to instance '%s'",
- ms->src_dir(), instance);
- RETURN_IF_ERROR(ms->StopSession(instance),
- "Failed to stop session for instance '%s'", instance);
+ ms->src_dir(), instance_id);
+ RETURN_IF_ERROR(ms->StopSession(instance_id),
+ "Failed to stop session for instance '%s'", instance_id);
// Session was stopped. If the MultiSession is empty now, delete it.
if (ms->Empty()) {
@@ -187,7 +187,7 @@ absl::Status SessionManager::StopSessionInternal(const std::string& instance) {
}
return absl::NotFoundError(
- absl::StrFormat("No session for instance id '%s' found", instance));
+ absl::StrFormat("No session for instance '%s' found", instance_id));
}
} // namespace cdc_ft
diff --git a/asset_stream_manager/session_manager.h b/asset_stream_manager/session_manager.h
index dccd428..acc6e16 100644
--- a/asset_stream_manager/session_manager.h
+++ b/asset_stream_manager/session_manager.h
@@ -30,43 +30,46 @@ namespace cdc_ft {
class MultiSession;
class ProcessFactory;
+struct SessionTarget;
-// Implements a service to start and stop streaming sessions as a server.
-// The corresponding clients are implemented by the ggp CLI and SDK Proxy.
-// The CLI triggers StartSession() from `ggp instance mount --local-dir` and
-// StopSession() from `ggp instance unmount`. SDK Proxy invokes StartSession()
-// when a user starts a new game from the partner portal and sets an `Asset
-// streaming directory` in the `Advanced settings` in the `Play settings`
-// dialog.
-// This service is owned by SessionManagementServer.
+// Adds logic around MultiSession to start and stop streaming sessions. Makes
+// sure that some invariants are maintained, like no two streaming sessions
+// exist to the same target user@host:dir.
class SessionManager {
public:
SessionManager(SessionConfig cfg, ProcessFactory* process_factory,
metrics::MetricsService* metrics_service);
~SessionManager();
- // Starts a session and populates |multi_session| and |metrics_status|.
+ // Starts a new session or reuses an existing one.
+ // |instance_id| is a unique id for the remote instance and mount directory,
+ // e.g. user@host:mount_dir.
+ // |src_dir| is the local directory to stream.
+ // |target| identifies the remote target and how to connect to it.
+ // |project_id| is the project that owns the instance. Stadia only.
+ // |organization_id| is organization that contains the instance. Stadia only.
+ // Populates |multi_session| and |metrics_status| on success.
absl::Status StartSession(const std::string& instance_id,
+ const std::string& src_dir,
+ const SessionTarget& target,
const std::string& project_id,
const std::string& organization_id,
- const std::string& instance_ip,
- uint16_t instance_port, const std::string& src_dir,
MultiSession** multi_session,
metrics::SessionStartStatus* metrics_status)
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
- // Stops the session for the given |instance|. Returns a NotFound error if no
- // session exists.
- absl::Status StopSession(const std::string& instance)
+ // Stops the session for the given |instance_id|.
+ // Returns a NotFound error if no session exists.
+ absl::Status StopSession(const std::string& instance_id)
ABSL_LOCKS_EXCLUDED(sessions_mutex_);
// Shuts down all existing MultiSessions.
absl::Status Shutdown() ABSL_LOCKS_EXCLUDED(sessions_mutex_);
private:
- // Stops the session for the given |instance|. Returns a NotFound error if no
- // session exists.
- absl::Status StopSessionInternal(const std::string& instance)
+ // Stops the session for the given |instance_id|. Returns a NotFound error if
+ // no session exists.
+ absl::Status StopSessionInternal(const std::string& instance_id)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(sessions_mutex_);
// Returns the MultiSession for the given workstation directory |src_dir| or
diff --git a/cdc_fuse_fs/cdc_fuse_fs.vcxproj b/cdc_fuse_fs/cdc_fuse_fs.vcxproj
index b60e370..25c080d 100644
--- a/cdc_fuse_fs/cdc_fuse_fs.vcxproj
+++ b/cdc_fuse_fs/cdc_fuse_fs.vcxproj
@@ -52,7 +52,6 @@
//cdc_fuse_fs
cdc_fuse_fs
..\;..\third_party\absl;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;$(AdditionalIncludeDirectories)
- ..\/
diff --git a/cdc_rsync/cdc_rsync.vcxproj b/cdc_rsync/cdc_rsync.vcxproj
index c83069a..0a255b6 100644
--- a/cdc_rsync/cdc_rsync.vcxproj
+++ b/cdc_rsync/cdc_rsync.vcxproj
@@ -69,16 +69,15 @@
//cdc_rsync
cdc_rsync.exe
..\;..\third_party\absl;..\third_party\blake3\c;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;$(VC_IncludePath);$(WindowsSDK_IncludePath)
- ..\/
diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc
index ea04cab..c595bcb 100644
--- a/cdc_rsync/cdc_rsync_client.cc
+++ b/cdc_rsync/cdc_rsync_client.cc
@@ -46,8 +46,8 @@ constexpr int kExitCodeNotFound = 127;
constexpr int kForwardPortFirst = 44450;
constexpr int kForwardPortLast = 44459;
-constexpr char kGgpServerFilename[] = "cdc_rsync_server";
-constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc_file_transfer/";
+constexpr char kCdcServerFilename[] = "cdc_rsync_server";
+constexpr char kRemoteToolsBinDir[] = "~/.cache/cdc-file-transfer/bin/";
SetOptionsRequest::FilterRule::Type ToProtoType(PathFilter::Rule::Type type) {
switch (type) {
@@ -178,7 +178,7 @@ absl::Status CdcRsyncClient::StartServer() {
std::vector components;
status = GameletComponent::Get(
- {path::Join(component_dir, kGgpServerFilename)}, &components);
+ {path::Join(component_dir, kCdcServerFilename)}, &components);
if (!status.ok()) {
return MakeStatus(
"Required instance component not found. Make sure the file "
@@ -202,7 +202,7 @@ absl::Status CdcRsyncClient::StartServer() {
int port = *port_res;
std::string remote_server_path =
- std::string(kRemoteToolsBinDir) + kGgpServerFilename;
+ std::string(kRemoteToolsBinDir) + kCdcServerFilename;
// Test existence manually to prevent misleading bash output message
// "bash: .../cdc_rsync_server: No such file or directory".
// Also create the bin dir because otherwise scp below might fail.
@@ -411,9 +411,9 @@ absl::Status CdcRsyncClient::DeployServer() {
// scp cdc_rsync_server to a temp location on the gamelet.
std::string remoteServerTmpPath =
- absl::StrFormat("%s%s.%s", kRemoteToolsBinDir, kGgpServerFilename,
+ absl::StrFormat("%s%s.%s", kRemoteToolsBinDir, kCdcServerFilename,
Util::GenerateUniqueId());
- std::string localServerPath = path::Join(exe_dir, kGgpServerFilename);
+ std::string localServerPath = path::Join(exe_dir, kCdcServerFilename);
status = remote_util_.Scp({localServerPath}, remoteServerTmpPath,
/*compress=*/true);
if (!status.ok()) {
@@ -424,9 +424,9 @@ absl::Status CdcRsyncClient::DeployServer() {
// - Make the old cdc_rsync_server writable (if it exists).
// - Make the new cdc_rsync_server executable.
// - Replace the old cdc_rsync_server by the new one.
- std::string old_path = RemoteUtil::EscapeForWindows(
- std::string(kRemoteToolsBinDir) + kGgpServerFilename);
- std::string new_path = RemoteUtil::EscapeForWindows(remoteServerTmpPath);
+ std::string old_path = RemoteUtil::QuoteForWindows(
+ std::string(kRemoteToolsBinDir) + kCdcServerFilename);
+ std::string new_path = RemoteUtil::QuoteForWindows(remoteServerTmpPath);
std::string replace_cmd = absl::StrFormat(
" ([ ! -f %s ] || chmod u+w %s) && chmod a+x %s && mv %s %s", old_path,
old_path, new_path, new_path, old_path);
diff --git a/cdc_rsync/params.cc b/cdc_rsync/params.cc
index d98597a..aa6d91c 100644
--- a/cdc_rsync/params.cc
+++ b/cdc_rsync/params.cc
@@ -73,10 +73,10 @@ Options:
-R, --relative Use relative path names
--existing Skip creating new files on instance
--copy-dest dir Use files from dir as sync base if files are missing
- --ssh-command Path and arguments of SSH command to use, e.g.
+ --ssh-command Path and arguments of ssh command to use, e.g.
C:\path\to\ssh.exe -F config -i id_rsa -oStrictHostKeyChecking=yes -oUserKnownHostsFile="""known_hosts"""
Can also be specified by the CDC_SSH_COMMAND environment variable.
- --scp-command Path and arguments of SSH command to use, e.g.
+ --scp-command Path and arguments of scp command to use, e.g.
C:\path\to\scp.exe -F config -i id_rsa -oStrictHostKeyChecking=yes -oUserKnownHostsFile="""known_hosts"""
Can also be specified by the CDC_SCP_COMMAND environment variable.
-h --help Help for cdc_rsync
diff --git a/cdc_rsync_server/cdc_rsync_server.vcxproj b/cdc_rsync_server/cdc_rsync_server.vcxproj
index 4aa0d73..81027c5 100644
--- a/cdc_rsync_server/cdc_rsync_server.vcxproj
+++ b/cdc_rsync_server/cdc_rsync_server.vcxproj
@@ -50,7 +50,6 @@
//cdc_rsync_server:cdc_rsync_server
cdc_rsync_server
..\;..\third_party\absl;..\third_party\blake3\c;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\googletest\googletest\include;..\third_party\protobuf\src
- ..\/
diff --git a/cdc_stream/.gitignore b/cdc_stream/.gitignore
new file mode 100644
index 0000000..7dc8dde
--- /dev/null
+++ b/cdc_stream/.gitignore
@@ -0,0 +1,3 @@
+x64/*
+*.log
+*.user
\ No newline at end of file
diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD
new file mode 100644
index 0000000..6c1ae02
--- /dev/null
+++ b/cdc_stream/BUILD
@@ -0,0 +1,26 @@
+package(default_visibility = [
+ "//:__subpackages__",
+])
+
+cc_binary(
+ name = "cdc_stream",
+ srcs = ["main.cc"],
+ deps = [
+ ":local_assets_stream_manager_client",
+ "//common:log",
+ "//common:path",
+ "@com_google_absl//absl/flags:parse",
+ "@com_google_absl//absl/status",
+ ],
+)
+
+cc_library(
+ name = "local_assets_stream_manager_client",
+ srcs = ["local_assets_stream_manager_client.cc"],
+ hdrs = ["local_assets_stream_manager_client.h"],
+ deps = [
+ "//common:grpc_status",
+ "//proto:local_assets_stream_manager_grpc_proto",
+ "@com_google_absl//absl/status",
+ ],
+)
diff --git a/cdc_stream/local_assets_stream_manager_client.cc b/cdc_stream/local_assets_stream_manager_client.cc
new file mode 100644
index 0000000..434e6d4
--- /dev/null
+++ b/cdc_stream/local_assets_stream_manager_client.cc
@@ -0,0 +1,62 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "cdc_stream/local_assets_stream_manager_client.h"
+
+#include "absl/status/status.h"
+#include "common/grpc_status.h"
+
+namespace cdc_ft {
+
+using StartSessionRequest = localassetsstreammanager::StartSessionRequest;
+using StartSessionResponse = localassetsstreammanager::StartSessionResponse;
+using StopSessionRequest = localassetsstreammanager::StopSessionRequest;
+using StopSessionResponse = localassetsstreammanager::StopSessionResponse;
+
+LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient(
+ std::shared_ptr channel) {
+ stub_ = LocalAssetsStreamManager::NewStub(std::move(channel));
+}
+
+LocalAssetsStreamManagerClient::~LocalAssetsStreamManagerClient() = default;
+
+absl::Status LocalAssetsStreamManagerClient::StartSession(
+ const std::string& src_dir, const std::string& user_host, uint16_t ssh_port,
+ const std::string& mount_dir, const std::string& ssh_command,
+ const std::string& scp_command) {
+ StartSessionRequest request;
+ request.set_workstation_directory(src_dir);
+ request.set_user_host(user_host);
+ request.set_port(ssh_port);
+ request.set_mount_dir(mount_dir);
+ request.set_ssh_command(ssh_command);
+ request.set_scp_command(scp_command);
+
+ grpc::ClientContext context;
+ StartSessionResponse response;
+ return ToAbslStatus(stub_->StartSession(&context, request, &response));
+}
+
+absl::Status LocalAssetsStreamManagerClient::StopSession(
+ const std::string& user_host, const std::string& mount_dir) {
+ StopSessionRequest request;
+ request.set_user_host(user_host);
+ request.set_mount_dir(mount_dir);
+
+ grpc::ClientContext context;
+ StopSessionResponse response;
+ return ToAbslStatus(stub_->StopSession(&context, request, &response));
+}
+
+} // namespace cdc_ft
diff --git a/cdc_stream/local_assets_stream_manager_client.h b/cdc_stream/local_assets_stream_manager_client.h
new file mode 100644
index 0000000..3aaf67e
--- /dev/null
+++ b/cdc_stream/local_assets_stream_manager_client.h
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_
+#define ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_
+
+#include
+
+#include "absl/status/status.h"
+#include "grpcpp/channel.h"
+#include "proto/local_assets_stream_manager.grpc.pb.h"
+
+namespace grpc_impl {
+class Channel;
+}
+
+namespace cdc_ft {
+
+// gRpc client for starting/stopping asset streaming sessions.
+class LocalAssetsStreamManagerClient {
+ public:
+ // |channel| is a grpc channel to use.
+ explicit LocalAssetsStreamManagerClient(
+ std::shared_ptr channel);
+ ~LocalAssetsStreamManagerClient();
+
+ // Starts streaming the Windows directory |src_dir| to the Linux target
+ // |user_host_dir|, which must be formatted as [user@]host:dir, e.g.
+ // jdoe@jdoe.corp.foo.com:~/assets
+ // Starting a second session to the same target will stop the first one.
+ absl::Status StartSession(const std::string& src_dir,
+ const std::string& user_host, uint16_t ssh_port,
+ const std::string& mount_dir,
+ const std::string& ssh_command,
+ const std::string& scp_command);
+
+ // Stops the streaming session to the Linux target |user_host_dir|, which must
+ // be formatted as [user@]host:dir, e.g. jdoe@jdoe.corp.foo.com:~/assets
+ absl::Status StopSession(const std::string& user_host_dir,
+ const std::string& mount_dir);
+
+ private:
+ using LocalAssetsStreamManager =
+ localassetsstreammanager::LocalAssetsStreamManager;
+ std::unique_ptr stub_;
+};
+
+} // namespace cdc_ft
+
+#endif // ASSET_STREAM_MANAGER_LOCAL_ASSETS_STREAM_MANAGER_CLIENT_H_
diff --git a/cdc_stream/main.cc b/cdc_stream/main.cc
new file mode 100644
index 0000000..697e754
--- /dev/null
+++ b/cdc_stream/main.cc
@@ -0,0 +1,158 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+#include
+
+#include "absl/flags/flag.h"
+#include "absl/flags/parse.h"
+#include "absl/flags/usage.h"
+#include "absl/status/status.h"
+#include "absl/strings/str_split.h"
+#include "cdc_stream/local_assets_stream_manager_client.h"
+#include "common/log.h"
+#include "common/path.h"
+#include "grpcpp/channel.h"
+#include "grpcpp/security/credentials.h"
+
+ABSL_FLAG(uint16_t, service_port, 44432,
+ "Port to use while connecting to the local asset stream service.");
+ABSL_FLAG(
+ uint16_t, ssh_port, 22,
+ "Port to use while connecting to the remote instance being streamed to.");
+ABSL_FLAG(std::string, ssh_command, "",
+ "Path and arguments of ssh command to use, e.g. "
+ "\"C:\\path\\to\\ssh.exe -F config_file\". Can also be specified by "
+ "the CDC_SSH_COMMAND environment variable.");
+ABSL_FLAG(std::string, scp_command, "",
+ "Path and arguments of scp command to use, e.g. "
+ "\"C:\\path\\to\\scp.exe -F config_file\". Can also be specified by "
+ "the CDC_SCP_COMMAND environment variable.");
+ABSL_FLAG(int, verbosity, 2,
+ "Verbosity of the log output. Increase to make logs more verbose.");
+
+namespace {
+
+constexpr char kHelpText[] = R"!(Stream files from a Windows to a Linux device
+
+Usage: cdc_stream [flags] start windows_dir [user@]host:linux_dir
+ Streams the Windows directory windows_dir to directory linux_dir on the
+ Linux host using SSH username user.
+
+ cdc_stream [flags] stop [user@]host:linux_dir
+ Stops the streaming session to the given Linux target.
+
+Type cdc_stream --helpfull for available flags.)!";
+
+// Splits |user_host_dir| = [user@]host:dir up into [user@]host and dir.
+// Does not touch Windows drives, e.g. C:\foo.
+bool ParseUserHostDir(const std::string& user_host_dir, std::string* user_host,
+ std::string* dir) {
+ std::vector parts =
+ absl::StrSplit(user_host_dir, absl::MaxSplits(':', 1));
+ if (parts.size() < 2 ||
+ (parts[0].size() == 1 && toupper(parts[0][0]) >= 'A' &&
+ toupper(parts[0][0]) <= 'Z')) {
+ LOG_ERROR(
+ "Failed to parse '%s'. Make sure it is of the form "
+ "[user@]host:linux_dir.",
+ user_host_dir);
+ return false;
+ }
+
+ *user_host = parts[0];
+ *dir = parts[1];
+ return true;
+}
+
+std::string GetFlagFromEnvOrArgs(const char* env,
+ const absl::Flag& flag) {
+ std::string value = absl::GetFlag(flag);
+ if (value.empty()) {
+ cdc_ft::path::GetEnv(env, &value).IgnoreError();
+ }
+ return value;
+}
+
+} // namespace
+
+int main(int argc, char* argv[]) {
+ absl::SetProgramUsageMessage(kHelpText);
+ std::vector args = absl::ParseCommandLine(argc, argv);
+
+ uint16_t service_port = absl::GetFlag(FLAGS_service_port);
+ int verbosity = absl::GetFlag(FLAGS_verbosity);
+ std::string ssh_command =
+ GetFlagFromEnvOrArgs("CDC_SSH_COMMAND", FLAGS_ssh_command);
+ std::string scp_command =
+ GetFlagFromEnvOrArgs("CDC_SCP_COMMAND", FLAGS_scp_command);
+ uint16_t ssh_port = absl::GetFlag(FLAGS_ssh_port);
+
+ cdc_ft::LogLevel level = cdc_ft::Log::VerbosityToLogLevel(verbosity);
+ cdc_ft::Log::Initialize(std::make_unique(level));
+
+ if (args.size() < 2) {
+ LOG_INFO(kHelpText);
+ return 0;
+ }
+ std::string command = args[1];
+ if (command != "start" && command != "stop") {
+ LOG_ERROR("Unknown command '%s'. Must be 'start' or 'stop'.", command);
+ return 1;
+ }
+
+ // Start a gRpc client.
+ std::string client_address = absl::StrFormat("localhost:%u", service_port);
+ std::shared_ptr grpc_channel = grpc::CreateCustomChannel(
+ client_address, grpc::InsecureChannelCredentials(),
+ grpc::ChannelArguments());
+
+ cdc_ft::LocalAssetsStreamManagerClient client(grpc_channel);
+
+ absl::Status status;
+ if (command == "start") {
+ if (args.size() < 4) {
+ LOG_ERROR(
+ "Command 'start' needs 2 arguments: the Windows directory to stream"
+ " and the Linux target [user@]host:dir.");
+ return 1;
+ }
+
+ std::string src_dir = args[2];
+ std::string user_host, mount_dir;
+ if (!ParseUserHostDir(args[3], &user_host, &mount_dir)) return 1;
+
+ status = client.StartSession(src_dir, user_host, ssh_port, mount_dir,
+ ssh_command, scp_command);
+ } else /* if (command == "stop") */ {
+ if (args.size() < 3) {
+ LOG_ERROR(
+ "Command 'stop' needs 1 argument: the Linux target "
+ "[user@]host:linux_dir.");
+ return 1;
+ }
+
+ std::string user_host, mount_dir;
+ if (!ParseUserHostDir(args[2], &user_host, &mount_dir)) return 1;
+
+ status = client.StopSession(user_host, mount_dir);
+ }
+
+ if (!status.ok()) {
+ LOG_ERROR("Error: %s", status.ToString());
+ }
+
+ cdc_ft::Log::Shutdown();
+ return static_cast(status.code());
+}
diff --git a/common/path.cc b/common/path.cc
index b5d7e87..57d1e85 100644
--- a/common/path.cc
+++ b/common/path.cc
@@ -326,6 +326,10 @@ std::string GetDrivePrefix(const std::string& path) {
std::string GetCwd() { return std::filesystem::current_path().u8string(); }
+void SetCwd(const std::string& path) {
+ std::filesystem::current_path(std::filesystem::u8path(path));
+}
+
std::string GetFullPath(const std::string& path) {
if (path.empty()) {
return std::string();
diff --git a/common/path.h b/common/path.h
index 8901277..204580f 100644
--- a/common/path.h
+++ b/common/path.h
@@ -130,6 +130,9 @@ std::string GetDrivePrefix(const std::string& path);
// Gets the current working directory.
std::string GetCwd();
+// Sets the current working directory.
+void SetCwd(const std::string& path);
+
// Expands a relative path to an absolute path (relative to the current working
// directory). Also canonicalizes the path, removing any . and .. elements.
// Note that if the path does not exist or contains invalid characters, it may
diff --git a/common/remote_util.cc b/common/remote_util.cc
index cf5b557..edaaece 100644
--- a/common/remote_util.cc
+++ b/common/remote_util.cc
@@ -16,6 +16,7 @@
#include
+#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "common/path.h"
@@ -46,6 +47,7 @@ void RemoteUtil::SetUserHostAndPort(std::string user_host, int port) {
user_host_ = std::move(user_host);
ssh_port_ = port;
}
+
void RemoteUtil::SetScpCommand(std::string scp_command) {
scp_command_ = std::move(scp_command);
}
@@ -56,7 +58,7 @@ void RemoteUtil::SetSshCommand(std::string ssh_command) {
absl::Status RemoteUtil::Scp(std::vector source_filepaths,
const std::string& dest, bool compress) {
- absl::Status status = CheckHostPort();
+ absl::Status status = CheckUserHostPort();
if (!status.ok()) {
return status;
}
@@ -64,7 +66,11 @@ absl::Status RemoteUtil::Scp(std::vector source_filepaths,
std::string source_args;
for (const std::string& sourceFilePath : source_filepaths) {
// Workaround for scp thinking that C is a host in C:\path\to\foo.
- source_args += QuoteArgument("//./" + sourceFilePath) + " ";
+ if (absl::StrContains(path::GetDrivePrefix(sourceFilePath), ":")) {
+ source_args += QuoteForWindows("//./" + sourceFilePath) + " ";
+ } else {
+ source_args += QuoteForWindows(sourceFilePath) + " ";
+ }
}
// -p preserves timestamps. This enables timestamp-based up-to-date checks.
@@ -73,66 +79,27 @@ absl::Status RemoteUtil::Scp(std::vector source_filepaths,
"%s "
"%s %s -p -T "
"-P %i %s "
- "%s",
- scp_command_, quiet_ || verbosity_ < 2 ? "-q" : "", compress ? "-C" : "",
- ssh_port_, source_args, QuoteArgument(user_host_ + ":" + dest));
+ "%s:%s",
+ QuoteForWindows(scp_command_), quiet_ || verbosity_ < 2 ? "-q" : "",
+ compress ? "-C" : "", ssh_port_, source_args, QuoteForWindows(user_host_),
+ QuoteForWindows(dest));
start_info.name = "scp";
start_info.forward_output_to_log = forward_output_to_log_;
return process_factory_->Run(start_info);
}
-absl::Status RemoteUtil::Sync(std::vector source_filepaths,
- const std::string& dest) {
- absl::Status status = CheckHostPort();
- if (!status.ok()) {
- return status;
- }
-
- std::string source_args;
- for (const std::string& sourceFilePath : source_filepaths) {
- source_args += QuoteArgument(sourceFilePath) + " ";
- }
-
- ProcessStartInfo start_info;
- start_info.command = absl::StrFormat(
- "cdc_rsync --ip=%s --port=%i -z "
- "%s %s%s",
- QuoteArgument(user_host_), ssh_port_,
- quiet_ || verbosity_ < 2 ? "-q " : " ", source_args, QuoteArgument(dest));
- start_info.name = "cdc_rsync";
- start_info.forward_output_to_log = forward_output_to_log_;
-
- return process_factory_->Run(start_info);
-}
-
absl::Status RemoteUtil::Chmod(const std::string& mode,
const std::string& remote_path, bool quiet) {
std::string remote_command =
- absl::StrFormat("chmod %s %s %s", QuoteArgument(mode),
- EscapeForWindows(remote_path), quiet ? "-f" : "");
+ absl::StrFormat("chmod %s %s %s", QuoteForSsh(mode),
+ QuoteForSsh(remote_path), quiet ? "-f" : "");
return Run(remote_command, "chmod");
}
-absl::Status RemoteUtil::Rm(const std::string& remote_path, bool force) {
- std::string remote_command = absl::StrFormat("rm %s %s", force ? "-f" : "",
- EscapeForWindows(remote_path));
-
- return Run(remote_command, "rm");
-}
-
-absl::Status RemoteUtil::Mv(const std::string& old_remote_path,
- const std::string& new_remote_path) {
- std::string remote_command =
- absl::StrFormat("mv %s %s", EscapeForWindows(old_remote_path),
- EscapeForWindows(new_remote_path));
-
- return Run(remote_command, "mv");
-}
-
absl::Status RemoteUtil::Run(std::string remote_command, std::string name) {
- absl::Status status = CheckHostPort();
+ absl::Status status = CheckUserHostPort();
if (!status.ok()) {
return status;
}
@@ -177,33 +144,61 @@ ProcessStartInfo RemoteUtil::BuildProcessStartInfoForSshInternal(
"-oServerAliveCountMax=6 " // Number of lost msgs before ssh terminates
"-oServerAliveInterval=5 " // Time interval between alive msgs
"%s %s -p %i %s",
- ssh_command_, quiet_ || verbosity_ < 2 ? "-q" : "", forward_arg,
- QuoteArgument(user_host_), ssh_port_, remote_command_arg);
+ QuoteForWindows(ssh_command_), quiet_ || verbosity_ < 2 ? "-q" : "",
+ forward_arg, QuoteForWindows(user_host_), ssh_port_, remote_command_arg);
start_info.forward_output_to_log = forward_output_to_log_;
return start_info;
}
-std::string RemoteUtil::EscapeForWindows(const std::string& argument) {
- std::string str =
+std::string RemoteUtil::QuoteForWindows(const std::string& argument) {
+ // Escape certain backslashes (see doc of this function).
+ std::string escaped =
std::regex_replace(argument, std::regex(R"(\\*(?="|$))"), "$&$&");
- return std::regex_replace(str, std::regex(R"(")"), R"(\")");
+ // Escape " -> \".
+ escaped = std::regex_replace(escaped, std::regex(R"(")"), R"(\")");
+ // Quote.
+ return absl::StrCat("\"", escaped, "\"");
}
-std::string RemoteUtil::QuoteArgument(const std::string& argument) {
- return absl::StrCat("\"", EscapeForWindows(argument), "\"");
+std::string RemoteUtil::QuoteForSsh(const std::string& argument) {
+ // Escape \ ->: \\.
+ std::string escaped =
+ std::regex_replace(argument, std::regex(R"(\\)"), R"(\\)");
+ // Escape " -> \".
+ escaped = std::regex_replace(escaped, std::regex(R"(")"), R"(\")");
+
+ // Quote, but handle special case for ~.
+ if (escaped.empty() || escaped[0] != '~') {
+ return QuoteForWindows(absl::StrCat("\"", escaped, "\""));
+ }
+
+ // Simple special cases. Quote() isn't required, but called for consistency.
+ if (escaped == "~" || escaped == "~/") {
+ return QuoteForWindows(escaped);
+ }
+
+ // Check whether the username contains only valid characters.
+ // E.g. ~user name/foo -> Quote(~user name/foo)
+ size_t slash_pos = escaped.find('/');
+ size_t username_end_pos =
+ slash_pos == std::string::npos ? escaped.size() : slash_pos;
+ if (username_end_pos > 1 &&
+ !std::regex_match(escaped.substr(1, username_end_pos - 1),
+ std::regex("^[a-z][-a-z0-9]*"))) {
+ return QuoteForWindows(absl::StrCat("\"", escaped, "\""));
+ }
+
+ if (slash_pos == std::string::npos) {
+ // E.g. ~username -> Quote(~username)
+ return QuoteForWindows(escaped);
+ }
+
+ // E.g. or ~username/foo -> Quote(~username/"foo")
+ return QuoteForWindows(absl::StrCat(escaped.substr(0, slash_pos + 1), "\"",
+ escaped.substr(slash_pos + 1), "\""));
}
-std::string RemoteUtil::QuoteArgumentForSsh(const std::string& argument) {
- return absl::StrFormat(
- "'%s'", std::regex_replace(argument, std::regex("'"), "'\\''"));
-}
-
-std::string RemoteUtil::QuoteAndEscapeArgumentForSsh(
- const std::string& argument) {
- return EscapeForWindows(QuoteArgumentForSsh(argument));
-}
-
-absl::Status RemoteUtil::CheckHostPort() {
+absl::Status RemoteUtil::CheckUserHostPort() {
if (user_host_.empty() || ssh_port_ == 0) {
return MakeStatus("IP or port not set");
}
diff --git a/common/remote_util.h b/common/remote_util.h
index 31e59b3..2916486 100644
--- a/common/remote_util.h
+++ b/common/remote_util.h
@@ -61,25 +61,11 @@ class RemoteUtil {
absl::Status Scp(std::vector source_filepaths,
const std::string& dest, bool compress);
- // Syncs |source_filepaths| to the remote folder |dest| on the gamelet using
- // cdc_rsync. Must call SetUserHostAndPort before calling this method.
- absl::Status Sync(std::vector source_filepaths,
- const std::string& dest);
-
// Calls 'chmod |mode| |remote_path|' on the gamelet.
// Must call SetUserHostAndPort before calling this method.
absl::Status Chmod(const std::string& mode, const std::string& remote_path,
bool quiet = false);
- // Calls 'rm [-f] |remote_path|' on the gamelet.
- // Must call SetUserHostAndPort before calling this method.
- absl::Status Rm(const std::string& remote_path, bool force);
-
- // Calls `mv |old_remote_path| |new_remote_path| on the gamelet.
- // Must call SetUserHostAndPort before calling this method.
- absl::Status Mv(const std::string& old_remote_path,
- const std::string& new_remote_path);
-
// Runs |remote_command| on the gamelet. The command must be properly escaped.
// |name| is the name of the command displayed in the logs.
// Must call SetUserHostAndPort before calling this method.
@@ -107,28 +93,32 @@ class RemoteUtil {
// Returns whether output is suppressed.
bool Quiet() const { return quiet_; }
- // Escapes command line argument for the Microsoft command line parser in
- // preparation for quoting. Double quotes are backslash-escaped. One or more
- // backslashes are backslash-escaped if they are followed by a double quote,
- // or if they occur at the end of the string, e.g.
- // foo\bar -> foo\bar, foo\ -> foo\\, foo\\"bar -> foo\\\\\"bar.
- static std::string EscapeForWindows(const std::string& argument);
-
// Quotes and escapes a command line argument following the convention
// understood by the Microsoft command line parser.
- static std::string QuoteArgument(const std::string& argument);
-
- // Quotes and escapes a command line argument for usage in SSH.
- static std::string QuoteArgumentForSsh(const std::string& argument);
+ // Double quotes are backslash-escaped. One or more backslashes are backslash-
+ // escaped if they are followed by a double quote, or if they occur at the end
+ // of the string, e.g.
+ // foo -> "foo"
+ // foo\bar -> "foo\bar"
+ // foo\ -> "foo\\"
+ // foo\\"bar -> "foo\\\\\"bar".
+ static std::string QuoteForWindows(const std::string& argument);
// Quotes and escapes a command line arguments for use in SSH command. The
- // argument is first escaped and quoted for Linux using single quotes and then
+ // argument is first escaped and quoted for Linux using double quotes and then
// it is escaped to be used by the Microsoft command line parser.
- static std::string QuoteAndEscapeArgumentForSsh(const std::string& argument);
+ // Properly supports path starting with ~ and ~username.
+ // foo -> "\"foo\""
+ // foo\bar -> "\"foo\bar\""
+ // foo\ -> "\"foo\\\\\""
+ // foo\"bar -> "\"foo\\\\\\\"bar\"".
+ // ~/foo -> "~/\"foo\""
+ // ~user/foo -> "~user/\"foo\""
+ static std::string QuoteForSsh(const std::string& argument);
private:
- // Verifies that both || and |ssh_port_| are set.
- absl::Status CheckHostPort();
+ // Verifies that both |user_host_| and |ssh_port_| are set.
+ absl::Status CheckUserHostPort();
// Common code for BuildProcessStartInfoForSsh*.
ProcessStartInfo BuildProcessStartInfoForSshInternal(
diff --git a/common/remote_util_test.cc b/common/remote_util_test.cc
index 7ac730a..11293b8 100644
--- a/common/remote_util_test.cc
+++ b/common/remote_util_test.cc
@@ -97,35 +97,40 @@ TEST_F(RemoteUtilTest, BuildProcessStartInfoForSshWithCustomCommand) {
ExpectContains(si.command, {kCustomSshCmd});
}
-TEST_F(RemoteUtilTest, EscapeForWindows) {
- EXPECT_EQ("foo", RemoteUtil::EscapeForWindows("foo"));
- EXPECT_EQ("foo bar", RemoteUtil::EscapeForWindows("foo bar"));
- EXPECT_EQ("foo\\bar", RemoteUtil::EscapeForWindows("foo\\bar"));
- EXPECT_EQ("\\\\foo", RemoteUtil::EscapeForWindows("\\\\foo"));
- EXPECT_EQ("foo\\\\", RemoteUtil::EscapeForWindows("foo\\"));
- EXPECT_EQ("foo\\\\\\\\", RemoteUtil::EscapeForWindows("foo\\\\"));
- EXPECT_EQ("foo\\\"", RemoteUtil::EscapeForWindows("foo\""));
- EXPECT_EQ("foo\\\"bar", RemoteUtil::EscapeForWindows("foo\"bar"));
- EXPECT_EQ("foo\\\\\\\"bar", RemoteUtil::EscapeForWindows("foo\\\"bar"));
- EXPECT_EQ("foo\\\\\\\\\\\"bar", RemoteUtil::EscapeForWindows("foo\\\\\"bar"));
- EXPECT_EQ("\\\"foo\\\"", RemoteUtil::EscapeForWindows("\"foo\""));
- EXPECT_EQ("\\\" \\file.txt", RemoteUtil::EscapeForWindows("\" \\file.txt"));
+TEST_F(RemoteUtilTest, QuoteForWindows) {
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo"), "\"foo\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo bar"), "\"foo bar\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\bar"), "\"foo\\bar\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("\\\\foo"), "\"\\\\foo\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\"), "\"foo\\\\\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\\"), "\"foo\\\\\\\\\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\""), "\"foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\"bar"), "\"foo\\\"bar\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\"bar"), "\"foo\\\\\\\"bar\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("foo\\\\\"bar"),
+ "\"foo\\\\\\\\\\\"bar\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("\"foo\""), "\"\\\"foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForWindows("\" \\file.txt"),
+ "\"\\\" \\file.txt\"");
}
-TEST_F(RemoteUtilTest, QuoteArgument) {
- EXPECT_EQ("\"foo\"", RemoteUtil::QuoteArgument("foo"));
- EXPECT_EQ("\"foo bar\"", RemoteUtil::QuoteArgument("foo bar"));
- EXPECT_EQ("\"foo\\bar\"", RemoteUtil::QuoteArgument("foo\\bar"));
- EXPECT_EQ("\"\\\\foo\"", RemoteUtil::QuoteArgument("\\\\foo"));
- EXPECT_EQ("\"foo\\\\\"", RemoteUtil::QuoteArgument("foo\\"));
- EXPECT_EQ("\"foo\\\\\\\\\"", RemoteUtil::QuoteArgument("foo\\\\"));
- EXPECT_EQ("\"foo\\\"\"", RemoteUtil::QuoteArgument("foo\""));
- EXPECT_EQ("\"foo\\\"bar\"", RemoteUtil::QuoteArgument("foo\"bar"));
- EXPECT_EQ("\"foo\\\\\\\"bar\"", RemoteUtil::QuoteArgument("foo\\\"bar"));
- EXPECT_EQ("\"foo\\\\\\\\\\\"bar\"",
- RemoteUtil::QuoteArgument("foo\\\\\"bar"));
- EXPECT_EQ("\"\\\"foo\\\"\"", RemoteUtil::QuoteArgument("\"foo\""));
- EXPECT_EQ("\"\\\" \\file.txt\"", RemoteUtil::QuoteArgument("\" \\file.txt"));
+TEST_F(RemoteUtilTest, QuoteForSsh) {
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("foo"), "\"\\\"foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\bar"), "\"\\\"foo\\\\bar\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\"), "\"\\\"foo\\\\\\\\\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("foo\\\"bar"),
+ "\"\\\"foo\\\\\\\\\\\\\\\"bar\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~"), "\"~\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~username"), "\"~username\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~/foo"), "\"~/\\\"foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~username/foo"),
+ "\"~username/\\\"foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~invalid user name"),
+ "\"\\\"~invalid user name\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~invalid user name/foo"),
+ "\"\\\"~invalid user name/foo\\\"\"");
+ EXPECT_EQ(RemoteUtil::QuoteForSsh("~user-name69/foo"),
+ "\"~user-name69/\\\"foo\\\"\""); // Nice!
}
} // namespace
diff --git a/data_store/data_provider_test.cc b/data_store/data_provider_test.cc
index b21df42..fdb56b4 100644
--- a/data_store/data_provider_test.cc
+++ b/data_store/data_provider_test.cc
@@ -14,14 +14,12 @@
#include "data_store/data_provider.h"
-#include
#include
#include
#include "common/path.h"
#include "common/status_test_macros.h"
#include "common/testing_clock.h"
-#include "common/util.h"
#include "data_store/disk_data_store.h"
#include "data_store/mem_data_store.h"
#include "gtest/gtest.h"
diff --git a/proto/local_assets_stream_manager.proto b/proto/local_assets_stream_manager.proto
index a658629..74dc868 100644
--- a/proto/local_assets_stream_manager.proto
+++ b/proto/local_assets_stream_manager.proto
@@ -25,35 +25,50 @@ service LocalAssetsStreamManager {
rpc StopSession(StopSessionRequest) returns (StopSessionResponse) {}
}
-// NextID: 7
+// NextID: 12
message StartSessionRequest {
- // ID of assets streaming target gamelet. gamelet_id will continue to be set
- // alongside gamelet_name for backwards compatibility, but new code should
- // not read from the gamelet_id field.
- string gamelet_id = 1;
// The resource name of the assets streaming target gamelet, in the form
// "organizations/{org-id}/projects/{proj-id}/pools/{pool-id}/gamelets/{gamelet-id}".
- // If gamelet_name is specified, it will take precedence over gamelet_id.
+ // Only used by Stadia. Should set either this or user_host.
string gamelet_name = 5;
- // Path in the local workstation to stream assets from.
+ // Directory in the local workstation to stream assets from.
string workstation_directory = 2;
- // The user's email.
- string account = 3;
- // The OnePlatForm Url of the publishing API.
- string url = 4;
- // Caller of the SartSession request.
+ // Caller of the StartSession request.
+ // Only used by Stadia. May be left unspecified.
enum Origin {
ORIGIN_UNKNOWN = 0;
ORIGIN_CLI = 1;
ORIGIN_PARTNER_PORTAL = 2;
}
Origin origin = 6;
+ // Username and host, in the form [user@]host.
+ string user_host = 7;
+ // Remote directory where to mount the streamed directory.
+ string mount_dir = 8;
+ // SSH port to use while connecting to the remote instance.
+ // Optional, falls back to port 22 (default SSH port).
+ int32 port = 9;
+ // SSH command to connect to the remote instance.
+ // Optional, falls back to searching ssh.
+ string ssh_command = 10;
+ // SCP command to copy files to the remote instance.
+ // Optional, falls back to searching scp.
+ string scp_command = 11;
+
+ reserved 1, 3, 4;
}
message StartSessionResponse {}
+// NextID: 4
message StopSessionRequest {
+ // ID of assets streaming target gamelet.
+ // Only used by Stadia. Should set either this or user_host_dir.
string gamelet_id = 1;
+ // Username and host, in the form [user@]host.
+ string user_host = 2;
+ // Remote directory where the streamed directory is mounted.
+ string mount_dir = 3;
}
message StopSessionResponse {}
diff --git a/tests_asset_streaming_30/BUILD b/tests_asset_streaming_30/BUILD
index b79fbc3..b9452db 100644
--- a/tests_asset_streaming_30/BUILD
+++ b/tests_asset_streaming_30/BUILD
@@ -28,6 +28,7 @@ cc_binary(
"//asset_stream_manager:multi_session",
"//cdc_fuse_fs:asset",
"//cdc_fuse_fs:cdc_fuse_fs_lib_mocked",
+ "//cdc_fuse_fs:mock_config_stream_client",
"//common:test_main",
"//common:testing_clock",
"//data_store:data_provider",
diff --git a/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj b/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj
index 03c725c..729c1bf 100644
--- a/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj
+++ b/tests_asset_streaming_30/tests_asset_streaming_30.vcxproj
@@ -62,7 +62,6 @@
//tests_asset_streaming_30
tests_asset_streaming_30.exe
..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath)
- ..\/
diff --git a/tests_cdc_rsync/tests_cdc_rsync.vcxproj b/tests_cdc_rsync/tests_cdc_rsync.vcxproj
index e7d478a..2335070 100644
--- a/tests_cdc_rsync/tests_cdc_rsync.vcxproj
+++ b/tests_cdc_rsync/tests_cdc_rsync.vcxproj
@@ -63,7 +63,6 @@
//tests_cdc_rsync
tests_cdc_rsync.exe
..\;..\third_party\absl;..\third_party\jsoncpp\include;..\bazel-stadia-file-transfer\external\com_github_zstd\lib;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath)
- ..\/
diff --git a/tests_common/tests_common.vcxproj b/tests_common/tests_common.vcxproj
index 3ea056c..121d334 100644
--- a/tests_common/tests_common.vcxproj
+++ b/tests_common/tests_common.vcxproj
@@ -62,7 +62,6 @@
//tests_common
tests_common.exe
..\;..\third_party\absl;..\third_party\jsoncpp\include;..\third_party\blake3\c;..\third_party\googletest\googletest\include;..\third_party\protobuf\src;..\third_party\grpc\include;..\bazel-out\x64_windows-dbg\bin;$(VC_IncludePath);$(WindowsSDK_IncludePath)
- ..\/