[cdc_stream] Automatically start service (#28)

Starts the streaming service if it's not up and running. This required
adding the ability to run a detached process. By default, all child
processes are killed when the parent process exits. Since detached
child processes don't run with a console, they need to create sub-
processes with CREATE_NO_WINDOW since otherwise a new console pops up,
e.g. for every ssh command.

Polls for 20 seconds while the service starts up. For this purpose,
a BackgroundServiceClient is added. This will be reused in a future CL
by a new stop-service command to exit the service.

Also adds --service-port as additional argument to start-service.
This commit is contained in:
Lutz Justen
2022-12-02 14:34:36 +01:00
committed by GitHub
parent 6d63aa72d7
commit 1120dcbee0
21 changed files with 372 additions and 76 deletions

View File

@@ -211,20 +211,12 @@ cdc_rsync C:\path\to\assets\* user@linux.device.com:~/assets -vr
### CDC Stream ### CDC Stream
`cdc_stream` consists of a background service, which has to be started in
advance with
```
cdc_stream start-service
```
The service logs to `%APPDATA%\cdc-file-transfer\logs` by default. Try
`cdc_stream --help` to get a list of available flags.
To stream the Windows directory `C:\path\to\assets` to `~/assets` on the Linux To stream the Windows directory `C:\path\to\assets` to `~/assets` on the Linux
device, run device, run
``` ```
cdc_stream start C:\path\to\assets user@linux.device.com:~/assets cdc_stream start C:\path\to\assets user@linux.device.com:~/assets
``` ```
This makes all files and directories of `C:\path\to\assets` available on This makes all files and directories in `C:\path\to\assets` available on
`~/assets` immediately, as if it were a local copy. However, data is streamed `~/assets` immediately, as if it were a local copy. However, data is streamed
from Windows to Linux as files are accessed. from Windows to Linux as files are accessed.
@@ -235,14 +227,31 @@ cdc_stream stop user@linux.device.com:~/assets
## Troubleshooting ## Troubleshooting
`cdc_rsync` always logs to the console. By default, the `cdc_stream` service On first run, `cdc_stream` starts a background service, which does all the work.
logs to a timestamped file in `%APPDATA%\cdc-file-transfer\logs`. It can be The `cdc_stream start` and `cdc_stream stop` commands are just RPC clients that
switched to log to console by starting it with `--log-to-stdout`: talk to the service.
```
cdc_stream start-service --log_to_stdout
```
Both `cdc_rsync` and `cdc_stream` support command line flags to control log The service logs to `%APPDATA%\cdc-file-transfer\logs` by default. The logs are
verbosity. Passing `-vvv` prints debug logs, `-vvvv` prints verbose logs. The useful to investigate issues with asset streaming. To pass custom arguments, or
debug logs contain all SSH and SCP commands that are attempted to run, which is to debug the service, create a JSON config file at
very useful for troubleshooting. `%APPDATA%\cdc-file-transfer\cdc_stream.json` with command line flags.
For instance,
```
{ "verbosity":3 }
```
instructs the service to log debug messages. Try `cdc_stream start-service -h`
for a list of available flags. Alternatively, run the service manually with
```
cdc_stream start-service
```
and pass the flags as command line arguments. When you run the service manually,
the flag `--log-to-stdout` is particularly useful as it logs to the console
instead of to the file.
`cdc_rsync` always logs to the console. To increase log verbosity, pass `-vvv`
for debug logs or `-vvvv` for verbose logs.
For both sync and stream, the debug logs contain all SSH and SCP commands that
are attempted to run, which is very useful for troubleshooting. If a command
fails unexpectedly, copy it and run it in isolation. Pass `-vv` or `-vvv` for
additional debug output.

View File

@@ -45,6 +45,7 @@ cc_library(
srcs = ["start_command.cc"], srcs = ["start_command.cc"],
hdrs = ["start_command.h"], hdrs = ["start_command.h"],
deps = [ deps = [
":background_service_client",
":base_command", ":base_command",
":local_assets_stream_manager_client", ":local_assets_stream_manager_client",
":session_management_server", ":session_management_server",
@@ -78,6 +79,18 @@ cc_library(
], ],
) )
cc_library(
name = "background_service_client",
srcs = ["background_service_client.cc"],
hdrs = ["background_service_client.h"],
deps = [
"//common:grpc_status",
"//common:status_macros",
"//proto:background_service_grpc_proto",
"@com_google_absl//absl/status",
],
)
cc_library( cc_library(
name = "asset_stream_server", name = "asset_stream_server",
srcs = [ srcs = [
@@ -112,6 +125,8 @@ cc_library(
deps = [ deps = [
":base_command", ":base_command",
":multi_session", ":multi_session",
":session_management_server",
"//absl_helper:jedec_size_flag",
"//common:log", "//common:log",
"//common:path", "//common:path",
"//common:status_macros", "//common:status_macros",

View File

@@ -20,6 +20,7 @@
#include "absl/strings/str_join.h" #include "absl/strings/str_join.h"
#include "absl_helper/jedec_size_flag.h" #include "absl_helper/jedec_size_flag.h"
#include "cdc_stream/base_command.h" #include "cdc_stream/base_command.h"
#include "cdc_stream/session_management_server.h"
#include "common/buffer.h" #include "common/buffer.h"
#include "common/path.h" #include "common/path.h"
#include "common/status_macros.h" #include "common/status_macros.h"
@@ -41,6 +42,13 @@ AssetStreamConfig::~AssetStreamConfig() = default;
void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd, void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd,
BaseCommand& base_command) { BaseCommand& base_command) {
service_port_ = SessionManagementServer::kDefaultServicePort;
cmd.add_argument(lyra::opt(service_port_, "port")
.name("--service-port")
.help("Local port to use while connecting to the local "
"asset stream service, default: " +
std::to_string(service_port_)));
session_cfg_.verbosity = kDefaultVerbosity; session_cfg_.verbosity = kDefaultVerbosity;
cmd.add_argument(lyra::opt(session_cfg_.verbosity, "num") cmd.add_argument(lyra::opt(session_cfg_.verbosity, "num")
.name("--verbosity") .name("--verbosity")
@@ -174,6 +182,7 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) {
} \ } \
} while (0) } while (0)
ASSIGN_VAR(service_port_, "service-port", Int);
ASSIGN_VAR(session_cfg_.verbosity, "verbosity", Int); ASSIGN_VAR(session_cfg_.verbosity, "verbosity", Int);
ASSIGN_VAR(session_cfg_.fuse_debug, "debug", Bool); ASSIGN_VAR(session_cfg_.fuse_debug, "debug", Bool);
ASSIGN_VAR(session_cfg_.fuse_singlethreaded, "singlethreaded", Bool); ASSIGN_VAR(session_cfg_.fuse_singlethreaded, "singlethreaded", Bool);
@@ -212,6 +221,7 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) {
std::string AssetStreamConfig::ToString() { std::string AssetStreamConfig::ToString() {
std::ostringstream ss; std::ostringstream ss;
ss << "service-port = " << service_port_ << std::endl;
ss << "verbosity = " << session_cfg_.verbosity ss << "verbosity = " << session_cfg_.verbosity
<< std::endl; << std::endl;
ss << "debug = " << session_cfg_.fuse_debug ss << "debug = " << session_cfg_.fuse_debug

View File

@@ -76,6 +76,9 @@ class AssetStreamConfig {
// read from the JSON file. // read from the JSON file.
std::string GetFlagReadErrors(); std::string GetFlagReadErrors();
// Gets the port to use for the asset streaming service.
uint16_t service_port() const { return service_port_; }
// Session configuration. // Session configuration.
const SessionConfig& session_cfg() const { return session_cfg_; } const SessionConfig& session_cfg() const { return session_cfg_; }
@@ -91,6 +94,13 @@ class AssetStreamConfig {
bool log_to_stdout() const { return log_to_stdout_; } bool log_to_stdout() const { return log_to_stdout_; }
private: private:
// Jedec parser for Lyra options. Usage:
// lyra::opt(JedecParser("size-flag", &size_bytes), "bytes"))
// Sets jedec_parse_error_ on error, Lyra doesn't support errors from lambdas.
std::function<void(const std::string&)> JedecParser(const char* flag_name,
uint64_t* bytes);
uint16_t service_port_ = 0;
SessionConfig session_cfg_; SessionConfig session_cfg_;
bool log_to_stdout_ = false; bool log_to_stdout_ = false;

View File

@@ -0,0 +1,56 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "cdc_stream/background_service_client.h"
#include "absl/status/status.h"
#include "common/grpc_status.h"
#include "common/status_macros.h"
#include "grpcpp/channel.h"
namespace cdc_ft {
using GetPidResponse = backgroundservice::GetPidResponse;
using EmptyProto = google::protobuf::Empty;
BackgroundServiceClient::BackgroundServiceClient(
std::shared_ptr<grpc::Channel> channel) {
stub_ = BackgroundService::NewStub(std::move(channel));
}
BackgroundServiceClient::~BackgroundServiceClient() = default;
absl::Status BackgroundServiceClient::Exit() {
EmptyProto request;
EmptyProto response;
grpc::ClientContext context;
return ToAbslStatus(stub_->Exit(&context, request, &response));
}
absl::StatusOr<int> BackgroundServiceClient::GetPid() {
EmptyProto request;
GetPidResponse response;
grpc::ClientContext context;
RETURN_IF_ERROR(ToAbslStatus(stub_->GetPid(&context, request, &response)));
return response.pid();
}
absl::Status BackgroundServiceClient::IsHealthy() {
EmptyProto request;
EmptyProto response;
grpc::ClientContext context;
return ToAbslStatus(stub_->HealthCheck(&context, request, &response));
}
} // namespace cdc_ft

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_
#define CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "proto/background_service.grpc.pb.h"
namespace grpc_impl {
class Channel;
}
namespace cdc_ft {
// gRpc client for managing the asset streaming service.
class BackgroundServiceClient {
public:
// |channel| is a grpc channel to use.
explicit BackgroundServiceClient(std::shared_ptr<grpc::Channel> channel);
~BackgroundServiceClient();
// Initialize service shutdown.
absl::Status Exit();
// Returns the PID of the service process.
absl::StatusOr<int> GetPid();
// Verifies that the service is running and able to take requests.
absl::Status IsHealthy();
private:
using BackgroundService = backgroundservice::BackgroundService;
std::unique_ptr<BackgroundService::Stub> stub_;
};
} // namespace cdc_ft
#endif // CDC_STREAM_BACKGROUND_SERVICE_CLIENT_H_

View File

@@ -30,8 +30,8 @@ void BackgroundServiceImpl::SetExitCallback(ExitCallback exit_callback) {
} }
grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context, grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context,
const ExitRequest* request, const EmptyProto* request,
ExitResponse* response) { EmptyProto* response) {
LOG_INFO("RPC:Exit"); LOG_INFO("RPC:Exit");
if (exit_callback_) { if (exit_callback_) {
return ToGrpcStatus(exit_callback_()); return ToGrpcStatus(exit_callback_());
@@ -40,7 +40,7 @@ grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context,
} }
grpc::Status BackgroundServiceImpl::GetPid(grpc::ServerContext* context, grpc::Status BackgroundServiceImpl::GetPid(grpc::ServerContext* context,
const GetPidRequest* request, const EmptyProto* request,
GetPidResponse* response) { GetPidResponse* response) {
LOG_INFO("RPC:GetPid"); LOG_INFO("RPC:GetPid");
response->set_pid(static_cast<int32_t>(Util::GetPid())); response->set_pid(static_cast<int32_t>(Util::GetPid()));

View File

@@ -30,9 +30,6 @@ namespace cdc_ft {
class BackgroundServiceImpl final class BackgroundServiceImpl final
: public backgroundservice::BackgroundService::Service { : public backgroundservice::BackgroundService::Service {
public: public:
using ExitRequest = backgroundservice::ExitRequest;
using ExitResponse = backgroundservice::ExitResponse;
using GetPidRequest = backgroundservice::GetPidRequest;
using GetPidResponse = backgroundservice::GetPidResponse; using GetPidResponse = backgroundservice::GetPidResponse;
using EmptyProto = google::protobuf::Empty; using EmptyProto = google::protobuf::Empty;
@@ -43,11 +40,10 @@ class BackgroundServiceImpl final
using ExitCallback = std::function<absl::Status()>; using ExitCallback = std::function<absl::Status()>;
void SetExitCallback(ExitCallback exit_callback); void SetExitCallback(ExitCallback exit_callback);
grpc::Status Exit(grpc::ServerContext* context, const ExitRequest* request, grpc::Status Exit(grpc::ServerContext* context, const EmptyProto* request,
ExitResponse* response) override; EmptyProto* response) override;
grpc::Status GetPid(grpc::ServerContext* context, grpc::Status GetPid(grpc::ServerContext* context, const EmptyProto* request,
const GetPidRequest* request,
GetPidResponse* response) override; GetPidResponse* response) override;
grpc::Status HealthCheck(grpc::ServerContext* context, grpc::Status HealthCheck(grpc::ServerContext* context,

View File

@@ -28,15 +28,6 @@ using StartSessionResponse = localassetsstreammanager::StartSessionResponse;
using StopSessionRequest = localassetsstreammanager::StopSessionRequest; using StopSessionRequest = localassetsstreammanager::StopSessionRequest;
using StopSessionResponse = localassetsstreammanager::StopSessionResponse; using StopSessionResponse = localassetsstreammanager::StopSessionResponse;
LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient(
uint16_t service_port) {
std::string client_address = absl::StrFormat("localhost:%u", service_port);
std::shared_ptr<grpc::Channel> channel = grpc::CreateCustomChannel(
client_address, grpc::InsecureChannelCredentials(),
grpc::ChannelArguments());
stub_ = LocalAssetsStreamManager::NewStub(std::move(channel));
}
LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient( LocalAssetsStreamManagerClient::LocalAssetsStreamManagerClient(
std::shared_ptr<grpc::Channel> channel) { std::shared_ptr<grpc::Channel> channel) {
stub_ = LocalAssetsStreamManager::NewStub(std::move(channel)); stub_ = LocalAssetsStreamManager::NewStub(std::move(channel));

View File

@@ -20,7 +20,6 @@
#include <memory> #include <memory>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "grpcpp/channel.h"
#include "proto/local_assets_stream_manager.grpc.pb.h" #include "proto/local_assets_stream_manager.grpc.pb.h"
namespace grpc_impl { namespace grpc_impl {
@@ -32,8 +31,6 @@ namespace cdc_ft {
// gRpc client for starting/stopping asset streaming sessions. // gRpc client for starting/stopping asset streaming sessions.
class LocalAssetsStreamManagerClient { class LocalAssetsStreamManagerClient {
public: public:
explicit LocalAssetsStreamManagerClient(uint16_t service_port);
// |channel| is a grpc channel to use. // |channel| is a grpc channel to use.
explicit LocalAssetsStreamManagerClient( explicit LocalAssetsStreamManagerClient(
std::shared_ptr<grpc::Channel> channel); std::shared_ptr<grpc::Channel> channel);

View File

@@ -277,6 +277,7 @@ absl::Status LocalAssetsStreamManagerServiceImpl::InitSsh(
absl::StrFormat(" --organization %s", Quoted(organization_id)); absl::StrFormat(" --organization %s", Quoted(organization_id));
} }
start_info.name = "ggp ssh init"; start_info.name = "ggp ssh init";
start_info.flags = ProcessFlags::kNoWindow;
std::string output; std::string output;
start_info.stdout_handler = [&output, this](const char* data, start_info.stdout_handler = [&output, this](const char* data,

View File

@@ -36,7 +36,7 @@ class ProcessFactory;
// - Background // - Background
class SessionManagementServer { class SessionManagementServer {
public: public:
static constexpr int kDefaultServicePort = 44432; static constexpr uint16_t kDefaultServicePort = 44432;
SessionManagementServer(grpc::Service* session_service, SessionManagementServer(grpc::Service* session_service,
grpc::Service* background_service, grpc::Service* background_service,

View File

@@ -16,12 +16,19 @@
#include <memory> #include <memory>
#include "cdc_stream/background_service_client.h"
#include "cdc_stream/local_assets_stream_manager_client.h" #include "cdc_stream/local_assets_stream_manager_client.h"
#include "cdc_stream/session_management_server.h" #include "cdc_stream/session_management_server.h"
#include "common/log.h" #include "common/log.h"
#include "common/path.h" #include "common/path.h"
#include "common/process.h"
#include "common/remote_util.h" #include "common/remote_util.h"
#include "common/status_macros.h" #include "common/status_macros.h"
#include "common/stopwatch.h"
#include "common/util.h"
#include "grpcpp/channel.h"
#include "grpcpp/create_channel.h"
#include "grpcpp/support/channel_arguments.h"
#include "lyra/lyra.hpp" #include "lyra/lyra.hpp"
namespace cdc_ft { namespace cdc_ft {
@@ -29,6 +36,19 @@ namespace {
constexpr int kDefaultVerbosity = 2; constexpr int kDefaultVerbosity = 2;
} // namespace } // namespace
namespace {
// Time to poll until the streaming service becomes healthy.
constexpr double kServiceStartupTimeoutSec = 20.0;
std::shared_ptr<grpc::Channel> CreateChannel(uint16_t service_port) {
std::string client_address = absl::StrFormat("localhost:%u", service_port);
return grpc::CreateCustomChannel(client_address,
grpc::InsecureChannelCredentials(),
grpc::ChannelArguments());
}
} // namespace
StartCommand::StartCommand(int* exit_code) StartCommand::StartCommand(int* exit_code)
: BaseCommand("start", : BaseCommand("start",
"Start streaming files from a Windows to a Linux device", "Start streaming files from a Windows to a Linux device",
@@ -89,16 +109,33 @@ void StartCommand::RegisterCommandLineFlags(lyra::command& cmd) {
absl::Status StartCommand::Run() { absl::Status StartCommand::Run() {
LogLevel level = Log::VerbosityToLogLevel(verbosity_); LogLevel level = Log::VerbosityToLogLevel(verbosity_);
ScopedLog scoped_log(std::make_unique<ConsoleLog>(level)); ScopedLog scoped_log(std::make_unique<ConsoleLog>(level));
LocalAssetsStreamManagerClient client(service_port_);
std::string full_src_dir = path::GetFullPath(src_dir_); std::string full_src_dir = path::GetFullPath(src_dir_);
std::string user_host, mount_dir; std::string user_host, mount_dir;
RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir(
user_host_dir_, &user_host, &mount_dir)); user_host_dir_, &user_host, &mount_dir));
LocalAssetsStreamManagerClient client(CreateChannel(service_port_));
absl::Status status = absl::Status status =
client.StartSession(full_src_dir, user_host, ssh_port_, mount_dir, client.StartSession(full_src_dir, user_host, ssh_port_, mount_dir,
ssh_command_, scp_command_); ssh_command_, scp_command_);
if (absl::IsUnavailable(status)) {
LOG_DEBUG("StartSession status: %s", status.ToString());
LOG_INFO("Streaming service is unavailable. Starting it...");
status = StartStreamingService();
if (status.ok()) {
LOG_INFO("Streaming service successfully started");
// Recreate client. The old channel might still be in a transient failure
// state.
LocalAssetsStreamManagerClient new_client(CreateChannel(service_port_));
status = new_client.StartSession(full_src_dir, user_host, ssh_port_,
mount_dir, ssh_command_, scp_command_);
}
}
if (status.ok()) { if (status.ok()) {
LOG_INFO("Started streaming directory '%s' to '%s:%s'", src_dir_, user_host, LOG_INFO("Started streaming directory '%s' to '%s:%s'", src_dir_, user_host,
mount_dir); mount_dir);
@@ -107,4 +144,44 @@ absl::Status StartCommand::Run() {
return status; return status;
} }
absl::Status StartCommand::StartStreamingService() {
std::string exe_dir;
RETURN_IF_ERROR(path::GetExeDir(&exe_dir),
"Failed to get executable directory");
std::string exe_path = path::Join(exe_dir, "cdc_stream");
// Try starting the service first.
WinProcessFactory process_factory;
ProcessStartInfo start_info;
start_info.command =
absl::StrFormat("%s start-service --verbosity=%i --service-port=%i",
exe_path, verbosity_, service_port_);
start_info.flags = ProcessFlags::kDetached;
std::unique_ptr<Process> service_process = process_factory.Create(start_info);
RETURN_IF_ERROR(service_process->Start(),
"Failed to start asset streaming service");
// Poll until the service becomes healthy.
LOG_INFO("Streaming service initializing...");
Stopwatch sw;
while (sw.ElapsedSeconds() < kServiceStartupTimeoutSec) {
// The channel is in some transient failure state, and it's faster to
// reconnect instead of waiting for it to return.
BackgroundServiceClient bg_client(CreateChannel(service_port_));
absl::Status status = bg_client.IsHealthy();
if (status.ok()) {
return absl::OkStatus();
}
LOG_DEBUG("Health check result: %s", status.ToString());
Util::Sleep(100);
}
// Kill the process.
service_process->Terminate();
return absl::DeadlineExceededError(
absl::StrFormat("Timed out after %0.0f seconds waiting for the asset "
"streaming service to become healthy",
kServiceStartupTimeoutSec));
}
} // namespace cdc_ft } // namespace cdc_ft

View File

@@ -20,6 +20,10 @@
#include "absl/status/status.h" #include "absl/status/status.h"
#include "cdc_stream/base_command.h" #include "cdc_stream/base_command.h"
namespace grpc {
class Channel;
}
namespace cdc_ft { namespace cdc_ft {
// Handler for the start command. Sends an RPC call to the service to starts a // Handler for the start command. Sends an RPC call to the service to starts a
@@ -34,6 +38,9 @@ class StartCommand : public BaseCommand {
absl::Status Run() override; absl::Status Run() override;
private: private:
// Starts the asset streaming service.
absl::Status StartStreamingService();
int verbosity_ = 0; int verbosity_ = 0;
uint16_t service_port_ = 0; uint16_t service_port_ = 0;
uint16_t ssh_port_ = 0; uint16_t ssh_port_ = 0;

View File

@@ -43,7 +43,7 @@ StartServiceCommand::StartServiceCommand(int* exit_code)
StartServiceCommand::~StartServiceCommand() = default; StartServiceCommand::~StartServiceCommand() = default;
void StartServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) { void StartServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) {
config_file_ = "%APPDATA%\\cdc-file-transfer\\assets_stream_manager.json"; config_file_ = "%APPDATA%\\cdc-file-transfer\\cdc_stream.json";
cmd.add_argument( cmd.add_argument(
lyra::opt(config_file_, "path") lyra::opt(config_file_, "path")
.name("--config-file") .name("--config-file")
@@ -147,8 +147,7 @@ absl::Status StartServiceCommand::RunService() {
RETURN_ABSL_IF_ERROR( RETURN_ABSL_IF_ERROR(
session_service.StartSession(nullptr, &request, &response)); session_service.StartSession(nullptr, &request, &response));
} }
RETURN_IF_ERROR( RETURN_IF_ERROR(sm_server.Start(cfg_.service_port()));
sm_server.Start(SessionManagementServer::kDefaultServicePort));
sm_server.RunUntilShutdown(); sm_server.RunUntilShutdown();
return absl::OkStatus(); return absl::OkStatus();
} }

View File

@@ -21,6 +21,9 @@
#include "common/log.h" #include "common/log.h"
#include "common/path.h" #include "common/path.h"
#include "common/status_macros.h" #include "common/status_macros.h"
#include "grpcpp/channel.h"
#include "grpcpp/create_channel.h"
#include "grpcpp/support/channel_arguments.h"
#include "lyra/lyra.hpp" #include "lyra/lyra.hpp"
namespace cdc_ft { namespace cdc_ft {
@@ -58,7 +61,13 @@ void StopCommand::RegisterCommandLineFlags(lyra::command& cmd) {
absl::Status StopCommand::Run() { absl::Status StopCommand::Run() {
LogLevel level = Log::VerbosityToLogLevel(verbosity_); LogLevel level = Log::VerbosityToLogLevel(verbosity_);
ScopedLog scoped_log(std::make_unique<ConsoleLog>(level)); ScopedLog scoped_log(std::make_unique<ConsoleLog>(level));
LocalAssetsStreamManagerClient client(service_port_);
std::string client_address = absl::StrFormat("localhost:%u", service_port_);
std::shared_ptr<grpc::Channel> channel = grpc::CreateCustomChannel(
client_address, grpc::InsecureChannelCredentials(),
grpc::ChannelArguments());
LocalAssetsStreamManagerClient client(channel);
std::string user_host, mount_dir; std::string user_host, mount_dir;
RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir( RETURN_IF_ERROR(LocalAssetsStreamManagerClient::ParseUserHostDir(

View File

@@ -213,6 +213,7 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableLocalPorts(
ProcessStartInfo start_info; ProcessStartInfo start_info;
start_info.command = "netstat -a -n -p tcp"; start_info.command = "netstat -a -n -p tcp";
start_info.name = "netstat"; start_info.name = "netstat";
start_info.flags = ProcessFlags::kNoWindow;
std::string output; std::string output;
start_info.stdout_handler = [&output](const char* data, size_t data_size) { start_info.stdout_handler = [&output](const char* data, size_t data_size) {
@@ -246,6 +247,7 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableRemotePorts(
ProcessStartInfo start_info = ProcessStartInfo start_info =
remote_util->BuildProcessStartInfoForSsh(remote_command); remote_util->BuildProcessStartInfoForSsh(remote_command);
start_info.name = "netstat"; start_info.name = "netstat";
start_info.flags = ProcessFlags::kNoWindow;
std::string output; std::string output;
start_info.stdout_handler = [&output](const char* data, size_t data_size) { start_info.stdout_handler = [&output](const char* data, size_t data_size) {

View File

@@ -33,6 +33,12 @@ namespace cdc_ft {
absl::Status LogOutput(const char* name, const char* data, size_t data_size, absl::Status LogOutput(const char* name, const char* data, size_t data_size,
absl::optional<LogLevel> log_level = {}); absl::optional<LogLevel> log_level = {});
enum class ProcessFlags {
kNone = 0,
kDetached = 1 << 0,
kNoWindow = 1 << 1,
};
struct ProcessStartInfo { struct ProcessStartInfo {
// Handler for stdout/stderr. |data| is guaranteed to be NULL terminated, so // Handler for stdout/stderr. |data| is guaranteed to be NULL terminated, so
// it may be used like a C-string if it's known to be text, e.g. for printf(). // it may be used like a C-string if it's known to be text, e.g. for printf().
@@ -63,8 +69,14 @@ struct ProcessStartInfo {
OutputHandler stdout_handler; OutputHandler stdout_handler;
OutputHandler stderr_handler; OutputHandler stderr_handler;
// Flags that define additional properties of the process.
ProcessFlags flags = ProcessFlags::kNone;
// Returns |name| if set, otherwise |command|. // Returns |name| if set, otherwise |command|.
const std::string& Name() const; const std::string& Name() const;
// Tests ALL flags (flags & flag) == flag.
bool HasFlag(ProcessFlags flag) const;
}; };
// Runs a background process and pipes stdin/stdout/stderr. // Runs a background process and pipes stdin/stdout/stderr.
@@ -75,6 +87,8 @@ class Process {
static constexpr uint32_t kExitCodeFailedToGetExitCode = 4000000002; static constexpr uint32_t kExitCodeFailedToGetExitCode = 4000000002;
explicit Process(const ProcessStartInfo& start_info); explicit Process(const ProcessStartInfo& start_info);
// Terminates the process unless it's running with ProcessFlags::kDetached.
virtual ~Process(); virtual ~Process();
// Start the background process. // Start the background process.
@@ -140,6 +154,16 @@ class WinProcessFactory : public ProcessFactory {
std::unique_ptr<Process> Create(const ProcessStartInfo& start_info) override; std::unique_ptr<Process> Create(const ProcessStartInfo& start_info) override;
}; };
inline ProcessFlags operator|(ProcessFlags a, ProcessFlags b) {
using T = std::underlying_type_t<ProcessFlags>;
return static_cast<ProcessFlags>(static_cast<T>(a) | static_cast<T>(b));
}
inline ProcessFlags operator&(ProcessFlags a, ProcessFlags b) {
using T = std::underlying_type_t<ProcessFlags>;
return static_cast<ProcessFlags>(static_cast<T>(a) & static_cast<T>(b));
}
} // namespace cdc_ft } // namespace cdc_ft
#endif // COMMON_PROCESS_H_ #endif // COMMON_PROCESS_H_

View File

@@ -49,6 +49,24 @@ void SetThreadName(const std::string& name) {
} }
} }
int ToCreationFlags(ProcessFlags pflags) {
#define HANDLE_FLAG(pflag, cflag) \
if ((pflags & pflag) == pflag) { \
cflags |= cflag; \
pdone = pdone | pflag; \
}
int cflags = 0;
ProcessFlags pdone = ProcessFlags::kNone;
HANDLE_FLAG(ProcessFlags::kDetached, DETACHED_PROCESS);
HANDLE_FLAG(ProcessFlags::kNoWindow, CREATE_NO_WINDOW);
assert(pflags == pdone);
#undef HANDLE_FLAG
return cflags;
}
std::atomic_int g_pipe_serial_number{0}; std::atomic_int g_pipe_serial_number{0};
// Creates a pipe suitable for overlapped IO. Regular anonymous pipes in Windows // Creates a pipe suitable for overlapped IO. Regular anonymous pipes in Windows
@@ -567,6 +585,10 @@ const std::string& ProcessStartInfo::Name() const {
return !name.empty() ? name : command; return !name.empty() ? name : command;
} }
bool ProcessStartInfo::HasFlag(ProcessFlags flag) const {
return (flags & flag) == flag;
}
Process::Process(const ProcessStartInfo& start_info) Process::Process(const ProcessStartInfo& start_info)
: start_info_(start_info) {} : start_info_(start_info) {}
@@ -593,6 +615,8 @@ class WinProcess : public Process {
absl::Status GetStatus() const override; absl::Status GetStatus() const override;
private: private:
void Reset();
std::unique_ptr<ProcessInfo> process_info_; std::unique_ptr<ProcessInfo> process_info_;
std::unique_ptr<MessagePumpThread> message_pump_; std::unique_ptr<MessagePumpThread> message_pump_;
}; };
@@ -600,7 +624,14 @@ class WinProcess : public Process {
WinProcess::WinProcess(const ProcessStartInfo& start_info) WinProcess::WinProcess(const ProcessStartInfo& start_info)
: Process(start_info) {} : Process(start_info) {}
WinProcess::~WinProcess() { Terminate().IgnoreError(); } WinProcess::~WinProcess() {
if (start_info_.HasFlag(ProcessFlags::kDetached)) {
// If the process runs detached, just reset handles, don't terminate it.
Reset();
} else {
Terminate().IgnoreError();
}
}
absl::Status WinProcess::Start() { absl::Status WinProcess::Start() {
LOG_INFO("Starting process %s", start_info_.command.c_str()); LOG_INFO("Starting process %s", start_info_.command.c_str());
@@ -676,10 +707,13 @@ absl::Status WinProcess::Start() {
} }
JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = {0}; JOBOBJECT_EXTENDED_LIMIT_INFORMATION jeli = {0};
if (!start_info_.HasFlag(ProcessFlags::kDetached)) {
jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
}
bool success = SetInformationJobObject(process_info_->job.Get(), bool success = SetInformationJobObject(process_info_->job.Get(),
JobObjectExtendedLimitInformation, JobObjectExtendedLimitInformation,
&jeli, sizeof(jeli)); &jeli, sizeof(jeli));
if (!success) { if (!success) {
return MakeStatus("SetInformationJobObject() failed: %s", return MakeStatus("SetInformationJobObject() failed: %s",
Util::GetLastWin32Error()); Util::GetLastWin32Error());
@@ -691,7 +725,7 @@ absl::Status WinProcess::Start() {
NULL, // Process handle not inheritable NULL, // Process handle not inheritable
NULL, // Thread handle not inheritable NULL, // Thread handle not inheritable
TRUE, // Inherit handles TRUE, // Inherit handles
0, // No creation flags ToCreationFlags(start_info_.flags),
NULL, // Use parent's environment block NULL, // Use parent's environment block
NULL, // Use parent's starting directory NULL, // Use parent's starting directory
&si, &process_info_->pi); &si, &process_info_->pi);
@@ -785,34 +819,41 @@ absl::Status WinProcess::Terminate() {
message_pump_.reset(); message_pump_.reset();
} }
if (process_info_) { std::string error_msg;
bool result = true; if (process_info_ && should_terminate &&
if (should_terminate) { !TerminateProcess(process_info_->pi.hProcess, 0)) {
result = TerminateProcess(process_info_->pi.hProcess, 0); if (GetLastError() == ERROR_ACCESS_DENIED) {
if (!result && GetLastError() == ERROR_ACCESS_DENIED) {
// This means that the process has already exited, but in a way that // This means that the process has already exited, but in a way that
// the exit wasn't properly reported to this code (e.g. the process got // the exit wasn't properly reported to this code (e.g. the process got
// killed somewhere). Just handle this silently. // killed somewhere). Just handle this silently.
LOG_DEBUG("Process '%s' already exited", start_info_.Name()); LOG_DEBUG("Process '%s' already exited", start_info_.Name());
result = true; } else {
error_msg = Util::GetLastWin32Error();
} }
} }
// Reset handles.
Reset();
if (!error_msg.empty()) {
return MakeStatus("TerminateProcess() failed: %s", error_msg);
}
return absl::OkStatus();
}
void WinProcess::Reset() {
// Shut down message pump.
message_pump_.reset();
if (process_info_) {
// Close the handles that are not scoped handles. // Close the handles that are not scoped handles.
ScopedHandle(process_info_->pi.hProcess).Close(); ScopedHandle(process_info_->pi.hProcess).Close();
ScopedHandle(process_info_->pi.hThread).Close(); ScopedHandle(process_info_->pi.hThread).Close();
process_info_.reset(); process_info_.reset();
if (!result) {
return MakeStatus("TerminateProcess() failed: %s",
Util::GetLastWin32Error());
} }
} }
return absl::OkStatus();
}
ProcessFactory::~ProcessFactory() = default; ProcessFactory::~ProcessFactory() = default;
absl::Status ProcessFactory::Run(const ProcessStartInfo& start_info) { absl::Status ProcessFactory::Run(const ProcessStartInfo& start_info) {

View File

@@ -75,6 +75,7 @@ absl::Status RemoteUtil::Scp(std::vector<std::string> source_filepaths,
// -p preserves timestamps. This enables timestamp-based up-to-date checks. // -p preserves timestamps. This enables timestamp-based up-to-date checks.
ProcessStartInfo start_info; ProcessStartInfo start_info;
start_info.flags = ProcessFlags::kNoWindow;
start_info.command = absl::StrFormat( start_info.command = absl::StrFormat(
"%s " "%s "
"%s %s -p -T " "%s %s -p -T "
@@ -147,6 +148,7 @@ ProcessStartInfo RemoteUtil::BuildProcessStartInfoForSshInternal(
QuoteForWindows(ssh_command_), quiet_ || verbosity_ < 2 ? "-q" : "", QuoteForWindows(ssh_command_), quiet_ || verbosity_ < 2 ? "-q" : "",
forward_arg, QuoteForWindows(user_host_), ssh_port_, remote_command_arg); forward_arg, QuoteForWindows(user_host_), ssh_port_, remote_command_arg);
start_info.forward_output_to_log = forward_output_to_log_; start_info.forward_output_to_log = forward_output_to_log_;
start_info.flags = ProcessFlags::kNoWindow;
return start_info; return start_info;
} }

View File

@@ -23,22 +23,16 @@ import "google/protobuf/empty.proto";
service BackgroundService { service BackgroundService {
// Exit is used to ask the service to exit. In the case of the process // Exit is used to ask the service to exit. In the case of the process
// manager, this cascades to all background processes. // manager, this cascades to all background processes.
rpc Exit(ExitRequest) returns (ExitResponse) {} rpc Exit(google.protobuf.Empty) returns (google.protobuf.Empty) {}
// GetPid is used to get the PID of the service process. // GetPid is used to get the PID of the service process.
rpc GetPid(GetPidRequest) returns (GetPidResponse) {} rpc GetPid(google.protobuf.Empty) returns (GetPidResponse) {}
// HealthCheck is used to verify that the service is running. It returns an // HealthCheck is used to verify that the service is running. It returns an
// empty protobuf if the service is ready to serve requests. // empty protobuf if the service is ready to serve requests.
rpc HealthCheck(google.protobuf.Empty) returns (google.protobuf.Empty) {} rpc HealthCheck(google.protobuf.Empty) returns (google.protobuf.Empty) {}
} }
message ExitRequest {}
message ExitResponse {}
message GetPidRequest {}
message GetPidResponse { message GetPidResponse {
int32 pid = 1; int32 pid = 1;
} }