diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py index 4c074ef3e35cdbfe35baa2926609f7b988f48d9d..71a8ed498c59f37b3a4260195be3dbb0323fa6da 100644 --- a/swh/model/tests/test_from_disk.py +++ b/swh/model/tests/test_from_disk.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from collections import defaultdict +from functools import partial import os import tarfile import tempfile @@ -25,6 +26,121 @@ from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex TEST_DATA = os.path.join(os.path.dirname(__file__), "data") +def mk_tree(root: bytes, tree_desc: bytes): + """Create a directory tree under `root` with content generated from `tree_desc` + + tree_desc is a simple textual representation of the tree structure; each + line is an element of the directory tree structure, a trailing '/' defines + a directory, otherwise it's an (empty) file; a symlink is specified with a + ' -> path' in the description. If the destination path starts with a slash ('/') + it is considered as absolute, ie. relative to the 'root' directory; e.g. + + foo/bar/baz.txt + foo/baz/ + foo/bar/toto -> baz.txt + foo/abstoto -> /foo/bar/baz.txt + + will generate a directory structure like: + + . + └── foo + ├── abstoto -> bar/baz.txt + ├── bar + │  ├── baz.txt + │  └── toto -> baz.txt + └── baz + + The root directory must already exist. + + """ + if not os.path.isdir(root): + raise EnvironmentError("The root directory must exists and be writable") + + symlinks = [] + for entry in tree_desc.splitlines(): + entry = entry.strip() + if not entry or entry.startswith(b"#"): + continue + entry = entry.strip().lstrip(b"/") + if b".." in entry: + raise ValueError(".. in path descr is forbidden...") + if b"->" in entry: + dst, src = entry.split(b"->") + symlinks.append((src.strip(), dst.strip())) + continue + path = os.path.join(root, entry) + if entry.endswith(b"/"): + os.makedirs(path, exist_ok=True) + else: + dirname = os.path.dirname(path) + os.makedirs(dirname, exist_ok=True) + open(path, "a") + + # now create symlinks + while symlinks: + src, dst = symlinks.pop(0) + fp_dst = os.path.join(root, dst) + if src.startswith(b"/"): + rp_src = src.lstrip(b"/") + else: + rp_src = os.path.join(os.path.dirname(dst), src) + fp_src = os.path.join(root, rp_src) + if not os.path.exists(fp_src): + symlinks.append((src, dst)) + continue + # create the parent directory of the dst, if need be + dirname = os.path.dirname(fp_dst) + os.makedirs(dirname, exist_ok=True) + + rp_src = os.path.relpath(fp_src, os.path.dirname(fp_dst)) + os.symlink(rp_src, fp_dst) + + +def test_mk_tree(tmpdir): + desc = b""" + foo/bar/baz.txt + foo/baz/ + foo/bar/toto -> baz.txt + foo/abstoto -> /foo/bar/baz.txt + baz/baz/baz/ + # prefix / is ignored + /bar/a_file.txt + # symlink to a not yet defined target is ok + bar/baz/lnk -> /foo/bar/later.txt + foo/bar/later.txt + # symlink to another symlink is ok + bar/baz/lnk2 -> /foo/bar/toto + # even if the src of the symlink is defined after the dst + bar/baz/lnk3 -> /foo/bar/toto2 + foo/bar/toto2 -> later.txt + + """ + from os.path import isdir, isfile, islink, realpath + + join = partial(os.path.join, tmpdir) + + mk_tree(os.fsencode(tmpdir), desc) + + assert isfile(join("foo/bar/baz.txt")) + assert isfile(join("foo/bar/later.txt")) + assert isfile(join("bar/a_file.txt")) + + assert isdir(join("baz/baz/baz")) + + assert islink(join("foo/bar/toto")) + assert realpath(join("foo/bar/toto")) == join("foo/bar/baz.txt") + assert islink(join("foo/bar/toto2")) + assert realpath(join("foo/bar/toto2")) == join("foo/bar/later.txt") + assert islink(join("foo/abstoto")) + assert realpath(join("foo/abstoto")) == join("foo/bar/baz.txt") + assert islink(join("bar/baz/lnk")) + assert realpath(join("bar/baz/lnk")) == join("foo/bar/later.txt") + assert islink(join("bar/baz/lnk2")) + assert realpath(join("bar/baz/lnk2")) == join("foo/bar/baz.txt") + assert islink(join("bar/baz/lnk3")) + assert realpath(join("bar/baz/lnk3")) == join("foo/bar/later.txt") + + class ModeToPerms(unittest.TestCase): def setUp(self): super().setUp() @@ -867,10 +983,14 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): def test_directory_entry_order(self): with tempfile.TemporaryDirectory() as dirname: dirname = os.fsencode(dirname) - open(os.path.join(dirname, b"foo."), "a") - open(os.path.join(dirname, b"foo0"), "a") - os.mkdir(os.path.join(dirname, b"foo")) - + mk_tree( + dirname, + b""" + /foo. + /foo0 + /foo/ + """, + ) directory = Directory.from_disk(path=dirname) assert [entry["name"] for entry in directory.entries] == [ @@ -885,11 +1005,15 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): with tempfile.TemporaryDirectory() as dirname: dirname = os.fsencode(dirname) - open(os.path.join(dirname, b"foofile"), "a") - open(os.path.join(dirname, b"file"), "a") - os.mkdir(os.path.join(dirname, b"foo")) - os.mkdir(os.path.join(dirname, b"baz")) - os.mkdir(os.path.join(dirname, b"foo", b"foo")) + mk_tree( + dirname, + b""" + /foofile + /file + /foo/foo/ + /baz/ + """, + ) # No filters directory = Directory.from_disk(path=dirname) @@ -922,19 +1046,21 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): """exclude patterns""" with tempfile.TemporaryDirectory() as dirname: dirname = os.fsencode(dirname) - open(os.path.join(dirname, b"foofile"), "a") - open(os.path.join(dirname, b"file"), "a") - os.mkdir(os.path.join(dirname, b"foo")) - os.mkdir(os.path.join(dirname, b"baz")) - os.mkdir(os.path.join(dirname, b"foo", b"foo")) - os.mkdir(os.path.join(dirname, b"excluded_dir")) - os.mkdir(os.path.join(dirname, b"excluded_dir\x96")) - open(os.path.join(dirname, b"excluded_dir", b"file"), "a") - open(os.path.join(dirname, b"excluded_dir\x96", b"file"), "a") - os.mkdir(os.path.join(dirname, b"excluded_dir2")) - os.mkdir(os.path.join(dirname, b"excluded_dir2\x96")) - os.mkdir(os.path.join(dirname, b"foo", b"excluded_dir")) - os.mkdir(os.path.join(dirname, b"foo", b"excluded_dir2\x96")) + mk_tree( + dirname, + b""" + /foofile + /file + /foo/foo/ + /baz/ + /excluded_dir/file + /excluded_dir\x96/file + /excluded_dir2/ + /excluded_dir2\x96/ + /foo/excluded_dir/ + /foo/excluded_dir2\x96/ + """, + ) # no filter directory = Directory.from_disk(path=dirname)