From f0ef34db2f14a8ee10ced90c219f251883862275 Mon Sep 17 00:00:00 2001 From: Lutz Justen Date: Thu, 8 Dec 2022 15:12:14 +0100 Subject: [PATCH] [cdc_stream] Add integration tests (#44) This CL adds Python integration tests for cdc_stream. To run the tests, you need to supply a Linux host and proper configuration for cdc_stream to work: set CDC_SSH_COMMAND=C:\path\to\ssh.exe set CDC_SCP_COMMAND=C:\path\to\scp.exe C:\python38\python.exe -m integration_tests.cdc_stream.all_tests --binary_path=C:\full\path\to\cdc_stream.exe --user_host=user@host Ran the tests and made sure they worked. --- integration_tests/cdc_rsync/test_base.py | 3 +- integration_tests/cdc_stream/__init__.py | 1 + integration_tests/cdc_stream/all_tests.py | 42 ++ integration_tests/cdc_stream/cache_test.py | 116 ++++ .../cdc_stream/consistency_test.py | 496 ++++++++++++++++++ .../cdc_stream/directory_test.py | 124 +++++ integration_tests/cdc_stream/general_test.py | 275 ++++++++++ integration_tests/cdc_stream/test_base.py | 266 ++++++++++ integration_tests/framework/test_base.py | 10 +- integration_tests/framework/utils.py | 45 +- 10 files changed, 1357 insertions(+), 21 deletions(-) create mode 100644 integration_tests/cdc_stream/__init__.py create mode 100644 integration_tests/cdc_stream/all_tests.py create mode 100644 integration_tests/cdc_stream/cache_test.py create mode 100644 integration_tests/cdc_stream/consistency_test.py create mode 100644 integration_tests/cdc_stream/directory_test.py create mode 100644 integration_tests/cdc_stream/general_test.py create mode 100644 integration_tests/cdc_stream/test_base.py diff --git a/integration_tests/cdc_rsync/test_base.py b/integration_tests/cdc_rsync/test_base.py index 02fe397..4a6086e 100644 --- a/integration_tests/cdc_rsync/test_base.py +++ b/integration_tests/cdc_rsync/test_base.py @@ -39,7 +39,8 @@ class CdcRsyncTest(unittest.TestCase): super(CdcRsyncTest, self).setUp() logging.debug('CdcRsyncTest -> setUp') - utils.initialize(test_base.Flags.binary_path, test_base.Flags.user_host) + utils.initialize(test_base.Flags.binary_path, None, + test_base.Flags.user_host) now_str = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') self.tmp_dir = tempfile.TemporaryDirectory( diff --git a/integration_tests/cdc_stream/__init__.py b/integration_tests/cdc_stream/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/integration_tests/cdc_stream/__init__.py @@ -0,0 +1 @@ + diff --git a/integration_tests/cdc_stream/all_tests.py b/integration_tests/cdc_stream/all_tests.py new file mode 100644 index 0000000..6ce8722 --- /dev/null +++ b/integration_tests/cdc_stream/all_tests.py @@ -0,0 +1,42 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +import unittest + +from integration_tests.cdc_stream import cache_test +from integration_tests.cdc_stream import consistency_test +from integration_tests.cdc_stream import directory_test +from integration_tests.cdc_stream import general_test +from integration_tests.framework import test_base + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def load_tests(loader, unused_tests, unused_pattern): + """Customizes the list of test cases to run. + + See the Python documentation for details: + https://docs.python.org/3/library/unittest.html#load-tests-protocol + """ + suite = unittest.TestSuite() + suite.addTests(loader.loadTestsFromModule(cache_test)) + suite.addTests(loader.loadTestsFromModule(consistency_test)) + suite.addTests(loader.loadTestsFromModule(directory_test)) + suite.addTests(loader.loadTestsFromModule(general_test)) + + return suite + + +if __name__ == '__main__': + test_base.main() diff --git a/integration_tests/cdc_stream/cache_test.py b/integration_tests/cdc_stream/cache_test.py new file mode 100644 index 0000000..e6285a6 --- /dev/null +++ b/integration_tests/cdc_stream/cache_test.py @@ -0,0 +1,116 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""cdc_stream cache test.""" + +import logging +import os +import posixpath +import time + +from integration_tests.framework import utils +from integration_tests.cdc_stream import test_base + + +class CacheTest(test_base.CdcStreamTest): + """cdc_stream test class for cache.""" + + cache_capacity = 10 * 1024 * 1024 # 10MB + cleanup_timeout_sec = 2 + access_idle_timeout_sec = 2 + cleanup_time = 5 # estimated cleanup time + # Returns a list of files and directories with mtimes in the cache. + # 2021-10-19 01:09:30.070055513 -0700 /var/cache/asset_streaming + cache_cmd = ('find %s -exec stat --format \"%%y %%n\" ' + '\"{}\" \\;') % ( + test_base.CdcStreamTest.cache_dir) + + @classmethod + def setUpClass(cls): + super().setUpClass() + logging.debug('CacheTest -> setUpClass') + + config_json = ('{\"cache-capacity\":\"%s\",\"cleanup-timeout\":%i,' + '\"access-idle-timeout\":%i}') % ( + cls.cache_capacity, cls.cleanup_timeout_sec, + cls.access_idle_timeout_sec) + cls._start_service(config_json) + + def test_cache_reused(self): + """Cache survives remount and is reused.""" + filename = '1.txt' + utils.create_test_file( + os.path.join(self.local_base_dir, filename), 7 * 1024 * 1024) + self._start() + self._test_dir_content(files=[filename], dirs=[]) + # Read the file => fill the cache.file_transfer + utils.get_ssh_command_output('cat %s > /dev/null' % + posixpath.join(self.remote_base_dir, filename)) + cache_size = self._get_cache_size_in_bytes() + cache_files = utils.get_ssh_command_output(self.cache_cmd) + + self._stop() + self._assert_cdc_fuse_mounted(success=False) + self._assert_cache() + + self._start() + self._assert_cdc_fuse_mounted() + self._test_dir_content(files=[filename], dirs=[]) + utils.get_ssh_command_output('cat %s > /dev/null' % + posixpath.join(self.remote_base_dir, filename)) + # The same manifest should be re-used. No change in the cache is expected. + self.assertEqual(self._get_cache_size_in_bytes(), cache_size) + # The mtimes of the files should have changed after each Get() operation. + self.assertNotEqual( + utils.get_ssh_command_output(self.cache_cmd), cache_files) + + def test_set_cache_capacity_old_chunks_removed(self): + # Command to return the oldest mtime in the cache directory. + ts_cmd = ('find %s -type f -printf \"%%T@\\n\" ' + '| sort -n | head -n 1') % ( + self.cache_dir) + # Stream a file. + filename = '1.txt' + utils.create_test_file( + os.path.join(self.local_base_dir, filename), 11 * 1024 * 1024) + self._start() + self._test_dir_content(files=[filename], dirs=[]) + utils.get_ssh_command_output('cat %s > /dev/null' % + posixpath.join(self.remote_base_dir, filename)) + # Extract the oldest file. + oldest_ts = utils.get_ssh_command_output(ts_cmd) + original = utils.get_ssh_command_output(self.ls_cmd) + + # Add and read one more file. + filename2 = '2.txt' + utils.create_test_file( + os.path.join(self.local_base_dir, filename2), 11 * 1024 * 1024) + self.assertTrue(self._wait_until_remote_dir_changed(original)) + utils.get_ssh_command_output( + 'cat %s > /dev/null' % posixpath.join(self.remote_base_dir, filename2)) + + # Wait some time till the cache is cleaned up. + wait_sec = self.cleanup_timeout_sec + self.access_idle_timeout_sec + self.cleanup_time + logging.info(f'Waiting {wait_sec} seconds until the cache is cleaned up') + time.sleep(wait_sec) + self.assertLessEqual(self._get_cache_size_in_bytes(), self.cache_capacity) + new_oldest_ts = utils.get_ssh_command_output(ts_cmd) + + self.assertGreater(new_oldest_ts, oldest_ts) + self._test_dir_content(files=[filename, filename2], dirs=[]) + + +if __name__ == '__main__': + test_base.test_base.main() diff --git a/integration_tests/cdc_stream/consistency_test.py b/integration_tests/cdc_stream/consistency_test.py new file mode 100644 index 0000000..842c02a --- /dev/null +++ b/integration_tests/cdc_stream/consistency_test.py @@ -0,0 +1,496 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""cdc_stream consistency test.""" + +import glob +import logging +import os +import queue +import re +import string +import time + +from integration_tests.framework import utils +from integration_tests.cdc_stream import test_base + + +class ConsistencyTest(test_base.CdcStreamTest): + """cdc_stream test class for CDC FUSE consistency.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + logging.debug('ConsistencyTest -> setUpClass') + + config_json = '{\"debug\":1, \"check\":1, \"verbosity\":3}' + cls._start_service(config_json) + + def _wait_until_remote_dir_matches(self, files, dirs, counter=20): + """Wait until the directory content has changed. + + Args: + files (list of strings): List of relative file paths. + dirs (list of strings): List of relative directory paths. + counter (int): The number of retries. + Returns: + bool: Whether the content of the remote directory matches the local one. + """ + dirs = [directory.replace('\\', '/').rstrip('/') for directory in dirs] + files = [file.replace('\\', '/') for file in files] + sha1_local = self.sha1sum_local_batch(files) + for _ in range(counter): + utils.get_ssh_command_output('ls -al %s' % self.remote_base_dir) + found = utils.get_sorted_files(self.remote_base_dir, '"*"') + expected = sorted(['./' + f for f in files + dirs]) + if found == expected: + if not files: + return True + sha1_remote = self.sha1sum_remote_batch() + if sha1_local == sha1_remote: + return True + time.sleep(1) + return False + + def _generate_random_name(self, depth): + """Generate a random name for a file/directory name. + + Args: + depth (int): Depth of the directory structure. + Returns: + string: Random string. + """ + max_path_len = 260 # Windows limitation for a path length. + # 4 symbols are reserved for file extension .txt. + max_path_len_no_root = max_path_len - len(self.local_base_dir) - 4 + # As a Windows path is limited to 260 symbols it is necesary to consider the + # depth of the full path. + # +1 is for the last file, -2: for a path separator + down rounding. + max_file_name_len = int(max_path_len_no_root / (depth + 1) - 2) + length = utils.RANDOM.randint(1, max_file_name_len) + + # Consider only upper case and digits, as 1.txt and 1.TXT result in 1 file + # on Windows. + name = ''.join( + utils.RANDOM.choice(string.ascii_uppercase + string.digits) + for i in range(length)) + return name + + def _generate_dir_list(self, depth, num_leaf_dirs): + """Generate a list of directories. + + Args: + depth (int): Depth of the directory structure. + num_leaf_dirs (int): How many leaf directories should be generated. + Returns: + queue of list of strings: Relative paths of directories to be created. + """ + dirs = queue.Queue(maxsize=0) + if depth == 0: + return dirs + top_num = utils.RANDOM.randint(1, 1 + num_leaf_dirs) + for _ in range(top_num): + directory = self._generate_random_name(depth) + dirs.put([directory]) + new_dirs = queue.Queue(maxsize=0) + for _ in range(depth - 1): + while not dirs.empty(): + curr_set = dirs.get() + missing_dirs = num_leaf_dirs - new_dirs.qsize() - dirs.qsize() + if missing_dirs > 0: + num_dir = utils.RANDOM.randint(1, missing_dirs) + for _ in range(num_dir): + name = self._generate_random_name(depth) + path = curr_set.copy() + path.append(name) + new_dirs.put(path) + else: + new_dirs.put(curr_set) + new_dirs, dirs = dirs, new_dirs + for _ in range(num_leaf_dirs - dirs.qsize()): + dirs.put([self._generate_random_name(depth)]) + return dirs + + def _generate_files(self, dirs, size, depth, min_file_num, max_file_num): + """Create files in given directories. + + Args: + dirs (set of strings): Relative paths for directories. + size (int): Total size of files to be created. + depth (int): Depth of the directory hierarchy. + min_file_num (int): Minimal number of files, which can be created in a + directory. + max_file_num (int): Maximal number of files, which can be created in a + directory. + Returns: + list of strings: Set of relative paths of created files. + """ + files = set() + for directory in dirs: + number_of_files = utils.RANDOM.randint(min_file_num, max_file_num) + for _ in range(number_of_files): + # Add a file extension not to compare if a similar directory exists. + file_name = self._generate_random_name(depth=depth) + '.txt' + if file_name not in files: + file_path = os.path.join(directory, file_name) + files.add(file_path) + # Do not create files larger than 1 GB. + file_size = utils.RANDOM.randint(0, min(1024 * 1024 * 1024, size)) + size -= file_size + utils.create_test_file( + os.path.join(self.local_base_dir, file_path), file_size) + if size <= 0: + return files + # Create files for the remaining size. + if size > 0: + number_of_files = utils.RANDOM.randint(min_file_num, max_file_num) + for _ in range(number_of_files): + file_name = self._generate_random_name(depth) + files.add(file_name) + utils.create_test_file( + os.path.join(self.local_base_dir, file_name), + int(size / number_of_files)) + return files + + def _generate_dir_paths(self, dirs): + """Create directories. + + Args: + dirs (queue of lists of strings): Relative paths for directories. + Returns: + set of strings: Relative paths for created directories. + """ + paths = set() + for dir_set in dirs.queue: + curr_path = '' + for name in dir_set: + # It is necessary to add the last separator. + # Otherwise, the leaf directory will not be created. + curr_path = os.path.join(curr_path, name) + '\\' + paths.add(curr_path) + utils.create_test_directory(os.path.join(self.local_base_dir, curr_path)) + return paths + + def _generate_streamed_dir(self, size, depth, min_file_num=1, max_file_num=1): + """Generate a streamed directory. + + Args: + size (int): Total size of files to create in the directory. + depth (int): Depth of the directory hierarchy. + min_file_num (int): Minimal number of files, which can be created in a + single directory. + max_file_num (int): Maximal number of files, which can be created in a + single directory. + Returns: + two sets of strings: Relative paths for created files and directories. + """ + num_leaf_dirs = 0 + if depth > 0: + num_leaf_dirs = utils.RANDOM.randint(0, 100) + logging.debug(('CdcStreamConsistencyTest -> _generate_streamed_dir' + ' of depth %i and number of leaf directories %i'), depth, + num_leaf_dirs) + dirs = self._generate_dir_paths( + self._generate_dir_list(depth, num_leaf_dirs)) + files = self._generate_files( + dirs=dirs, + size=size, + depth=depth, + min_file_num=min_file_num, + max_file_num=max_file_num) + logging.debug( + ('CdcStreamConsistencyTest -> _generate_streamed_dir: generated' + ' %i files, %i directories, depth %i'), len(files), len(dirs), depth) + return files, dirs + + def _recreate_data(self, files, dirs): + """Recreate test data and check that it can be read on a gamelet. + + Args: + files (list of strings): List of relative file paths. + dirs (list of strings): List of relative directory paths. + """ + logging.debug('CdcStreamConsistencyTest -> _recreate_data') + self._create_test_data(files=files, dirs=dirs) + self.assertTrue(self._wait_until_remote_dir_matches(files=files, dirs=dirs)) + self._assert_cdc_fuse_mounted() + + def _assert_inode_consistency_line(self, line, updated_proto=0, updated=0): + """Assert if the numbers of inodes specific states are correct. + + Args: + line (string): Statement like Initialized=X, updated_proto=X, updated=X, + invalid=X. + updated_proto(int): Expected number of inodes whose protos were updated. + updated(int): Expected number of inodes whose contents were updated. + """ + self.assertIn(('Initialized=0, updated_proto=%i,' + ' updated=%i, invalid=0') % (updated_proto, updated), line) + + def _assert_consistency_line(self, line): + """Assert if there are no invalid and initialized nodes. + + Args: + line (string): Statement like Initialized=X, updated_proto=X, updated=X, + invalid=X. + """ + self.assertIn(('Initialized=0,'), line) + self.assertIn(('invalid=0'), line) + + def _assert_inode_consistency(self, update_map, log_file): + """Assert that the amount of updated inodes is correct. + + Args: + update_map (dict): Mapping of inodes' types to their amount. + log_file (string): Absolute path to the log file. + """ + with open(log_file) as file: + success_count = 0 + for line in file: + if 'Initialized=' in line: + self._assert_inode_consistency_line( + line, + updated_proto=update_map[success_count][0], + updated=update_map[success_count][1]) + if 'FUSE consistency check succeeded' in line: + success_count += 1 + self.assertNotIn('FUSE consistency check:', line) + + def _assert_consistency(self, log_file): + """Assert that there is no error consistency messages in the log. + + Args: + log_file (string): Absolute path to the log file. + """ + + def assert_initialized_line(line): + self.assertNotIn('FUSE consistency check:', line) + if 'Initialized=' in line: + self._assert_consistency_line(line) + + joined_line = '' + with open(log_file) as file: + for line in file: + # Matches log lines with a log level + # 2022-01-23 05:18:12.401 DEBUG process_win.cc(546): LogOutput(): + # cdc_fuse_fs_stdout: DEBUG cdc_fuse_fs.cc(1165): + # CheckFUSEConsistency(): Initialized= + # Matches log lines without log level + # 2022-01-23 05:18:12.401 INFO process_win.cc(536): LogOutput(): + # cdc_fuse_fs_stdout: 0, updated_proto=437, updated=563, + # invalid + match = re.match( + r'[0-9]{4}-[0-9]{2}-[0-9]{2}\s+' + r'[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+\s+' + r'[A-Z]+\s+' + r'(?:[._a-zA-Z0-9()]+:\s+){2}' + r'cdc_fuse_fs_stdout:\s+' + r'((?:DEBUG|INFO|WARNING|ERROR)\s+)?(.*)', line) + if match is None: + continue + log_level = match.group(1) + log_msg = match.group(2) + # A client side log level marks the beginning of a new log line + if log_level: + assert_initialized_line(joined_line) + joined_line = log_msg.rstrip('\r\n') + else: + joined_line += log_msg.rstrip('\r\n') + assert_initialized_line(joined_line) + + def _get_log_file(self): + """Find the newest log file for asset streaming 3.0. + + Returns: + string: Absolute file path for the log file. + """ + log_dir = os.path.join(os.environ['APPDATA'], 'cdc-file-transfer', 'logs') + log_files = glob.glob(os.path.join(log_dir, 'cdc_stream*.log')) + latest_file = max(log_files, key=os.path.getctime) + logging.debug(('CdcStreamConsistencyTest -> _get_log_file:' + ' the current log file is %s'), latest_file) + return latest_file + + def _mount_with_data(self, files, dirs): + """Mount a directory, check the content. + + Args: + files (list of strings): List of relative file paths. + dirs (list of strings): List of relative directory paths. + """ + self._start() + self._test_random_dir_content(files=files, dirs=dirs) + self._assert_cache() + self._assert_cdc_fuse_mounted() + + def test_consistency_fixed_data(self): + """Execute consistency check on a small directory. + + Streamed directory layout: + |-- rootdir + | |-- dir1 + | |-- emptydir2 + | |-- file1_1.txt + | |-- file1_2.txt + | |-- dir2 + | |-- file2_1.txt + | |-- emptydir1 + | |-- file0.txt + """ + files = [ + 'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt', + 'file0.txt' + ] + dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\'] + self._create_test_data(files=files, dirs=dirs) + self._mount_with_data(files, dirs) + + # Recreate test data. + log_file = self._get_log_file() + self._recreate_data(files=files, dirs=dirs) + + # In total there should be 2 checks: + # - For initial manifest when no data was read, + # - Two additional caused by the directory change. + self._assert_inode_consistency([[0, 0], [2, 6], [2, 6]], log_file) + + def _test_consistency_random(self, files, dirs): + """Mount and check consistency, recreate the data and re-check consistency. + + Args: + files (list of strings): List of relative file paths. + dirs (list of strings): List of relative directory paths. + """ + self._mount_with_data(files=files, dirs=dirs) + + # Recreate test data. + log_file = self._get_log_file() + self._recreate_data(files=files, dirs=dirs) + self._assert_consistency(log_file) + + def sha1sum_local_batch(self, files): + """Calculate sha1sum of files in the streamed directory on the workstation. + + Args: + files (list of strings): List of relative file paths to check. + Returns: + string: Concatenated sha1 hashes with relative posix file names. + """ + files.sort() + sha1sum_local = '' + for file in files: + full_path = os.path.join(self.local_base_dir, file.replace('/', '\\')) + sha1sum_local += utils.sha1sum_local(full_path) + file + return sha1sum_local + + def sha1sum_remote_batch(self): + """Calculate sha1sum of files in the streamed directory on the gamelet. + + Returns: + string: Concatenated sha1 hashes with relative posix file names. + """ + sha1sum_remote = utils.get_ssh_command_output( + 'find %s -type f -exec sha1sum \'{}\' + | sort -k 2' % + self.remote_base_dir) + # Example: + # original: d664613df491478095fa201fac435112 + # /tmp/_cdc_stream_test/E8KPXXS1MYKLIQGAI4I6/M0 + # final: d664613df491478095fa201fac435112E8KPXXS1MYKLIQGAI4I6/M0 + sha1sum_remote = sha1sum_remote.replace(self.remote_base_dir, '').replace( + ' ', '').replace('\r', '').replace('\n', '').replace('\t', '') + return sha1sum_remote + + def _test_random_dir_content(self, files, dirs): + """Check the streamed randomly generated directory's content on gamelet. + + Args: + files (list of strings): List of relative file paths to check. + dirs (list of strings): List of relative dir paths to check. + """ + dirs = [directory.replace('\\', '/').rstrip('/') for directory in dirs] + files = [file.replace('\\', '/') for file in files] + + utils.get_ssh_command_output('ls -al %s' % self.remote_base_dir) + self._assert_remote_dir_matches(files + dirs) + if not files: + return + + sha1_local = self.sha1sum_local_batch(files) + sha1_remote = self.sha1sum_remote_batch() + self.assertEqual(sha1_local, sha1_remote) + + def test_consistency_random_100MB_10files_per_dir(self): + """Consistency check: modification, 100MB, 10 files/directory.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, + depth=utils.RANDOM.randint(0, 10), + max_file_num=10) + self._test_consistency_random(files=files, dirs=dirs) + + def test_consistency_random_100MB_exact_1000files_no_dir(self): + """Consistency check: modification, 100MB, 1000 files/root.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, depth=0, min_file_num=1000, max_file_num=1000) + self._test_consistency_random(files=files, dirs=dirs) + + def test_consistency_random_100MB_1000files_per_dir_one_level(self): + """Consistency check: modification, 100MB, max. 1000 files/dir, depth 1.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, depth=1, max_file_num=1000) + self._test_consistency_random(files=files, dirs=dirs) + + def _test_consistency_random_delete(self, files, dirs): + """Remove and recreate a streamed directory. + + Args: + files (list of strings): List of relative file paths. + dirs (list of strings): List of relative directory paths. + """ + self._mount_with_data(files, dirs) + + # Remove directory on workstation => empty directory on gamelet. + utils.get_ssh_command_output(self.ls_cmd) + utils.remove_test_directory(self.local_base_dir) + + self.assertTrue(self._wait_until_remote_dir_matches(files=[], dirs=[])) + self._assert_cdc_fuse_mounted() + + log_file = self._get_log_file() + self._recreate_data(files=files, dirs=dirs) + self._assert_consistency(log_file) + + def test_consistency_random_delete_100MB_10files_per_dir(self): + """Consistency check: removal, 100MB, 10 files/directory.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, + depth=utils.RANDOM.randint(0, 10), + max_file_num=10) + self._test_consistency_random_delete(files=files, dirs=dirs) + + def test_consistency_random_delete_100MB_exact_1000files_no_dir(self): + """Consistency check: removal, 100MB, 1000 files/root.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, depth=0, min_file_num=1000, max_file_num=1000) + self._test_consistency_random_delete(files=files, dirs=dirs) + + def test_consistency_random_delete_100MB_1000files_per_dir_one_level(self): + """Consistency check: removal, 100MB, max. 1000 files/directory, depth 1.""" + files, dirs = self._generate_streamed_dir( + size=100 * 1024 * 1024, depth=1, max_file_num=1000) + self._test_consistency_random_delete(files=files, dirs=dirs) + + +if __name__ == '__main__': + test_base.test_base.main() diff --git a/integration_tests/cdc_stream/directory_test.py b/integration_tests/cdc_stream/directory_test.py new file mode 100644 index 0000000..3105e2f --- /dev/null +++ b/integration_tests/cdc_stream/directory_test.py @@ -0,0 +1,124 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""cdc_stream directory Test.""" + +import os + +from integration_tests.framework import utils +from integration_tests.cdc_stream import test_base + + +class DirectoryTest(test_base.CdcStreamTest): + """cdc_stream test class for modifications of streamed directory.""" + + def _assert_mount_fails(self, directory): + """Check that mounting a directory fails. + + Args: + directory (string): name of a file/directory to be streamed. + """ + with self.assertRaises(Exception): + self._start(directory) + + def test_recreate_streamed_dir(self): + """Survive recreation of a streamed directory. + + Streamed directory layout: + |-- rootdir + | |-- dir1 + | |-- emptydir2 + | |-- file1_1.txt + | |-- file1_2.txt + | |-- dir2 + | |-- file2_1.txt + | |-- emptydir1 + | |-- file0.txt + """ + files = [ + 'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt', + 'file0.txt' + ] + dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\'] + self._create_test_data(files, dirs) + self._start() + self._test_dir_content(files=files, dirs=dirs) + self._assert_cache() + self._assert_cdc_fuse_mounted() + original = utils.get_ssh_command_output(self.ls_cmd) + + # Remove directory on workstation => empty directory on gamelet. + utils.remove_test_directory(self.local_base_dir) + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=[], dirs=[]) + self._assert_cdc_fuse_mounted() + + original = utils.get_ssh_command_output(self.ls_cmd) + # Recreate directory, add files => the content becomes visible again. + self._create_test_data(files, dirs) + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=files, dirs=dirs) + self._assert_cdc_fuse_mounted() + + def test_non_existing_streamed_dir_fail(self): + """Fail if the streamed directory does not exist.""" + streamed_dir = os.path.join(self.local_base_dir, 'non_existing') + self._assert_mount_fails(streamed_dir) + self._test_dir_content(files=[], dirs=[]) + self._assert_cdc_fuse_mounted(success=False) + + def test_streamed_dir_as_file_fail(self): + """Fail if the streamed path is a file.""" + streamed_file = os.path.join(self.local_base_dir, 'file') + utils.create_test_file(streamed_file, 1024) + self._assert_mount_fails(streamed_file) + self._test_dir_content(files=[], dirs=[]) + self._assert_cdc_fuse_mounted(success=False) + + def test_remount_recreated_streamed_dir(self): + """Remounting a directory, which is currently removed, stops streaming session.""" + files = [ + 'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt', + 'file0.txt' + ] + dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\'] + self._create_test_data(files, dirs) + self._start() + self._test_dir_content(files=files, dirs=dirs) + self._assert_cache() + self._assert_cdc_fuse_mounted() + original = utils.get_ssh_command_output(self.ls_cmd) + + # Remove directory on workstation => empty directory on gamelet. + utils.remove_test_directory(self.local_base_dir) + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=[], dirs=[]) + self._assert_cdc_fuse_mounted() + + # Remount for the same directory fails and stops an existing session. + self._assert_mount_fails(self.local_base_dir) + self._test_dir_content(files=[], dirs=[]) + + # Create a new folder and mount -> should succeed. + test_dir = 'Temp' + file_name = 'test_file.txt' + utils.create_test_file( + os.path.join(self.local_base_dir, test_dir, file_name), 100) + self._start(os.path.join(self.local_base_dir, test_dir)) + self._assert_remote_dir_matches([file_name]) + + +if __name__ == '__main__': + test_base.test_base.main() diff --git a/integration_tests/cdc_stream/general_test.py b/integration_tests/cdc_stream/general_test.py new file mode 100644 index 0000000..9d77d54 --- /dev/null +++ b/integration_tests/cdc_stream/general_test.py @@ -0,0 +1,275 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""cdc_stream general test.""" + +import os +import posixpath +import shutil + +from integration_tests.framework import utils +from integration_tests.cdc_stream import test_base + + +class GeneralTest(test_base.CdcStreamTest): + """cdc_stream general test class.""" + + def test_stream(self): + """Stream an existing directory.""" + files = [ + 'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt', + 'file0.txt' + ] + dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\'] + self._create_test_data(files, dirs) + self._start() + self._test_dir_content(files=files, dirs=dirs) + self._assert_cache() + self._assert_cdc_fuse_mounted() + + def test_update_file(self): + """File updates are visible on gamelet.""" + filename = 'file1.txt' + utils.create_test_file(os.path.join(self.local_base_dir, filename), 1024) + self._start() + self._test_dir_content(files=[filename], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # Modify the file, cache should become larger. + utils.create_test_file(os.path.join(self.local_base_dir, filename), 2048) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=[filename], dirs=[]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_add_file(self): + """New file is visible on gamelet.""" + self._start() + self._test_dir_content(files=[], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + # Create a file, cache should become larger. + filename = 'file1.txt' + original = utils.get_ssh_command_output(self.ls_cmd) + + utils.create_test_file(os.path.join(self.local_base_dir, filename), 1024) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=[filename], dirs=[]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_change_mtime(self): + """Change of mtime is visible on gamelet.""" + filename = 'file1.txt' + file_local_path = os.path.join(self.local_base_dir, filename) + utils.create_test_file(file_local_path, 1024) + self._start() + mtime = os.path.getmtime(file_local_path) + self._test_dir_content(files=[filename], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # Change mtime of the file, a new manifest should be created. + utils.change_modified_time(file_local_path) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + + # Cache should become larger. + self._test_dir_content(files=[filename], dirs=[]) + self.assertNotEqual(os.path.getmtime(file_local_path), mtime) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_remove_file(self): + """File removal is visible on gamelet.""" + filename = 'file1.txt' + file_local_path = os.path.join(self.local_base_dir, filename) + utils.create_test_file(file_local_path, 1024) + self._start() + self._test_dir_content(files=[filename], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # After removing a file, the manifest is updated. + utils.remove_test_file(file_local_path) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + + self._test_dir_content(files=[], dirs=[]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + filename = 'file1.txt' + file_local_path = os.path.join(self.local_base_dir, filename) + utils.create_test_file(file_local_path, 1024) + self._start() + self._test_dir_content(files=[filename], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + + # After a file is renamed, the manifest is updated. + renamed_filename = 'file2.txt' + os.rename(file_local_path, + os.path.join(self.local_base_dir, renamed_filename)) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + + self._test_dir_content(files=[renamed_filename], dirs=[]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_add_directory(self): + """A new directory is visible on gamelet.""" + self._start() + self._test_dir_content(files=[], dirs=[]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # Create a directory, cache becomes larger as a new manifest arrived. + directory = 'dir1\\' + dir_local_path = os.path.join(self.local_base_dir, directory) + utils.create_test_directory(dir_local_path) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + self._test_dir_content(files=[], dirs=[directory]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_remove_directory(self): + """A directory removal is visible on gamelet.""" + directory = 'dir1\\' + dir_local_path = os.path.join(self.local_base_dir, directory) + + utils.create_test_directory(dir_local_path) + self._start() + self._test_dir_content(files=[], dirs=[directory]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # After removing a file, the manifest is updated. + utils.remove_test_directory(dir_local_path) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + + self._test_dir_content(files=[], dirs=[]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_rename_directory(self): + """A renamed directory is visible on gamelet.""" + directory = 'dir1\\' + dir_local_path = os.path.join(self.local_base_dir, directory) + + utils.create_test_directory(dir_local_path) + self._start() + self._test_dir_content(files=[], dirs=[directory]) + cache_size = self._get_cache_size_in_bytes() + original = utils.get_ssh_command_output(self.ls_cmd) + + # After removing a file, the manifest us updated. + renamed_directory = 'dir2\\' + os.rename(dir_local_path, + os.path.join(self.local_base_dir, renamed_directory)) + + self.assertTrue(self._wait_until_remote_dir_changed(original)) + + self._test_dir_content(files=[], dirs=[renamed_directory]) + self.assertGreater(self._get_cache_size_in_bytes(), cache_size) + + def test_detect_executables(self): + """Executable bits are propagated to gamelet.""" + # Add an .exe, an ELF file and a .sh file to the streamed directory. + cdc_stream_dir = os.path.dirname(utils.CDC_STREAM_PATH) + exe_filename = os.path.basename(utils.CDC_STREAM_PATH) + elf_filename = 'cdc_fuse_fs' + sh_filename = 'script.sh' + + shutil.copyfile( + os.path.join(cdc_stream_dir, exe_filename), + os.path.join(self.local_base_dir, exe_filename)) + shutil.copyfile( + os.path.join(cdc_stream_dir, elf_filename), + os.path.join(self.local_base_dir, elf_filename)) + with open(os.path.join(self.local_base_dir, sh_filename), 'w') as f: + f.write('#!/path/to/bash\n\nls -al') + + files = [exe_filename, elf_filename, sh_filename] + self._start() + self._test_dir_content(files=files, dirs=[], is_exe=True) + self._assert_cache() + + def test_resend_corrupted_chunks(self): + """Corrupted chunks are recovered.""" + filename = 'file1.txt' + remote_file_path = posixpath.join(self.remote_base_dir, filename) + utils.create_test_file(os.path.join(self.local_base_dir, filename), 1024) + self._start() + + manifest_chunk = utils.get_ssh_command_output('find %s -type f' % + self.cache_dir).rstrip('\r\n') + + # Read the file without caching. + utils.get_ssh_command_output('dd if=%s bs=1K of=/dev/null iflag=direct' % + remote_file_path) + + # Find any data chunk. + data_chunks = utils.get_ssh_command_output('find %s -type f' % + self.cache_dir) + chunk_path = manifest_chunk + for chunk in data_chunks.splitlines(): + if manifest_chunk not in chunk: + chunk_path = chunk.rstrip('\r\n') + break + chunk_data = utils.get_ssh_command_output('cat %s' % chunk_path) + + # Modify the chosen data chunk. + utils.get_ssh_command_output('dd if=/dev/zero of=%s bs=1 count=3' % + chunk_path) + self.assertNotEqual(chunk_data, + utils.get_ssh_command_output('cat %s' % chunk_path)) + + # Read the file again, the chunk should be recovered. + self._test_dir_content(files=[filename], dirs=[]) + self.assertEqual(chunk_data, + utils.get_ssh_command_output('cat %s' % chunk_path), + 'The corrupted chunk was not recreated') + + def test_unicode(self): + """Stream a directory with non-ASCII Unicode paths.""" + streamed_dir = '⛽⛽⛽' + filename = '⛽⛽⛽⛽⛽⛽⛽⛽.dat' + nonascii_local_data_path = os.path.join(self.local_base_dir, streamed_dir, + filename) + nonascii_remote_data_path = posixpath.join(self.remote_base_dir, filename) + utils.create_test_file(nonascii_local_data_path, 1024) + self._start(os.path.join(self.local_base_dir, streamed_dir)) + self._assert_cache() + self.assertTrue( + utils.sha1_matches(nonascii_local_data_path, nonascii_remote_data_path)) + + def test_recovery(self): + """Remount succeeds also if FUSE was killed at the previous execution.""" + files = [ + 'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt', + 'file0.txt' + ] + dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\'] + self._create_test_data(files, dirs) + self._start() + self._test_dir_content(files=files, dirs=dirs) + self._assert_cache() + self._assert_cdc_fuse_mounted() + utils.get_ssh_command_output('killall cdc_fuse_fs') + self._test_dir_content(files=[], dirs=[]) + self._start() + self._test_dir_content(files=files, dirs=dirs) + + +if __name__ == '__main__': + test_base.test_base.main() diff --git a/integration_tests/cdc_stream/test_base.py b/integration_tests/cdc_stream/test_base.py new file mode 100644 index 0000000..c3d8ec7 --- /dev/null +++ b/integration_tests/cdc_stream/test_base.py @@ -0,0 +1,266 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""cdc_stream test.""" + +import datetime +import logging +import os +import posixpath +import tempfile +import time +import subprocess +import unittest + +from integration_tests.framework import utils +from integration_tests.framework import test_base + + +class CdcStreamTest(unittest.TestCase): + """cdc_stream test class.""" + + # Grpc status codes. + NOT_FOUND = 5 + SERVICE_UNAVAILABLE = 14 + + tmp_dir = None + local_base_dir = None + remote_base_dir = '/tmp/_cdc_stream_test/' + cache_dir = '~/.cache/cdc-file-transfer/chunks/' + service_port_arg = None + service_running = False + + # Returns a list of files and directories with mtimes in the remote directory. + # For example, 2021-10-13 07:49:25.512766391 -0700 /tmp/_cdc_stream_test/2.txt + ls_cmd = ('find %s -exec stat --format \"%%y %%n\" \"{}\" \\;') % ( + remote_base_dir) + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + logging.debug('CdcStreamTest -> setUpClass') + + utils.initialize(None, test_base.Flags.binary_path, + test_base.Flags.user_host) + cls.service_port_arg = f'--service-port={test_base.Flags.service_port}' + cls._stop_service() + with tempfile.NamedTemporaryFile() as tf: + cls.config_path = tf.name + + @classmethod + def tearDownClass(cls): + logging.debug('CdcStreamTest -> tearDownClass') + cls._stop_service() + if os.path.exists(cls.config_path): + os.remove(cls.config_path) + + def setUp(self): + """Stops the service, cleans up cache and streamed directory and initializes random.""" + super(CdcStreamTest, self).setUp() + logging.debug('CdcStreamTest -> setUp') + + now_str = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') + self.tmp_dir = tempfile.TemporaryDirectory( + prefix=f'_cdc_stream_test_{now_str}') + self.local_base_dir = self.tmp_dir.name + '\\base\\' + utils.create_test_directory(self.local_base_dir) + + logging.info('Local base dir: "%s"', self.local_base_dir) + logging.info('Remote base dir: "%s"', self.remote_base_dir) + utils.initialize_random() + self._stop(ignore_not_found=True) + self._clean_cache() + + def tearDown(self): + super(CdcStreamTest, self).tearDown() + logging.debug('CdcStreamTest -> tearDown') + self.tmp_dir.cleanup() + + @classmethod + def _start_service(cls, config_json=None): + """Starts the asset streaming service. + + Args: + config_json (string, optional): Config JSON string. Defaults to None. + """ + config_arg = None + if config_json: + with open(cls.config_path, 'wt') as file: + file.write(config_json) + config_arg = f'--config-file={cls.config_path}' + + # Note: Service must be spawned in a background process. + args = ['start-service', config_arg, cls.service_port_arg] + command = [utils.CDC_STREAM_PATH, *filter(None, args)] + + # Workaround issue with unicode logging. + logging.debug( + 'Executing %s ', + ' '.join(command).encode('utf-8').decode('ascii', 'backslashreplace')) + subprocess.Popen(command) + cls.service_running = True + + @classmethod + def _stop_service(cls): + res = utils.run_stream('stop-service', cls.service_port_arg) + if res.returncode != 0: + logging.warn(f'Stopping service failed: {res}') + cls.service_running = False + + def _start(self, local_dir=None): + """Starts streaming the given directory + + Args: + local_dir (string): Directory to stream. Defaults to local_base_dir. + """ + res = utils.run_stream('start', local_dir or self.local_base_dir, + utils.target(self.remote_base_dir), + self.service_port_arg) + self._assert_stream_success(res) + + def _stop(self, ignore_not_found=False): + """Stops streaming to the target + + Args: + local_dir (string): Directory to stream. Defaults to local_base_dir. + """ + if not self.service_running: + return + res = utils.run_stream('stop', utils.target(self.remote_base_dir), + self.service_port_arg) + if ignore_not_found and res.returncode == self.NOT_FOUND: + return + self._assert_stream_success(res) + + def _assert_stream_success(self, res): + """Asserts if the return code is 0 and outputs return message with args.""" + self.assertEqual(res.returncode, 0, 'Return value is ' + str(res)) + + def _assert_remote_dir_matches(self, file_list): + """Asserts that the remote directory matches the list of files and directories. + + Args: + file_list (list of strings): List of relative paths to check. + """ + found = utils.get_sorted_files(self.remote_base_dir, '"*"') + expected = sorted(['./' + f for f in file_list]) + self.assertListEqual(found, expected) + + def _get_cache_size_in_bytes(self): + """Returns the asset streaming cache size in bytes. + + Returns: + bool: Cache size in bytes. + """ + result = utils.get_ssh_command_output('du -sb %s | awk \'{print $1}\'' % + self.cache_dir) + logging.info(f'Cache capacity is {int(result)}') + return int(result) + + def _assert_cache(self): + """Asserts that the asset streaming cache contains some data.""" + cache_size = self._get_cache_size_in_bytes() + # On Linux, an empty directory occupies 4KB. + self.assertTrue(int(cache_size) >= 4096) + self.assertGreater( + int(utils.get_ssh_command_output('ls %s | wc -l' % self.cache_dir)), 0) + + def _assert_cdc_fuse_mounted(self, success=True): + """Asserts that CDC FUSE is appropriately mounted.""" + logging.info(f'Asserting that FUSE is {"" if success else "not "}mounted') + result = utils.get_ssh_command_output('cat /etc/mtab | grep fuse') + if success: + self.assertIn(f'{self.remote_base_dir[:-1]} fuse.', result) + else: + self.assertNotIn(f'{self.remote_base_dir[:-1]} fuse.', result) + + def _clean_cache(self): + """Removes all data from the asset streaming caches.""" + logging.info(f'Clearing cache') + utils.get_ssh_command_output('rm -rf %s' % + posixpath.join(self.cache_dir, '*')) + cache_dir = os.path.join(os.environ['APPDATA'], 'cdc-file-transfer', + 'chunks') + utils.remove_test_directory(cache_dir) + + def _create_test_data(self, files, dirs): + """Create test data locally. + + Args: + files (list of strings): List of relative file paths to create. + dirs (list of strings): List of relative dir paths to create. + """ + logging.info( + f'Creating test data with {len(files)} files and {len(dirs)} dirs') + for directory in dirs: + utils.create_test_directory(os.path.join(self.local_base_dir, directory)) + for file in files: + utils.create_test_file(os.path.join(self.local_base_dir, file), 1024) + + def _wait_until_remote_dir_changed(self, original, counter=20): + """Wait until the directory content has changed. + + Args: + original (string): The original file list of the remote directory. + counter (int): The number of retries. + Returns: + bool: Whether the content of the remote directory has changed. + """ + logging.info(f'Waiting until remote dir changes') + for _ in range(counter): + if utils.get_ssh_command_output(self.ls_cmd) != original: + return True + time.sleep(0.1) + logging.info(f'Still waiting...') + return False + + def _test_dir_content(self, files, dirs, is_exe=False): + """Check the streamed directory's content on gamelet. + + Args: + files (list of strings): List of relative file paths to check. + dirs (list of strings): List of relative dir paths to check. + is_exe (bool): Flag which identifies whether files are executables. + """ + logging.info( + f'Testing dir content with {len(files)} files and {len(dirs)} dirs') + dirs = [directory.replace('\\', '/').rstrip('/') for directory in dirs] + files = [file.replace('\\', '/') for file in files] + + # Read the content of the directory once to load some data. + utils.get_ssh_command_output('ls -al %s' % self.remote_base_dir) + self._assert_remote_dir_matches(files + dirs) + if not dirs and not files: + return + file_list = list() + mapping = dict() + for file in files: + full_name = posixpath.join(self.remote_base_dir, file) + self.assertTrue( + utils.sha1_matches( + os.path.join(self.local_base_dir, file), full_name)) + file_list.append(full_name) + if is_exe: + mapping[full_name] = '-rwxr-xr-x' + else: + mapping[full_name] = '-rw-r--r--' + for directory in dirs: + full_name = posixpath.join(self.remote_base_dir, directory) + file_list.append(full_name) + mapping[full_name] = 'drwxr-xr-x' + + ls_res = utils.get_ssh_command_output('ls -ld %s' % ' '.join(file_list)) + for line in ls_res.splitlines(): + self.assertIn(mapping[list(filter(None, line.split(' ')))[8]], line) diff --git a/integration_tests/framework/test_base.py b/integration_tests/framework/test_base.py index cec88cd..0e87c33 100644 --- a/integration_tests/framework/test_base.py +++ b/integration_tests/framework/test_base.py @@ -27,7 +27,7 @@ from integration_tests.framework import test_runner class Flags(object): binary_path = None user_host = None - ssh_port = 22 + service_port = 0 def main(): @@ -35,17 +35,17 @@ def main(): parser.add_argument('--binary_path', help='Target [user@]host', required=True) parser.add_argument('--user_host', help='Target [user@]host', required=True) parser.add_argument( - '--ssh_port', + '--service_port', type=int, - help='SSH port for connecting to the host', - default=22) + help='Asset streaming service port', + default=44432) parser.add_argument('--log_file', help='Log file path') # Capture all remaining arguments to pass to unittest.main(). args, unittest_args = parser.parse_known_args() Flags.binary_path = args.binary_path Flags.user_host = args.user_host - Flags.ssh_port = args.ssh_port + Flags.service_port = args.service_port # Log to STDERR log_format = ('%(levelname)-8s%(asctime)s ' diff --git a/integration_tests/framework/utils.py b/integration_tests/framework/utils.py index 92d208c..783e508 100644 --- a/integration_tests/framework/utils.py +++ b/integration_tests/framework/utils.py @@ -24,8 +24,10 @@ import shutil import string import subprocess import time +import sys CDC_RSYNC_PATH = None +CDC_STREAM_PATH = None USER_HOST = None SHA1_LEN = 40 @@ -33,11 +35,12 @@ SHA1_BUF_SIZE = 65536 RANDOM = random.Random() -def initialize(cdc_rsync_path, user_host): +def initialize(cdc_rsync_path, cdc_stream_path, user_host): """Sets global variables.""" - global CDC_RSYNC_PATH, USER_HOST + global CDC_RSYNC_PATH, CDC_STREAM_PATH, USER_HOST CDC_RSYNC_PATH = cdc_rsync_path + CDC_STREAM_PATH = cdc_stream_path USER_HOST = user_host @@ -66,6 +69,11 @@ def _remove_carriage_return_lines(text): return ret +def target(dir): + """Prepends user@host: to dir.""" + return USER_HOST + ":" + dir + + def run_rsync(*args): """Runs cdc_rsync with given args. @@ -86,7 +94,7 @@ def run_rsync(*args): args_list = list(filter(None, args)) for n in range(len(args_list) - 1, 0, -1): if args_list[n][0] != '-' and not ':' in args_list[n]: - args_list[n] = USER_HOST + ":" + args_list[n] + args_list[n] = target(args_list[n]) break command = [CDC_RSYNC_PATH, *args_list] @@ -103,6 +111,25 @@ def run_rsync(*args): return res +def run_stream(*args): + """Runs cdc_stream with given args. + + Args: + *args (string): cdc_stream arguments. + + Returns: + CompletedProcess: cdc_stream process info with exit code and stdout/stderr. + """ + + command = [CDC_STREAM_PATH, *filter(None, args)] + + # Workaround issue with unicode logging. + logging.debug( + 'Executing %s ', + ' '.join(command).encode('utf-8').decode('ascii', 'backslashreplace')) + return subprocess.run(command) + + def files_count_is(cdc_rsync_res, missing=0, missing_dir=0, @@ -317,15 +344,3 @@ def get_sorted_files(remote_dir, pattern='"*.[t|d]*"'): found = sorted( filter(lambda item: item and item != '.', find_res.split('\r\n'))) return found - - -def write_file(path, content): - """Writes a file and creates the parent directory if it does not exist yet. - - Args: - path (string): File path to create. - content (string): File content. - """ - pathlib.Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True) - with open(path, 'wt') as file: - file.write(content)