diff --git a/README.md b/README.md index 6e892a4..d54c7d5 100644 --- a/README.md +++ b/README.md @@ -211,20 +211,12 @@ cdc_rsync C:\path\to\assets\* user@linux.device.com:~/assets -vr ### CDC Stream -`cdc_stream` consists of a background service, which has to be started in -advance with -``` -cdc_stream start-service -``` -The service logs to `%APPDATA%\cdc-file-transfer\logs` by default. Try -`cdc_stream --help` to get a list of available flags. - To stream the Windows directory `C:\path\to\assets` to `~/assets` on the Linux device, run ``` cdc_stream start C:\path\to\assets user@linux.device.com:~/assets ``` -This makes all files and directories of `C:\path\to\assets` available on +This makes all files and directories in `C:\path\to\assets` available on `~/assets` immediately, as if it were a local copy. However, data is streamed from Windows to Linux as files are accessed. @@ -235,14 +227,31 @@ cdc_stream stop user@linux.device.com:~/assets ## Troubleshooting -`cdc_rsync` always logs to the console. By default, the `cdc_stream` service -logs to a timestamped file in `%APPDATA%\cdc-file-transfer\logs`. It can be -switched to log to console by starting it with `--log-to-stdout`: -``` -cdc_stream start-service --log_to_stdout -``` +On first run, `cdc_stream` starts a background service, which does all the work. +The `cdc_stream start` and `cdc_stream stop` commands are just RPC clients that +talk to the service. -Both `cdc_rsync` and `cdc_stream` support command line flags to control log -verbosity. Passing `-vvv` prints debug logs, `-vvvv` prints verbose logs. The -debug logs contain all SSH and SCP commands that are attempted to run, which is -very useful for troubleshooting. +The service logs to `%APPDATA%\cdc-file-transfer\logs` by default. The logs are +useful to investigate issues with asset streaming. To pass custom arguments, or +to debug the service, create a JSON config file at +`%APPDATA%\cdc-file-transfer\cdc_stream.json` with command line flags. +For instance, +``` +{ "verbosity":3 } +``` +instructs the service to log debug messages. Try `cdc_stream start-service -h` +for a list of available flags. Alternatively, run the service manually with +``` +cdc_stream start-service +``` +and pass the flags as command line arguments. When you run the service manually, +the flag `--log-to-stdout` is particularly useful as it logs to the console +instead of to the file. + +`cdc_rsync` always logs to the console. To increase log verbosity, pass `-vvv` +for debug logs or `-vvvv` for verbose logs. + +For both sync and stream, the debug logs contain all SSH and SCP commands that +are attempted to run, which is very useful for troubleshooting. If a command +fails unexpectedly, copy it and run it in isolation. Pass `-vv` or `-vvv` for +additional debug output. diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD index 92fe9ba..a26b91e 100644 --- a/cdc_stream/BUILD +++ b/cdc_stream/BUILD @@ -45,6 +45,7 @@ cc_library( srcs = ["start_command.cc"], hdrs = ["start_command.h"], deps = [ + ":background_service_client", ":base_command", ":local_assets_stream_manager_client", ":session_management_server", @@ -78,6 +79,18 @@ cc_library( ], ) +cc_library( + name = "background_service_client", + srcs = ["background_service_client.cc"], + hdrs = ["background_service_client.h"], + deps = [ + "//common:grpc_status", + "//common:status_macros", + "//proto:background_service_grpc_proto", + "@com_google_absl//absl/status", + ], +) + cc_library( name = "asset_stream_server", srcs = [ @@ -112,6 +125,8 @@ cc_library( deps = [ ":base_command", ":multi_session", + ":session_management_server", + "//absl_helper:jedec_size_flag", "//common:log", "//common:path", "//common:status_macros", diff --git a/cdc_stream/asset_stream_config.cc b/cdc_stream/asset_stream_config.cc index 6f66174..1ca2ab9 100644 --- a/cdc_stream/asset_stream_config.cc +++ b/cdc_stream/asset_stream_config.cc @@ -20,6 +20,7 @@ #include "absl/strings/str_join.h" #include "absl_helper/jedec_size_flag.h" #include "cdc_stream/base_command.h" +#include "cdc_stream/session_management_server.h" #include "common/buffer.h" #include "common/path.h" #include "common/status_macros.h" @@ -41,6 +42,13 @@ AssetStreamConfig::~AssetStreamConfig() = default; void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd, BaseCommand& base_command) { + service_port_ = SessionManagementServer::kDefaultServicePort; + cmd.add_argument(lyra::opt(service_port_, "port") + .name("--service-port") + .help("Local port to use while connecting to the local " + "asset stream service, default: " + + std::to_string(service_port_))); + session_cfg_.verbosity = kDefaultVerbosity; cmd.add_argument(lyra::opt(session_cfg_.verbosity, "num") .name("--verbosity") @@ -174,6 +182,7 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { } \ } while (0) + ASSIGN_VAR(service_port_, "service-port", Int); ASSIGN_VAR(session_cfg_.verbosity, "verbosity", Int); ASSIGN_VAR(session_cfg_.fuse_debug, "debug", Bool); ASSIGN_VAR(session_cfg_.fuse_singlethreaded, "singlethreaded", Bool); @@ -212,6 +221,7 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { std::string AssetStreamConfig::ToString() { std::ostringstream ss; + ss << "service-port = " << service_port_ << std::endl; ss << "verbosity = " << session_cfg_.verbosity << std::endl; ss << "debug = " << session_cfg_.fuse_debug diff --git a/cdc_stream/asset_stream_config.h b/cdc_stream/asset_stream_config.h index 83cda5d..3e24e07 100644 --- a/cdc_stream/asset_stream_config.h +++ b/cdc_stream/asset_stream_config.h @@ -76,6 +76,9 @@ class AssetStreamConfig { // read from the JSON file. std::string GetFlagReadErrors(); + // Gets the port to use for the asset streaming service. + uint16_t service_port() const { return service_port_; } + // Session configuration. const SessionConfig& session_cfg() const { return session_cfg_; } @@ -91,6 +94,13 @@ class AssetStreamConfig { bool log_to_stdout() const { return log_to_stdout_; } private: + // Jedec parser for Lyra options. Usage: + // lyra::opt(JedecParser("size-flag", &size_bytes), "bytes")) + // Sets jedec_parse_error_ on error, Lyra doesn't support errors from lambdas. + std::function JedecParser(const char* flag_name, + uint64_t* bytes); + + uint16_t service_port_ = 0; SessionConfig session_cfg_; bool log_to_stdout_ = false; diff --git a/cdc_stream/background_service_client.cc b/cdc_stream/background_service_client.cc new file mode 100644 index 0000000..649a236 --- /dev/null +++ b/cdc_stream/background_service_client.cc @@ -0,0 +1,56 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cdc_stream/background_service_client.h" + +#include "absl/status/status.h" +#include "common/grpc_status.h" +#include "common/status_macros.h" +#include "grpcpp/channel.h" + +namespace cdc_ft { + +using GetPidResponse = backgroundservice::GetPidResponse; +using EmptyProto = google::protobuf::Empty; + +BackgroundServiceClient::BackgroundServiceClient( + std::shared_ptr channel) { + stub_ = BackgroundService::NewStub(std::move(channel)); +} + +BackgroundServiceClient::~BackgroundServiceClient() = default; + +absl::Status BackgroundServiceClient::Exit() { + EmptyProto request; + EmptyProto response; + grpc::ClientContext context; + return ToAbslStatus(stub_->Exit(&context, request, &response)); +} + +absl::StatusOr BackgroundServiceClient::GetPid() { + EmptyProto request; + GetPidResponse response; + grpc::ClientContext context; + RETURN_IF_ERROR(ToAbslStatus(stub_->GetPid(&context, request, &response))); + return response.pid(); +} + +absl::Status BackgroundServiceClient::IsHealthy() { + EmptyProto request; + EmptyProto response; + grpc::ClientContext context; + return ToAbslStatus(stub_->HealthCheck(&context, request, &response)); +} + +} // namespace cdc_ft diff --git a/cdc_stream/background_service_client.h b/cdc_stream/background_service_client.h new file mode 100644 index 0000000..4bd0b01 --- /dev/null +++ b/cdc_stream/background_service_client.h @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_ +#define CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_ + +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "proto/background_service.grpc.pb.h" + +namespace grpc_impl { +class Channel; +} + +namespace cdc_ft { + +// gRpc client for managing the asset streaming service. +class BackgroundServiceClient { + public: + // |channel| is a grpc channel to use. + explicit BackgroundServiceClient(std::shared_ptr channel); + + ~BackgroundServiceClient(); + + // Initialize service shutdown. + absl::Status Exit(); + + // Returns the PID of the service process. + absl::StatusOr GetPid(); + + // Verifies that the service is running and able to take requests. + absl::Status IsHealthy(); + + private: + using BackgroundService = backgroundservice::BackgroundService; + std::unique_ptr stub_; +}; + +} // namespace cdc_ft + +#endif // CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_ diff --git a/cdc_stream/background_service_impl.cc b/cdc_stream/background_service_impl.cc index b7af911..1cc63c4 100644 --- a/cdc_stream/background_service_impl.cc +++ b/cdc_stream/background_service_impl.cc @@ -30,8 +30,8 @@ void BackgroundServiceImpl::SetExitCallback(ExitCallback exit_callback) { } grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context, - const ExitRequest* request, - ExitResponse* response) { + const EmptyProto* request, + EmptyProto* response) { LOG_INFO("RPC:Exit"); if (exit_callback_) { return ToGrpcStatus(exit_callback_()); @@ -40,7 +40,7 @@ grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context, } grpc::Status BackgroundServiceImpl::GetPid(grpc::ServerContext* context, - const GetPidRequest* request, + const EmptyProto* request, GetPidResponse* response) { LOG_INFO("RPC:GetPid"); response->set_pid(static_cast(Util::GetPid())); diff --git a/cdc_stream/background_service_impl.h b/cdc_stream/background_service_impl.h index 7b13866..326208a 100644 --- a/cdc_stream/background_service_impl.h +++ b/cdc_stream/background_service_impl.h @@ -30,9 +30,6 @@ namespace cdc_ft { class BackgroundServiceImpl final : public backgroundservice::BackgroundService::Service { public: - using ExitRequest = backgroundservice::ExitRequest; - using ExitResponse = backgroundservice::ExitResponse; - using GetPidRequest = backgroundservice::GetPidRequest; using GetPidResponse = backgroundservice::GetPidResponse; using EmptyProto = google::protobuf::Empty; @@ -43,11 +40,10 @@ class BackgroundServiceImpl final using ExitCallback = std::function; void SetExitCallback(ExitCallback exit_callback); - grpc::Status Exit(grpc::ServerContext* context, const ExitRequest* request, - ExitResponse* response) override; + grpc::Status Exit(grpc::ServerContext* context, const EmptyProto* request, + EmptyProto* response) override; - grpc::Status GetPid(grpc::ServerContext* context, - const GetPidRequest* request, + grpc::Status GetPid(grpc::ServerContext* context, const EmptyProto* request, GetPidResponse* response) override; grpc::Status HealthCheck(grpc::ServerContext* context, diff --git a/cdc_stream/local_assets_stream_manager_client.cc b/cdc_stream/local_assets_stream_manager_client.cc index 3543d16..9edd208 100644 --- a/cdc_stream/local_assets_stream_manager_client.cc +++ b/cdc_stream/local_assets_stream_manager_client.cc @@ -28,15 +28,6 @@ using StartSessionResponse = localassetsstreammanager::StartSessionResponse; using StopSessionRequest = localassetsstreammanager::StopSessionRequest; using StopSessionResponse = localassetsstreammanager::StopSessionResponse; -LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient( - uint16_t service_port) { - std::string client_address = absl::StrFormat("localhost:%u", service_port); - std::shared_ptr channel = grpc::CreateCustomChannel( - client_address, grpc::InsecureChannelCredentials(), - grpc::ChannelArguments()); - stub_ = LocalAssetsStreamManager::NewStub(std::move(channel)); -} - LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient( std::shared_ptr channel) { stub_ = LocalAssetsStreamManager::NewStub(std::move(channel)); diff --git a/cdc_stream/local_assets_stream_manager_client.h b/cdc_stream/local_assets_stream_manager_client.h index a031bb9..afda9c4 100644 --- a/cdc_stream/local_assets_stream_manager_client.h +++ b/cdc_stream/local_assets_stream_manager_client.h @@ -20,7 +20,6 @@ #include #include "absl/status/status.h" -#include "grpcpp/channel.h" #include "proto/local_assets_stream_manager.grpc.pb.h" namespace grpc_impl { @@ -32,8 +31,6 @@ namespace cdc_ft { // gRpc client for starting/stopping asset streaming sessions. class LocalAssetsStreamManagerClient { public: - explicit LocalAssetsStreamManagerClient(uint16_t service_port); - // |channel| is a grpc channel to use. explicit LocalAssetsStreamManagerClient( std::shared_ptr channel); diff --git a/cdc_stream/local_assets_stream_manager_service_impl.cc b/cdc_stream/local_assets_stream_manager_service_impl.cc index 0085ab9..98dcdf9 100644 --- a/cdc_stream/local_assets_stream_manager_service_impl.cc +++ b/cdc_stream/local_assets_stream_manager_service_impl.cc @@ -277,6 +277,7 @@ absl::Status LocalAssetsStreamManagerServiceImpl::InitSsh( absl::StrFormat(" --organization %s", Quoted(organization_id)); } start_info.name = "ggp ssh init"; + start_info.flags = ProcessFlags::kNoWindow; std::string output; start_info.stdout_handler = [&output, this](const char* data, diff --git a/cdc_stream/session_management_server.h b/cdc_stream/session_management_server.h index 0b790a7..a763518 100644 --- a/cdc_stream/session_management_server.h +++ b/cdc_stream/session_management_server.h @@ -36,7 +36,7 @@ class ProcessFactory; // - Background class SessionManagementServer { public: - static constexpr int kDefaultServicePort = 44432; + static constexpr uint16_t kDefaultServicePort = 44432; SessionManagementServer(grpc::Service* session_service, grpc::Service* background_service, diff --git a/cdc_stream/start_command.cc b/cdc_stream/start_command.cc index c346c4e..811ae03 100644 --- a/cdc_stream/start_command.cc +++ b/cdc_stream/start_command.cc @@ -16,12 +16,19 @@ #include +#include "cdc_stream/background_service_client.h" #include "cdc_stream/local_assets_stream_manager_client.h" #include "cdc_stream/session_management_server.h" #include "common/log.h" #include "common/path.h" +#include "common/process.h" #include "common/remote_util.h" #include "common/status_macros.h" +#include "common/stopwatch.h" +#include "common/util.h" +#include "grpcpp/channel.h" +#include "grpcpp/create_channel.h" +#include "grpcpp/support/channel_arguments.h" #include "lyra/lyra.hpp" namespace cdc_ft { @@ -29,6 +36,19 @@ namespace { constexpr int kDefaultVerbosity = 2; } // namespace +namespace { +// Time to poll until the streaming service becomes healthy. +constexpr double kServiceStartupTimeoutSec = 20.0; + +std::shared_ptr CreateChannel(uint16_t service_port) { + std::string client_address = absl::StrFormat("localhost:%u", service_port); + return grpc::CreateCustomChannel(client_address, + grpc::InsecureChannelCredentials(), + grpc::ChannelArguments()); +} + +} // namespace + StartCommand::StartCommand(int* exit_code) : BaseCommand("start", "Start streaming files from a Windows to a Linux device", @@ -89,16 +109,33 @@ void StartCommand::RegisterCommandLineFlags(lyra::command& cmd) { absl::Status StartCommand::Run() { LogLevel level = Log::VerbosityToLogLevel(verbosity_); ScopedLog scoped_log(std::make_unique(level)); - LocalAssetsStreamManagerClient client(service_port_); std::string full_src_dir = path::GetFullPath(src_dir_); std::string user_host, mount_dir; RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( user_host_dir_, &user_host, &mount_dir)); + LocalAssetsStreamManagerClient client(CreateChannel(service_port_)); absl::Status status = client.StartSession(full_src_dir, user_host, ssh_port_, mount_dir, ssh_command_, scp_command_); + + if (absl::IsUnavailable(status)) { + LOG_DEBUG("StartSession status: %s", status.ToString()); + LOG_INFO("Streaming service is unavailable. Starting it..."); + status = StartStreamingService(); + + if (status.ok()) { + LOG_INFO("Streaming service successfully started"); + + // Recreate client. The old channel might still be in a transient failure + // state. + LocalAssetsStreamManagerClient new_client(CreateChannel(service_port_)); + status = new_client.StartSession(full_src_dir, user_host, ssh_port_, + mount_dir, ssh_command_, scp_command_); + } + } + if (status.ok()) { LOG_INFO("Started streaming directory '%s' to '%s:%s'", src_dir_, user_host, mount_dir); @@ -107,4 +144,44 @@ absl::Status StartCommand::Run() { return status; } +absl::Status StartCommand::StartStreamingService() { + std::string exe_dir; + RETURN_IF_ERROR(path::GetExeDir(&exe_dir), + "Failed to get executable directory"); + std::string exe_path = path::Join(exe_dir, "cdc_stream"); + + // Try starting the service first. + WinProcessFactory process_factory; + ProcessStartInfo start_info; + start_info.command = + absl::StrFormat("%s start-service --verbosity=%i --service-port=%i", + exe_path, verbosity_, service_port_); + start_info.flags = ProcessFlags::kDetached; + std::unique_ptr service_process = process_factory.Create(start_info); + RETURN_IF_ERROR(service_process->Start(), + "Failed to start asset streaming service"); + + // Poll until the service becomes healthy. + LOG_INFO("Streaming service initializing..."); + Stopwatch sw; + while (sw.ElapsedSeconds() < kServiceStartupTimeoutSec) { + // The channel is in some transient failure state, and it's faster to + // reconnect instead of waiting for it to return. + BackgroundServiceClient bg_client(CreateChannel(service_port_)); + absl::Status status = bg_client.IsHealthy(); + if (status.ok()) { + return absl::OkStatus(); + } + LOG_DEBUG("Health check result: %s", status.ToString()); + Util::Sleep(100); + } + + // Kill the process. + service_process->Terminate(); + return absl::DeadlineExceededError( + absl::StrFormat("Timed out after %0.0f seconds waiting for the asset " + "streaming service to become healthy", + kServiceStartupTimeoutSec)); +} + } // namespace cdc_ft diff --git a/cdc_stream/start_command.h b/cdc_stream/start_command.h index b38f0a4..bf14dc2 100644 --- a/cdc_stream/start_command.h +++ b/cdc_stream/start_command.h @@ -20,6 +20,10 @@ #include "absl/status/status.h" #include "cdc_stream/base_command.h" +namespace grpc { +class Channel; +} + namespace cdc_ft { // Handler for the start command. Sends an RPC call to the service to starts a @@ -34,6 +38,9 @@ class StartCommand : public BaseCommand { absl::Status Run() override; private: + // Starts the asset streaming service. + absl::Status StartStreamingService(); + int verbosity_ = 0; uint16_t service_port_ = 0; uint16_t ssh_port_ = 0; diff --git a/cdc_stream/start_service_command.cc b/cdc_stream/start_service_command.cc index 88e2dbe..6452f23 100644 --- a/cdc_stream/start_service_command.cc +++ b/cdc_stream/start_service_command.cc @@ -43,7 +43,7 @@ StartServiceCommand::StartServiceCommand(int* exit_code) StartServiceCommand::~StartServiceCommand() = default; void StartServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) { - config_file_ = "%APPDATA%\\cdc-file-transfer\\assets_stream_manager.json"; + config_file_ = "%APPDATA%\\cdc-file-transfer\\cdc_stream.json"; cmd.add_argument( lyra::opt(config_file_, "path") .name("--config-file") @@ -147,8 +147,7 @@ absl::Status StartServiceCommand::RunService() { RETURN_ABSL_IF_ERROR( session_service.StartSession(nullptr, &request, &response)); } - RETURN_IF_ERROR( - sm_server.Start(SessionManagementServer::kDefaultServicePort)); + RETURN_IF_ERROR(sm_server.Start(cfg_.service_port())); sm_server.RunUntilShutdown(); return absl::OkStatus(); } diff --git a/cdc_stream/stop_command.cc b/cdc_stream/stop_command.cc index 753cf58..dc24441 100644 --- a/cdc_stream/stop_command.cc +++ b/cdc_stream/stop_command.cc @@ -21,6 +21,9 @@ #include "common/log.h" #include "common/path.h" #include "common/status_macros.h" +#include "grpcpp/channel.h" +#include "grpcpp/create_channel.h" +#include "grpcpp/support/channel_arguments.h" #include "lyra/lyra.hpp" namespace cdc_ft { @@ -58,7 +61,13 @@ void StopCommand::RegisterCommandLineFlags(lyra::command& cmd) { absl::Status StopCommand::Run() { LogLevel level = Log::VerbosityToLogLevel(verbosity_); ScopedLog scoped_log(std::make_unique(level)); - LocalAssetsStreamManagerClient client(service_port_); + + std::string client_address = absl::StrFormat("localhost:%u", service_port_); + std::shared_ptr channel = grpc::CreateCustomChannel( + client_address, grpc::InsecureChannelCredentials(), + grpc::ChannelArguments()); + + LocalAssetsStreamManagerClient client(channel); std::string user_host, mount_dir; RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( diff --git a/common/port_manager_win.cc b/common/port_manager_win.cc index 2a06128..2c6c586 100644 --- a/common/port_manager_win.cc +++ b/common/port_manager_win.cc @@ -213,6 +213,7 @@ absl::StatusOr> PortManager::FindAvailableLocalPorts( ProcessStartInfo start_info; start_info.command = "netstat -a -n -p tcp"; start_info.name = "netstat"; + start_info.flags = ProcessFlags::kNoWindow; std::string output; start_info.stdout_handler = [&output](const char* data, size_t data_size) { @@ -246,6 +247,7 @@ absl::StatusOr> PortManager::FindAvailableRemotePorts( ProcessStartInfo start_info = remote_util->BuildProcessStartInfoForSsh(remote_command); start_info.name = "netstat"; + start_info.flags = ProcessFlags::kNoWindow; std::string output; start_info.stdout_handler = [&output](const char* data, size_t data_size) { diff --git a/common/process.h b/common/process.h index 1db47e1..48b14a7 100644 --- a/common/process.h +++ b/common/process.h @@ -33,6 +33,12 @@ namespace cdc_ft { absl::Status LogOutput(const char* name, const char* data, size_t data_size, absl::optional log_level = {}); +enum class ProcessFlags { + kNone = 0, + kDetached = 1 << 0, + kNoWindow = 1 << 1, +}; + struct ProcessStartInfo { // Handler for stdout/stderr. |data| is guaranteed to be NULL terminated, so // it may be used like a C-string if it's known to be text, e.g. for printf(). @@ -63,8 +69,14 @@ struct ProcessStartInfo { OutputHandler stdout_handler; OutputHandler stderr_handler; + // Flags that define additional properties of the process. + ProcessFlags flags = ProcessFlags::kNone; + // Returns |name| if set, otherwise |command|. const std::string& Name() const; + + // Tests ALL flags (flags & flag) == flag. + bool HasFlag(ProcessFlags flag) const; }; // Runs a background process and pipes stdin/stdout/stderr. @@ -75,6 +87,8 @@ class Process { static constexpr uint32_t kExitCodeFailedToGetExitCode = 4000000002; explicit Process(const ProcessStartInfo& start_info); + + // Terminates the process unless it's running with ProcessFlags::kDetached. virtual ~Process(); // Start the background process. @@ -140,6 +154,16 @@ class WinProcessFactory : public ProcessFactory { std::unique_ptr Create(const ProcessStartInfo& start_info) override; }; +inline ProcessFlags operator|(ProcessFlags a, ProcessFlags b) { + using T = std::underlying_type_t; + return static_cast(static_cast(a) | static_cast(b)); +} + +inline ProcessFlags operator&(ProcessFlags a, ProcessFlags b) { + using T = std::underlying_type_t; + return static_cast(static_cast(a) & static_cast(b)); +} + } // namespace cdc_ft #endif // COMMON_PROCESS_H_ diff --git a/common/process_win.cc b/common/process_win.cc index bacc4f2..20447d6 100644 --- a/common/process_win.cc +++ b/common/process_win.cc @@ -49,6 +49,24 @@ void SetThreadName(const std::string& name) { } } +int ToCreationFlags(ProcessFlags pflags) { +#define HANDLE_FLAG(pflag, cflag) \ + if ((pflags & pflag) == pflag) { \ + cflags |= cflag; \ + pdone = pdone | pflag; \ + } + + int cflags = 0; + ProcessFlags pdone = ProcessFlags::kNone; + HANDLE_FLAG(ProcessFlags::kDetached, DETACHED_PROCESS); + HANDLE_FLAG(ProcessFlags::kNoWindow, CREATE_NO_WINDOW); + assert(pflags == pdone); + +#undef HANDLE_FLAG + + return cflags; +} + std::atomic_int g_pipe_serial_number{0}; // Creates a pipe suitable for overlapped IO. Regular anonymous pipes in Windows @@ -567,6 +585,10 @@ const std::string& ProcessStartInfo::Name() const { return !name.empty() ? name : command; } +bool ProcessStartInfo::HasFlag(ProcessFlags flag) const { + return (flags & flag) == flag; +} + Process::Process(const ProcessStartInfo& start_info) : start_info_(start_info) {} @@ -593,6 +615,8 @@ class WinProcess : public Process { absl::Status GetStatus() const override; private: + void Reset(); + std::unique_ptr process_info_; std::unique_ptr message_pump_; }; @@ -600,7 +624,14 @@ class WinProcess : public Process { WinProcess::WinProcess(const ProcessStartInfo& start_info) : Process(start_info) {} -WinProcess::~WinProcess() { Terminate().IgnoreError(); } +WinProcess::~WinProcess() { + if (start_info_.HasFlag(ProcessFlags::kDetached)) { + // If the process runs detached, just reset handles, don't terminate it. + Reset(); + } else { + Terminate().IgnoreError(); + } +} absl::Status WinProcess::Start() { LOG_INFO("Starting process %s", start_info_.command.c_str()); @@ -676,10 +707,13 @@ absl::Status WinProcess::Start() { } JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = {0}; - jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + if (!start_info_.HasFlag(ProcessFlags::kDetached)) { + jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + } bool success = SetInformationJobObject(process_info_->job.Get(), JobObjectExtendedLimitInformation, &jeli, sizeof(jeli)); + if (!success) { return MakeStatus("SetInformationJobObject() failed: %s", Util::GetLastWin32Error()); @@ -691,7 +725,7 @@ absl::Status WinProcess::Start() { NULL, // Process handle not inheritable NULL, // Thread handle not inheritable TRUE, // Inherit handles - 0, // No creation flags + ToCreationFlags(start_info_.flags), NULL, // Use parent's environment block NULL, // Use parent's starting directory &si, &process_info_->pi); @@ -785,32 +819,39 @@ absl::Status WinProcess::Terminate() { message_pump_.reset(); } - if (process_info_) { - bool result = true; - if (should_terminate) { - result = TerminateProcess(process_info_->pi.hProcess, 0); - if (!result && GetLastError() == ERROR_ACCESS_DENIED) { - // This means that the process has already exited, but in a way that - // the exit wasn't properly reported to this code (e.g. the process got - // killed somewhere). Just handle this silently. - LOG_DEBUG("Process '%s' already exited", start_info_.Name()); - result = true; - } + std::string error_msg; + if (process_info_ && should_terminate && + !TerminateProcess(process_info_->pi.hProcess, 0)) { + if (GetLastError() == ERROR_ACCESS_DENIED) { + // This means that the process has already exited, but in a way that + // the exit wasn't properly reported to this code (e.g. the process got + // killed somewhere). Just handle this silently. + LOG_DEBUG("Process '%s' already exited", start_info_.Name()); + } else { + error_msg = Util::GetLastWin32Error(); } + } + // Reset handles. + Reset(); + + if (!error_msg.empty()) { + return MakeStatus("TerminateProcess() failed: %s", error_msg); + } + return absl::OkStatus(); +} + +void WinProcess::Reset() { + // Shut down message pump. + message_pump_.reset(); + + if (process_info_) { // Close the handles that are not scoped handles. ScopedHandle(process_info_->pi.hProcess).Close(); ScopedHandle(process_info_->pi.hThread).Close(); process_info_.reset(); - - if (!result) { - return MakeStatus("TerminateProcess() failed: %s", - Util::GetLastWin32Error()); - } } - - return absl::OkStatus(); } ProcessFactory::~ProcessFactory() = default; diff --git a/common/remote_util.cc b/common/remote_util.cc index 0af9448..715a895 100644 --- a/common/remote_util.cc +++ b/common/remote_util.cc @@ -75,6 +75,7 @@ absl::Status RemoteUtil::Scp(std::vector source_filepaths, // -p preserves timestamps. This enables timestamp-based up-to-date checks. ProcessStartInfo start_info; + start_info.flags = ProcessFlags::kNoWindow; start_info.command = absl::StrFormat( "%s " "%s %s -p -T " @@ -147,6 +148,7 @@ ProcessStartInfo RemoteUtil::BuildProcessStartInfoForSshInternal( QuoteForWindows(ssh_command_), quiet_ || verbosity_ < 2 ? "-q" : "", forward_arg, QuoteForWindows(user_host_), ssh_port_, remote_command_arg); start_info.forward_output_to_log = forward_output_to_log_; + start_info.flags = ProcessFlags::kNoWindow; return start_info; } diff --git a/proto/background_service.proto b/proto/background_service.proto index 88413d5..72c1dd3 100644 --- a/proto/background_service.proto +++ b/proto/background_service.proto @@ -23,22 +23,16 @@ import "google/protobuf/empty.proto"; service BackgroundService { // Exit is used to ask the service to exit. In the case of the process // manager, this cascades to all background processes. - rpc Exit(ExitRequest) returns (ExitResponse) {} + rpc Exit(google.protobuf.Empty) returns (google.protobuf.Empty) {} // GetPid is used to get the PID of the service process. - rpc GetPid(GetPidRequest) returns (GetPidResponse) {} + rpc GetPid(google.protobuf.Empty) returns (GetPidResponse) {} // HealthCheck is used to verify that the service is running. It returns an // empty protobuf if the service is ready to serve requests. rpc HealthCheck(google.protobuf.Empty) returns (google.protobuf.Empty) {} } -message ExitRequest {} - -message ExitResponse {} - -message GetPidRequest {} - message GetPidResponse { int32 pid = 1; } \ No newline at end of file