diff --git a/swh/provenance/storage/journal.py b/swh/provenance/storage/journal.py index 8e96bd6ad6d18076031bccac37dd5d1fa4d553e1..fac6e08889aa3ce68c34caec11f2bb6ca4b4bc7b 100644 --- a/swh/provenance/storage/journal.py +++ b/swh/provenance/storage/journal.py @@ -5,8 +5,8 @@ from __future__ import annotations -from dataclasses import asdict from datetime import datetime +import hashlib from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type @@ -23,9 +23,10 @@ from swh.provenance.storage.interface import ( class JournalMessage: - def __init__(self, id, value): + def __init__(self, id, value, add_id=True): self.id = id self.value = value + self.add_id = add_id def anonymize(self): return None @@ -34,10 +35,13 @@ class JournalMessage: return self.id def to_dict(self): - return { - "id": self.id, - "value": self.value, - } + if self.add_id: + return { + "id": self.id, + "value": self.value, + } + else: + return self.value class ProvenanceStorageJournal: @@ -83,7 +87,11 @@ class ProvenanceStorageJournal: def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: self.journal.write_additions( "directory", - [JournalMessage(key, asdict(value)) for (key, value) in dirs.items()], + [ + JournalMessage(key, value.date) + for (key, value) in dirs.items() + if value.date is not None + ], ) return self.storage.directory_add(dirs) @@ -99,9 +107,6 @@ class ProvenanceStorageJournal: return self.storage.entity_get_all(entity) def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: - self.journal.write_additions( - "location", [JournalMessage(key, value) for (key, value) in paths.items()] - ) return self.storage.location_add(paths) def location_get_all(self) -> Dict[Sha1Git, bytes]: @@ -119,7 +124,11 @@ class ProvenanceStorageJournal: def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: self.journal.write_additions( "revision", - [JournalMessage(key, asdict(value)) for (key, value) in revs.items()], + [ + JournalMessage(key, value.date) + for (key, value) in revs.items() + if value.date is not None + ], ) return self.storage.revision_add(revs) @@ -129,13 +138,17 @@ class ProvenanceStorageJournal: def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: - self.journal.write_additions( - relation.value, - [ - JournalMessage(key, [asdict(reldata) for reldata in value]) - for (key, value) in data.items() - ], - ) + for src, relations in data.items(): + for reldata in relations: + key = hashlib.sha1(src + reldata.dst + (reldata.path or b"")).digest() + self.journal.write_addition( + relation.value, + JournalMessage( + key, + {"src": src, "dst": reldata.dst, "path": reldata.path}, + add_id=False, + ), + ) return self.storage.relation_add(relation, data) def relation_get( diff --git a/swh/provenance/storage/replay.py b/swh/provenance/storage/replay.py index 78e2a7f8bf6f03ed8e82b3e7b86b285e5c5a93c0..0f19e7c7de986a146947095a49b2be0052342a1a 100644 --- a/swh/provenance/storage/replay.py +++ b/swh/provenance/storage/replay.py @@ -3,8 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict +from datetime import datetime import logging -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple, Union try: from systemd.daemon import notify @@ -18,6 +20,7 @@ from swh.provenance.storage.interface import ( RelationData, RelationType, RevisionData, + Sha1Git, ) from .interface import ProvenanceStorageInterface @@ -29,11 +32,11 @@ REPLAY_DURATION_METRIC = "swh_provenance_replayer_duration_seconds" def cvrt_directory(msg_d): - return (msg_d["id"], DirectoryData(**msg_d["value"])) + return (msg_d["id"], DirectoryData(date=msg_d["value"], flat=False)) def cvrt_revision(msg_d): - return (msg_d["id"], RevisionData(**msg_d["value"])) + return (msg_d["id"], RevisionData(date=msg_d["value"], origin=None)) def cvrt_default(msg_d): @@ -41,14 +44,13 @@ def cvrt_default(msg_d): def cvrt_relation(msg_d): - return (msg_d["id"], {RelationData(**v) for v in msg_d["value"]}) + return (msg_d["src"], RelationData(dst=msg_d["dst"], path=msg_d["path"])) OBJECT_CONVERTERS: Dict[str, Callable[[Dict], Tuple[bytes, Any]]] = { "directory": cvrt_directory, "revision": cvrt_revision, "content": cvrt_default, - "location": cvrt_default, "content_in_revision": cvrt_relation, "content_in_directory": cvrt_relation, "directory_in_revision": cvrt_relation, @@ -100,8 +102,21 @@ def _insert_objects( logger.warning("Received a series of %s, this should not happen", object_type) return - data = dict(objects) if "_in_" in object_type: - storage.relation_add(relation=RelationType(object_type), data=data) + reldata = defaultdict(set) + for k, v in objects: + reldata[k].add(v) + storage.relation_add(relation=RelationType(object_type), data=reldata) + elif object_type in ("revision", "directory"): + entitydata: Dict[Sha1Git, Union[RevisionData, DirectoryData]] = {} + for k, v in objects: + if k not in entitydata or entitydata[k].date > v.date: + entitydata[k] = v + getattr(storage, f"{object_type}_add")(entitydata) else: + data: Dict[Sha1Git, datetime] = {} + for k, v in objects: + assert isinstance(v, datetime) + if k not in data or data[k] > v: + data[k] = v getattr(storage, f"{object_type}_add")(data) diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py index ec11c4b826c6685bbb282e5d8299e6ec742c0330..edfa53689ad6e21bc1835cb1ca0493c33ccd0f38 100644 --- a/swh/provenance/tests/test_provenance_journal_writer.py +++ b/swh/provenance/tests/test_provenance_journal_writer.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from dataclasses import asdict from typing import Dict, Generator import pytest @@ -63,19 +62,6 @@ class TestProvenanceStorageJournal(_TestProvenanceStorage): } assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs - def test_provenance_storage_location(self, provenance_storage): - super().test_provenance_storage_location(provenance_storage) - assert provenance_storage.journal - objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} - assert objtypes == {"location"} - - journal_objs = { - obj.id: obj.value - for (objtype, obj) in provenance_storage.journal.objects - if objtype == "location" - } - assert provenance_storage.location_get_all() == journal_objs - def test_provenance_storage_origin(self, provenance_storage): super().test_provenance_storage_origin(provenance_storage) assert provenance_storage.journal @@ -100,94 +86,102 @@ class TestProvenanceStorageJournal(_TestProvenanceStorage): for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision" } - assert provenance_storage.entity_get_all(EntityType.REVISION) == journal_objs + all_revisions = provenance_storage.revision_get( + provenance_storage.entity_get_all(EntityType.REVISION) + ) + + assert { + id for (id, value) in all_revisions.items() if value.date is not None + } == journal_objs def test_provenance_storage_relation_revision_layer(self, provenance_storage): super().test_provenance_storage_relation_revision_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { - "location", "content", "directory", - "revision", "content_in_revision", "content_in_directory", "directory_in_revision", } journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_directory" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_IN_DIR ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory_in_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.DIR_IN_REV ).items() + for relation in v } assert prov_rels == journal_rels def test_provenance_storage_relation_origin_layer(self, provenance_storage): - super().test_provenance_storage_relation_orign_layer(provenance_storage) + super().test_provenance_storage_relation_origin_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "origin", - "revision", "revision_in_origin", "revision_before_revision", } journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_in_origin" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_IN_ORG ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_before_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() + for relation in v } assert prov_rels == journal_rels