Files
Lutz Justen a8b948b323 [cdc_rsync] Add initial support for Windows (#51)
Adds a ServerArch class whose job it is to encapsulate differences
between Windows and Linux cdc_rsync_servers. It detects the type
based on a heuristic in the destination path. This is not fool proof
and will probably require further work, like falling back to the other
type if the detected one doesn't work.

Uses the ServerArch class to determine the different commands to start
the server and to deploy the server.

Note that the functionality is not well tested on Windows yet, but
copying plain files works.
2023-01-17 13:34:14 +01:00

497 lines
18 KiB
Python

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""cdc_stream consistency test."""
import glob
import logging
import os
import queue
import re
import string
import time
from integration_tests.framework import utils
from integration_tests.cdc_stream import test_base
class ConsistencyTest(test_base.CdcStreamTest):
"""cdc_stream test class for CDC FUSE consistency."""
@classmethod
def setUpClass(cls):
super().setUpClass()
logging.debug('ConsistencyTest -> setUpClass')
config_json = '{\"debug\":1, \"check\":1, \"verbosity\":3}'
cls._start_service(config_json)
def _wait_until_remote_dir_matches(self, files, dirs, counter=20):
"""Wait until the directory content has changed.
Args:
files (list of strings): List of relative file paths.
dirs (list of strings): List of relative directory paths.
counter (int): The number of retries.
Returns:
bool: Whether the content of the remote directory matches the local one.
"""
dirs = [directory.replace('\\', '/').rstrip('/') for directory in dirs]
files = [file.replace('\\', '/') for file in files]
sha1_local = self.sha1sum_local_batch(files)
for _ in range(counter):
utils.get_ssh_command_output('ls -al %s' % self.remote_base_dir)
found = utils.get_sorted_files(self.remote_base_dir, '"*"')
expected = sorted(['./' + f for f in files + dirs])
if found == expected:
if not files:
return True
sha1_remote = self.sha1sum_remote_batch()
if sha1_local == sha1_remote:
return True
time.sleep(1)
return False
def _generate_random_name(self, depth):
"""Generate a random name for a file/directory name.
Args:
depth (int): Depth of the directory structure.
Returns:
string: Random string.
"""
max_path_len = 260 # Windows limitation for a path length.
# 4 symbols are reserved for file extension .txt.
max_path_len_no_root = max_path_len - len(self.local_base_dir) - 4
# As a Windows path is limited to 260 symbols it is necesary to consider the
# depth of the full path.
# +1 is for the last file, -2: for a path separator + down rounding.
max_file_name_len = int(max_path_len_no_root / (depth + 1) - 2)
length = utils.RANDOM.randint(1, max_file_name_len)
# Consider only upper case and digits, as 1.txt and 1.TXT result in 1 file
# on Windows.
name = ''.join(
utils.RANDOM.choice(string.ascii_uppercase + string.digits)
for i in range(length))
return name
def _generate_dir_list(self, depth, num_leaf_dirs):
"""Generate a list of directories.
Args:
depth (int): Depth of the directory structure.
num_leaf_dirs (int): How many leaf directories should be generated.
Returns:
queue of list of strings: Relative paths of directories to be created.
"""
dirs = queue.Queue(maxsize=0)
if depth == 0:
return dirs
top_num = utils.RANDOM.randint(1, 1 + num_leaf_dirs)
for _ in range(top_num):
directory = self._generate_random_name(depth)
dirs.put([directory])
new_dirs = queue.Queue(maxsize=0)
for _ in range(depth - 1):
while not dirs.empty():
curr_set = dirs.get()
missing_dirs = num_leaf_dirs - new_dirs.qsize() - dirs.qsize()
if missing_dirs > 0:
num_dir = utils.RANDOM.randint(1, missing_dirs)
for _ in range(num_dir):
name = self._generate_random_name(depth)
path = curr_set.copy()
path.append(name)
new_dirs.put(path)
else:
new_dirs.put(curr_set)
new_dirs, dirs = dirs, new_dirs
for _ in range(num_leaf_dirs - dirs.qsize()):
dirs.put([self._generate_random_name(depth)])
return dirs
def _generate_files(self, dirs, size, depth, min_file_num, max_file_num):
"""Create files in given directories.
Args:
dirs (set of strings): Relative paths for directories.
size (int): Total size of files to be created.
depth (int): Depth of the directory hierarchy.
min_file_num (int): Minimal number of files, which can be created in a
directory.
max_file_num (int): Maximal number of files, which can be created in a
directory.
Returns:
list of strings: Set of relative paths of created files.
"""
files = set()
for directory in dirs:
number_of_files = utils.RANDOM.randint(min_file_num, max_file_num)
for _ in range(number_of_files):
# Add a file extension not to compare if a similar directory exists.
file_name = self._generate_random_name(depth=depth) + '.txt'
if file_name not in files:
file_path = os.path.join(directory, file_name)
files.add(file_path)
# Do not create files larger than 1 GB.
file_size = utils.RANDOM.randint(0, min(1024 * 1024 * 1024, size))
size -= file_size
utils.create_test_file(
os.path.join(self.local_base_dir, file_path), file_size)
if size <= 0:
return files
# Create files for the remaining size.
if size > 0:
number_of_files = utils.RANDOM.randint(min_file_num, max_file_num)
for _ in range(number_of_files):
file_name = self._generate_random_name(depth)
files.add(file_name)
utils.create_test_file(
os.path.join(self.local_base_dir, file_name),
int(size / number_of_files))
return files
def _generate_dir_paths(self, dirs):
"""Create directories.
Args:
dirs (queue of lists of strings): Relative paths for directories.
Returns:
set of strings: Relative paths for created directories.
"""
paths = set()
for dir_set in dirs.queue:
curr_path = ''
for name in dir_set:
# It is necessary to add the last separator.
# Otherwise, the leaf directory will not be created.
curr_path = os.path.join(curr_path, name) + '\\'
paths.add(curr_path)
utils.create_test_directory(os.path.join(self.local_base_dir, curr_path))
return paths
def _generate_streamed_dir(self, size, depth, min_file_num=1, max_file_num=1):
"""Generate a streamed directory.
Args:
size (int): Total size of files to create in the directory.
depth (int): Depth of the directory hierarchy.
min_file_num (int): Minimal number of files, which can be created in a
single directory.
max_file_num (int): Maximal number of files, which can be created in a
single directory.
Returns:
two sets of strings: Relative paths for created files and directories.
"""
num_leaf_dirs = 0
if depth > 0:
num_leaf_dirs = utils.RANDOM.randint(0, 100)
logging.debug(('CdcStreamConsistencyTest -> _generate_streamed_dir'
' of depth %i and number of leaf directories %i'), depth,
num_leaf_dirs)
dirs = self._generate_dir_paths(
self._generate_dir_list(depth, num_leaf_dirs))
files = self._generate_files(
dirs=dirs,
size=size,
depth=depth,
min_file_num=min_file_num,
max_file_num=max_file_num)
logging.debug(
('CdcStreamConsistencyTest -> _generate_streamed_dir: generated'
' %i files, %i directories, depth %i'), len(files), len(dirs), depth)
return files, dirs
def _recreate_data(self, files, dirs):
"""Recreate test data and check that it can be read on the remote instance.
Args:
files (list of strings): List of relative file paths.
dirs (list of strings): List of relative directory paths.
"""
logging.debug('CdcStreamConsistencyTest -> _recreate_data')
self._create_test_data(files=files, dirs=dirs)
self.assertTrue(self._wait_until_remote_dir_matches(files=files, dirs=dirs))
self._assert_cdc_fuse_mounted()
def _assert_inode_consistency_line(self, line, updated_proto=0, updated=0):
"""Assert if the numbers of inodes specific states are correct.
Args:
line (string): Statement like Initialized=X, updated_proto=X, updated=X,
invalid=X.
updated_proto(int): Expected number of inodes whose protos were updated.
updated(int): Expected number of inodes whose contents were updated.
"""
self.assertIn(('Initialized=0, updated_proto=%i,'
' updated=%i, invalid=0') % (updated_proto, updated), line)
def _assert_consistency_line(self, line):
"""Assert if there are no invalid and initialized nodes.
Args:
line (string): Statement like Initialized=X, updated_proto=X, updated=X,
invalid=X.
"""
self.assertIn(('Initialized=0,'), line)
self.assertIn(('invalid=0'), line)
def _assert_inode_consistency(self, update_map, log_file):
"""Assert that the amount of updated inodes is correct.
Args:
update_map (dict): Mapping of inodes' types to their amount.
log_file (string): Absolute path to the log file.
"""
with open(log_file) as file:
success_count = 0
for line in file:
if 'Initialized=' in line:
self._assert_inode_consistency_line(
line,
updated_proto=update_map[success_count][0],
updated=update_map[success_count][1])
if 'FUSE consistency check succeeded' in line:
success_count += 1
self.assertNotIn('FUSE consistency check:', line)
def _assert_consistency(self, log_file):
"""Assert that there is no error consistency messages in the log.
Args:
log_file (string): Absolute path to the log file.
"""
def assert_initialized_line(line):
self.assertNotIn('FUSE consistency check:', line)
if 'Initialized=' in line:
self._assert_consistency_line(line)
joined_line = ''
with open(log_file) as file:
for line in file:
# Matches log lines with a log level
# 2022-01-23 05:18:12.401 DEBUG process_win.cc(546): LogOutput():
# cdc_fuse_fs_stdout: DEBUG cdc_fuse_fs.cc(1165):
# CheckFUSEConsistency(): Initialized=
# Matches log lines without log level
# 2022-01-23 05:18:12.401 INFO process_win.cc(536): LogOutput():
# cdc_fuse_fs_stdout: 0, updated_proto=437, updated=563,
# invalid
match = re.match(
r'[0-9]{4}-[0-9]{2}-[0-9]{2}\s+'
r'[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]+\s+'
r'[A-Z]+\s+'
r'(?:[._a-zA-Z0-9()]+:\s+){2}'
r'cdc_fuse_fs_stdout:\s+'
r'((?:DEBUG|INFO|WARNING|ERROR)\s+)?(.*)', line)
if match is None:
continue
log_level = match.group(1)
log_msg = match.group(2)
# A client side log level marks the beginning of a new log line
if log_level:
assert_initialized_line(joined_line)
joined_line = log_msg.rstrip('\r\n')
else:
joined_line += log_msg.rstrip('\r\n')
assert_initialized_line(joined_line)
def _get_log_file(self):
"""Find the newest log file for asset streaming 3.0.
Returns:
string: Absolute file path for the log file.
"""
log_dir = os.path.join(os.environ['APPDATA'], 'cdc-file-transfer', 'logs')
log_files = glob.glob(os.path.join(log_dir, 'cdc_stream*.log'))
latest_file = max(log_files, key=os.path.getctime)
logging.debug(('CdcStreamConsistencyTest -> _get_log_file:'
' the current log file is %s'), latest_file)
return latest_file
def _mount_with_data(self, files, dirs):
"""Mount a directory, check the content.
Args:
files (list of strings): List of relative file paths.
dirs (list of strings): List of relative directory paths.
"""
self._start()
self._test_random_dir_content(files=files, dirs=dirs)
self._assert_cache()
self._assert_cdc_fuse_mounted()
def test_consistency_fixed_data(self):
"""Execute consistency check on a small directory.
Streamed directory layout:
|-- rootdir
| |-- dir1
| |-- emptydir2
| |-- file1_1.txt
| |-- file1_2.txt
| |-- dir2
| |-- file2_1.txt
| |-- emptydir1
| |-- file0.txt
"""
files = [
'dir1\\file1_1.txt', 'dir1\\file1_2.txt', 'dir2\\file2_1.txt',
'file0.txt'
]
dirs = ['dir1\\emptydir2\\', 'emptydir1\\', 'dir1\\', 'dir2\\']
self._create_test_data(files=files, dirs=dirs)
self._mount_with_data(files, dirs)
# Recreate test data.
log_file = self._get_log_file()
self._recreate_data(files=files, dirs=dirs)
# In total there should be 2 checks:
# - For initial manifest when no data was read,
# - Two additional caused by the directory change.
self._assert_inode_consistency([[0, 0], [2, 6], [2, 6]], log_file)
def _test_consistency_random(self, files, dirs):
"""Mount and check consistency, recreate the data and re-check consistency.
Args:
files (list of strings): List of relative file paths.
dirs (list of strings): List of relative directory paths.
"""
self._mount_with_data(files=files, dirs=dirs)
# Recreate test data.
log_file = self._get_log_file()
self._recreate_data(files=files, dirs=dirs)
self._assert_consistency(log_file)
def sha1sum_local_batch(self, files):
"""Calculate sha1sum of files in the streamed directory on the workstation.
Args:
files (list of strings): List of relative file paths to check.
Returns:
string: Concatenated sha1 hashes with relative posix file names.
"""
files.sort()
sha1sum_local = ''
for file in files:
full_path = os.path.join(self.local_base_dir, file.replace('/', '\\'))
sha1sum_local += utils.sha1sum_local(full_path) + file
return sha1sum_local
def sha1sum_remote_batch(self):
"""Calculate sha1sum of files in the remote streamed directory.
Returns:
string: Concatenated sha1 hashes with relative posix file names.
"""
sha1sum_remote = utils.get_ssh_command_output(
'find %s -type f -exec sha1sum \'{}\' + | sort -k 2' %
self.remote_base_dir)
# Example:
# original: d664613df491478095fa201fac435112
# /tmp/_cdc_stream_test/E8KPXXS1MYKLIQGAI4I6/M0
# final: d664613df491478095fa201fac435112E8KPXXS1MYKLIQGAI4I6/M0
sha1sum_remote = sha1sum_remote.replace(self.remote_base_dir, '').replace(
' ', '').replace('\r', '').replace('\n', '').replace('\t', '')
return sha1sum_remote
def _test_random_dir_content(self, files, dirs):
"""Check the streamed randomly generated remote directory's content.
Args:
files (list of strings): List of relative file paths to check.
dirs (list of strings): List of relative dir paths to check.
"""
dirs = [directory.replace('\\', '/').rstrip('/') for directory in dirs]
files = [file.replace('\\', '/') for file in files]
utils.get_ssh_command_output('ls -al %s' % self.remote_base_dir)
self._assert_remote_dir_matches(files + dirs)
if not files:
return
sha1_local = self.sha1sum_local_batch(files)
sha1_remote = self.sha1sum_remote_batch()
self.assertEqual(sha1_local, sha1_remote)
def test_consistency_random_100MB_10files_per_dir(self):
"""Consistency check: modification, 100MB, 10 files/directory."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024,
depth=utils.RANDOM.randint(0, 10),
max_file_num=10)
self._test_consistency_random(files=files, dirs=dirs)
def test_consistency_random_100MB_exact_1000files_no_dir(self):
"""Consistency check: modification, 100MB, 1000 files/root."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024, depth=0, min_file_num=1000, max_file_num=1000)
self._test_consistency_random(files=files, dirs=dirs)
def test_consistency_random_100MB_1000files_per_dir_one_level(self):
"""Consistency check: modification, 100MB, max. 1000 files/dir, depth 1."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024, depth=1, max_file_num=1000)
self._test_consistency_random(files=files, dirs=dirs)
def _test_consistency_random_delete(self, files, dirs):
"""Remove and recreate a streamed directory.
Args:
files (list of strings): List of relative file paths.
dirs (list of strings): List of relative directory paths.
"""
self._mount_with_data(files, dirs)
# Remove directory on workstation => empty remote directory.
utils.get_ssh_command_output(self.ls_cmd)
utils.remove_test_directory(self.local_base_dir)
self.assertTrue(self._wait_until_remote_dir_matches(files=[], dirs=[]))
self._assert_cdc_fuse_mounted()
log_file = self._get_log_file()
self._recreate_data(files=files, dirs=dirs)
self._assert_consistency(log_file)
def test_consistency_random_delete_100MB_10files_per_dir(self):
"""Consistency check: removal, 100MB, 10 files/directory."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024,
depth=utils.RANDOM.randint(0, 10),
max_file_num=10)
self._test_consistency_random_delete(files=files, dirs=dirs)
def test_consistency_random_delete_100MB_exact_1000files_no_dir(self):
"""Consistency check: removal, 100MB, 1000 files/root."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024, depth=0, min_file_num=1000, max_file_num=1000)
self._test_consistency_random_delete(files=files, dirs=dirs)
def test_consistency_random_delete_100MB_1000files_per_dir_one_level(self):
"""Consistency check: removal, 100MB, max. 1000 files/directory, depth 1."""
files, dirs = self._generate_streamed_dir(
size=100 * 1024 * 1024, depth=1, max_file_num=1000)
self._test_consistency_random_delete(files=files, dirs=dirs)
if __name__ == '__main__':
test_base.test_base.main()