From 60585d64e9acec654fff84fae02bfab2dafaba79 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Rapha=C3=ABl=20Gom=C3=A8s?= <rgomes@octobus.net>
Date: Mon, 21 Aug 2023 09:58:19 +0000
Subject: [PATCH] Move most of the discovery module from `swh-loader-core` here

---
 requirements-test.txt             |   1 +
 swh/model/discovery.py            | 229 ++++++++++++++++++++++++++++++
 swh/model/tests/test_discovery.py |  68 +++++++++
 3 files changed, 298 insertions(+)
 create mode 100644 swh/model/discovery.py
 create mode 100644 swh/model/tests/test_discovery.py

diff --git a/requirements-test.txt b/requirements-test.txt
index 1e8eb331..4f27dff5 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,3 +1,4 @@
+aiohttp
 click
 pytest
 pytz
diff --git a/swh/model/discovery.py b/swh/model/discovery.py
new file mode 100644
index 00000000..7cd9e7ac
--- /dev/null
+++ b/swh/model/discovery.py
@@ -0,0 +1,229 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding unknown content efficiently."""
+
+from __future__ import annotations
+
+from collections import namedtuple
+import itertools
+import logging
+import random
+from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union
+
+from typing_extensions import Protocol, runtime_checkable
+
+from .from_disk import model
+from .model import Sha1Git
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of directory entries
+SAMPLE_SIZE = 1000
+
+# Sets of sha1 of contents, skipped contents and directories respectively
+Sample: NamedTuple = namedtuple(
+    "Sample", ["contents", "skipped_contents", "directories"]
+)
+
+
+@runtime_checkable
+class ArchiveDiscoveryInterface(Protocol):
+    """Interface used in discovery code to abstract over ways of connecting to
+    the SWH archive (direct storage, web API, etc.) for all methods needed by
+    discovery algorithms."""
+
+    contents: List[model.Content]
+    skipped_contents: List[model.SkippedContent]
+    directories: List[model.Directory]
+
+    def __init__(
+        self,
+        contents: List[model.Content],
+        skipped_contents: List[model.SkippedContent],
+        directories: List[model.Directory],
+    ) -> None:
+        self.contents = contents
+        self.skipped_contents = skipped_contents
+        self.directories = directories
+
+    async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+        """List content missing from the archive by sha1"""
+
+    async def skipped_content_missing(
+        self, skipped_contents: List[Sha1Git]
+    ) -> Iterable[Sha1Git]:
+        """List skipped content missing from the archive by sha1"""
+
+    async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+        """List directories missing from the archive by sha1"""
+
+
+class BaseDiscoveryGraph:
+    """Creates the base structures and methods needed for discovery algorithms.
+    Subclasses should override ``get_sample`` to affect how the discovery is made."""
+
+    def __init__(self, contents, skipped_contents, directories):
+        self._all_contents: Mapping[
+            Sha1Git, Union[model.Content, model.SkippedContent]
+        ] = {}
+        self._undecided_directories: Set[Sha1Git] = set()
+        self._children: Mapping[Sha1Git, model.DirectoryEntry] = {}
+        self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {}
+        self.undecided: Set[Sha1Git] = set()
+
+        for content in itertools.chain(contents, skipped_contents):
+            self.undecided.add(content.sha1_git)
+            self._all_contents[content.sha1_git] = content
+
+        for directory in directories:
+            self.undecided.add(directory.id)
+            self._undecided_directories.add(directory.id)
+            self._children[directory.id] = {c.target for c in directory.entries}
+            for child in directory.entries:
+                self._parents.setdefault(child.target, set()).add(directory.id)
+
+        self.undecided |= self._undecided_directories
+        self.known: Set[Sha1Git] = set()
+        self.unknown: Set[Sha1Git] = set()
+
+    def mark_known(self, entries: Iterable[Sha1Git]):
+        """Mark ``entries`` and those they imply as known in the SWH archive"""
+        self._mark_entries(entries, self._children, self.known)
+
+    def mark_unknown(self, entries: Iterable[Sha1Git]):
+        """Mark ``entries`` and those they imply as unknown in the SWH archive"""
+        self._mark_entries(entries, self._parents, self.unknown)
+
+    def _mark_entries(
+        self,
+        entries: Iterable[Sha1Git],
+        transitive_mapping: Mapping[Any, Any],
+        target_set: Set[Any],
+    ):
+        """Use Merkle graph properties to mark a directory entry as known or unknown.
+
+        If an entry is known, then all of its descendants are known. If it's
+        unknown, then all of its ancestors are unknown.
+
+        - ``entries``: directory entries to mark along with their ancestors/descendants
+          where applicable.
+        - ``transitive_mapping``: mapping from an entry to the next entries to mark
+          in the hierarchy, if any.
+        - ``target_set``: set where marked entries will be added.
+
+        """
+        to_process = set(entries)
+        while to_process:
+            current = to_process.pop()
+            target_set.add(current)
+            self.undecided.discard(current)
+            self._undecided_directories.discard(current)
+            next_entries = transitive_mapping.get(current, set()) & self.undecided
+            to_process.update(next_entries)
+
+    async def get_sample(
+        self,
+    ) -> Sample:
+        """Return a three-tuple of samples from the undecided sets of contents,
+        skipped contents and directories respectively.
+        These samples will be queried against the storage which will tell us
+        which are known."""
+        raise NotImplementedError()
+
+    async def do_query(
+        self, archive: ArchiveDiscoveryInterface, sample: Sample
+    ) -> None:
+        """Given a three-tuple of samples, ask the archive which are known or
+        unknown and mark them as such."""
+
+        methods = (
+            archive.content_missing,
+            archive.skipped_content_missing,
+            archive.directory_missing,
+        )
+
+        for sample_per_type, method in zip(sample, methods):
+            if not sample_per_type:
+                continue
+            known = set(sample_per_type)
+            unknown = set(await method(list(sample_per_type)))
+            known -= unknown
+
+            self.mark_known(known)
+            self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+    """Use a random sampling using only directories.
+
+    This allows us to find a statistically good spread of entries in the graph
+    with a smaller population than using all types of entries. When there are
+    no more directories, only contents or skipped contents are undecided if any
+    are left: we send them directly to the storage since they should be few and
+    their structure flat."""
+
+    async def get_sample(self) -> Sample:
+        if self._undecided_directories:
+            if len(self._undecided_directories) <= SAMPLE_SIZE:
+                return Sample(
+                    contents=set(),
+                    skipped_contents=set(),
+                    directories=set(self._undecided_directories),
+                )
+            sample = random.sample(tuple(self._undecided_directories), SAMPLE_SIZE)
+            directories = {o for o in sample}
+            return Sample(
+                contents=set(), skipped_contents=set(), directories=directories
+            )
+
+        contents = set()
+        skipped_contents = set()
+
+        for sha1 in self.undecided:
+            obj = self._all_contents[sha1]
+            obj_type = obj.object_type
+            if obj_type == model.Content.object_type:
+                contents.add(sha1)
+            elif obj_type == model.SkippedContent.object_type:
+                skipped_contents.add(sha1)
+            else:
+                raise TypeError(f"Unexpected object type {obj_type}")
+
+        return Sample(
+            contents=contents, skipped_contents=skipped_contents, directories=set()
+        )
+
+
+async def filter_known_objects(archive: ArchiveDiscoveryInterface):
+    """Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories``
+    to only return those that are unknown to the SWH archive using a discovery
+    algorithm."""
+    contents = archive.contents
+    skipped_contents = archive.skipped_contents
+    directories = archive.directories
+
+    contents_count = len(contents)
+    skipped_contents_count = len(skipped_contents)
+    directories_count = len(directories)
+
+    graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+    while graph.undecided:
+        sample = await graph.get_sample()
+        await graph.do_query(archive, sample)
+
+    contents = [c for c in contents if c.sha1_git in graph.unknown]
+    skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+    directories = [c for c in directories if c.id in graph.unknown]
+
+    logger.debug(
+        "Filtered out %d contents, %d skipped contents and %d directories",
+        contents_count - len(contents),
+        skipped_contents_count - len(skipped_contents),
+        directories_count - len(directories),
+    )
+
+    return (contents, skipped_contents, directories)
diff --git a/swh/model/tests/test_discovery.py b/swh/model/tests/test_discovery.py
new file mode 100644
index 00000000..09519241
--- /dev/null
+++ b/swh/model/tests/test_discovery.py
@@ -0,0 +1,68 @@
+# Copyright (C) 2023 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from dataclasses import dataclass
+from typing import Iterable, List
+
+from swh.model import discovery, model
+from swh.model.hashutil import hash_to_bytes
+from swh.model.model import Sha1Git
+from swh.model.tests.test_identifiers import directory_example
+
+pytest_plugins = ["aiohttp.pytest_plugin"]
+
+UNKNOWN_HASH = hash_to_bytes("17140cb6109f1e3296dc52e2b2cd29bcb40e86be")
+KNOWN_CONTENT_HASH = hash_to_bytes("e8e4106de42e2d5d5efab6a9422b9a8677c993c8")
+KNOWN_DIRECTORY_HASH = hash_to_bytes("d7ed3d2c31d608823be58b1cbe57605310615231")
+KNOWN_DIRECTORY_HASH_2 = hash_to_bytes("c76724e9a0be4b60f4bf0cb48b261df8eda94b1d")
+
+
+@dataclass
+class FakeArchive:
+    contents: List[model.Content]
+    skipped_contents: List[model.SkippedContent]
+    directories: List[model.Directory]
+
+    async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+        return []
+
+    async def skipped_content_missing(
+        self, skipped_contents: List[Sha1Git]
+    ) -> Iterable[Sha1Git]:
+        """List skipped content missing from the archive by sha1"""
+        return []
+
+    async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+        """List directories missing from the archive by sha1"""
+        return []
+
+
+async def test_filter_known_objects(monkeypatch):
+    # Test with smaller sample sizes to actually trigger the random sampling
+    monkeypatch.setattr(discovery, "SAMPLE_SIZE", 1)
+
+    base_directory = model.Directory.from_dict(directory_example)
+
+    # Hardcoding another hash is enough since it's all that's being checked
+    directory_data = directory_example.copy()
+    directory_data["id"] = KNOWN_DIRECTORY_HASH_2
+    other_directory = model.Directory.from_dict(directory_data)
+    archive = FakeArchive(
+        contents=[model.Content.from_data(b"blabla")],
+        skipped_contents=[model.SkippedContent.from_data(b"blabla2", reason="reason")],
+        directories=[
+            base_directory,
+            other_directory,
+        ],
+    )
+    assert archive.contents[0].sha1_git == KNOWN_CONTENT_HASH
+    assert archive.directories[0].id == KNOWN_DIRECTORY_HASH
+    assert archive.directories[1].id == KNOWN_DIRECTORY_HASH_2
+    (contents, skipped_contents, directories) = await discovery.filter_known_objects(
+        archive
+    )
+    assert len(contents) == 0
+    assert len(skipped_contents) == 0
+    assert len(directories) == 0
-- 
GitLab