diff --git a/cdc_rsync/BUILD b/cdc_rsync/BUILD index f2f9ece..33e65a2 100644 --- a/cdc_rsync/BUILD +++ b/cdc_rsync/BUILD @@ -71,9 +71,9 @@ cc_library( "//common:path", "//common:path_filter", "//common:platform", - "//common:port_manager", "//common:process", "//common:remote_util", + "//common:server_socket", "//common:socket", "//common:status", "//common:status_macros", diff --git a/cdc_rsync/cdc_rsync_client.cc b/cdc_rsync/cdc_rsync_client.cc index 1ff5fdb..56ef71a 100644 --- a/cdc_rsync/cdc_rsync_client.cc +++ b/cdc_rsync/cdc_rsync_client.cc @@ -30,9 +30,9 @@ #include "common/gamelet_component.h" #include "common/log.h" #include "common/path.h" -#include "common/port_manager.h" #include "common/process.h" #include "common/remote_util.h" +#include "common/server_socket.h" #include "common/status.h" #include "common/status_macros.h" #include "common/stopwatch.h" @@ -113,12 +113,6 @@ CdcRsyncClient::CdcRsyncClient(const Options& options, remote_util_->SetSftpCommand(options_.sftp_command); } } - - // Note that remote_util_.get() may be null. - port_manager_ = std::make_unique( - "cdc_rsync_ports_f77bcdfe-368c-4c45-9f01-230c5e7e2132", - options.forward_port_first, options.forward_port_last, &process_factory_, - nullptr /* never reserve remote ports */); } CdcRsyncClient::~CdcRsyncClient() { @@ -288,22 +282,15 @@ absl::Status CdcRsyncClient::StartServer(const ServerArch& arch) { return SetTag(MakeStatus("Redeploy server"), Tag::kDeployServer); } + // Start up sockets. + RETURN_IF_ERROR(Socket::Initialize(), "Failed to initialize sockets"); + socket_finalizer_ = std::make_unique(); + // Now that we know which port the server is using, set up port forwarding. std::unique_ptr fwd_process; int local_port = server_listen_port_; if (IsRemoteConnection()) { - absl::StatusOr local_port_or = port_manager_->ReservePort( - options_.connection_timeout_sec, arch.GetType()); - - if (absl::IsResourceExhausted(local_port_or.status())) { - // Port in use. - return SetTag(local_port_or.status(), Tag::kAddressInUse); - } - if (!local_port_or.ok()) { - return local_port_or.status(); - } - - local_port = *local_port_or; + ASSIGN_OR_RETURN(local_port, ServerSocket::FindAvailablePort()); ProcessStartInfo start_info = remote_util_->BuildProcessStartInfoForSshPortForward( local_port, server_listen_port_, /*reverse=*/false); @@ -313,14 +300,8 @@ absl::Status CdcRsyncClient::StartServer(const ServerArch& arch) { "Failed to start cdc_rsync_server process"); } - // Connect to the socket with port |local_port|. - status = Socket::Initialize(); - if (!status.ok()) { - return WrapStatus(status, "Failed to initialize sockets"); - } - socket_finalizer_ = std::make_unique(); - - // Poll until the port forwarding connection is set up. + // Poll the connection to the socket with port |local_port| until the port + // forwarding connection is set up. timeout_timer.Reset(); for (;;) { assert(local_port != 0); diff --git a/cdc_rsync/cdc_rsync_client.h b/cdc_rsync/cdc_rsync_client.h index cd80c83..8ac42a0 100644 --- a/cdc_rsync/cdc_rsync_client.h +++ b/cdc_rsync/cdc_rsync_client.h @@ -30,7 +30,6 @@ namespace cdc_ft { -class PortManager; class Process; class RemoteUtil; class ServerArch; @@ -53,8 +52,6 @@ class CdcRsyncClient { std::string copy_dest; int compress_level = 6; int connection_timeout_sec = 10; - int forward_port_first = 44450; - int forward_port_last = 44459; std::string ssh_command; std::string sftp_command; std::string sources_dir; // Base dir for files loaded for --files-from. @@ -132,7 +129,6 @@ class CdcRsyncClient { const std::string destination_; WinProcessFactory process_factory_; std::unique_ptr remote_util_; - std::unique_ptr port_manager_; std::unique_ptr socket_finalizer_; ClientSocket socket_; MessagePump message_pump_{&socket_, MessagePump::PacketReceivedDelegate()}; diff --git a/cdc_rsync/params.cc b/cdc_rsync/params.cc index 8f92d31..7633098 100644 --- a/cdc_rsync/params.cc +++ b/cdc_rsync/params.cc @@ -80,8 +80,6 @@ Options: --sftp-command Path and arguments of sftp command to use, e.g. "C:\path\to\sftp.exe -P 12345 -i id_rsa -oUserKnownHostsFile=known_hosts" Can also be specified by the CDC_SFTP_COMMAND environment variable. - --forward-port TCP port or range used for SSH port forwarding (default: 44450-44459). - If a range is specified, searches for available ports (slower). -h, --help Help for cdc_rsync )"; @@ -305,15 +303,9 @@ OptionResult HandleParameter(const std::string& key, const char* value, } if (key == "forward-port") { - if (!ValidateValue(key, value)) return OptionResult::kError; - uint16_t first, last; - if (!port_range::Parse(value, &first, &last)) { - PrintError("Failed to parse %s=%s, expected or -", - key, value); - return OptionResult::kError; - } - params->options.forward_port_first = first; - params->options.forward_port_last = last; + // This param is no longer needed. Just print a warning for backwards + // compatibility. + std::cout << "--forward-port argument no longer needed" << std::endl; return OptionResult::kConsumedKeyValue; } diff --git a/cdc_rsync/params_test.cc b/cdc_rsync/params_test.cc index 3b90e6b..1436e8a 100644 --- a/cdc_rsync/params_test.cc +++ b/cdc_rsync/params_test.cc @@ -585,40 +585,6 @@ TEST_F(ParamsTest, IncludeExcludeMixed_ProperOrder) { ExpectNoError(); } -TEST_F(ParamsTest, ForwardPort_Single) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=65535", kSrc, - kUserHostDst, NULL}; - EXPECT_TRUE(Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - EXPECT_EQ(parameters_.options.forward_port_first, 65535); - EXPECT_EQ(parameters_.options.forward_port_last, 65535); - ExpectNoError(); -} - -TEST_F(ParamsTest, ForwardPort_Range) { - const char* argv[] = { - "cdc_rsync.exe", "--forward-port", "1-2", kSrc, kUserHostDst, NULL}; - EXPECT_TRUE(Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - EXPECT_EQ(parameters_.options.forward_port_first, 1); - EXPECT_EQ(parameters_.options.forward_port_last, 2); - ExpectNoError(); -} - -TEST_F(ParamsTest, ForwardPort_NoValue) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=", kSrc, kUserHostDst, - NULL}; - EXPECT_FALSE( - Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - ExpectError(NeedsValueError("forward-port")); -} - -TEST_F(ParamsTest, ForwardPort_BadValueTooSmall) { - const char* argv[] = {"cdc_rsync.exe", "--forward-port=0", kSrc, kUserHostDst, - NULL}; - EXPECT_FALSE( - Parse(static_cast(std::size(argv)) - 1, argv, ¶meters_)); - ExpectError("Failed to parse"); -} - } // namespace } // namespace params } // namespace cdc_ft diff --git a/cdc_rsync/server_arch.cc b/cdc_rsync/server_arch.cc index 27709a7..926d0e3 100644 --- a/cdc_rsync/server_arch.cc +++ b/cdc_rsync/server_arch.cc @@ -147,8 +147,7 @@ absl::StatusOr ServerArch::DetectFromRemoteDevice( // Note: That space after PROCESSOR_ARCHITECTURE is important or else Windows // command magic interprets quotes as part of the string. std::string arch_out; - std::string windows_cmd = - RemoteUtil::QuoteForSsh("cmd /C set PROCESSOR_ARCHITECTURE "); + std::string windows_cmd = "\"cmd /C set PROCESSOR_ARCHITECTURE \""; status = remote_util->RunWithCapture(windows_cmd, "set PROCESSOR_ARCHITECTURE", &arch_out, nullptr, ArchType::kWindows_x86_64); @@ -223,14 +222,13 @@ std::string ServerArch::GetStartServerCommand(int exit_code_not_found, // a minor issue and means we display "Deploying server..." instead of // "Server not deployed. Deploying..."; return RemoteUtil::QuoteForWindows( - absl::StrFormat("powershell -Command \" " + absl::StrFormat("powershell -Command " "Set-StrictMode -Version 2; " "$ErrorActionPreference = 'Stop'; " "if (-not (Test-Path -Path '%s')) { " " exit %i; " "} " - "%s %s " - "\"", + "%s %s", server_path, exit_code_not_found, server_path, args)); } diff --git a/cdc_rsync_server/cdc_rsync_server.cc b/cdc_rsync_server/cdc_rsync_server.cc index 80e558c..2f7e255 100644 --- a/cdc_rsync_server/cdc_rsync_server.cc +++ b/cdc_rsync_server/cdc_rsync_server.cc @@ -289,10 +289,7 @@ bool CdcRsyncServer::CheckComponents( } absl::Status CdcRsyncServer::Run() { - absl::Status status = Socket::Initialize(); - if (!status.ok()) { - return WrapStatus(status, "Failed to initialize sockets"); - } + RETURN_IF_ERROR(Socket::Initialize(), "Failed to initialize sockets"); socket_finalizer_ = std::make_unique(); socket_ = std::make_unique(); @@ -307,10 +304,8 @@ absl::Status CdcRsyncServer::Run() { printf("Port %i: Server is listening\n", port); fflush(stdout); - status = socket_->WaitForConnection(); - if (!status.ok()) { - return WrapStatus(status, "Failed to establish a connection"); - } + RETURN_IF_ERROR(socket_->WaitForConnection(), + "Failed to establish a connection"); message_pump_ = std::make_unique( socket_.get(), @@ -318,7 +313,7 @@ absl::Status CdcRsyncServer::Run() { message_pump_->StartMessagePump(); LOG_INFO("Client connected. Starting to sync."); - status = Sync(); + absl::Status status = Sync(); if (!status.ok()) { socket_->ShutdownSendingEnd().IgnoreError(); return status; diff --git a/common/server_socket.cc b/common/server_socket.cc index 78c9c9d..940a68d 100644 --- a/common/server_socket.cc +++ b/common/server_socket.cc @@ -125,6 +125,12 @@ ServerSocket::~ServerSocket() { StopListening(); } +// static +absl::StatusOr ServerSocket::FindAvailablePort() { + ServerSocket socket; + return socket.StartListening(0); +} + absl::StatusOr ServerSocket::StartListening(int port) { if (socket_info_->listen_sock != kInvalidSocket) { return MakeStatus("Already listening"); @@ -246,7 +252,7 @@ absl::StatusOr ServerSocket::StartListeningInternal(int port, void ServerSocket::StopListening() { Close(&socket_info_->listen_sock); - LOG_INFO("Stopped listening."); + LOG_DEBUG("Stopped listening."); } absl::Status ServerSocket::WaitForConnection() { @@ -268,7 +274,7 @@ absl::Status ServerSocket::WaitForConnection() { void ServerSocket::Disconnect() { Close(&socket_info_->conn_sock); - LOG_INFO("Disconnected"); + LOG_DEBUG("Disconnected"); } absl::Status ServerSocket::ShutdownSendingEnd() { diff --git a/common/server_socket.h b/common/server_socket.h index 68fe8f1..329d1cf 100644 --- a/common/server_socket.h +++ b/common/server_socket.h @@ -30,6 +30,15 @@ class ServerSocket : public Socket { ServerSocket(); ~ServerSocket(); + // Returns an available ephemeral port that can be used as a listening port. + // Note that calling this function, followed by StartListening() or similar, + // is slightly racy as another process might use the port in the meantime. + // However, the OS usually returns ephemeral ports in a round-robin manner, + // and ports remain in TIME_WAIT state for a while, which may block other apps + // from reusing the port. Hence, the chances of races are small. Nevertheless, + // consider calling StartListening() with zero |port| if possible. + static absl::StatusOr FindAvailablePort(); + // Starts listening for connections on |port|. // Passing 0 as port will bind to any available port. // Returns the port that was bound to. diff --git a/integration_tests/cdc_rsync/connection_test.py b/integration_tests/cdc_rsync/connection_test.py index 4cc44e5..70991c3 100644 --- a/integration_tests/cdc_rsync/connection_test.py +++ b/integration_tests/cdc_rsync/connection_test.py @@ -27,37 +27,19 @@ RETURN_CODE_GENERIC_ERROR = 1 RETURN_CODE_CONNECTION_TIMEOUT = 2 RETURN_CODE_ADDRESS_IN_USE = 4 -FIRST_PORT = 44450 -LAST_PORT = 44459 - class ConnectionTest(test_base.CdcRsyncTest): """cdc_rsync connection test class.""" - def test_valid_instance(self): - """Runs rsync with --instance option for a valid id. - - 1) Uploads a file with --instance option instead of --ip --port. - 2) Checks the file exists on the used instance. - """ - utils.create_test_file(self.local_data_path, 1024) - res = utils.run_rsync(self.local_data_path, self.remote_base_dir) - self._assert_rsync_success(res) - self.assertTrue(utils.does_file_exist_remotely(self.remote_data_path)) - def test_invalid_instance(self): - """Runs rsync with --instance option for an invalid id. - - 1) Uploads a file with --instance option for a non-existing id. - 2) Checks the error message. - """ + """Runs rsync with an invalid host""" bad_host = 'bad_host' utils.create_test_file(self.local_data_path, 1024) res = utils.run_rsync(self.local_data_path, bad_host + ":" + self.remote_base_dir) self.assertEqual(res.returncode, RETURN_CODE_GENERIC_ERROR) - self.assertIn('Failed to find available ports', str(res.stderr)) + self.assertIn('Failed to detect remote architecture', str(res.stderr)) def test_contimeout(self): """Runs rsync with --contimeout option for an invalid ip. @@ -89,7 +71,7 @@ class ConnectionTest(test_base.CdcRsyncTest): def test_multiple_instances(self): """Runs multiple instances of rsync at the same time.""" - num_instances = LAST_PORT - FIRST_PORT + 1 + num_instances = 10 local_data_paths = [] for n in range(num_instances): @@ -106,26 +88,6 @@ class ConnectionTest(test_base.CdcRsyncTest): for r in res: self._assert_rsync_success(r.result()) - def test_address_in_use(self): - """Blocks all ports and checks that rsync fails with the expected error.""" - sockets = [] - try: - # Occupy all ports. - for port in range(FIRST_PORT, LAST_PORT + 1): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sockets.append(s) - s.bind(('127.0.0.1', port)) - s.listen() - - # rsync shouldn't be able to find an available port now. - utils.create_test_file(self.local_data_path, 1024) - res = utils.run_rsync(self.local_data_path, self.remote_base_dir) - self.assertIn('All ports are already in use', str(res.stderr)) - - finally: - for s in sockets: - s.close() - if __name__ == '__main__': test_base.test_base.main()