diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py index 56bf2c86f270ea4973b7b517d614610bc59a4887..5af6bccef98e1217b6e963d2a826003577bc0b81 100644 --- a/swh/scrubber/tests/conftest.py +++ b/swh/scrubber/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (C) 2022 The Software Heritage developers +# Copyright (C) 2022-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,6 +12,7 @@ from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module from swh.journal.serializers import value_to_kafka +from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import Directory, DirectoryEntry from swh.model.swhids import ObjectType @@ -110,3 +111,27 @@ def corrupt_object(scrubber_db, config_entry): first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), ) + + +@pytest.fixture +def journal_client_config( + kafka_server: str, kafka_prefix: str, kafka_consumer_group: str +): + return dict( + cls="kafka", + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + on_eof="stop", + ) + + +@pytest.fixture +def journal_writer(kafka_server: str, kafka_prefix: str): + return get_journal_writer( + cls="kafka", + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py index 6f5096faaa514c82bf8f7dec072a3c0a0fba1548..f6f7a6141851da374299d7b9dbf5482d66cac7f6 100644 --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -10,7 +10,6 @@ import attr import pytest from swh.journal.serializers import kafka_to_value -from swh.journal.writer import get_journal_writer from swh.model import model, swhids from swh.model.swhids import ObjectType from swh.model.tests import swh_model_data @@ -18,55 +17,26 @@ from swh.scrubber.db import Datastore from swh.scrubber.journal_checker import JournalChecker, get_datastore -def journal_client_config( - kafka_server: str, kafka_prefix: str, kafka_consumer_group: str -): - return dict( - cls="kafka", - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - on_eof="stop", - ) - - -def journal_writer(kafka_server: str, kafka_prefix: str): - return get_journal_writer( - cls="kafka", - brokers=[kafka_server], - client_id="kafka_writer", - prefix=kafka_prefix, - anonymize=False, - ) - - @pytest.fixture -def datastore( - kafka_server: str, kafka_prefix: str, kafka_consumer_group: str -) -> Datastore: - journal_config = journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ) - datastore = get_datastore(journal_config) - return datastore +def datastore(journal_client_config) -> Datastore: + return get_datastore(journal_client_config) def test_no_corruption( - scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore + scrubber_db, + datastore, + journal_writer, + journal_client_config, ): - writer = journal_writer(kafka_server, kafka_prefix) - writer.write_additions("directory", swh_model_data.DIRECTORIES) - writer.write_additions("revision", swh_model_data.REVISIONS) - writer.write_additions("release", swh_model_data.RELEASES) - writer.write_additions("snapshot", swh_model_data.SNAPSHOTS) + journal_writer.write_additions("directory", swh_model_data.DIRECTORIES) + journal_writer.write_additions("revision", swh_model_data.REVISIONS) + journal_writer.write_additions("release", swh_model_data.RELEASES) + journal_writer.write_additions("snapshot", swh_model_data.SNAPSHOTS) - journal_cfg = journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ) - gid = journal_cfg["group_id"] + "_" + gid = journal_client_config["group_id"] + "_" for object_type in ("directory", "revision", "release", "snapshot"): - journal_cfg["group_id"] = gid + object_type + journal_client_config["group_id"] = gid + object_type config_id = scrubber_db.config_add( name=f"cfg_{object_type}", datastore=datastore, @@ -77,7 +47,7 @@ def test_no_corruption( jc = JournalChecker( db=scrubber_db, config_id=config_id, - journal_client_config=journal_cfg, + journal_client_config=journal_client_config, ) jc.run() jc.journal_client.close() @@ -88,10 +58,9 @@ def test_no_corruption( @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) def test_corrupt_snapshot( scrubber_db, - kafka_server, - kafka_prefix, - kafka_consumer_group, datastore, + journal_writer, + journal_client_config, corrupt_idx, ): config_id = scrubber_db.config_add( @@ -104,16 +73,13 @@ def test_corrupt_snapshot( snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) - writer = journal_writer(kafka_server, kafka_prefix) - writer.write_additions("snapshot", snapshots) + journal_writer.write_additions("snapshot", snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) JournalChecker( db=scrubber_db, config_id=config_id, - journal_client_config=journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ), + journal_client_config=journal_client_config, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) @@ -136,10 +102,9 @@ def test_corrupt_snapshot( def test_corrupt_snapshots( scrubber_db, - kafka_server, - kafka_prefix, - kafka_consumer_group, datastore, + journal_writer, + journal_client_config, ): config_id = scrubber_db.config_add( name="cfg_snapshot", @@ -152,15 +117,12 @@ def test_corrupt_snapshots( for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) - writer = journal_writer(kafka_server, kafka_prefix) - writer.write_additions("snapshot", snapshots) + journal_writer.write_additions("snapshot", snapshots) JournalChecker( db=scrubber_db, config_id=config_id, - journal_client_config=journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ), + journal_client_config=journal_client_config, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -176,10 +138,10 @@ def test_corrupt_snapshots( def test_duplicate_directory_entries( scrubber_db, - kafka_server, - kafka_prefix, - kafka_consumer_group, datastore, + journal_writer, + kafka_prefix, + journal_client_config, ): config_id = scrubber_db.config_add( name="cfg_directory", @@ -213,23 +175,22 @@ def test_duplicate_directory_entries( + b"0 filename\x00" + b"\x02" * 20 ) - dupe_directory = { + dup_directory = { "id": hashlib.sha1(raw_manifest).digest(), "entries": corrupt_directory["entries"], "raw_manifest": raw_manifest, } - writer = journal_writer(kafka_server, kafka_prefix) - writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict()) - writer.send(f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory) - writer.send(f"{kafka_prefix}.directory", dupe_directory["id"], dupe_directory) + journal_writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict()) + journal_writer.send( + f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory + ) + journal_writer.send(f"{kafka_prefix}.directory", dup_directory["id"], dup_directory) JournalChecker( db=scrubber_db, config_id=config_id, - journal_client_config=journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ), + journal_client_config=journal_client_config, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -238,17 +199,15 @@ def test_duplicate_directory_entries( swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:dir:0000000000000000000000000000000000000000", - f"swh:1:dir:{dupe_directory['id'].hex()}", + f"swh:1:dir:{dup_directory['id'].hex()}", ] } def test_check_references_raises( scrubber_db, - kafka_server, - kafka_prefix, - kafka_consumer_group, datastore, + journal_client_config, ): config_id = scrubber_db.config_add( name="cfg_snapshot", @@ -257,12 +216,10 @@ def test_check_references_raises( nb_partitions=1, check_references=True, ) - journal_config = journal_client_config( - kafka_server, kafka_prefix, kafka_consumer_group - ) + with pytest.raises(ValueError): JournalChecker( db=scrubber_db, config_id=config_id, - journal_client_config=journal_config, + journal_client_config=journal_client_config, )