From 26ff93489e86384097a3064edfedb4b4c32570e8 Mon Sep 17 00:00:00 2001 From: Lutz Justen Date: Sat, 8 Apr 2023 20:19:01 +0200 Subject: [PATCH] [cdc_rsync] Use ephemeral port on client (#96) Instead of calling netstat locally to find out available ports in a tight range, call bind() with port zero to find an available ephemeral port. This is faster and much simpler, and will eventually help getting rid of PortManager. Also fixes issues with running SSH commands on Windows when the remote shell is Powershell (aka Backslash Bingo). --- cdc_rsync/BUILD | 2 +- cdc_rsync/cdc_rsync_client.cc | 35 ++++----------- cdc_rsync/cdc_rsync_client.h | 4 -- cdc_rsync/params.cc | 14 ++---- cdc_rsync/params_test.cc | 34 -------------- cdc_rsync/server_arch.cc | 8 ++-- cdc_rsync_server/cdc_rsync_server.cc | 13 ++---- common/server_socket.cc | 10 ++++- common/server_socket.h | 9 ++++ .../cdc_rsync/connection_test.py | 44 ++----------------- 10 files changed, 39 insertions(+), 134 deletions(-) diff --git a/cdc_rsync/BUILD b/cdc_rsync/BUILD index f2f9ece..33e65a2 100644 --- a/cdc_rsync/BUILD +++ b/cdc_rsync/BUILD @@ -71,9 +71,9 @@ cc_library( "//common:path", "//common:path_filter", "//common:platform", - "//common:port_manager", "//common:process", "//common:remote_util", + "//common:server_socket", "//common:socket", "//common:status", "//common:status_macros", diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index 1ff5fdb..56ef71a 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -30,9 +30,9 @@ #include "common/gamelet_component.h" #include "common/log.h" #include "common/path.h" -#include "common/port_manager.h" #include "common/process.h" #include "common/remote_util.h" +#include "common/server_socket.h" #include "common/status.h" #include "common/status_macros.h" #include "common/stopwatch.h" @@ -113,12 +113,6 @@ CdcRsyncClient::CdcRsyncClient(const Options& options, remote_util_->SetSftpCommand(options_.sftp_command); } } - - // Note that remote_util_.get() may be null. - port_manager_ = std::make_unique( - "cdc_rsync_ports_f77bcdfe-368c-4c45-9f01-230c5e7e2132", - options.forward_port_first, options.forward_port_last, &process_factory_, - nullptr /* never reserve remote ports */); } CdcRsyncClient::~CdcRsyncClient() { @@ -288,22 +282,15 @@ absl::Status CdcRsyncClient::StartServer(const ServerArch& arch) { return SetTag(MakeStatus("Redeploy server"), Tag::kDeployServer); } + // Start up sockets. + RETURN_IF_ERROR(Socket::Initialize(), "Failed to initialize sockets"); + socket_finalizer_ = std::make_unique(); + // Now that we know which port the server is using, set up port forwarding. std::unique_ptr fwd_process; int local_port = server_listen_port_; if (IsRemoteConnection()) { - absl::StatusOr local_port_or = port_manager_->ReservePort( - options_.connection_timeout_sec, arch.GetType()); - - if (absl::IsResourceExhausted(local_port_or.status())) { - // Port in use. - return SetTag(local_port_or.status(), Tag::kAddressInUse); - } - if (!local_port_or.ok()) { - return local_port_or.status(); - } - - local_port = *local_port_or; + ASSIGN_OR_RETURN(local_port, ServerSocket::FindAvailablePort()); ProcessStartInfo start_info = remote_util_->BuildProcessStartInfoForSshPortForward( local_port, server_listen_port_, /*reverse=*/false); @@ -313,14 +300,8 @@ absl::Status CdcRsyncClient::StartServer(const ServerArch& arch) { "Failed to start cdc_rsync_server process"); } - // Connect to the socket with port |local_port|. - status = Socket::Initialize(); - if (!status.ok()) { - return WrapStatus(status, "Failed to initialize sockets"); - } - socket_finalizer_ = std::make_unique(); - - // Poll until the port forwarding connection is set up. + // Poll the connection to the socket with port |local_port| until the port + // forwarding connection is set up. timeout_timer.Reset(); for (;;) { assert(local_port != 0); diff --git a/cdc_rsync/cdc_rsync_client.h b/cdc_rsync/cdc_rsync_client.h index cd80c83..8ac42a0 100644 --- a/cdc_rsync/cdc_rsync_client.h +++ b/cdc_rsync/cdc_rsync_client.h @@ -30,7 +30,6 @@ namespace cdc_ft { -class PortManager; class Process; class RemoteUtil; class ServerArch; @@ -53,8 +52,6 @@ class CdcRsyncClient { std::string copy_dest; int compress_level = 6; int connection_timeout_sec = 10; - int forward_port_first = 44450; - int forward_port_last = 44459; std::string ssh_command; std::string sftp_command; std::string sources_dir; // Base dir for files loaded for --files-from. @@ -132,7 +129,6 @@ class CdcRsyncClient { const std::string destination_; WinProcessFactory process_factory_; std::unique_ptr remote_util_; - std::unique_ptr port_manager_; std::unique_ptr socket_finalizer_; ClientSocket socket_; MessagePump message_pump_{&socket_, MessagePump::PacketReceivedDelegate()}; diff --git a/cdc_rsync/params.cc b/cdc_rsync/params.cc index 8f92d31..7633098 100644 --- a/cdc_rsync/params.cc +++ b/cdc_rsync/params.cc @@ -80,8 +80,6 @@ Options: --sftp-command Path and arguments of sftp command to use, e.g. "C:\path\to\sftp.exe -P 12345 -i id_rsa -oUserKnownHostsFile=known_hosts" Can also be specified by the CDC_SFTP_COMMAND environment variable. - --forward-port TCP port or range used for SSH port forwarding (default: 44450-44459). - If a range is specified, searches for available ports (slower). -h, --help Help for cdc_rsync )"; @@ -305,15 +303,9 @@ OptionResult HandleParameter(const std::string& key, const char* value, } if (key == "forward-port") { - if (!ValidateValue(key, value)) return OptionResult::kError; - uint16_t first, last; - if (!port_range::Parse(value, &first, &last)) { - PrintError("Failed to parse %s=%s, expected or -", - key, value); - return OptionResult::kError; - } - params->options.forward_port_first = first; - params->options.forward_port_last = last; + // This param is no longer needed. Just print a warning for backwards + // compatibility. + std::cout << "--forward-port argument no longer needed" << std::endl; return OptionResult::kConsumedKeyValue; } diff --git a/cdc_rsync/params_test.cc b/cdc_rsync/params_test.cc index 3b90e6b..1436e8a 100644 --- a/cdc_rsync/params_test.cc +++ b/cdc_rsync/params_test.cc @@ -585,40 +585,6 @@ TEST_F(ParamsTest, IncludeExcludeMixed_ProperOrder) { ExpectNoError(); } -TEST_F(ParamsTest, ForwardPort_Single) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=65535", kSrc, - kUserHostDst, NULL}; - EXPECT_TRUE(Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - EXPECT_EQ(parameters_.options.forward_port_first, 65535); - EXPECT_EQ(parameters_.options.forward_port_last, 65535); - ExpectNoError(); -} - -TEST_F(ParamsTest, ForwardPort_Range) { - const char* argv[] = { - "cdc_rsync.exe", "--forward-port", "1-2", kSrc, kUserHostDst, NULL}; - EXPECT_TRUE(Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - EXPECT_EQ(parameters_.options.forward_port_first, 1); - EXPECT_EQ(parameters_.options.forward_port_last, 2); - ExpectNoError(); -} - -TEST_F(ParamsTest, ForwardPort_NoValue) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=", kSrc, kUserHostDst, - NULL}; - EXPECT_FALSE( - Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - ExpectError(NeedsValueError("forward-port")); -} - -TEST_F(ParamsTest, ForwardPort_BadValueTooSmall) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=0", kSrc, kUserHostDst, - NULL}; - EXPECT_FALSE( - Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - ExpectError("Failed to parse"); -} - } // namespace } // namespace params } // namespace cdc_ft diff --git a/cdc_rsync/server_arch.cc b/cdc_rsync/server_arch.cc index 27709a7..926d0e3 100644 --- a/cdc_rsync/server_arch.cc +++ b/cdc_rsync/server_arch.cc @@ -147,8 +147,7 @@ absl::StatusOr ServerArch::DetectFromRemoteDevice( // Note: That space after PROCESSOR_ARCHITECTURE is important or else Windows // command magic interprets quotes as part of the string. std::string arch_out; - std::string windows_cmd = - RemoteUtil::QuoteForSsh("cmd /C set PROCESSOR_ARCHITECTURE "); + std::string windows_cmd = "\"cmd /C set PROCESSOR_ARCHITECTURE \""; status = remote_util->RunWithCapture(windows_cmd, "set PROCESSOR_ARCHITECTURE", &arch_out, nullptr, ArchType::kWindows_x86_64); @@ -223,14 +222,13 @@ std::string ServerArch::GetStartServerCommand(int exit_code_not_found, // a minor issue and means we display "Deploying server..." instead of // "Server not deployed. Deploying..."; return RemoteUtil::QuoteForWindows( - absl::StrFormat("powershell -Command \" " + absl::StrFormat("powershell -Command " "Set-StrictMode -Version 2; " "$ErrorActionPreference = 'Stop'; " "if (-not (Test-Path -Path '%s')) { " " exit %i; " "} " - "%s %s " - "\"", + "%s %s", server_path, exit_code_not_found, server_path, args)); } diff --git a/cdc_rsync_server/cdc_rsync_server.cc b/cdc_rsync_server/cdc_rsync_server.cc index 80e558c..2f7e255 100644 --- a/cdc_rsync_server/cdc_rsync_server.cc +++ b/cdc_rsync_server/cdc_rsync_server.cc @@ -289,10 +289,7 @@ bool CdcRsyncServer::CheckComponents( } absl::Status CdcRsyncServer::Run() { - absl::Status status = Socket::Initialize(); - if (!status.ok()) { - return WrapStatus(status, "Failed to initialize sockets"); - } + RETURN_IF_ERROR(Socket::Initialize(), "Failed to initialize sockets"); socket_finalizer_ = std::make_unique(); socket_ = std::make_unique(); @@ -307,10 +304,8 @@ absl::Status CdcRsyncServer::Run() { printf("Port %i: Server is listening\n", port); fflush(stdout); - status = socket_->WaitForConnection(); - if (!status.ok()) { - return WrapStatus(status, "Failed to establish a connection"); - } + RETURN_IF_ERROR(socket_->WaitForConnection(), + "Failed to establish a connection"); message_pump_ = std::make_unique( socket_.get(), @@ -318,7 +313,7 @@ absl::Status CdcRsyncServer::Run() { message_pump_->StartMessagePump(); LOG_INFO("Client connected. Starting to sync."); - status = Sync(); + absl::Status status = Sync(); if (!status.ok()) { socket_->ShutdownSendingEnd().IgnoreError(); return status; diff --git a/common/server_socket.cc b/common/server_socket.cc index 78c9c9d..940a68d 100644 --- a/common/server_socket.cc +++ b/common/server_socket.cc @@ -125,6 +125,12 @@ ServerSocket::~ServerSocket() { StopListening(); } +// static +absl::StatusOr ServerSocket::FindAvailablePort() { + ServerSocket socket; + return socket.StartListening(0); +} + absl::StatusOr ServerSocket::StartListening(int port) { if (socket_info_->listen_sock != kInvalidSocket) { return MakeStatus("Already listening"); @@ -246,7 +252,7 @@ absl::StatusOr ServerSocket::StartListeningInternal(int port, void ServerSocket::StopListening() { Close(&socket_info_->listen_sock); - LOG_INFO("Stopped listening."); + LOG_DEBUG("Stopped listening."); } absl::Status ServerSocket::WaitForConnection() { @@ -268,7 +274,7 @@ absl::Status ServerSocket::WaitForConnection() { void ServerSocket::Disconnect() { Close(&socket_info_->conn_sock); - LOG_INFO("Disconnected"); + LOG_DEBUG("Disconnected"); } absl::Status ServerSocket::ShutdownSendingEnd() { diff --git a/common/server_socket.h b/common/server_socket.h index 68fe8f1..329d1cf 100644 --- a/common/server_socket.h +++ b/common/server_socket.h @@ -30,6 +30,15 @@ class ServerSocket : public Socket { ServerSocket(); ~ServerSocket(); + // Returns an available ephemeral port that can be used as a listening port. + // Note that calling this function, followed by StartListening() or similar, + // is slightly racy as another process might use the port in the meantime. + // However, the OS usually returns ephemeral ports in a round-robin manner, + // and ports remain in TIME_WAIT state for a while, which may block other apps + // from reusing the port. Hence, the chances of races are small. Nevertheless, + // consider calling StartListening() with zero |port| if possible. + static absl::StatusOr FindAvailablePort(); + // Starts listening for connections on |port|. // Passing 0 as port will bind to any available port. // Returns the port that was bound to. diff --git a/integration_tests/cdc_rsync/connection_test.py b/integration_tests/cdc_rsync/connection_test.py index 4cc44e5..70991c3 100644 --- a/integration_tests/cdc_rsync/connection_test.py +++ b/integration_tests/cdc_rsync/connection_test.py @@ -27,37 +27,19 @@ RETURN_CODE_GENERIC_ERROR = 1 RETURN_CODE_CONNECTION_TIMEOUT = 2 RETURN_CODE_ADDRESS_IN_USE = 4 -FIRST_PORT = 44450 -LAST_PORT = 44459 - class ConnectionTest(test_base.CdcRsyncTest): """cdc_rsync connection test class.""" - def test_valid_instance(self): - """Runs rsync with --instance option for a valid id. - - 1) Uploads a file with --instance option instead of --ip --port. - 2) Checks the file exists on the used instance. - """ - utils.create_test_file(self.local_data_path, 1024) - res = utils.run_rsync(self.local_data_path, self.remote_base_dir) - self._assert_rsync_success(res) - self.assertTrue(utils.does_file_exist_remotely(self.remote_data_path)) - def test_invalid_instance(self): - """Runs rsync with --instance option for an invalid id. - - 1) Uploads a file with --instance option for a non-existing id. - 2) Checks the error message. - """ + """Runs rsync with an invalid host""" bad_host = 'bad_host' utils.create_test_file(self.local_data_path, 1024) res = utils.run_rsync(self.local_data_path, bad_host + ":" + self.remote_base_dir) self.assertEqual(res.returncode, RETURN_CODE_GENERIC_ERROR) - self.assertIn('Failed to find available ports', str(res.stderr)) + self.assertIn('Failed to detect remote architecture', str(res.stderr)) def test_contimeout(self): """Runs rsync with --contimeout option for an invalid ip. @@ -89,7 +71,7 @@ class ConnectionTest(test_base.CdcRsyncTest): def test_multiple_instances(self): """Runs multiple instances of rsync at the same time.""" - num_instances = LAST_PORT - FIRST_PORT + 1 + num_instances = 10 local_data_paths = [] for n in range(num_instances): @@ -106,26 +88,6 @@ class ConnectionTest(test_base.CdcRsyncTest): for r in res: self._assert_rsync_success(r.result()) - def test_address_in_use(self): - """Blocks all ports and checks that rsync fails with the expected error.""" - sockets = [] - try: - # Occupy all ports. - for port in range(FIRST_PORT, LAST_PORT + 1): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sockets.append(s) - s.bind(('127.0.0.1', port)) - s.listen() - - # rsync shouldn't be able to find an available port now. - utils.create_test_file(self.local_data_path, 1024) - res = utils.run_rsync(self.local_data_path, self.remote_base_dir) - self.assertIn('All ports are already in use', str(res.stderr)) - - finally: - for s in sockets: - s.close() - if __name__ == '__main__': test_base.test_base.main()