diff --git a/README.md b/README.md index d54c7d5..78adace 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,11 @@ To stop the streaming session, enter ``` cdc_stream stop user@linux.device.com:~/assets ``` +The command also accepts wildcards. For instance, +``` +cdc_stream stop user@*:* +``` +stops all existing streaming sessions for the given user. ## Troubleshooting diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD index 739fb93..a12c381 100644 --- a/cdc_stream/BUILD +++ b/cdc_stream/BUILD @@ -208,6 +208,7 @@ cc_library( "//common:file_watcher", "//common:log", "//common:path", + "//common:path_filter", "//common:port_manager", "//common:process", "//common:remote_util", diff --git a/cdc_stream/multi_session.cc b/cdc_stream/multi_session.cc index d79171c..966cf63 100644 --- a/cdc_stream/multi_session.cc +++ b/cdc_stream/multi_session.cc @@ -18,6 +18,7 @@ #include "common/file_watcher_win.h" #include "common/log.h" #include "common/path.h" +#include "common/path_filter.h" #include "common/platform.h" #include "common/port_manager.h" #include "common/process.h" @@ -555,6 +556,21 @@ bool MultiSession::HasSession(const std::string& instance_id) { return sessions_.find(instance_id) != sessions_.end(); } +std::vector MultiSession::MatchSessions( + const std::string& instance_id_filter) { + PathFilter filter; + filter.AddRule(PathFilter::Rule::Type::kInclude, instance_id_filter); + filter.AddRule(PathFilter::Rule::Type::kExclude, "*"); + + std::vector matches; + for (const auto& [instance_id, session] : sessions_) { + if (filter.IsMatch(instance_id)) { + matches.push_back(instance_id); + } + } + return matches; +} + bool MultiSession::IsSessionHealthy(const std::string& instance_id) { absl::ReaderMutexLock lock(&sessions_mutex_); auto iter = sessions_.find(instance_id); diff --git a/cdc_stream/multi_session.h b/cdc_stream/multi_session.h index 532c699..7c83739 100644 --- a/cdc_stream/multi_session.h +++ b/cdc_stream/multi_session.h @@ -194,6 +194,11 @@ class MultiSession { absl::Status StopSession(const std::string& instance_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); + // Returns all instance ids that match the given filter. The filter may + // contain Windows-style wildcards, e.g. *, foo* or f?o. + // Matches are case sensitive. + std::vector MatchSessions(const std::string& instance_id_filter); + // Returns true if there is an existing session for |instance_id|. bool HasSession(const std::string& instance_id) ABSL_LOCKS_EXCLUDED(sessions_mutex_); diff --git a/cdc_stream/session_manager.cc b/cdc_stream/session_manager.cc index a34483e..1ad89b1 100644 --- a/cdc_stream/session_manager.cc +++ b/cdc_stream/session_manager.cc @@ -136,9 +136,24 @@ absl::Status SessionManager::StartSession( return status; } -absl::Status SessionManager::StopSession(const std::string& instance_id) { +absl::Status SessionManager::StopSession( + const std::string& instance_id_filter) { absl::MutexLock lock(&sessions_mutex_); - return StopSessionInternal(instance_id); + + std::vector instance_ids; + for (const auto& [key, ms] : sessions_) { + auto ids = ms->MatchSessions(instance_id_filter); + instance_ids.insert(instance_ids.end(), ids.begin(), ids.end()); + } + if (instance_ids.empty()) { + return absl::NotFoundError( + absl::StrFormat("No session found matching '%s'", instance_id_filter)); + } + + for (const std::string& instance_id : instance_ids) { + RETURN_IF_ERROR(StopSessionInternal(instance_id)); + } + return absl::OkStatus(); } MultiSession* SessionManager::GetMultiSession(const std::string& src_dir) { diff --git a/cdc_stream/session_manager.h b/cdc_stream/session_manager.h index 079663c..1f6337d 100644 --- a/cdc_stream/session_manager.h +++ b/cdc_stream/session_manager.h @@ -58,9 +58,11 @@ class SessionManager { metrics::SessionStartStatus* metrics_status) ABSL_LOCKS_EXCLUDED(sessions_mutex_); - // Stops the session for the given |instance_id|. + // Stops all sessions that match the given |instance_id_filter|. + // The filter may contain Windows-style wildcards like * and ?. + // Matching is case-sensitive. // Returns a NotFound error if no session exists. - absl::Status StopSession(const std::string& instance_id) + absl::Status StopSession(const std::string& instance_id_filter) ABSL_LOCKS_EXCLUDED(sessions_mutex_); // Shuts down all existing MultiSessions. diff --git a/cdc_stream/start_command.cc b/cdc_stream/start_command.cc index 811ae03..2f4bc06 100644 --- a/cdc_stream/start_command.cc +++ b/cdc_stream/start_command.cc @@ -101,7 +101,7 @@ void StartCommand::RegisterCommandLineFlags(lyra::command& cmd) { .help("Windows directory to stream")); cmd.add_argument( - lyra::arg(PosArgValidator(&user_host_dir_), "[user@]host:src-dir") + lyra::arg(PosArgValidator(&user_host_dir_), "[user@]host:dir") .required() .help("Linux host and directory to stream to")); } diff --git a/cdc_stream/stop_command.cc b/cdc_stream/stop_command.cc index dc24441..7eadc46 100644 --- a/cdc_stream/stop_command.cc +++ b/cdc_stream/stop_command.cc @@ -53,7 +53,7 @@ void StopCommand::RegisterCommandLineFlags(lyra::command& cmd) { std::to_string(SessionManagementServer::kDefaultServicePort))); cmd.add_argument( - lyra::arg(PosArgValidator(&user_host_dir_), "[user@]host:src-dir") + lyra::arg(PosArgValidator(&user_host_dir_), "[user@]host:dir") .required() .help("Linux host and directory to stream to")); } @@ -70,8 +70,14 @@ absl::Status StopCommand::Run() { LocalAssetsStreamManagerClient client(channel); std::string user_host, mount_dir; - RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( - user_host_dir_, &user_host, &mount_dir)); + if (user_host_dir_ == "*") { + // Convenience shortcut "*" for "*:*". + user_host = "*"; + mount_dir = "*"; + } else { + RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( + user_host_dir_, &user_host, &mount_dir)); + } absl::Status status = client.StopSession(user_host, mount_dir); if (status.ok()) { diff --git a/proto/local_assets_stream_manager.proto b/proto/local_assets_stream_manager.proto index 74dc868..f43eae6 100644 --- a/proto/local_assets_stream_manager.proto +++ b/proto/local_assets_stream_manager.proto @@ -65,9 +65,10 @@ message StopSessionRequest { // ID of assets streaming target gamelet. // Only used by Stadia. Should set either this or user_host_dir. string gamelet_id = 1; - // Username and host, in the form [user@]host. + // Username and host, in the form [user@]host. Accepts wildcards * and ?. string user_host = 2; // Remote directory where the streamed directory is mounted. + // Accepts wildcards * and ?. string mount_dir = 3; }