diff --git a/swh/provenance/archive/interface.py b/swh/provenance/archive/interface.py index a86ef69f2c48b093ee338c0fef21b233d53ad3fe..e69e524b60593f935b0efcb7ed9daa6f916cd472 100644 --- a/swh/provenance/archive/interface.py +++ b/swh/provenance/archive/interface.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, Tuple +from typing import Any, Dict, Iterable, Iterator, Tuple from typing_extensions import Protocol, runtime_checkable @@ -43,6 +43,20 @@ class ArchiveInterface(Protocol): """ ... + def revisions_get( + self, ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + """Get revision date and root directory. + + Args: + ids: iterable of sha1 id of the revision to list parents from. + + Returns: + list of (revision_id, directory_id, commit_timestamp) + + """ + ... + def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. diff --git a/swh/provenance/archive/multiplexer.py b/swh/provenance/archive/multiplexer.py index 2f630761f04926ae3502af124ca1f9dea9e567f2..8f9c5b17adcff4182bf27f1061d3a0ba390a5e09 100644 --- a/swh/provenance/archive/multiplexer.py +++ b/swh/provenance/archive/multiplexer.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import logging -from typing import Any, Dict, Iterable, List, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Tuple from swh.core.statsd import statsd from swh.model.model import Directory, Sha1Git @@ -93,6 +93,44 @@ class ArchiveMultiplexed: return [] + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revisions_get"}, + ) + def revisions_get( + self, revision_ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + revision_ids = list(revision_ids) # this will be iterated several times + for backend, archive in self.archives: + try: + revs = list(archive.revisions_get(revision_ids)) + if revs: + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={ + "method": "revisions_get", + "backend": backend, + }, + ) + yield from revs + LOGGER.debug( + "No revs found via %s", + archive.__class__, + ) + except Exception as e: + LOGGER.warn( + "Error retrieving revisions via %s: %s", + archive.__class__, + e, + ) + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={ + "method": "revisions_get", + "backend": "not_found", + }, + ) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: for backend, archive in self.archives: diff --git a/swh/provenance/archive/postgresql.py b/swh/provenance/archive/postgresql.py index 0e158eae9e9b1c3fb53581a74eb76ef8e67c2684..28cb872d55bfc34e4f8284550cf3e3e3d6c68234 100644 --- a/swh/provenance/archive/postgresql.py +++ b/swh/provenance/archive/postgresql.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List, Tuple, cast +from typing import Any, Dict, Iterable, Iterator, List, Tuple, cast import psycopg2.extensions @@ -114,6 +114,19 @@ class ArchivePostgreSQL: ) return cast(List[Tuple[Sha1Git, Sha1Git]], cursor.fetchall()) + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revisions_get"}, + ) + def revisions_get( + self, revision_ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + # XXX write the actual SQL version of this? + revs = self.storage.revision_get(list(revision_ids)) + for rev in revs: + if rev is not None and rev.date is not None: + yield (rev.id, rev.directory, rev.date.to_dict()) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: diff --git a/swh/provenance/archive/storage.py b/swh/provenance/archive/storage.py index 97b112174eeadb615a60adfb947578dfdd33ed6d..540f360d695885f40a46f2264364c84e5e66efa1 100644 --- a/swh/provenance/archive/storage.py +++ b/swh/provenance/archive/storage.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information from datetime import datetime -from typing import Any, Dict, Iterable, Set, Tuple +from typing import Any, Dict, Iterable, Iterator, Set, Tuple from swh.core.statsd import statsd from swh.model.model import ObjectType, Sha1Git, TargetType @@ -41,6 +41,18 @@ class ArchiveStorage: for parent_id in rev.parents: yield (revision_id, parent_id) + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revisions_get"}, + ) + def revisions_get( + self, revision_ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + revs = self.storage.revision_get(list(revision_ids)) + for rev in revs: + if rev is not None and rev.date is not None: + yield (rev.id, rev.directory, rev.date.to_dict()) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: from swh.core.utils import grouper diff --git a/swh/provenance/archive/swhgraph.py b/swh/provenance/archive/swhgraph.py index 131f676b39af9d4aba43350d0a9a7118d00aefdc..e5d8a4dad067c705a9002292254502dde68b8c13 100644 --- a/swh/provenance/archive/swhgraph.py +++ b/swh/provenance/archive/swhgraph.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, Tuple +from typing import Any, Dict, Iterable, Iterator, Tuple from google.protobuf.field_mask_pb2 import FieldMask import grpc @@ -58,6 +58,19 @@ class ArchiveGraph: pass raise + @statsd.timed( + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revisions_get"}, + ) + def revisions_get( + self, revision_ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + # XXX: write an actual swh-graph version of this? + revs = self.storage.revision_get(list(revision_ids)) + for rev in revs: + if rev is not None and rev.date is not None: + yield (rev.id, rev.directory, rev.date.to_dict()) + @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: src = str(CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=id)) diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index db886962b6e31bcc0f917133dc1f8c0ed67af43e..94a61562ca1c3a457ab97f0c4df16b2fbf6151b0 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -555,6 +555,13 @@ def revision_from_csv( type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) +@click.option( + "-t", + "--object-type", + default="revision", + type=click.Choice(["revision", "release"]), + help="""Topic (type of objects) to get revisions from.""", +) @click.pass_context def revision_from_journal( ctx: click.core.Context, @@ -565,10 +572,14 @@ def revision_from_journal( reuse: bool, min_size: int, max_directory_size: int, + object_type: str, ) -> None: from swh.journal.client import get_journal_client - from .journal_client import process_journal_revisions + if object_type == "release": + from .journal_client import process_journal_releases as process_journal + else: + from .journal_client import process_journal_revisions as process_journal provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] @@ -576,7 +587,7 @@ def revision_from_journal( journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( - process_journal_revisions, + process_journal, archive=archive, provenance=provenance, minsize=min_size, @@ -588,7 +599,7 @@ def revision_from_journal( cls, **{ **journal_cfg, - "object_types": ["revision"], + "object_types": [object_type], }, ) diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py index cfb9135b2923cb5f89b2c3796b2e833a9014e7e5..4c144ba3ffb93ea9d099dcae66aebfbb697bdc6d 100644 --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -67,3 +67,37 @@ def process_journal_revisions( revision_add(provenance, archive, revisions, **cfg) if notify: notify("WATCHDOG=1") + + +def process_journal_releases( + messages, *, provenance: ProvenanceInterface, archive: ArchiveInterface, **cfg +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"release"}, set(messages) + rev_ids = [] + for rel in messages["release"]: + if rel["target"] is None: + continue + + if rel["target_type"] == "revision": + rev_ids.append(rel["target"]) + + revisions = [] + for (rev, directory, date_d) in archive.revisions_get(rev_ids): + if not date_d: + continue + date = TimestampWithTimezone.from_dict(date_d).to_datetime() + if date <= EPOCH: + continue + revisions.append( + RevisionEntry( + id=rev, + root=directory, + date=date, + ) + ) + + if revisions: + revision_add(provenance, archive, revisions, **cfg) + if notify: + notify("WATCHDOG=1") diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 860364a3597b1dd8fc7b6426f56f65f1681d4884..fd2bf346ba9e3623bbbde50e8f1925dc15a99540 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -7,7 +7,7 @@ from collections import Counter from operator import itemgetter from typing import Any from typing import Counter as TCounter -from typing import Dict, Iterable, List, Set, Tuple, Type, Union +from typing import Dict, Iterable, Iterator, List, Set, Tuple, Type, Union import pytest @@ -54,6 +54,11 @@ class ArchiveNoop: def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: return [] + def revisions_get( + self, ids: Iterable[Sha1Git] + ) -> Iterator[Tuple[Sha1Git, Sha1Git, Dict[str, Any]]]: + yield from [] + def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index 411594e16b4142f57ea1f796bc27d1edd1480f26..25627eefabb46c2879142b0b1b5884d8b64cc98a 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -132,3 +132,62 @@ def test_cli_revision_from_journal_client( result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) + + +@pytest.mark.kafka +def test_cli_release_from_journal_client( + swh_storage: StorageInterface, + swh_storage_backend_config: Dict, + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + provenance, + postgres_provenance, +) -> None: + """Test revision journal client cli""" + + # Prepare storage data + data = load_repo_data("cmdbts2") + assert len(data["origin"]) >= 1 + assert len(data["release"]) >= 1 + fill_storage(swh_storage, data) + + # Prepare configuration for cli call + swh_storage_backend_config.pop("journal_writer", None) # no need for that config + storage_config_dict = swh_storage_backend_config + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + "provenance": { + "archive": { + "cls": "api", + "storage": storage_config_dict, + }, + "storage": { + "cls": "postgresql", + "db": postgres_provenance.dsn, + }, + }, + } + + release_revs = [rel["target"] for rel in data["release"]] + result = provenance.storage.revision_get(release_revs) + assert not result + + # call the cli 'swh provenance revision from-journal' + cli_result = invoke( + ["revision", "from-journal", "--object-type", "release"], config=cfg + ) + assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" + + result = provenance.storage.revision_get(release_revs) + assert set(result.keys()) == set(release_revs) + + # ensure only these release revisions have been ingested + result = provenance.storage.revision_get([rev["id"] for rev in data["revision"]]) + assert set(result.keys()) == set(release_revs) diff --git a/tox.ini b/tox.ini index 7ca3132bac9cc00fbf4dfc144a91ebdc2cc4ccc7..cb554fc7f5c503c88438654fc19e2f0dad5f3ee8 100644 --- a/tox.ini +++ b/tox.ini @@ -48,7 +48,7 @@ commands = extras = testing deps = - mypy==0.942 + mypy==1.0.1 commands = mypy swh