diff --git a/swh/model/discovery.py b/swh/model/discovery.py index 95c8e14d87b23e4d790bad03001fc5232c3b16f5..eba4cd4d6924ccf61ab81413ea8022978bb7b642 100644 --- a/swh/model/discovery.py +++ b/swh/model/discovery.py @@ -11,7 +11,17 @@ from collections import namedtuple import itertools import logging import random -from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union +from typing import ( + Any, + Callable, + Iterable, + List, + Mapping, + NamedTuple, + Optional, + Set, + Union, +) from typing_extensions import Protocol, runtime_checkable @@ -63,15 +73,28 @@ class ArchiveDiscoveryInterface(Protocol): class BaseDiscoveryGraph: """Creates the base structures and methods needed for discovery algorithms. - Subclasses should override ``get_sample`` to affect how the discovery is made.""" + Subclasses should override ``get_sample`` to affect how the discovery is made. + + The `update_info_callback` is an optional argument that will get called for + each new piece of information we get. The callback arguments are `(content, + known)`. + - content: the relevant model.Content object, + - known: a boolean, True if the file is known to the archive False otherwise. + """ - def __init__(self, contents, skipped_contents, directories): + def __init__( + self, + contents, + skipped_contents, + directories, + update_info_callback: Optional[Callable[[Any, bool], None]] = None, + ): self._all_contents: Mapping[ Sha1Git, Union[model.Content, model.SkippedContent] ] = {} self._undecided_directories: Set[Sha1Git] = set() - self._children: Mapping[Sha1Git, model.DirectoryEntry] = {} - self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {} + self._children: Mapping[Sha1Git, Set[Sha1Git]] = {} + self._parents: Mapping[model.DirectoryEntry, Set[Any]] = {} self.undecided: Set[Sha1Git] = set() for content in itertools.chain(contents, skipped_contents): @@ -88,6 +111,12 @@ class BaseDiscoveryGraph: self.undecided |= self._undecided_directories self.known: Set[Sha1Git] = set() self.unknown: Set[Sha1Git] = set() + self._update_info_callback = update_info_callback + self._sha1_to_obj = {} + for content in itertools.chain(contents, skipped_contents): + self._sha1_to_obj[content.sha1_git] = content + for directory in directories: + self._sha1_to_obj[directory.id] = directory def mark_known(self, entries: Iterable[Sha1Git]): """Mark ``entries`` and those they imply as known in the SWH archive""" @@ -115,14 +144,19 @@ class BaseDiscoveryGraph: - ``target_set``: set where marked entries will be added. """ + callback = self._update_info_callback to_process = set(entries) while to_process: current = to_process.pop() target_set.add(current) + new = current in self.undecided self.undecided.discard(current) self._undecided_directories.discard(current) next_entries = transitive_mapping.get(current, set()) & self.undecided to_process.update(next_entries) + if new and callback is not None: + obj = self._sha1_to_obj[current] + callback(obj, current in self.known) def get_sample( self, @@ -195,10 +229,20 @@ class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph): ) -def filter_known_objects(archive: ArchiveDiscoveryInterface): +def filter_known_objects( + archive: ArchiveDiscoveryInterface, + update_info_callback: Optional[Callable[[Any, bool], None]] = None, +): """Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories`` to only return those that are unknown to the SWH archive using a discovery - algorithm.""" + algorithm. + + The `update_info_callback` is an optional argument that will get called for + each new piece of information we get. The callback arguments are `(content, + known)`. + - content: the relevant model.Content object, + - known: a boolean, True if the file is known to the archive False otherwise. + """ contents = archive.contents skipped_contents = archive.skipped_contents directories = archive.directories @@ -207,7 +251,12 @@ def filter_known_objects(archive: ArchiveDiscoveryInterface): skipped_contents_count = len(skipped_contents) directories_count = len(directories) - graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories) + graph = RandomDirSamplingDiscoveryGraph( + contents, + skipped_contents, + directories, + update_info_callback=update_info_callback, + ) while graph.undecided: sample = graph.get_sample()