diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 129a422..65d1a48 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -63,7 +63,6 @@ jobs: --test_output=errors --local_test_jobs=1 \ -- //... -//third_party/... -//cdc_rsync_server:file_finder_test - # The artifact collector doesn't like the fact that bazel-bin is a symlink. - name: Copy artifacts run: | mkdir artifacts @@ -152,7 +151,6 @@ jobs: //manifest/... ` //metrics/... - # The artifact collector doesn't like the fact that bazel-bin is a symlink. - name: Copy artifacts run: | mkdir artifacts diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index aafe4df..e1668d6 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -3,7 +3,7 @@ name: Lint on: push: branches: - - master + - main pull_request: jobs: diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index b45cbf7..276f9a8 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -118,7 +118,7 @@ CdcRsyncClient::CdcRsyncClient(const Options& options, port_manager_ = std::make_unique( "cdc_rsync_ports_f77bcdfe-368c-4c45-9f01-230c5e7e2132", options.forward_port_first, options.forward_port_last, &process_factory_, - remote_util_.get()); + nullptr /* never reserve remote ports */); } CdcRsyncClient::~CdcRsyncClient() { @@ -127,19 +127,15 @@ CdcRsyncClient::~CdcRsyncClient() { } absl::Status CdcRsyncClient::Run() { - // If |remote_util_| is not set, it's a local sync. Otherwise, guess the - // architecture of the device that runs cdc_rsync_server from the destination - // path, e.g. "C:\path\to\dest" strongly indicates Windows. - ServerArch server_arch = !remote_util_ - ? ServerArch::DetectFromLocalDevice() - : ServerArch::GuessFromDestination(destination_); - - int port; - ASSIGN_OR_RETURN(port, FindAvailablePort(&server_arch), - "Failed to find available port"); + // For local syncs, cdc_rsync_server runs on this machine. For remote syncs, + // guess the architecture of the device that runs cdc_rsync_server from the + // destination path, e.g. "C:\path\to\dest" strongly indicates Windows. + ServerArch server_arch = IsRemoteConnection() + ? ServerArch::GuessFromDestination(destination_) + : ServerArch::DetectFromLocalDevice(); // Start the server process. - absl::Status status = StartServer(port, server_arch); + absl::Status status = StartServer(server_arch); if (HasTag(status, Tag::kDeployServer) && server_arch.IsGuess() && server_exit_code_ != kServerExitCodeOutOfDate) { // Server couldn't be run, e.g. not found or failed to start. @@ -155,7 +151,7 @@ absl::Status CdcRsyncClient::Run() { if (server_arch.GetType() != old_type) { LOG_DEBUG("Guessed server arch type wrong, guessed %s, actual %s.", GetArchTypeStr(old_type), server_arch.GetTypeStr()); - status = StartServer(port, server_arch); + status = StartServer(server_arch); } } @@ -166,7 +162,7 @@ absl::Status CdcRsyncClient::Run() { return WrapStatus(status, "Failed to deploy server"); } - status = StartServer(port, server_arch); + status = StartServer(server_arch); } if (!status.ok()) { return WrapStatus(status, "Failed to start server"); @@ -198,47 +194,7 @@ absl::Status CdcRsyncClient::Run() { return status; } -absl::StatusOr CdcRsyncClient::FindAvailablePort(ServerArch* server_arch) { - // Find available local and remote ports for port forwarding. - // If only one port is in the given range, try that without checking. - if (options_.forward_port_first >= options_.forward_port_last) { - return options_.forward_port_first; - } - - assert(server_arch); - absl::StatusOr port = port_manager_->ReservePort( - options_.connection_timeout_sec, server_arch->GetType()); - - if (absl::IsDeadlineExceeded(port.status())) { - // Server didn't respond in time. - return SetTag(port.status(), Tag::kConnectionTimeout); - } - if (absl::IsResourceExhausted(port.status())) { - // Port in use. - return SetTag(port.status(), Tag::kAddressInUse); - } - - // If |server_arch| was guessed, calling netstat might have failed because - // the arch was wrong. Properly detect it and try again if it changed. - if (!port.ok() && server_arch->IsGuess()) { - const ArchType old_type = server_arch->GetType(); - LOG_DEBUG( - "Failed to reserve port, retrying after detecting remote arch: %s", - port.status().ToString()); - ASSIGN_OR_RETURN(*server_arch, - ServerArch::DetectFromRemoteDevice(remote_util_.get())); - assert(!server_arch->IsGuess()); - if (server_arch->GetType() != old_type) { - LOG_DEBUG("Guessed server arch type wrong, guessed %s, actual %s.", - GetArchTypeStr(old_type), server_arch->GetTypeStr()); - return FindAvailablePort(server_arch); - } - } - - return port; -} - -absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { +absl::Status CdcRsyncClient::StartServer(const ServerArch& arch) { assert(!server_process_); // Components are expected to reside in the same dir as the executable. @@ -262,20 +218,20 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { ProcessStartInfo start_info; start_info.name = "cdc_rsync_server"; - if (remote_util_) { + if (IsRemoteConnection()) { // Run cdc_rsync_server on the remote instance. - std::string remote_command = arch.GetStartServerCommand( - kExitCodeNotFound, absl::StrFormat("%i %s", port, component_args)); - start_info = remote_util_->BuildProcessStartInfoForSshPortForwardAndCommand( - port, port, /*reverse=*/false, remote_command, arch.GetType()); + std::string remote_command = + arch.GetStartServerCommand(kExitCodeNotFound, component_args); + assert(remote_util_); + start_info = remote_util_->BuildProcessStartInfoForSsh(remote_command, + arch.GetType()); } else { // Run cdc_rsync_server locally. std::string exe_dir; RETURN_IF_ERROR(path::GetExeDir(&exe_dir), "Failed to get exe directory"); std::string server_path = path::Join(exe_dir, arch.CdcServerFilename()); - start_info.command = - absl::StrFormat("%s %i %s", server_path, port, component_args); + start_info.command = absl::StrFormat("%s %s", server_path, component_args); } // Capture stdout, but forward to stdout for debugging purposes. @@ -283,8 +239,8 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { return HandleServerOutput(data); }; - std::unique_ptr process = process_factory_.Create(start_info); - status = process->Start(); + std::unique_ptr srv_process = process_factory_.Create(start_info); + status = srv_process->Start(); if (!status.ok()) { return WrapStatus(status, "Failed to start cdc_rsync_server process"); } @@ -292,13 +248,13 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { // Wait until the server process is listening. Stopwatch timeout_timer; bool is_timeout = false; - auto detect_listening_or_timeout = [is_listening = &is_server_listening_, + auto detect_listening_or_timeout = [port = &server_listen_port_, timeout = options_.connection_timeout_sec, &timeout_timer, &is_timeout]() -> bool { is_timeout = timeout_timer.ElapsedSeconds() > timeout; - return *is_listening || is_timeout; + return *port != 0 || is_timeout; }; - status = process->RunUntil(detect_listening_or_timeout); + status = srv_process->RunUntil(detect_listening_or_timeout); if (!status.ok()) { // Some internal process error. Note that this does NOT mean that // cdc_rsync_server does not exist. In that case, the ssh process exits with @@ -310,10 +266,10 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { Tag::kConnectionTimeout); } - if (process->HasExited()) { + if (srv_process->HasExited()) { // Don't re-deploy for code > kServerExitCodeOutOfDate, which means that the // out-of-date check already passed on the server. - server_exit_code_ = process->ExitCode(); + server_exit_code_ = srv_process->ExitCode(); if (server_exit_code_ > kServerExitCodeOutOfDate && server_exit_code_ <= kServerExitCodeMax) { return GetServerExitStatus(server_exit_code_, server_error_); @@ -321,7 +277,7 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { // Don't re-deploy if we're not copying to a remote device. We can start // cdc_rsync_server from the original location directly. - if (!remote_util_) { + if (!IsRemoteConnection()) { return GetServerExitStatus(server_exit_code_, server_error_); } @@ -332,19 +288,58 @@ absl::Status CdcRsyncClient::StartServer(int port, const ServerArch& arch) { return SetTag(MakeStatus("Redeploy server"), Tag::kDeployServer); } + // Now that we know which port the server is using, set up port forwarding. + std::unique_ptr fwd_process; + int local_port = server_listen_port_; + if (IsRemoteConnection()) { + absl::StatusOr local_port_or = port_manager_->ReservePort( + options_.connection_timeout_sec, arch.GetType()); + + if (absl::IsResourceExhausted(local_port_or.status())) { + // Port in use. + return SetTag(local_port_or.status(), Tag::kAddressInUse); + } + if (!local_port_or.ok()) { + return local_port_or.status(); + } + + local_port = *local_port_or; + ProcessStartInfo start_info = + remote_util_->BuildProcessStartInfoForSshPortForward( + local_port, server_listen_port_, /*reverse=*/false); + start_info.forward_output_to_log = true; + fwd_process = process_factory_.Create(start_info); + RETURN_IF_ERROR(fwd_process->Start(), + "Failed to start cdc_rsync_server process"); + } + + // Connect to the socket with port |local_port|. status = Socket::Initialize(); if (!status.ok()) { return WrapStatus(status, "Failed to initialize sockets"); } socket_finalizer_ = std::make_unique(); - assert(is_server_listening_); - status = socket_.Connect(port); - if (!status.ok()) { - return WrapStatus(status, "Failed to initialize connection"); + // Poll until the port forwarding connection is set up. + timeout_timer.Reset(); + for (;;) { + assert(local_port != 0); + status = socket_.Connect(local_port); + if (status.ok()) { + break; + } + + if (timeout_timer.ElapsedSeconds() > options_.connection_timeout_sec) { + return SetTag( + absl::DeadlineExceededError("Timeout while connecting to server"), + Tag::kConnectionTimeout); + } + + Util::Sleep(10); } - server_process_ = std::move(process); + server_process_ = std::move(srv_process); + port_forwarding_process_ = std::move(fwd_process); message_pump_.StartMessagePump(); return absl::OkStatus(); } @@ -365,6 +360,7 @@ absl::Status CdcRsyncClient::StopServer() { server_exit_code_ = server_process_->ExitCode(); server_process_.reset(); + port_forwarding_process_.reset(); return absl::OkStatus(); } @@ -396,10 +392,29 @@ absl::Status CdcRsyncClient::HandleServerOutput(const char* data) { } printer_.Print(stdout_data, false, Util::GetConsoleWidth()); - if (!is_server_listening_) { + if (server_listen_port_ == 0) { server_output_.append(stdout_data); - is_server_listening_ = - server_output_.find("Server is listening") != std::string::npos; + + // Parse port from "Port : Server is listening". + size_t listening_pos = server_output_.find("Server is listening"); + if (listening_pos != std::string::npos) { + // Search backwards until we find "Port ". + constexpr char port_key[] = "Port "; + size_t port_pos = server_output_.rfind(port_key, listening_pos); + if (port_pos == std::string::npos) { + return MakeStatus("Failed to find 'Port' marker in server output '%s'", + server_output_); + } + assert(listening_pos > port_pos); + server_listen_port_ = atoi( + server_output_ + .substr(port_pos + strlen(port_key), listening_pos - port_pos) + .c_str()); + if (server_listen_port_ == 0) { + return MakeStatus("Failed to parse port from server output '%s'", + server_output_); + } + } } return absl::OkStatus(); @@ -466,6 +481,7 @@ absl::Status CdcRsyncClient::Sync() { absl::Status CdcRsyncClient::DeployServer(const ServerArch& arch) { assert(!server_process_); assert(remote_util_); + assert(IsRemoteConnection()); std::string exe_dir; absl::Status status = path::GetExeDir(&exe_dir); diff --git a/cdc_rsync/cdc_rsync_client.h b/cdc_rsync/cdc_rsync_client.h index 0f1c415..911a87c 100644 --- a/cdc_rsync/cdc_rsync_client.h +++ b/cdc_rsync/cdc_rsync_client.h @@ -78,14 +78,9 @@ class CdcRsyncClient { absl::Status Run(); private: - // Finds available local and remote ports for port forwarding. - // May update |server_arch| by properly detecting the architecture and retry - // if the architecture was guessed, i.e. if |server_arch|->IsGuess() is true. - absl::StatusOr FindAvailablePort(ServerArch* server_arch); - // Starts the server process. If the method returns a status with tag // |kTagDeployServer|, Run() calls DeployServer() and tries again. - absl::Status StartServer(int port, const ServerArch& arch); + absl::Status StartServer(const ServerArch& arch); // Stops the server process. absl::Status StopServer(); @@ -129,6 +124,9 @@ class CdcRsyncClient { // Stops the zstd compression stream. absl::Status StopCompressionStream(); + // Returns true if the target is a remote target. + bool IsRemoteConnection() const { return remote_util_ != nullptr; } + Options options_; std::vector sources_; const std::string destination_; @@ -143,10 +141,11 @@ class CdcRsyncClient { std::unique_ptr compression_stream_; std::unique_ptr server_process_; + std::unique_ptr port_forwarding_process_; std::string server_output_; // Written in a background thread. Do not access std::string server_error_; // while the server process is active. int server_exit_code_ = 0; - std::atomic_bool is_server_listening_{false}; + std::atomic_int server_listen_port_{0}; bool is_server_error_ = false; // All source files found on the client. diff --git a/cdc_rsync/client_socket.cc b/cdc_rsync/client_socket.cc index 873d3f4..ebe0ebc 100644 --- a/cdc_rsync/client_socket.cc +++ b/cdc_rsync/client_socket.cc @@ -68,11 +68,10 @@ absl::Status ClientSocket::Connect(int port) { int count = 0; for (addrinfo* curr = addr_infos; curr; curr = curr->ai_next, count++) { socket_info_->socket = - socket(addr_infos->ai_family, addr_infos->ai_socktype, - addr_infos->ai_protocol); + socket(curr->ai_family, curr->ai_socktype, curr->ai_protocol); if (socket_info_->socket == INVALID_SOCKET) { LOG_DEBUG("socket() failed for addr_info %i: %s", count, - Util::GetWin32Error(WSAGetLastError()).c_str()); + Util::GetWin32Error(WSAGetLastError())); continue; } @@ -80,7 +79,8 @@ absl::Status ClientSocket::Connect(int port) { result = connect(socket_info_->socket, curr->ai_addr, static_cast(curr->ai_addrlen)); if (result == SOCKET_ERROR) { - LOG_DEBUG("connect() failed for addr_info %i: %i", count, result); + LOG_DEBUG("connect() failed for addr_info %i: %s", count, + Util::GetWin32Error(WSAGetLastError())); closesocket(socket_info_->socket); socket_info_->socket = INVALID_SOCKET; continue; diff --git a/cdc_rsync_server/cdc_rsync_server.cc b/cdc_rsync_server/cdc_rsync_server.cc index 922b12a..a0212df 100644 --- a/cdc_rsync_server/cdc_rsync_server.cc +++ b/cdc_rsync_server/cdc_rsync_server.cc @@ -288,7 +288,7 @@ bool CdcRsyncServer::CheckComponents( return true; } -absl::Status CdcRsyncServer::Run(int port) { +absl::Status CdcRsyncServer::Run() { absl::Status status = Socket::Initialize(); if (!status.ok()) { return WrapStatus(status, "Failed to initialize sockets"); @@ -296,15 +296,15 @@ absl::Status CdcRsyncServer::Run(int port) { socket_finalizer_ = std::make_unique(); socket_ = std::make_unique(); - int new_port; - ASSIGN_OR_RETURN(new_port, socket_->StartListening(port), - "Failed to start listening on port %i", port); - assert(port != 0); - assert(port == new_port); + int port; + ASSIGN_OR_RETURN(port, socket_->StartListening(0), + "Failed to start listening for connections"); LOG_INFO("cdc_rsync_server listening on port %i", port); // This is the marker for the client, so it knows it can connect. - printf("Server is listening\n"); + // Print port first so the client can easily parse it when it sees "Server is + // listening" without dealing with half-transmitted data. + printf("Port %i: Server is listening\n", port); fflush(stdout); status = socket_->WaitForConnection(); @@ -607,7 +607,7 @@ absl::Status CdcRsyncServer::CreateMissingDirs() { template absl::Status CdcRsyncServer::SendFileIndices(const char* file_type, const std::vector& files) { - LOG_INFO("Sending indices of missing files to client"); + LOG_INFO("Sending indices of %s files to client", file_type); constexpr char error_fmt[] = "Failed to send indices of %s files."; AddFileIndicesResponse response; diff --git a/cdc_rsync_server/cdc_rsync_server.h b/cdc_rsync_server/cdc_rsync_server.h index 59c66fd..103eb7c 100644 --- a/cdc_rsync_server/cdc_rsync_server.h +++ b/cdc_rsync_server/cdc_rsync_server.h @@ -43,9 +43,10 @@ class CdcRsyncServer { // up-to-date by checking their sizes and timestamps. bool CheckComponents(const std::vector& components); - // Listens to |port|, accepts a connection from the client and runs the rsync - // procedure. - absl::Status Run(int port); + // Listens to any available port, accepts a connection from the client and + // runs the rsync procedure. Prints "Port : Server is listening" to stdout, + // so the client can retrieve the selected port. + absl::Status Run(); // Returns the verbosity sent from the client. 0 by default. int GetVerbosity() const { return verbosity_; } diff --git a/cdc_rsync_server/main.cc b/cdc_rsync_server/main.cc index 2ef0eef..204a9bf 100644 --- a/cdc_rsync_server/main.cc +++ b/cdc_rsync_server/main.cc @@ -62,29 +62,22 @@ ServerExitCode GetExitCode(const absl::Status& status) { int main(int argc, const char** argv) { if (argc < 5) { - printf("cdc_rsync_server - Remote component of cdc_rsync. Version: %s\n\n", - BUILD_VERSION); - printf( - "Usage: cdc_rsync_server cdc_rsync_server " - "