diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py index 32188f37c753977642d3061842e6767f7855949a..8c7e5657adc503d873a20b3056fe710eba6ff615 100644 --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -988,13 +988,29 @@ class Db(BaseDb): assert self.origin_visit_status_cols[-1] == "metadata" cols = self.origin_visit_status_cols[1:-1] cur = self._cursor(cur) + # note: the update section below should replace spurious 'created' OVS + # entries which are the result of how the replayer and pg storage used + # to work, because these later then prevent the actual eventful OVS + # (with the same date and with an actual snapshot entry) to be + # inserted... cur.execute( f"WITH origin_id as (select id from origin where url=%s) " f"INSERT INTO origin_visit_status " f"(origin, {', '.join(cols)}, metadata) " f"VALUES ((select id from origin_id), " f"{', '.join(['%s']*len(cols))}, %s) " - f"ON CONFLICT (origin, visit, date) do nothing", + f"ON CONFLICT (origin, visit, date) " + "do update " + " set " + " type=excluded.type, " + " status=excluded.status, " + " snapshot=excluded.snapshot, " + " metadata=excluded.metadata " + " where origin_visit_status.origin=excluded.origin " + " and origin_visit_status.visit=excluded.visit " + " and origin_visit_status.date=excluded.date " + " and origin_visit_status.status='created' " + " and origin_visit_status.status!=excluded.status", [visit_status.origin] + [getattr(visit_status, key) for key in cols] + [jsonize(visit_status.metadata)], diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index 851ccfa9e818bf007165cd4fa3f01c2a0576cd2e..8fe3c4191a891b4471633f71e2dea39188de0a7e 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -14,11 +14,14 @@ try: except ImportError: pytest_cov = None -from typing import Iterable +from typing import Any, Dict, Iterable +from swh.journal.client import JournalClient from swh.model.model import BaseContent, Origin from swh.model.tests.generate_testdata import gen_contents, gen_origins +from swh.storage import get_storage from swh.storage.interface import StorageInterface +from swh.storage.replay import ModelObjectDeserializer # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles @@ -69,3 +72,31 @@ def swh_origins(swh_storage: StorageInterface) -> Iterable[Origin]: origins = [Origin.from_dict(o) for o in gen_origins(n=100)] swh_storage.origin_add(origins) return origins + + +@pytest.fixture() +def replayer_storage_and_client( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str +): + journal_writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "auto_flush": False, + } + storage_config: Dict[str, Any] = { + "cls": "memory", + "journal_writer": journal_writer_config, + } + storage = get_storage(**storage_config) + deserializer = ModelObjectDeserializer() + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + value_deserializer=deserializer.convert, + ) + + yield storage, replayer diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index 91c73f999ebc0ce5f215397bfcbed07417f4a050..23fceef769d72ebebbe167ba29161274ed7543b8 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -61,34 +61,6 @@ def nullify_ctime(obj): return obj -@pytest.fixture() -def replayer_storage_and_client( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str -): - journal_writer_config = { - "cls": "kafka", - "brokers": [kafka_server], - "client_id": "kafka_writer", - "prefix": kafka_prefix, - "auto_flush": False, - } - storage_config: Dict[str, Any] = { - "cls": "memory", - "journal_writer": journal_writer_config, - } - storage = get_storage(**storage_config) - deserializer = ModelObjectDeserializer() - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_on_eof=True, - value_deserializer=deserializer.convert, - ) - - yield storage, replayer - - @pytest.mark.parametrize( "buffered", [pytest.param(True, id="buffered"), pytest.param(False, id="unbuffered")], diff --git a/swh/storage/tests/test_replay_postgres.py b/swh/storage/tests/test_replay_postgres.py new file mode 100644 index 0000000000000000000000000000000000000000..38ad4ba3e0231e9dfebfdc8b795b986314d50657 --- /dev/null +++ b/swh/storage/tests/test_replay_postgres.py @@ -0,0 +1,100 @@ +import datetime +from functools import partial + +import attr +from pytest_postgresql import factories + +from swh.core.db.db_utils import initialize_database_for_module +from swh.model.model import Origin, OriginVisit, OriginVisitStatus +from swh.storage.postgresql.storage import Storage as StorageDatastore +from swh.storage.pytest_plugin import create_object_references_partition +from swh.storage.replay import process_replay_objects +from swh.storage.tests.test_replay import TEST_OBJECTS, UTC + +swh_storage_postgresql_proc = factories.postgresql_proc( + load=[ + partial( + initialize_database_for_module, + modname="storage", + flavor="mirror", + version=StorageDatastore.current_version, + ), + create_object_references_partition, + ], +) + + +def test_storage_replayer_visit_status_update(replayer_storage_and_client, swh_storage): + """Optimal replayer scenario with origin visit status special case. + + This: + - create a new origin O, visit OV and 'full' visit status OVS, + - add a copy of OV with unset 'visit' attribute in the journal, + - run a replayer session, this should generate an OVS entry in the dst + storage which status is 'created', but with the same date as the correct + OVS for this origin, + - add all test objects (including the new O, OV and OVS above), + - run the replayer + - the artificial 'created' OVS should now be replaced by the 'full' OVS. + + """ + src, replayer = replayer_storage_and_client + dst = swh_storage + + test_objects = {k: v[:] for k, v in TEST_OBJECTS.items()} + # Create a visit and a corresponding eventful status with the exact same date + origin = "https://nowhere.to.be/" + test_objects["origin"].append(Origin(url=origin)) + visit = 1 + snapshot = test_objects["snapshot"][0].id + date = datetime.datetime(2023, 12, 12, 12, 48, 00, tzinfo=UTC) + ov_single = OriginVisit(origin=origin, date=date, type="git", visit=visit) + test_objects["origin_visit"].append(ov_single) + ovs_single_full = OriginVisitStatus( + origin=origin, + visit=visit, + date=date, + type="git", + status="full", + snapshot=snapshot, + metadata=None, + ) + test_objects["origin_visit_status"].append(ovs_single_full) + + # Prefill Kafka with a problematic origin visit status entry; Inserting the + # previsously created OriginVisit but with with no 'visit' attribute to + # make the dst storage create a 'created' OVS entry, replicating the behavior + # of what could have happen on a mirror some time ago. + visits = [attr.evolve(ov_single, visit=None)] + src.journal_writer.origin_visit_add(visits) + worker_fn = partial(process_replay_objects, storage=dst) + replayer.process(worker_fn) + # at this point we expect to have a 'created' OVS entry for this origin + dst_visits = set(dst.origin_visit_status_get(origin, visit).results) + assert dst_visits == { + OriginVisitStatus( + origin=origin, + visit=visit, + date=date, + type="git", + status="created", + snapshot=None, + metadata=None, + ) + } + + # Fill Kafka using a source storage + for object_type, objects in test_objects.items(): + method = getattr(src, object_type + "_add") + method(objects) + src.journal_writer.journal.flush() + replayer.process(worker_fn) + # The 'created' OVS inserted as a result of replaying the OV with no + # 'visit' set above is now expected to be overwritten by the normal entries + # during the complete replayer session. + visits = set((ovs.origin, ovs.visit) for ovs in test_objects["origin_visit_status"]) + + for origin, visit in visits: + dst_visits = set(dst.origin_visit_status_get(origin, visit).results) + src_visits = set(src.origin_visit_status_get(origin, visit).results) + assert src_visits == dst_visits