From d8c04aaa6ca8732a47c324f868ebeec0833e8c45 Mon Sep 17 00:00:00 2001
From: Antoine Lambert <anlambert@softwareheritage.org>
Date: Thu, 11 Apr 2024 12:04:19 +0200
Subject: [PATCH] tests: Wrap journal writer and client config in pytest
 fixtures

---
 swh/scrubber/tests/conftest.py           |  27 +++++-
 swh/scrubber/tests/test_journal_kafka.py | 113 +++++++----------------
 2 files changed, 61 insertions(+), 79 deletions(-)

diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py
index 56bf2c8..5af6bcc 100644
--- a/swh/scrubber/tests/conftest.py
+++ b/swh/scrubber/tests/conftest.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2022  The Software Heritage developers
+# Copyright (C) 2022-2024  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -12,6 +12,7 @@ from pytest_postgresql import factories
 
 from swh.core.db.db_utils import initialize_database_for_module
 from swh.journal.serializers import value_to_kafka
+from swh.journal.writer import get_journal_writer
 from swh.model.hashutil import hash_to_bytes
 from swh.model.model import Directory, DirectoryEntry
 from swh.model.swhids import ObjectType
@@ -110,3 +111,27 @@ def corrupt_object(scrubber_db, config_entry):
         first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc),
         object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()),
     )
+
+
+@pytest.fixture
+def journal_client_config(
+    kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
+):
+    return dict(
+        cls="kafka",
+        brokers=kafka_server,
+        group_id=kafka_consumer_group,
+        prefix=kafka_prefix,
+        on_eof="stop",
+    )
+
+
+@pytest.fixture
+def journal_writer(kafka_server: str, kafka_prefix: str):
+    return get_journal_writer(
+        cls="kafka",
+        brokers=[kafka_server],
+        client_id="kafka_writer",
+        prefix=kafka_prefix,
+        anonymize=False,
+    )
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
index 6f5096f..f6f7a61 100644
--- a/swh/scrubber/tests/test_journal_kafka.py
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -10,7 +10,6 @@ import attr
 import pytest
 
 from swh.journal.serializers import kafka_to_value
-from swh.journal.writer import get_journal_writer
 from swh.model import model, swhids
 from swh.model.swhids import ObjectType
 from swh.model.tests import swh_model_data
@@ -18,55 +17,26 @@ from swh.scrubber.db import Datastore
 from swh.scrubber.journal_checker import JournalChecker, get_datastore
 
 
-def journal_client_config(
-    kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
-):
-    return dict(
-        cls="kafka",
-        brokers=kafka_server,
-        group_id=kafka_consumer_group,
-        prefix=kafka_prefix,
-        on_eof="stop",
-    )
-
-
-def journal_writer(kafka_server: str, kafka_prefix: str):
-    return get_journal_writer(
-        cls="kafka",
-        brokers=[kafka_server],
-        client_id="kafka_writer",
-        prefix=kafka_prefix,
-        anonymize=False,
-    )
-
-
 @pytest.fixture
-def datastore(
-    kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
-) -> Datastore:
-    journal_config = journal_client_config(
-        kafka_server, kafka_prefix, kafka_consumer_group
-    )
-    datastore = get_datastore(journal_config)
-    return datastore
+def datastore(journal_client_config) -> Datastore:
+    return get_datastore(journal_client_config)
 
 
 def test_no_corruption(
-    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
+    scrubber_db,
+    datastore,
+    journal_writer,
+    journal_client_config,
 ):
-    writer = journal_writer(kafka_server, kafka_prefix)
-    writer.write_additions("directory", swh_model_data.DIRECTORIES)
-    writer.write_additions("revision", swh_model_data.REVISIONS)
-    writer.write_additions("release", swh_model_data.RELEASES)
-    writer.write_additions("snapshot", swh_model_data.SNAPSHOTS)
+    journal_writer.write_additions("directory", swh_model_data.DIRECTORIES)
+    journal_writer.write_additions("revision", swh_model_data.REVISIONS)
+    journal_writer.write_additions("release", swh_model_data.RELEASES)
+    journal_writer.write_additions("snapshot", swh_model_data.SNAPSHOTS)
 
-    journal_cfg = journal_client_config(
-        kafka_server, kafka_prefix, kafka_consumer_group
-    )
-    gid = journal_cfg["group_id"] + "_"
+    gid = journal_client_config["group_id"] + "_"
 
     for object_type in ("directory", "revision", "release", "snapshot"):
