diff --git a/all_files.vcxitems b/all_files.vcxitems index 1fac1d7..f7f0f20 100644 --- a/all_files.vcxitems +++ b/all_files.vcxitems @@ -33,6 +33,7 @@ + @@ -142,6 +143,7 @@ + diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD index a26b91e..739fb93 100644 --- a/cdc_stream/BUILD +++ b/cdc_stream/BUILD @@ -12,6 +12,7 @@ cc_binary( ":start_command", ":start_service_command", ":stop_command", + ":stop_service_command", "//common:log", "//common:path", ], @@ -40,6 +41,18 @@ cc_library( ], ) +cc_library( + name = "stop_service_command", + srcs = ["stop_service_command.cc"], + hdrs = ["stop_service_command.h"], + deps = [ + ":asset_stream_config", + ":background_service_client", + ":base_command", + ":session_management_server", + ], +) + cc_library( name = "start_command", srcs = ["start_command.cc"], diff --git a/cdc_stream/background_service_impl.cc b/cdc_stream/background_service_impl.cc index 1cc63c4..a3ed29e 100644 --- a/cdc_stream/background_service_impl.cc +++ b/cdc_stream/background_service_impl.cc @@ -23,7 +23,12 @@ namespace cdc_ft { BackgroundServiceImpl::BackgroundServiceImpl() {} -BackgroundServiceImpl::~BackgroundServiceImpl() = default; +BackgroundServiceImpl::~BackgroundServiceImpl() { + if (exit_thread_) { + exit_thread_->join(); + exit_thread_.reset(); + } +} void BackgroundServiceImpl::SetExitCallback(ExitCallback exit_callback) { exit_callback_ = std::move(exit_callback); @@ -33,8 +38,11 @@ grpc::Status BackgroundServiceImpl::Exit(grpc::ServerContext* context, const EmptyProto* request, EmptyProto* response) { LOG_INFO("RPC:Exit"); - if (exit_callback_) { - return ToGrpcStatus(exit_callback_()); + if (exit_callback_ && !exit_thread_) { + // Fire up a thread so call the callback, since shutting down a server + // won't finish until all RPCs are done. + exit_thread_ = + std::make_unique([cb = &exit_callback_]() { (*cb)(); }); } return grpc::Status::OK; } diff --git a/cdc_stream/background_service_impl.h b/cdc_stream/background_service_impl.h index 326208a..adda982 100644 --- a/cdc_stream/background_service_impl.h +++ b/cdc_stream/background_service_impl.h @@ -17,6 +17,9 @@ #ifndef CDC_STREAM_BACKGROUND_SERVICE_IMPL_H_ #define CDC_STREAM_BACKGROUND_SERVICE_IMPL_H_ +#include +#include + #include "absl/status/status.h" #include "cdc_stream/background_service_impl.h" #include "cdc_stream/session_management_server.h" @@ -52,6 +55,7 @@ class BackgroundServiceImpl final private: ExitCallback exit_callback_; + std::unique_ptr exit_thread_; }; } // namespace cdc_ft diff --git a/cdc_stream/main.cc b/cdc_stream/main.cc index 0c349f2..a167021 100644 --- a/cdc_stream/main.cc +++ b/cdc_stream/main.cc @@ -15,6 +15,7 @@ #include "cdc_stream/start_command.h" #include "cdc_stream/start_service_command.h" #include "cdc_stream/stop_command.h" +#include "cdc_stream/stop_service_command.h" #include "lyra/lyra.hpp" int main(int argc, char* argv[]) { @@ -33,6 +34,9 @@ int main(int argc, char* argv[]) { cdc_ft::StartServiceCommand start_service_cmd(&exit_code); start_service_cmd.Register(cli); + cdc_ft::StopServiceCommand stop_service_cmd(&exit_code); + stop_service_cmd.Register(cli); + // Parse args and run. Note that parse actually runs the commands. // exit_code is -1 if no command was run. auto result = cli.parse({argc, argv}); diff --git a/cdc_stream/start_service_command.cc b/cdc_stream/start_service_command.cc index 6452f23..8611e78 100644 --- a/cdc_stream/start_service_command.cc +++ b/cdc_stream/start_service_command.cc @@ -39,7 +39,7 @@ std::string GetLogPath(const char* log_dir, const char* log_base_name) { } // namespace StartServiceCommand::StartServiceCommand(int* exit_code) - : BaseCommand("start-service", "Start streaming service", exit_code) {} + : BaseCommand("start-service", "Start the streaming service", exit_code) {} StartServiceCommand::~StartServiceCommand() = default; void StartServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) { diff --git a/cdc_stream/stop_service_command.cc b/cdc_stream/stop_service_command.cc new file mode 100644 index 0000000..e764edc --- /dev/null +++ b/cdc_stream/stop_service_command.cc @@ -0,0 +1,70 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cdc_stream/stop_service_command.h" + +#include "absl/strings/str_format.h" +#include "cdc_stream/background_service_client.h" +#include "cdc_stream/session_management_server.h" +#include "common/log.h" +#include "grpcpp/channel.h" +#include "grpcpp/create_channel.h" +#include "grpcpp/support/channel_arguments.h" +#include "lyra/lyra.hpp" + +namespace cdc_ft { + +StopServiceCommand::StopServiceCommand(int* exit_code) + : BaseCommand("stop-service", "Stops the streaming service", exit_code) {} +StopServiceCommand::~StopServiceCommand() = default; + +void StopServiceCommand::RegisterCommandLineFlags(lyra::command& cmd) { + verbosity_ = 2; + cmd.add_argument(lyra::opt(verbosity_, "num") + .name("--verbosity") + .help("Verbosity of the log output, default: " + + std::to_string(verbosity_) + + ".Increase to make logs more verbose.")); + + service_port_ = SessionManagementServer::kDefaultServicePort; + cmd.add_argument(lyra::opt(service_port_, "port") + .name("--service-port") + .help("Local port to use while connecting to the local " + "asset stream service, default: " + + std::to_string(service_port_))); +} + +absl::Status StopServiceCommand::Run() { + LogLevel level = Log::VerbosityToLogLevel(verbosity_); + ScopedLog scoped_log(std::make_unique(level)); + + std::string client_address = absl::StrFormat("localhost:%u", service_port_); + std::shared_ptr channel = grpc::CreateCustomChannel( + client_address, grpc::InsecureChannelCredentials(), + grpc::ChannelArguments()); + + BackgroundServiceClient bg_client(channel); + absl::Status status = bg_client.Exit(); + if (status.ok()) { + LOG_INFO("Stopped streaming service"); + } else if (absl::IsUnavailable(status)) { + // Server wasn't running. This doesn't count as an error. + LOG_INFO("Streaming service already stopped"); + return absl::OkStatus(); + } + + return status; +} + +} // namespace cdc_ft diff --git a/cdc_stream/stop_service_command.h b/cdc_stream/stop_service_command.h new file mode 100644 index 0000000..1bbb131 --- /dev/null +++ b/cdc_stream/stop_service_command.h @@ -0,0 +1,44 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CDC_STREAM_STOP_SERVICE_COMMAND_H_ +#define CDC_STREAM_STOP_SERVICE_COMMAND_H_ + +#include + +#include "absl/status/status.h" +#include "cdc_stream/base_command.h" + +namespace cdc_ft { + +// Handler for the stop-service command. Stops the asset streaming service. +class StopServiceCommand : public BaseCommand { + public: + explicit StopServiceCommand(int* exit_code); + ~StopServiceCommand(); + + // BaseCommand: + void RegisterCommandLineFlags(lyra::command& cmd) override; + absl::Status Run() override; + + private: + int verbosity_ = 0; + uint16_t service_port_ = 0; +}; + +} // namespace cdc_ft + +#endif // CDC_STREAM_STOP_SERVICE_COMMAND_H_