Skip to content
Snippets Groups Projects
Commit 2b869aa7 authored by Antoine Cezar's avatar Antoine Cezar
Browse files

swh identify: add --exclude

parent 9224c8ca
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@
import os
import sys
from typing import List
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
......@@ -57,11 +58,21 @@ def swhid_of_file_content(data):
return swhid(CONTENT, object)
def swhid_of_dir(path):
from swh.model.from_disk import Directory
def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> str:
from swh.model.from_disk import (
Directory,
accept_all_directories,
ignore_directories_patterns,
)
from swh.model.identifiers import DIRECTORY, swhid
object = Directory.from_disk(path=path).get_data()
dir_filter = (
ignore_directories_patterns(path, exclude_patterns)
if exclude_patterns
else accept_all_directories
)
object = Directory.from_disk(path=path, dir_filter=dir_filter).get_data()
return swhid(DIRECTORY, object)
......@@ -101,7 +112,7 @@ def swhid_of_git_repo(path):
return str(SWHID(object_type="snapshot", object_id=snapshot_identifier(snapshot)))
def identify_object(obj_type, follow_symlinks, obj):
def identify_object(obj_type, follow_symlinks, exclude_patterns, obj):
from urllib.parse import urlparse
if obj_type == "auto":
......@@ -130,7 +141,9 @@ def identify_object(obj_type, follow_symlinks, obj):
if obj_type == "content":
swhid = swhid_of_file(path)
elif obj_type == "directory":
swhid = swhid_of_dir(path)
swhid = swhid_of_dir(
path, [pattern.encode() for pattern in exclude_patterns]
)
elif obj_type == "origin":
swhid = swhid_of_origin(obj)
elif obj_type == "snapshot":
......@@ -165,6 +178,15 @@ def identify_object(obj_type, follow_symlinks, obj):
type=click.Choice(["auto", "content", "directory", "origin", "snapshot"]),
help="type of object to identify (default: auto)",
)
@click.option(
"--exclude",
"-x",
"exclude_patterns",
metavar="PATTERN",
multiple=True,
help="Exclude directories using glob patterns \
(e.g., '*.git' to exclude all .git directories)",
)
@click.option(
"--verify",
"-v",
......@@ -173,7 +195,9 @@ def identify_object(obj_type, follow_symlinks, obj):
help="reference identifier to be compared with computed one",
)
@click.argument("objects", nargs=-1, required=True)
def identify(obj_type, verify, show_filename, follow_symlinks, objects):
def identify(
obj_type, verify, show_filename, follow_symlinks, objects, exclude_patterns,
):
"""Compute the Software Heritage persistent identifier (SWHID) for the given
source code object(s).
......@@ -208,7 +232,9 @@ def identify(obj_type, verify, show_filename, follow_symlinks, objects):
if verify and len(objects) != 1:
raise click.BadParameter("verification requires a single object")
results = map(partial(identify_object, obj_type, follow_symlinks), objects)
results = map(
partial(identify_object, obj_type, follow_symlinks, exclude_patterns), objects,
)
if verify:
swhid = next(results)[1]
......
......@@ -129,3 +129,7 @@ class ValidationError(Exception):
def __repr__(self):
return "ValidationError(%s)" % self
class InvalidDirectoryPath(Exception):
pass
......@@ -5,15 +5,19 @@
import datetime
import enum
import fnmatch
import glob
import os
import re
import stat
from typing import Any, Iterable, List, Optional, Tuple
from typing import Any, Iterable, Iterator, List, Optional, Pattern, Tuple
import attr
from attrs_strict import type_validator
from typing_extensions import Final
from . import model
from .exceptions import InvalidDirectoryPath
from .hashutil import MultiHash
from .identifiers import directory_entry_sort_key, directory_identifier
from .identifiers import identifier_to_bytes as id_to_bytes
......@@ -276,6 +280,63 @@ def ignore_named_directories(names, *, case_sensitive=True):
return named_filter
# TODO: `extract_regex_objs` has been copied and adapted from `swh.scanner`.
# In the future `swh.scanner` should use the `swh.model` version and remove its own.
def extract_regex_objs(
root_path: bytes, patterns: Iterable[bytes]
) -> Iterator[Pattern[bytes]]:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Args:
root_path (bytes): path to the root directory
patterns (list of byte): patterns to match
Yields:
an SRE_Pattern object
"""
absolute_root_path = os.path.abspath(root_path)
for pattern in patterns:
for path in glob.glob(pattern):
absolute_path = os.path.abspath(path)
if not absolute_path.startswith(absolute_root_path):
error_msg = (
b'The path "' + path + b'" is not a subdirectory or relative '
b'to the root directory path: "' + root_path + b'"'
)
raise InvalidDirectoryPath(error_msg)
regex = fnmatch.translate((pattern.decode()))
yield re.compile(regex.encode())
def ignore_directories_patterns(root_path: bytes, patterns: Iterable[bytes]):
"""Filter for :func:`directory_to_objects` to ignore directories
matching certain patterns.
Args:
root_path (bytes): path of the root directory
patterns (list of byte): patterns to ignore
Returns:
a directory filter for :func:`directory_to_objects`
"""
sre_patterns = set(extract_regex_objs(root_path, patterns))
def pattern_filter(
dirpath: bytes,
dirname: bytes,
entries: Iterable[Any],
patterns: Iterable[Any] = sre_patterns,
root_path: bytes = os.path.abspath(root_path),
):
full_path = os.path.abspath(dirpath)
relative_path = os.path.relpath(full_path, root_path)
return not any([pattern.match(relative_path) for pattern in patterns])
return pattern_filter
def iter_directory(
directory,
) -> Tuple[List[model.Content], List[model.SkippedContent], List[model.Directory]]:
......
......@@ -146,3 +146,19 @@ class TestIdentify(DataMixin, unittest.TestCase):
f.write("trailing garbage to make verification fail")
result = self.runner.invoke(cli.identify, ["--verify", expected_id, path])
self.assertEqual(result.exit_code, 1)
def test_exclude(self):
"""exclude patterns"""
self.make_from_tarball(self.tmpdir_name)
path = os.path.join(self.tmpdir_name, b"sample-folder")
excluded_dir = os.path.join(path, b"excluded_dir\x96")
os.mkdir(excluded_dir)
with open(os.path.join(excluded_dir, b"some_file"), "w") as f:
f.write("content")
result = self.runner.invoke(
cli.identify, ["--type", "directory", "--exclude", "excluded_*", path]
)
self.assertSWHID(result, "swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment