diff --git a/cdc_fuse_fs/BUILD b/cdc_fuse_fs/BUILD index 1b2d5cf..906e8ef 100644 --- a/cdc_fuse_fs/BUILD +++ b/cdc_fuse_fs/BUILD @@ -9,8 +9,10 @@ cc_binary( ":cdc_fuse_fs_lib", ":constants", "//absl_helper:jedec_size_flag", + "//common:client_socket", "//common:gamelet_component", "//common:log", + "//common:server_socket", "//data_store:data_provider", "//data_store:disk_data_store", "//data_store:grpc_reader", diff --git a/cdc_fuse_fs/constants.h b/cdc_fuse_fs/constants.h index b52d873..7bf4999 100644 --- a/cdc_fuse_fs/constants.h +++ b/cdc_fuse_fs/constants.h @@ -19,15 +19,22 @@ namespace cdc_ft { -// FUSE prints this to stdout when the binary timestamp and file size match the -// file on the workstation. -static constexpr char kFuseUpToDate[] = "cdc_fuse_fs is up-to-date"; +// FUSE prints +// Port 12345 cdc_fuse_fs is up-to-date +// to stdout when its version matches the version (=build version or +// size/timestamp for DEV builds) on the local device. The port is the gRPC port +// that FUSE will try to connect to. +static constexpr char kFusePortPrefix[] = "Port "; +static constexpr char kFuseUpToDate[] = " cdc_fuse_fs is up-to-date"; -// FUSE prints this to stdout when the binary timestamp or file size does not -// match the file on the workstation. It indicates that the binary has to be -// redeployed. +// FUSE prints this to stdout when its version does not match the version on the +// local device. It indicates that the binary has to be redeployed. static constexpr char kFuseNotUpToDate[] = "cdc_fuse_fs is not up-to-date"; +// FUSE prints this to stdout when it can connect to its port. This means that +// port forwarding has finished setting up, and startup is finished. +static constexpr char kFuseConnected[] = "cdc_fuse_fs is connected"; + } // namespace cdc_ft #endif // CDC_FUSE_FS_CONSTANTS_H_ diff --git a/cdc_fuse_fs/main.cc b/cdc_fuse_fs/main.cc index ef23e67..a69d4be 100644 --- a/cdc_fuse_fs/main.cc +++ b/cdc_fuse_fs/main.cc @@ -21,9 +21,11 @@ #include "cdc_fuse_fs/cdc_fuse_fs.h" #include "cdc_fuse_fs/config_stream_client.h" #include "cdc_fuse_fs/constants.h" +#include "common/client_socket.h" #include "common/gamelet_component.h" #include "common/log.h" #include "common/path.h" +#include "common/server_socket.h" #include "data_store/data_provider.h" #include "data_store/disk_data_store.h" #include "data_store/grpc_reader.h" @@ -37,6 +39,8 @@ namespace { constexpr char kFuseFilename[] = "cdc_fuse_fs"; constexpr char kLibFuseFilename[] = "libfuse.so"; +constexpr absl::Duration kConnectionTimeout = absl::Seconds(60); + bool IsUpToDate(const std::string& components_arg) { // Components are expected to reside in the same dir as the executable. std::string component_dir; @@ -107,7 +111,6 @@ ABSL_FLAG( "Whitespace-separated triples filename, size and timestamp of the " "workstation version of this binary and dependencies. Used for a fast " "up-to-date check."); -ABSL_FLAG(uint16_t, port, 0, "Port to connect to on localhost"); ABSL_FLAG(cdc_ft::JedecSize, prefetch_size, cdc_ft::JedecSize(512 << 10), "Additional data to request from the server when a FUSE read of " "maximum size is detected. This amount is added to the original " @@ -138,7 +141,6 @@ int main(int argc, char* argv[]) { std::vector mount_args = absl::ParseCommandLine(argc, argv); std::string instance = absl::GetFlag(FLAGS_instance); std::string components = absl::GetFlag(FLAGS_components); - uint16_t port = absl::GetFlag(FLAGS_port); std::string cache_dir = absl::GetFlag(FLAGS_cache_dir); int cache_dir_levels = absl::GetFlag(FLAGS_cache_dir_levels); int verbosity = absl::GetFlag(FLAGS_verbosity); @@ -159,7 +161,18 @@ int main(int argc, char* argv[]) { printf("%s\n", cdc_ft::kFuseNotUpToDate); return 0; } - printf("%s\n", cdc_ft::kFuseUpToDate); + + // Find an available port. + absl::StatusOr port_or = cdc_ft::ServerSocket::FindAvailablePort(); + if (!port_or.ok()) { + LOG_ERROR("Failed to find available port: %s\n", + port_or.status().ToString()); + return 1; + } + int port = *port_or; + + // Write marker for the server. + printf("%s%i%s\n", cdc_ft::kFusePortPrefix, port, cdc_ft::kFuseUpToDate); fflush(stdout); // Create mount dir if it doesn't exist yet. @@ -189,6 +202,18 @@ int main(int argc, char* argv[]) { store.value()->SetCapacity(cache_capacity); LOG_INFO("Caching chunks in '%s'", store.value()->RootDir()); + // Wait for port forwarding to listen to |port|. + status = + cdc_ft::ClientSocket::WaitForConnection(port, cdc_ft::kConnectionTimeout); + if (!status.ok()) { + LOG_ERROR("Failed to connect to port %i: %s", port, status.ToString()); + return static_cast(status.code()); + } + + // Write another marker for the server. + printf("%s\n", cdc_ft::kFuseConnected); + fflush(stdout); + // Start a gRpc client. std::string client_address = absl::StrFormat("localhost:%u", port); grpc::ChannelArguments channel_args; diff --git a/cdc_stream/BUILD b/cdc_stream/BUILD index c1e7fb6..e5e4fcf 100644 --- a/cdc_stream/BUILD +++ b/cdc_stream/BUILD @@ -214,6 +214,7 @@ cc_library( "//common:process", "//common:remote_util", "//common:sdk_util", + "//common:server_socket", "//common:status_macros", "//common:stopwatch", "//data_store:disk_data_store", diff --git a/cdc_stream/asset_stream_config.cc b/cdc_stream/asset_stream_config.cc index bb62ea8..7cd33f2 100644 --- a/cdc_stream/asset_stream_config.cc +++ b/cdc_stream/asset_stream_config.cc @@ -50,19 +50,14 @@ void AssetStreamConfig::RegisterCommandLineFlags(lyra::command& cmd, "asset stream service, default: " + std::to_string(service_port_))); - session_cfg_.forward_port_first = MultiSession::kDefaultForwardPortFirst; - session_cfg_.forward_port_last = MultiSession::kDefaultForwardPortLast; - cmd.add_argument( - lyra::opt(base_command.PortRangeParser("--forward-port", - &session_cfg_.forward_port_first, - &session_cfg_.forward_port_last), - "port") - .name("--forward-port") - .help("TCP port or range used for SSH port forwarding, default: " + - std::to_string(MultiSession::kDefaultForwardPortFirst) + "-" + - std::to_string(MultiSession::kDefaultForwardPortLast) + - ". If a range is specified, searches for available ports " - "(slower).")); + cmd.add_argument(lyra::opt(base_command.PortRangeParser( + "--forward-port", + &session_cfg_.deprecated_forward_port_first, + &session_cfg_.deprecated_forward_port_last), + "port") + .name("--forward-port") + .help("[Deprecated, ignored] TCP port or range used for " + "SSH port forwarding")); session_cfg_.verbosity = kDefaultVerbosity; cmd.add_argument(lyra::opt(session_cfg_.verbosity, "num") @@ -190,8 +185,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { } while (0) ASSIGN_VAR(service_port_, "service-port", Int); - ASSIGN_VAR(session_cfg_.forward_port_first, "forward-port-first", Int); - ASSIGN_VAR(session_cfg_.forward_port_last, "forward-port-last", Int); ASSIGN_VAR(session_cfg_.verbosity, "verbosity", Int); ASSIGN_VAR(session_cfg_.fuse_debug, "debug", Bool); ASSIGN_VAR(session_cfg_.fuse_singlethreaded, "singlethreaded", Bool); @@ -231,8 +224,6 @@ absl::Status AssetStreamConfig::LoadFromFile(const std::string& path) { std::string AssetStreamConfig::ToString() { std::ostringstream ss; ss << "service-port = " << service_port_ << std::endl; - ss << "forward-port = " << session_cfg_.forward_port_first - << "-" << session_cfg_.forward_port_last << std::endl; ss << "verbosity = " << session_cfg_.verbosity << std::endl; ss << "debug = " << session_cfg_.fuse_debug diff --git a/cdc_stream/cdc_fuse_manager.cc b/cdc_stream/cdc_fuse_manager.cc index 8b5fefb..df031e0 100644 --- a/cdc_stream/cdc_fuse_manager.cc +++ b/cdc_stream/cdc_fuse_manager.cc @@ -36,6 +36,24 @@ constexpr char kRemoteToolsBinDir[] = ".cache/cdc-file-transfer/bin/"; // Cache directory on the gamelet to store data chunks. constexpr char kCacheDir[] = "~/.cache/cdc-file-transfer/chunks"; +// Parses the port from the FUSE stdout when FUSE is up-to-date. In that case, +// the expected stdout is similar to "Port 12345 cdc_fuse_fs is up-to-date". +absl::StatusOr ParsePort(const std::string& fuse_stdout) { + // Search backwards until we find "Port ". + size_t port_pos = fuse_stdout.find(kFusePortPrefix); + if (port_pos == std::string::npos) { + return MakeStatus("Failed to find '%s' marker in server output '%s'", + kFusePortPrefix, fuse_stdout); + } + int port = + atoi(fuse_stdout.substr(port_pos + strlen(kFusePortPrefix)).c_str()); + if (port == 0) { + return MakeStatus("Failed to parse port from server output '%s'", + fuse_stdout); + } + return port; +} + } // namespace CdcFuseManager::CdcFuseManager(std::string instance, @@ -79,10 +97,10 @@ absl::Status CdcFuseManager::Deploy() { } absl::Status CdcFuseManager::Start(const std::string& mount_dir, - uint16_t local_port, uint16_t remote_port, - int verbosity, bool debug, - bool singlethreaded, bool enable_stats, - bool check, uint64_t cache_capacity, + uint16_t local_port, int verbosity, + bool debug, bool singlethreaded, + bool enable_stats, bool check, + uint64_t cache_capacity, uint32_t cleanup_timeout_sec, uint32_t access_idle_timeout_sec) { assert(!fuse_process_); @@ -109,70 +127,109 @@ absl::Status CdcFuseManager::Start(const std::string& mount_dir, std::string remote_command = absl::StrFormat( "LD_LIBRARY_PATH=%s %s " "--instance=%s " - "--components=%s --port=%i --cache_dir=%s " + "--components=%s --cache_dir=%s " "--verbosity=%i --cleanup_timeout=%i --access_idle_timeout=%i --stats=%i " "--check=%i --cache_capacity=%u -- -o allow_root -o ro -o nonempty -o " "auto_unmount %s%s%s", kRemoteToolsBinDir, remotePath, RemoteUtil::QuoteForSsh(instance_), - RemoteUtil::QuoteForSsh(component_args), remote_port, kCacheDir, - verbosity, cleanup_timeout_sec, access_idle_timeout_sec, enable_stats, - check, cache_capacity, debug ? "-d " : "", singlethreaded ? "-s " : "", + RemoteUtil::QuoteForSsh(component_args), kCacheDir, verbosity, + cleanup_timeout_sec, access_idle_timeout_sec, enable_stats, check, + cache_capacity, debug ? "-d " : "", singlethreaded ? "-s " : "", RemoteUtil::QuoteForSsh(mount_dir)); bool needs_deploy = false; - RETURN_IF_ERROR( - RunFuseProcess(local_port, remote_port, remote_command, &needs_deploy)); + int remote_port; + ASSIGN_OR_RETURN(remote_port, RunFuseProcess(remote_command, &needs_deploy)); if (needs_deploy) { // Deploy and try again. RETURN_IF_ERROR(Deploy()); - RETURN_IF_ERROR( - RunFuseProcess(local_port, remote_port, remote_command, &needs_deploy)); + ASSIGN_OR_RETURN(remote_port, + RunFuseProcess(remote_command, &needs_deploy)); } + // Start port forwarding. + RETURN_IF_ERROR(RunPortForwardingProcess(local_port, remote_port)); + + // Wait until port forwarding is up and FUSE can connect to |remote_port|. + RETURN_IF_ERROR(WaitForFuseConnected()); + return absl::OkStatus(); } -absl::Status CdcFuseManager::RunFuseProcess(uint16_t local_port, - uint16_t remote_port, - const std::string& remote_command, - bool* needs_deploy) { +absl::StatusOr CdcFuseManager::RunFuseProcess( + const std::string& remote_command, bool* needs_deploy) { assert(!fuse_process_); assert(needs_deploy); *needs_deploy = false; LOG_DEBUG("Running FUSE process"); - ProcessStartInfo start_info = - remote_util_->BuildProcessStartInfoForSshPortForwardAndCommand( - local_port, remote_port, true, remote_command, - ArchType::kLinux_x86_64); + ProcessStartInfo start_info = remote_util_->BuildProcessStartInfoForSsh( + remote_command, ArchType::kLinux_x86_64); start_info.name = kFuseFilename; // Capture stdout to determine whether a deploy is required. fuse_stdout_.clear(); - fuse_startup_finished_ = false; - start_info.stdout_handler = [this, needs_deploy](const char* data, - size_t size) { - return HandleFuseStdout(data, size, needs_deploy); + fuse_port_ = 0; + fuse_not_up_to_date_ = false; + fuse_update_check_finished_ = false; + fuse_connected_ = false; + start_info.stdout_handler = [this](const char* data, size_t size) { + return HandleFuseStdout(data, size); }; fuse_process_ = process_factory_->Create(start_info); RETURN_IF_ERROR(fuse_process_->Start(), "Failed to start FUSE process"); LOG_DEBUG("FUSE process started. Waiting for startup to finish."); // Run until process exits or startup finishes. - auto startup_finished = [this]() { return fuse_startup_finished_.load(); }; - RETURN_IF_ERROR(fuse_process_->RunUntil(startup_finished), + RETURN_IF_ERROR(fuse_process_->RunUntil( + [this]() { return fuse_update_check_finished_.load(); }), "Failed to run FUSE process"); - LOG_DEBUG("FUSE process startup complete."); + LOG_DEBUG("FUSE process update check complete."); // If the FUSE process exited before it could perform its up-to-date check, it // most likely happens because the binary does not exist and needs to be // deployed. - *needs_deploy |= !fuse_startup_finished_ && fuse_process_->HasExited() && - fuse_process_->ExitCode() != 0; + *needs_deploy = fuse_not_up_to_date_ || + (!fuse_update_check_finished_ && fuse_process_->HasExited() && + fuse_process_->ExitCode() != 0); if (*needs_deploy) { LOG_DEBUG("FUSE needs to be (re-)deployed."); fuse_process_.reset(); - return absl::OkStatus(); + } + + return fuse_port_; +} + +absl::Status CdcFuseManager::RunPortForwardingProcess(int local_port, + int remote_port) { + assert(fuse_process_); + assert(!forwarding_process_); + + LOG_DEBUG( + "Running reverse port forwarding process, local port %i, remote port %i", + local_port, remote_port); + ProcessStartInfo start_info = + remote_util_->BuildProcessStartInfoForSshPortForward( + local_port, remote_port, /*reverse=*/true); + forwarding_process_ = process_factory_->Create(start_info); + RETURN_IF_ERROR(forwarding_process_->Start(), + "Failed to start port forwarding process"); + + return absl::OkStatus(); +} + +absl::Status CdcFuseManager::WaitForFuseConnected() { + assert(fuse_process_); + assert(forwarding_process_); + + RETURN_IF_ERROR( + fuse_process_->RunUntil([this]() { return fuse_connected_.load(); }), + "Failed to run FUSE process"); + LOG_DEBUG("FUSE process connected."); + + if (!fuse_connected_ && fuse_process_->HasExited()) { + return MakeStatus("FUSE exited during startup with code %u", + fuse_process_->ExitCode()); } return absl::OkStatus(); @@ -183,30 +240,37 @@ absl::Status CdcFuseManager::Stop() { return absl::OkStatus(); } - LOG_DEBUG("Terminating FUSE process"); + LOG_DEBUG("Terminating FUSE and port forwarding processes"); absl::Status status = fuse_process_->Terminate(); + status.Update(forwarding_process_->Terminate()); fuse_process_.reset(); + forwarding_process_.reset(); return status; } bool CdcFuseManager::IsHealthy() const { - return fuse_process_ && !fuse_process_->HasExited(); + return fuse_process_ && !fuse_process_->HasExited() && forwarding_process_ && + !forwarding_process_->HasExited(); } -absl::Status CdcFuseManager::HandleFuseStdout(const char* data, size_t size, - bool* needs_deploy) { - assert(needs_deploy); - +absl::Status CdcFuseManager::HandleFuseStdout(const char* data, size_t size) { // Don't capture stdout beyond startup. - if (!fuse_startup_finished_) { + if (!fuse_connected_) { fuse_stdout_.append(data, size); - // The gamelet component prints some magic strings to stdout to indicate + + // The remote component prints some magic strings to stdout to indicate // whether it's up-to-date. if (absl::StrContains(fuse_stdout_, kFuseUpToDate)) { - fuse_startup_finished_ = true; + ASSIGN_OR_RETURN(fuse_port_, ParsePort(fuse_stdout_)); + fuse_update_check_finished_ = true; } else if (absl::StrContains(fuse_stdout_, kFuseNotUpToDate)) { - fuse_startup_finished_ = true; - *needs_deploy = true; + fuse_not_up_to_date_ = true; + fuse_update_check_finished_ = true; + } + + // It also prints stuff when it can connect to its port. + if (absl::StrContains(fuse_stdout_, kFuseConnected)) { + fuse_connected_ = true; } } diff --git a/cdc_stream/cdc_fuse_manager.h b/cdc_stream/cdc_fuse_manager.h index 2b172f6..2e46eb7 100644 --- a/cdc_stream/cdc_fuse_manager.h +++ b/cdc_stream/cdc_fuse_manager.h @@ -18,6 +18,7 @@ #define CDC_STREAM_CDC_FUSE_MANAGER_H_ #include "absl/status/status.h" +#include "absl/status/statusor.h" #include "common/remote_util.h" namespace cdc_ft { @@ -26,7 +27,7 @@ class Process; class ProcessFactory; class RemoteUtil; -// Manages the gamelet-side CDC FUSE filesystem process. +// Manages the remote CDC FUSE filesystem process. class CdcFuseManager { public: CdcFuseManager(std::string instance, ProcessFactory* process_factory, @@ -36,11 +37,10 @@ class CdcFuseManager { CdcFuseManager(CdcFuseManager&) = delete; CdcFuseManager& operator=(CdcFuseManager&) = delete; - // Starts the CDC FUSE and establishes a reverse SSH tunnel from the gamelet's - // |remote_port| to the workstation's |local_port|. Deploys the binary if - // necessary. + // Starts the remote CDC FUSE process. Deploys the binary if necessary. // // |mount_dir| is the remote directory where to mount the FUSE. + // |local_port| is the local port used for gRPC connections to the FUSE. // |verbosity| is the log verbosity used by the filesystem. // |debug| puts the filesystem into debug mode if set to true. This also // causes the process to run in the foreground, so that logs are piped through @@ -53,9 +53,9 @@ class CdcFuseManager { // |access_idle_timeout_sec| defines the number of seconds after which data // provider is considered to be access-idling. absl::Status Start(const std::string& mount_dir, uint16_t local_port, - uint16_t remote_port, int verbosity, bool debug, - bool singlethreaded, bool enable_stats, bool check, - uint64_t cache_capacity, uint32_t cleanup_timeout_sec, + int verbosity, bool debug, bool singlethreaded, + bool enable_stats, bool check, uint64_t cache_capacity, + uint32_t cleanup_timeout_sec, uint32_t access_idle_timeout_sec); // Stops the CDC FUSE. @@ -65,33 +65,49 @@ class CdcFuseManager { bool IsHealthy() const; private: - // Runs the FUSE process on the gamelet from the given |remote_command| and - // establishes a reverse SSH tunnel from the gamelet's |remote_port| to the - // workstation's |local_port|. + // Runs the remote FUSE process from the given |remote_command|. Returns the + // remote port that the FUSE will connect to, once the port forwarding process + // is up. // // If the FUSE is not up-to-date or does not exist, sets |needs_deploy| to // true and returns OK. In that case, Deploy() needs to be called and the FUSE // process should be run again. - absl::Status RunFuseProcess(uint16_t local_port, uint16_t remote_port, - const std::string& remote_command, - bool* needs_deploy); + absl::StatusOr RunFuseProcess(const std::string& remote_command, + bool* needs_deploy); - // Deploys the gamelet components. + // Establishes a reverse SSH tunnel from |remote_port| to |local_port|. + absl::Status RunPortForwardingProcess(int local_port, int remote_port); + + // Waits until FUSE can connect to its port. This essentially means that + // port forwarding is up and running. + absl::Status WaitForFuseConnected(); + + // Deploys the remote components. absl::Status Deploy(); - // Output handler for FUSE's stdout. Sets |needs_deploy| to true if the output - // contains a magic marker to indicate that the binary has to be redeployed. - // Called in a background thread. - absl::Status HandleFuseStdout(const char* data, size_t size, - bool* needs_deploy); + // Output handler for FUSE's stdout. + // Sets |fuse_not_up_to_date_| to true if the output contains a magic marker + // to indicate that the binary has to be redeployed. + // Sets |fuse_port_| to the remote gRPC port if the output contains a magic + // marker that has the port and indicates that the binary is up-to-date. + // Sets |fuse_update_check_finished_| to true if any of the above two markers + // was set. Called in a background thread. + // Sets |fuse_startup_finished_| to true if FUSE is connected to its port. + absl::Status HandleFuseStdout(const char* data, size_t size); std::string instance_; ProcessFactory* const process_factory_; RemoteUtil* const remote_util_; std::unique_ptr fuse_process_; + std::unique_ptr forwarding_process_; std::string fuse_stdout_; - std::atomic fuse_startup_finished_{false}; + + // Set by HandleFuseStdout + int fuse_port_ = 0; + bool fuse_not_up_to_date_ = false; + std::atomic_bool fuse_update_check_finished_{false}; + std::atomic_bool fuse_connected_{false}; }; } // namespace cdc_ft diff --git a/cdc_stream/multi_session.cc b/cdc_stream/multi_session.cc index 5a0b036..dc02e55 100644 --- a/cdc_stream/multi_session.cc +++ b/cdc_stream/multi_session.cc @@ -22,6 +22,7 @@ #include "common/platform.h" #include "common/port_manager.h" #include "common/process.h" +#include "common/server_socket.h" #include "common/util.h" #include "data_store/disk_data_store.h" #include "manifest/content_id.h" @@ -436,19 +437,8 @@ absl::Status MultiSession::Initialize() { } // Find an available local port. - local_asset_stream_port_ = cfg_.forward_port_first; - if (cfg_.forward_port_first < cfg_.forward_port_last) { - std::unordered_set ports; - ASSIGN_OR_RETURN( - ports, - PortManager::FindAvailableLocalPorts( - cfg_.forward_port_first, cfg_.forward_port_last, - ArchType::kWindows_x86_64, process_factory_), - "Failed to find an available local port in the range [%d, %d]", - cfg_.forward_port_first, cfg_.forward_port_last); - assert(!ports.empty()); - local_asset_stream_port_ = *ports.begin(); - } + ASSIGN_OR_RETURN(local_asset_stream_port_, ServerSocket::FindAvailablePort(), + "Failed to find an available local port"); assert(!runner_); runner_ = std::make_unique( @@ -523,9 +513,7 @@ absl::Status MultiSession::StartSession(const std::string& instance_id, auto session = std::make_unique( instance_id, target, cfg_, process_factory_, std::move(metrics_recorder)); - RETURN_IF_ERROR(session->Start(local_asset_stream_port_, - cfg_.forward_port_first, - cfg_.forward_port_last)); + RETURN_IF_ERROR(session->Start(local_asset_stream_port_)); // Wait for the FUSE to receive the first intermediate manifest. RETURN_IF_ERROR(runner_->WaitForManifestAck(instance_id, absl::Seconds(5))); diff --git a/cdc_stream/session.cc b/cdc_stream/session.cc index 81567bf..8a9c667 100644 --- a/cdc_stream/session.cc +++ b/cdc_stream/session.cc @@ -68,33 +68,18 @@ Session::~Session() { } } -absl::Status Session::Start(int local_port, int first_remote_port, - int last_remote_port) { - // Find an available remote port. - int remote_port = first_remote_port; - if (first_remote_port < last_remote_port) { - std::unordered_set ports; - ASSIGN_OR_RETURN( - ports, - PortManager::FindAvailableRemotePorts( - first_remote_port, last_remote_port, ArchType::kLinux_x86_64, - process_factory_, &remote_util_, kInstanceConnectionTimeoutSec), - "Failed to find an available remote port in the range [%d, %d]", - first_remote_port, last_remote_port); - assert(!ports.empty()); - remote_port = *ports.begin(); - } - +absl::Status Session::Start(int local_port) { assert(!fuse_); fuse_ = std::make_unique(instance_id_, process_factory_, &remote_util_); + RETURN_IF_ERROR( - fuse_->Start(mount_dir_, local_port, remote_port, cfg_.verbosity, - cfg_.fuse_debug, cfg_.fuse_singlethreaded, cfg_.stats, - cfg_.fuse_check, cfg_.fuse_cache_capacity, - cfg_.fuse_cleanup_timeout_sec, + fuse_->Start(mount_dir_, local_port, cfg_.verbosity, cfg_.fuse_debug, + cfg_.fuse_singlethreaded, cfg_.stats, cfg_.fuse_check, + cfg_.fuse_cache_capacity, cfg_.fuse_cleanup_timeout_sec, cfg_.fuse_access_idle_timeout_sec), "Failed to start instance component"); + return absl::OkStatus(); } diff --git a/cdc_stream/session.h b/cdc_stream/session.h index 5db3733..7f4fc82 100644 --- a/cdc_stream/session.h +++ b/cdc_stream/session.h @@ -58,9 +58,7 @@ class Session { // Starts the CDC FUSE on the instance with established port forwarding. // |local_port| is the local reverse forwarding port to use. - // [|first_remote_port|, |last_remote_port|] are the allowed remote ports. - absl::Status Start(int local_port, int first_remote_port, - int last_remote_port); + absl::Status Start(int local_port); // Shuts down the connection to the instance. absl::Status Stop() ABSL_LOCKS_EXCLUDED(transferred_data_mu_); diff --git a/cdc_stream/session_config.h b/cdc_stream/session_config.h index 36252ed..3222a16 100644 --- a/cdc_stream/session_config.h +++ b/cdc_stream/session_config.h @@ -57,9 +57,10 @@ struct SessionConfig { // Time to wait until running a manifest update after detecting a file change. uint32_t file_change_wait_duration_ms = 0; - // Ports used for local port forwarding. - uint16_t forward_port_first = 0; - uint16_t forward_port_last = 0; + // Ports used for local port forwarding. Deprecated as forward ports are + // determined automatically now using ephemeral ports. + uint16_t deprecated_forward_port_first = 0; + uint16_t deprecated_forward_port_last = 0; }; } // namespace cdc_ft diff --git a/integration_tests/cdc_stream/general_test.py b/integration_tests/cdc_stream/general_test.py index 9ff084c..6602bbf 100644 --- a/integration_tests/cdc_stream/general_test.py +++ b/integration_tests/cdc_stream/general_test.py @@ -109,12 +109,14 @@ class GeneralTest(test_base.CdcStreamTest): self._test_dir_content(files=[], dirs=[]) self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + def test_rename_file(self): filename = 'file1.txt' file_local_path = os.path.join(self.local_base_dir, filename) utils.create_test_file(file_local_path, 1024) self._start() self._test_dir_content(files=[filename], dirs=[]) cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) # After a file is renamed, the manifest is updated. renamed_filename = 'file2.txt' diff --git a/integration_tests/cdc_stream/test_base.py b/integration_tests/cdc_stream/test_base.py index bdf70f4..170dac9 100644 --- a/integration_tests/cdc_stream/test_base.py +++ b/integration_tests/cdc_stream/test_base.py @@ -129,17 +129,17 @@ class CdcStreamTest(unittest.TestCase): utils.target(self.remote_base_dir), self.service_port_arg) self._assert_stream_success(res) + CdcStreamTest.service_running = True def _stop(self, ignore_not_found=False): """Stops streaming to the target Args: - local_dir (string): Directory to stream. Defaults to local_base_dir. + ignore_not_found (bool): True to ignore if there's nothing to stop. """ - if not self.service_running: + if not CdcStreamTest.service_running: return - res = utils.run_stream('stop', utils.target(self.remote_base_dir), - self.service_port_arg) + res = utils.run_stream('stop', '*', self.service_port_arg) if ignore_not_found and res.returncode == self.NOT_FOUND: return self._assert_stream_success(res)