-        journal_cfg["group_id"] = gid + object_type
+        journal_client_config["group_id"] = gid + object_type
         config_id = scrubber_db.config_add(
             name=f"cfg_{object_type}",
             datastore=datastore,
@@ -77,7 +47,7 @@ def test_no_corruption(
         jc = JournalChecker(
             db=scrubber_db,
             config_id=config_id,
-            journal_client_config=journal_cfg,
+            journal_client_config=journal_client_config,
         )
         jc.run()
         jc.journal_client.close()
@@ -88,10 +58,9 @@ def test_no_corruption(
 @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
 def test_corrupt_snapshot(
     scrubber_db,
-    kafka_server,
-    kafka_prefix,
-    kafka_consumer_group,
     datastore,
+    journal_writer,
+    journal_client_config,
     corrupt_idx,
 ):
     config_id = scrubber_db.config_add(
@@ -104,16 +73,13 @@ def test_corrupt_snapshot(
     snapshots = list(swh_model_data.SNAPSHOTS)
     snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20)
 
-    writer = journal_writer(kafka_server, kafka_prefix)
-    writer.write_additions("snapshot", snapshots)
+    journal_writer.write_additions("snapshot", snapshots)
 
     before_date = datetime.datetime.now(tz=datetime.timezone.utc)
     JournalChecker(
         db=scrubber_db,
         config_id=config_id,
-        journal_client_config=journal_client_config(
-            kafka_server, kafka_prefix, kafka_consumer_group
-        ),
+        journal_client_config=journal_client_config,
     ).run()
     after_date = datetime.datetime.now(tz=datetime.timezone.utc)
 
@@ -136,10 +102,9 @@ def test_corrupt_snapshot(
 
 def test_corrupt_snapshots(
     scrubber_db,
-    kafka_server,
-    kafka_prefix,
-    kafka_consumer_group,
     datastore,
+    journal_writer,
+    journal_client_config,
 ):
     config_id = scrubber_db.config_add(
         name="cfg_snapshot",
@@ -152,15 +117,12 @@ def test_corrupt_snapshots(
     for i in (0, 1):
         snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20)
 
-    writer = journal_writer(kafka_server, kafka_prefix)
-    writer.write_additions("snapshot", snapshots)
+    journal_writer.write_additions("snapshot", snapshots)
 
     JournalChecker(
         db=scrubber_db,
         config_id=config_id,
-        journal_client_config=journal_client_config(
-            kafka_server, kafka_prefix, kafka_consumer_group
-        ),
+        journal_client_config=journal_client_config,
     ).run()
 
     corrupt_objects = list(scrubber_db.corrupt_object_iter())
@@ -176,10 +138,10 @@ def test_corrupt_snapshots(
 
 def test_duplicate_directory_entries(
     scrubber_db,
-    kafka_server,
-    kafka_prefix,
-    kafka_consumer_group,
     datastore,
+    journal_writer,
+    kafka_prefix,
+    journal_client_config,
 ):
     config_id = scrubber_db.config_add(
         name="cfg_directory",
@@ -213,23 +175,22 @@ def test_duplicate_directory_entries(
         + b"0 filename\x00"
         + b"\x02" * 20
     )
-    dupe_directory = {
+    dup_directory = {
         "id": hashlib.sha1(raw_manifest).digest(),
         "entries": corrupt_directory["entries"],
         "raw_manifest": raw_manifest,
     }
 
-    writer = journal_writer(kafka_server, kafka_prefix)
-    writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict())
-    writer.send(f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory)
-    writer.send(f"{kafka_prefix}.directory", dupe_directory["id"], dupe_directory)
+    journal_writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict())
+    journal_writer.send(
+        f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory
+    )
+    journal_writer.send(f"{kafka_prefix}.directory", dup_directory["id"], dup_directory)
 
     JournalChecker(
         db=scrubber_db,
         config_id=config_id,
-        journal_client_config=journal_client_config(
-            kafka_server, kafka_prefix, kafka_consumer_group
-        ),
+        journal_client_config=journal_client_config,
     ).run()
 
     corrupt_objects = list(scrubber_db.corrupt_object_iter())
@@ -238,17 +199,15 @@ def test_duplicate_directory_entries(
         swhids.CoreSWHID.from_string(swhid)
         for swhid in [
             "swh:1:dir:0000000000000000000000000000000000000000",
-            f"swh:1:dir:{dupe_directory['id'].hex()}",
+            f"swh:1:dir:{dup_directory['id'].hex()}",
         ]
     }
 
 
 def test_check_references_raises(
     scrubber_db,
-    kafka_server,
-    kafka_prefix,
-    kafka_consumer_group,
     datastore,
+    journal_client_config,
 ):
     config_id = scrubber_db.config_add(
         name="cfg_snapshot",
@@ -257,12 +216,10 @@ def test_check_references_raises(
         nb_partitions=1,
         check_references=True,
     )
-    journal_config = journal_client_config(
-        kafka_server, kafka_prefix, kafka_consumer_group
-    )
+
     with pytest.raises(ValueError):
         JournalChecker(
             db=scrubber_db,
             config_id=config_id,
-            journal_client_config=journal_config,
+            journal_client_config=journal_client_config,
         )
-- 
GitLab