diff --git a/swh/model/cli.py b/swh/model/cli.py index abcabfe17ae2215ace4e8604fed3d19f8cab5785..6508602110672049403ed507b5274549527d14dc 100644 --- a/swh/model/cli.py +++ b/swh/model/cli.py @@ -70,17 +70,18 @@ def swhid_of_file_content(data) -> CoreSWHID: def model_of_dir( - path: bytes, exclude_patterns: Optional[Iterable[bytes]] = None + path: bytes, + exclude_patterns: Optional[Iterable[bytes]] = None, ) -> Directory: - from swh.model.from_disk import accept_all_directories, ignore_directories_patterns + from swh.model.from_disk import accept_all_paths, ignore_directories_patterns - dir_filter = ( + path_filter = ( ignore_directories_patterns(path, exclude_patterns) if exclude_patterns - else accept_all_directories + else accept_all_paths ) - return Directory.from_disk(path=path, dir_filter=dir_filter) + return Directory.from_disk(path=path, path_filter=path_filter) def swhid_of_dir( diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py index 058e77fa2b39e1293b404c91f1747049add5bfb6..798851a23514c85a44cca0dd6c816153c6ae7485 100644 --- a/swh/model/from_disk.py +++ b/swh/model/from_disk.py @@ -17,7 +17,18 @@ import glob import os import re import stat -from typing import Any, Iterable, Iterator, List, Optional, Pattern, Tuple +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Pattern, + Tuple, +) +import warnings import attr from attrs_strict import type_validator @@ -239,7 +250,9 @@ class Content(MerkleLeaf): return DiskBackedContent.from_dict(data) -def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) -> bool: +def accept_all_directories( + dirpath: bytes, dirname: bytes, entries: Optional[Iterable[Any]] +) -> bool: """Default filter for :func:`Directory.from_disk` accepting all directories @@ -247,11 +260,22 @@ def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) - dirname (bytes): directory name entries (list): directory entries """ + warnings.warn( + "`accept_all_directories` is deprecated, use `accept_all_paths`", + DeprecationWarning, + ) + return True + + +def accept_all_paths( + path: bytes, name: bytes, entries: Optional[Iterable[Any]] +) -> bool: + """Default filter for :func:`Directory.from_disk` accepting all paths""" return True def ignore_empty_directories( - dirpath: str, dirname: str, entries: Iterable[Any] + dirpath: bytes, dirname: bytes, entries: Optional[Iterable[Any]] ) -> bool: """Filter for :func:`directory_to_objects` ignoring empty directories @@ -261,6 +285,9 @@ def ignore_empty_directories( Returns: True if the directory is not empty, false if the directory is empty """ + if entries is None: + # Files are not ignored + return True return bool(entries) @@ -285,6 +312,9 @@ def ignore_named_directories(names, *, case_sensitive=True): names: Iterable[Any] = names, case_sensitive: bool = case_sensitive, ): + if entries is None: + # Files are not ignored + return True if case_sensitive: return dirname not in names else: @@ -413,23 +443,43 @@ class Directory(MerkleNode): @classmethod def from_disk( - cls, *, path, dir_filter=accept_all_directories, max_content_length=None - ): + cls, + *, + path: bytes, + path_filter: Callable[ + [bytes, bytes, Optional[List[bytes]]], bool + ] = accept_all_paths, + dir_filter: Optional[ + Callable[[bytes, bytes, Optional[List[bytes]]], bool] + ] = None, + max_content_length: Optional[int] = None, + ) -> "Directory": """Compute the Software Heritage objects for a given directory tree Args: path (bytes): the directory to traverse data (bool): whether to add the data to the content objects save_path (bool): whether to add the path to the content objects - dir_filter (function): a filter to ignore some directories by - name or contents. Takes two arguments: dirname and entries, and + path_filter (function): a filter to ignore some paths. + Takes three arguments: `path`, `name` and `entries`. + `entries` is `None` for files, and a (possibly empty) list of names + for directories. + Returns True if the path should be added, False if the + path should be ignored. + dir_filter (DEPRECATED, function): a filter to ignore some directories + by name or contents. Takes two arguments: dirname and entries, and returns True if the directory should be added, False if the directory should be ignored. max_content_length (Optional[int]): if given, all contents larger than this will be skipped. """ top_path = path - dirs = {} + dirs: Dict[bytes, Directory] = {} + if dir_filter is not None: + warnings.warn( + "`dir_filter` is deprecated. Use `path_filter` instead", + DeprecationWarning, + ) for root, dentries, fentries in os.walk(top_path, topdown=False): entries = {} @@ -438,12 +488,17 @@ class Directory(MerkleNode): for name in fentries + dentries: path = os.path.join(root, name) if not os.path.isdir(path) or os.path.islink(path): + if not path_filter(path, name, None): + continue content = Content.from_file( path=path, max_content_length=max_content_length ) entries[name] = content else: - if dir_filter(path, name, dirs[path].entries): + if dir_filter is not None: + if dir_filter(path, name, dirs[path].entries): + entries[name] = dirs[path] + elif path_filter(path, name, dirs[path].entries): entries[name] = dirs[path] dirs[root] = cls({"name": os.path.basename(root), "path": root}) diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py index 1ebcbbe981f8e53b73a2e3913bfd9b6b537c15ec..3ab7ba242b836c16934175d3bbd58555721b6755 100644 --- a/swh/model/tests/test_from_disk.py +++ b/swh/model/tests/test_from_disk.py @@ -768,7 +768,7 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): def test_directory_to_objects_ignore_empty(self): directory = Directory.from_disk( - path=self.tmpdir_name, dir_filter=from_disk.ignore_empty_directories + path=self.tmpdir_name, path_filter=from_disk.ignore_empty_directories ) for name, value in self.contents.items(): @@ -798,7 +798,7 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): def test_directory_to_objects_ignore_name(self): directory = Directory.from_disk( path=self.tmpdir_name, - dir_filter=from_disk.ignore_named_directories([b"symlinks"]), + path_filter=from_disk.ignore_named_directories([b"symlinks"]), ) for name, value in self.contents.items(): self.assertContentEqual(directory[b"contents/" + name], value) @@ -826,7 +826,7 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): def test_directory_to_objects_ignore_name_case(self): directory = Directory.from_disk( path=self.tmpdir_name, - dir_filter=from_disk.ignore_named_directories( + path_filter=from_disk.ignore_named_directories( [b"symLiNks"], case_sensitive=False ), ) @@ -868,6 +868,52 @@ class DirectoryToObjects(DataMixin, unittest.TestCase): b"foo0", ] + def test_directory_path_filter(self): + def filter_func(path, name, entries): + return name.startswith(b"foo") + + with tempfile.TemporaryDirectory() as dirname: + dirname = os.fsencode(dirname) + open(os.path.join(dirname, b"foofile"), "a") + open(os.path.join(dirname, b"file"), "a") + os.mkdir(os.path.join(dirname, b"foo")) + os.mkdir(os.path.join(dirname, b"baz")) + + # No filters + directory = Directory.from_disk(path=dirname) + assert [entry["name"] for entry in directory.entries] == [ + b"baz", + b"file", + b"foo", + b"foofile", + ] + + # Filter paths + directory = Directory.from_disk(path=dirname, path_filter=filter_func) + assert [entry["name"] for entry in directory.entries] == [ + b"foo", + b"foofile", + ] + + # Filter directories and paths (`path_filter` should take precedence) + with pytest.deprecated_call(): + directory = Directory.from_disk( + path=dirname, path_filter=filter_func, dir_filter=filter_func + ) + assert [entry["name"] for entry in directory.entries] == [ + b"foo", + b"foofile", + ] + + # Test deprecated way + with pytest.deprecated_call(): + directory = Directory.from_disk(path=dirname, dir_filter=filter_func) + assert [entry["name"] for entry in directory.entries] == [ + b"file", + b"foo", + b"foofile", + ] + @pytest.mark.fs class TarballTest(DataMixin, unittest.TestCase):