mirror of
https://github.com/nestriness/cdc-file-transfer.git
synced 2026-01-30 14:45:37 +02:00
[cdc_stream] Append errors from netstart to status (#9)
So far, errors from the remote netstat process would only be logged in the asset stream service, for instance when SSH auth failed. However, the errors were not shown to the client, and that's the most important thing. Also adds some feedback to cdc_stream in case of success.
This commit is contained in:
@@ -445,7 +445,7 @@ absl::Status MultiSession::Initialize() {
|
|||||||
ports,
|
ports,
|
||||||
PortManager::FindAvailableLocalPorts(kAssetStreamPortFirst,
|
PortManager::FindAvailableLocalPorts(kAssetStreamPortFirst,
|
||||||
kAssetStreamPortLast, "127.0.0.1",
|
kAssetStreamPortLast, "127.0.0.1",
|
||||||
process_factory_, true),
|
process_factory_),
|
||||||
"Failed to find an available local port in the range [%d, %d]",
|
"Failed to find an available local port in the range [%d, %d]",
|
||||||
kAssetStreamPortFirst, kAssetStreamPortLast);
|
kAssetStreamPortFirst, kAssetStreamPortLast);
|
||||||
assert(!ports.empty());
|
assert(!ports.empty());
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ absl::Status Session::Start(int local_port, int first_remote_port,
|
|||||||
ports,
|
ports,
|
||||||
PortManager::FindAvailableRemotePorts(
|
PortManager::FindAvailableRemotePorts(
|
||||||
first_remote_port, last_remote_port, "127.0.0.1", process_factory_,
|
first_remote_port, last_remote_port, "127.0.0.1", process_factory_,
|
||||||
&remote_util_, kInstanceConnectionTimeoutSec, true),
|
&remote_util_, kInstanceConnectionTimeoutSec),
|
||||||
"Failed to find an available remote port in the range [%d, %d]",
|
"Failed to find an available remote port in the range [%d, %d]",
|
||||||
first_remote_port, last_remote_port);
|
first_remote_port, last_remote_port);
|
||||||
assert(!ports.empty());
|
assert(!ports.empty());
|
||||||
|
|||||||
@@ -135,6 +135,10 @@ int main(int argc, char* argv[]) {
|
|||||||
|
|
||||||
status = client.StartSession(src_dir, user_host, ssh_port, mount_dir,
|
status = client.StartSession(src_dir, user_host, ssh_port, mount_dir,
|
||||||
ssh_command, scp_command);
|
ssh_command, scp_command);
|
||||||
|
if (status.ok()) {
|
||||||
|
LOG_INFO("Started streaming directory '%s' to '%s:%s'", src_dir,
|
||||||
|
user_host, mount_dir);
|
||||||
|
}
|
||||||
} else /* if (command == "stop") */ {
|
} else /* if (command == "stop") */ {
|
||||||
if (args.size() < 3) {
|
if (args.size() < 3) {
|
||||||
LOG_ERROR(
|
LOG_ERROR(
|
||||||
@@ -147,6 +151,9 @@ int main(int argc, char* argv[]) {
|
|||||||
if (!ParseUserHostDir(args[2], &user_host, &mount_dir)) return 1;
|
if (!ParseUserHostDir(args[2], &user_host, &mount_dir)) return 1;
|
||||||
|
|
||||||
status = client.StopSession(user_host, mount_dir);
|
status = client.StopSession(user_host, mount_dir);
|
||||||
|
if (status.ok()) {
|
||||||
|
LOG_INFO("Stopped streaming session to '%s:%s'", user_host, mount_dir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
|
|||||||
@@ -71,12 +71,10 @@ class PortManager {
|
|||||||
// forwarding on the local workstation.
|
// forwarding on the local workstation.
|
||||||
// |ip| is the IP address to filter by.
|
// |ip| is the IP address to filter by.
|
||||||
// |process_factory| is used to create a netstat process.
|
// |process_factory| is used to create a netstat process.
|
||||||
// |forward_output_to_log| determines whether the stderr of netstat is
|
// Returns ResourceExhaustedError if no port is available.
|
||||||
// forwarded to the logs. Returns ResourceExhaustedError if no port is
|
|
||||||
// available.
|
|
||||||
static absl::StatusOr<std::unordered_set<int>> FindAvailableLocalPorts(
|
static absl::StatusOr<std::unordered_set<int>> FindAvailableLocalPorts(
|
||||||
int first_port, int last_port, const char* ip,
|
int first_port, int last_port, const char* ip,
|
||||||
ProcessFactory* process_factory, bool forward_output_to_log);
|
ProcessFactory* process_factory);
|
||||||
|
|
||||||
// Finds available ports in the range [first_port, last_port] for port
|
// Finds available ports in the range [first_port, last_port] for port
|
||||||
// forwarding on the instance.
|
// forwarding on the instance.
|
||||||
@@ -84,13 +82,11 @@ class PortManager {
|
|||||||
// |process_factory| is used to create a netstat process.
|
// |process_factory| is used to create a netstat process.
|
||||||
// |remote_util| is used to connect to the instance.
|
// |remote_util| is used to connect to the instance.
|
||||||
// |timeout_sec| is the connection timeout in seconds.
|
// |timeout_sec| is the connection timeout in seconds.
|
||||||
// |forward_output_to_log| determines whether the stderr of netstat is
|
// Returns a DeadlineExceeded error if the timeout is exceeded.
|
||||||
// forwarded to the logs. Returns a DeadlineExceeded error if the timeout is
|
// Returns ResourceExhaustedError if no port is available.
|
||||||
// exceeded. Returns ResourceExhaustedError if no port is available.
|
|
||||||
static absl::StatusOr<std::unordered_set<int>> FindAvailableRemotePorts(
|
static absl::StatusOr<std::unordered_set<int>> FindAvailableRemotePorts(
|
||||||
int first_port, int last_port, const char* ip,
|
int first_port, int last_port, const char* ip,
|
||||||
ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec,
|
ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec,
|
||||||
bool forward_output_to_log,
|
|
||||||
SteadyClock* steady_clock = DefaultSteadyClock::GetInstance());
|
SteadyClock* steady_clock = DefaultSteadyClock::GetInstance());
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@@ -219,7 +219,7 @@ TEST_F(PortManagerTest, FindAvailableLocalPortsSuccess) {
|
|||||||
|
|
||||||
absl::StatusOr<std::unordered_set<int>> ports =
|
absl::StatusOr<std::unordered_set<int>> ports =
|
||||||
PortManager::FindAvailableLocalPorts(kFirstPort, kLastPort, "127.0.0.1",
|
PortManager::FindAvailableLocalPorts(kFirstPort, kLastPort, "127.0.0.1",
|
||||||
&process_factory_, true);
|
&process_factory_);
|
||||||
ASSERT_OK(ports);
|
ASSERT_OK(ports);
|
||||||
EXPECT_EQ(ports->size(), kNumPorts - 1);
|
EXPECT_EQ(ports->size(), kNumPorts - 1);
|
||||||
for (int port = kFirstPort + 1; port <= kLastPort; ++port) {
|
for (int port = kFirstPort + 1; port <= kLastPort; ++port) {
|
||||||
@@ -237,7 +237,7 @@ TEST_F(PortManagerTest, FindAvailableLocalPortsFailsNoPorts) {
|
|||||||
|
|
||||||
absl::StatusOr<std::unordered_set<int>> ports =
|
absl::StatusOr<std::unordered_set<int>> ports =
|
||||||
PortManager::FindAvailableLocalPorts(kFirstPort, kLastPort, "127.0.0.1",
|
PortManager::FindAvailableLocalPorts(kFirstPort, kLastPort, "127.0.0.1",
|
||||||
&process_factory_, true);
|
&process_factory_);
|
||||||
EXPECT_TRUE(absl::IsResourceExhausted(ports.status()));
|
EXPECT_TRUE(absl::IsResourceExhausted(ports.status()));
|
||||||
EXPECT_TRUE(absl::StrContains(ports.status().message(),
|
EXPECT_TRUE(absl::StrContains(ports.status().message(),
|
||||||
"No port available in range"));
|
"No port available in range"));
|
||||||
@@ -252,7 +252,7 @@ TEST_F(PortManagerTest, FindAvailableRemotePortsSuccess) {
|
|||||||
absl::StatusOr<std::unordered_set<int>> ports =
|
absl::StatusOr<std::unordered_set<int>> ports =
|
||||||
PortManager::FindAvailableRemotePorts(kFirstPort, kLastPort, "0.0.0.0",
|
PortManager::FindAvailableRemotePorts(kFirstPort, kLastPort, "0.0.0.0",
|
||||||
&process_factory_, &remote_util_,
|
&process_factory_, &remote_util_,
|
||||||
kTimeoutSec, true);
|
kTimeoutSec);
|
||||||
ASSERT_OK(ports);
|
ASSERT_OK(ports);
|
||||||
EXPECT_EQ(ports->size(), kNumPorts - 1);
|
EXPECT_EQ(ports->size(), kNumPorts - 1);
|
||||||
for (int port = kFirstPort + 1; port <= kLastPort; ++port) {
|
for (int port = kFirstPort + 1; port <= kLastPort; ++port) {
|
||||||
@@ -271,7 +271,7 @@ TEST_F(PortManagerTest, FindAvailableRemotePortsFailsNoPorts) {
|
|||||||
absl::StatusOr<std::unordered_set<int>> ports =
|
absl::StatusOr<std::unordered_set<int>> ports =
|
||||||
PortManager::FindAvailableRemotePorts(kFirstPort, kLastPort, "0.0.0.0",
|
PortManager::FindAvailableRemotePorts(kFirstPort, kLastPort, "0.0.0.0",
|
||||||
&process_factory_, &remote_util_,
|
&process_factory_, &remote_util_,
|
||||||
kTimeoutSec, true);
|
kTimeoutSec);
|
||||||
EXPECT_TRUE(absl::IsResourceExhausted(ports.status()));
|
EXPECT_TRUE(absl::IsResourceExhausted(ports.status()));
|
||||||
EXPECT_TRUE(absl::StrContains(ports.status().message(),
|
EXPECT_TRUE(absl::StrContains(ports.status().message(),
|
||||||
"No port available in range"));
|
"No port available in range"));
|
||||||
|
|||||||
@@ -127,18 +127,17 @@ absl::StatusOr<int> PortManager::ReservePort(bool check_remote,
|
|||||||
std::unordered_set<int> local_ports;
|
std::unordered_set<int> local_ports;
|
||||||
ASSIGN_OR_RETURN(local_ports,
|
ASSIGN_OR_RETURN(local_ports,
|
||||||
FindAvailableLocalPorts(first_port_, last_port_, "127.0.0.1",
|
FindAvailableLocalPorts(first_port_, last_port_, "127.0.0.1",
|
||||||
process_factory_, false),
|
process_factory_),
|
||||||
"Failed to find available ports on workstation");
|
"Failed to find available ports on workstation");
|
||||||
|
|
||||||
// Find available port on remote instance.
|
// Find available port on remote instance.
|
||||||
std::unordered_set<int> remote_ports = local_ports;
|
std::unordered_set<int> remote_ports = local_ports;
|
||||||
if (check_remote) {
|
if (check_remote) {
|
||||||
ASSIGN_OR_RETURN(
|
ASSIGN_OR_RETURN(remote_ports,
|
||||||
remote_ports,
|
FindAvailableRemotePorts(
|
||||||
FindAvailableRemotePorts(first_port_, last_port_, "0.0.0.0",
|
first_port_, last_port_, "0.0.0.0", process_factory_,
|
||||||
process_factory_, remote_util_,
|
remote_util_, remote_timeout_sec, steady_clock_),
|
||||||
remote_timeout_sec, false, steady_clock_),
|
"Failed to find available ports on instance");
|
||||||
"Failed to find available ports on instance");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch shared memory.
|
// Fetch shared memory.
|
||||||
@@ -206,7 +205,7 @@ absl::Status PortManager::ReleasePort(int port) {
|
|||||||
// static
|
// static
|
||||||
absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableLocalPorts(
|
absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableLocalPorts(
|
||||||
int first_port, int last_port, const char* ip,
|
int first_port, int last_port, const char* ip,
|
||||||
ProcessFactory* process_factory, bool forward_output_to_log) {
|
ProcessFactory* process_factory) {
|
||||||
// -a to get the connection and ports the computer is listening on.
|
// -a to get the connection and ports the computer is listening on.
|
||||||
// -n to get numerical addresses to avoid the overhead of determining names.
|
// -n to get numerical addresses to avoid the overhead of determining names.
|
||||||
// -p tcp to limit the output to TCPv4 connections.
|
// -p tcp to limit the output to TCPv4 connections.
|
||||||
@@ -220,10 +219,16 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableLocalPorts(
|
|||||||
output.append(data, data_size);
|
output.append(data, data_size);
|
||||||
return absl::OkStatus();
|
return absl::OkStatus();
|
||||||
};
|
};
|
||||||
start_info.forward_output_to_log = forward_output_to_log;
|
std::string errors;
|
||||||
|
start_info.stderr_handler = [&errors](const char* data, size_t data_size) {
|
||||||
|
errors.append(data, data_size);
|
||||||
|
return absl::OkStatus();
|
||||||
|
};
|
||||||
|
|
||||||
absl::Status status = process_factory->Run(start_info);
|
absl::Status status = process_factory->Run(start_info);
|
||||||
if (!status.ok()) return WrapStatus(status, "Failed to run netstat");
|
if (!status.ok()) {
|
||||||
|
return WrapStatus(status, "Failed to run netstat:\n%s", errors);
|
||||||
|
}
|
||||||
|
|
||||||
LOG_DEBUG("netstat (workstation) output:\n%s", output);
|
LOG_DEBUG("netstat (workstation) output:\n%s", output);
|
||||||
return FindAvailablePorts(first_port, last_port, output, ip);
|
return FindAvailablePorts(first_port, last_port, output, ip);
|
||||||
@@ -233,7 +238,7 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableLocalPorts(
|
|||||||
absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableRemotePorts(
|
absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableRemotePorts(
|
||||||
int first_port, int last_port, const char* ip,
|
int first_port, int last_port, const char* ip,
|
||||||
ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec,
|
ProcessFactory* process_factory, RemoteUtil* remote_util, int timeout_sec,
|
||||||
bool forward_output_to_log, SteadyClock* steady_clock) {
|
SteadyClock* steady_clock) {
|
||||||
// --numeric to get numerical addresses.
|
// --numeric to get numerical addresses.
|
||||||
// --listening to get only listening sockets.
|
// --listening to get only listening sockets.
|
||||||
// --tcp to get only TCP connections.
|
// --tcp to get only TCP connections.
|
||||||
@@ -247,12 +252,15 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableRemotePorts(
|
|||||||
output.append(data, data_size);
|
output.append(data, data_size);
|
||||||
return absl::OkStatus();
|
return absl::OkStatus();
|
||||||
};
|
};
|
||||||
start_info.forward_output_to_log = forward_output_to_log;
|
std::string errors;
|
||||||
|
start_info.stderr_handler = [&errors](const char* data, size_t data_size) {
|
||||||
|
errors.append(data, data_size);
|
||||||
|
return absl::OkStatus();
|
||||||
|
};
|
||||||
|
|
||||||
std::unique_ptr<Process> process = process_factory->Create(start_info);
|
std::unique_ptr<Process> process = process_factory->Create(start_info);
|
||||||
absl::Status status = process->Start();
|
absl::Status status = process->Start();
|
||||||
if (!status.ok())
|
if (!status.ok()) return WrapStatus(status, "Failed to start netstat");
|
||||||
return WrapStatus(status, "Failed to start netstat process");
|
|
||||||
|
|
||||||
Stopwatch timeout_timer(steady_clock);
|
Stopwatch timeout_timer(steady_clock);
|
||||||
bool is_timeout = false;
|
bool is_timeout = false;
|
||||||
@@ -266,8 +274,10 @@ absl::StatusOr<std::unordered_set<int>> PortManager::FindAvailableRemotePorts(
|
|||||||
return absl::DeadlineExceededError("Timeout while running netstat");
|
return absl::DeadlineExceededError("Timeout while running netstat");
|
||||||
|
|
||||||
uint32_t exit_code = process->ExitCode();
|
uint32_t exit_code = process->ExitCode();
|
||||||
if (exit_code != 0)
|
if (exit_code != 0) {
|
||||||
return MakeStatus("netstat process exited with code %u", exit_code);
|
return MakeStatus("netstat process exited with code %u:\n%s", exit_code,
|
||||||
|
errors);
|
||||||
|
}
|
||||||
|
|
||||||
LOG_DEBUG("netstat (instance) output:\n%s", output);
|
LOG_DEBUG("netstat (instance) output:\n%s", output);
|
||||||
return FindAvailablePorts(first_port, last_port, output, ip);
|
return FindAvailablePorts(first_port, last_port, output, ip);
|
||||||
|
|||||||
Reference in New Issue
Block a user