From 13181d59ccad649c97865853f5d3321b5bccd78c Mon Sep 17 00:00:00 2001
From: David Douard <david.douard@sdfa3.org>
Date: Wed, 13 Mar 2024 15:40:24 +0100
Subject: [PATCH] Replace `swh check journal|storage` by a single `swh check
 run` command

Deprecate the former ones.
---
 swh/scrubber/cli.py            | 111 +++++++++++++--
 swh/scrubber/interface.py      |  29 ++++
 swh/scrubber/tests/test_cli.py | 240 ++++++++++++++++++++++++---------
 3 files changed, 307 insertions(+), 73 deletions(-)
 create mode 100644 swh/scrubber/interface.py

diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index 78b4568..844cdb9 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -100,11 +100,7 @@ def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
         )
         conf["scrubber"] = conf.pop("scrubber_db")
 
-    if "scrubber" not in conf:
-        click.echo(
-            "WARNING: You must have a scrubber configured in your config file.\n"
-        )
-    else:
+    if "scrubber" in conf:
         ctx.obj["db"] = get_scrubber_db(**conf["scrubber"])
 
 
@@ -173,6 +169,8 @@ def scrubber_check_init(
         )
 
     conf = ctx.obj["config"]
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     db = ctx.obj["db"]
 
     if backend == "storage":
@@ -228,9 +226,10 @@ def scrubber_check_list(
 ):
     """List the know configurations"""
     conf = ctx.obj["config"]
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     if "storage" not in conf:
         ctx.fail("You must have a storage configured in your config file.")
-
     db = ctx.obj["db"]
 
     for id_, cfg in db.config_iter():
@@ -279,6 +278,8 @@ def scrubber_check_stalled(
 
     from humanize import naturaldate, naturaldelta
 
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     db = ctx.obj["db"]
     if name and config_id is None:
         config_id = db.config_get_by_name(name)
@@ -335,6 +336,8 @@ def scrubber_check_running(ctx, name: str, config_id: int):
 
     from humanize import naturaldate, naturaldelta
 
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     db = ctx.obj["db"]
     if name and config_id is None:
         config_id = db.config_get_by_name(name)
@@ -388,6 +391,8 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
 
     from humanize import naturaldelta
 
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     db = ctx.obj["db"]
     if name and config_id is None:
         config_id = db.config_get_by_name(name)
@@ -437,7 +442,7 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
             click.echo(f"  from references: {stats['missing_object_reference']}")
 
 
-@scrubber_check_cli_group.command(name="storage")
+@scrubber_check_cli_group.command(name="run")
 @click.argument(
     "name",
     type=str,
@@ -447,14 +452,94 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
 @click.option(
     "--config-id",
     type=int,
+    default=None,
+    help="Config ID (is config name is not given as argument)",
+)
+@click.option("--limit", default=0, type=int)
+@click.pass_context
+def scrubber_check_run(
+    ctx,
+    name: Optional[str],
+    config_id: Optional[int],
+    limit: int,
+):
+    """Run the scrubber checker configured as `name` and reports corrupt
+    objects to the scrubber DB.
+
+    This runs a single thread; parallelism is achieved by running this command
+    multiple times.
+
+    This command references an existing scrubbing configuration (either by name
+    or by id); the configuration holds the object type, number of partitions
+    and the storage configuration this scrubbing session will check on.
+
+    """
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
+    db = ctx.obj["db"]
+    if name and config_id is None:
+        config_id = db.config_get_by_name(name)
+
+    if config_id is None:
+        ctx.fail("A valid configuration name/id must be given.")
+
+    from swh.scrubber.base_checker import BaseChecker
+
+    scrubber_cfg = db.config_get(config_id)
+    datastore = scrubber_cfg.datastore
+    conf = ctx.obj["config"]
+
+    assert config_id is not None
+    checker: BaseChecker
+
+    if datastore.package == "storage":
+        if "storage" not in conf:
+            ctx.fail("You must have a storage configured in your config file.")
+        from swh.storage import get_storage
+
+        from .storage_checker import StorageChecker
+
+        checker = StorageChecker(
+            db=db,
+            storage=get_storage(**conf["storage"]),
+            config_id=config_id,
+            limit=limit,
+        )
+    elif datastore.package == "journal":
+        if "journal" not in conf:
+            ctx.fail("You must have a journal configured in your config file.")
+        from .journal_checker import JournalChecker
+
+        checker = JournalChecker(
+            db=db,
+            journal_client_config=conf["journal"],
+            config_id=config_id,
+        )
+    else:
+        ctx.fail(f"Unsupported scruber package {datastore.package}")
+
+    checker.run()
+
+
+@scrubber_check_cli_group.command(name="storage", deprecated=True, hidden=True)
+@click.argument(
+    "name",
+    type=str,
+    default=None,
+    required=False,  # can be given by config_id instead
+)
+@click.option(
+    "--config-id",
+    type=int,
+    default=None,
     help="Config ID (is config name is not given as argument)",
 )
 @click.option("--limit", default=0, type=int)
 @click.pass_context
 def scrubber_check_storage(
     ctx,
-    name: str,
-    config_id: int,
+    name: Optional[str],
+    config_id: Optional[int],
     limit: int,
 ):
     """Reads a swh-storage instance, and reports corrupt objects to the scrubber DB.
@@ -473,8 +558,10 @@ def scrubber_check_storage(
     check session is stored in the database, so the number of concurrent
     workers can be dynamically adjusted.
 
-    """  # noqa
+    """
     conf = ctx.obj["config"]
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     if "storage" not in conf:
         ctx.fail("You must have a storage configured in your config file.")
     db = ctx.obj["db"]
@@ -506,7 +593,7 @@ def scrubber_check_storage(
     checker.run()
 
 
-@scrubber_check_cli_group.command(name="journal")
+@scrubber_check_cli_group.command(name="journal", deprecated=True, hidden=True)
 @click.argument(
     "name",
     type=str,
@@ -523,6 +610,8 @@ def scrubber_check_journal(ctx, name, config_id) -> None:
     """Reads a complete kafka journal, and reports corrupt objects to
     the scrubber DB."""
     conf = ctx.obj["config"]
+    if "db" not in ctx.obj:
+        ctx.fail("You must have a scrubber configured in your config file.")
     if "journal" not in conf:
         ctx.fail("You must have a journal configured in your config file.")
     db = ctx.obj["db"]
diff --git a/swh/scrubber/interface.py b/swh/scrubber/interface.py
new file mode 100644
index 0000000..8680274
--- /dev/null
+++ b/swh/scrubber/interface.py
@@ -0,0 +1,29 @@
+# Copyright (C) 2024  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from typing_extensions import Protocol, runtime_checkable
+
+from swh.core.statsd import Statsd
+
+from .db import ConfigEntry, Datastore
+
+
+@runtime_checkable
+class CheckerInterface(Protocol):
+    @property
+    def config(self) -> ConfigEntry:
+        ...
+
+    @property
+    def datastore(self) -> Datastore:
+        ...
+
+    @property
+    def statsd(self) -> Statsd:
+        ...
+
+    def run(self) -> None:
+        ...
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index 6ccbfdb..90a6f2b 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -92,24 +92,24 @@ def test_help_check(mocker, scrubber_db, swh_storage):
     # With older click version (e.g. 7.0-1), the text wrapping can be different,
     # resulting in some docstring text included in this command list, so check we find
     # the expected commands instead
-    for command in ["init", "journal", "list", "stalled", "storage"]:
+    for command in ["init", "list", "run", "stalled"]:
         assert command in commands
+    for command in ["storage", "journal"]:
+        assert command not in commands
 
-    # without a config file, --help should still work but with an extra message
+    # without a config file, --help should still work
     result = CliRunner().invoke(
         scrubber_cli_group, ["check", "--help"], catch_exceptions=False
     )
     output = result.output.splitlines(keepends=False)
-    msg = "WARNING: You must have a scrubber configured in your config file."
-    assert output[0] == msg
     msg = "Usage: scrubber check [OPTIONS] COMMAND [ARGS]..."
-    assert output[2] == msg
+    assert output[0] == msg
     assert "Commands:" in output
     commands = [cmd.split()[0] for cmd in output[output.index("Commands:") + 1 :]]
     # With older click version (e.g. 7.0-1), the text wrapping can be different,
     # resulting in some docstring text included in this command list, so check we find
     # the expected commands instead
-    for command in ["init", "journal", "list", "stalled", "storage"]:
+    for command in ["init", "list", "run", "stalled"]:
         assert command in commands
 
 
@@ -233,7 +233,14 @@ def test_check_init_journal_flags(
     assert cfg_entry.check_references is False
 
 
-def test_check_storage(mocker, scrubber_db, swh_storage):
+def test_check_run_ko(mocker, scrubber_db, swh_storage):
+    # using the config id instead of the config name
+    result = invoke(scrubber_db, ["check", "run"], storage=swh_storage)
+    assert result.exit_code == 2, result.output
+    assert "Error: A valid configuration name/id must be given." in result.output
+
+
+def test_check_run_storage(mocker, scrubber_db, swh_storage):
     storage_checker = MagicMock()
     StorageChecker = mocker.patch(
         "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker
@@ -260,7 +267,7 @@ def test_check_storage(mocker, scrubber_db, swh_storage):
     msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage"
     assert result.output.strip() == msg
 
-    result = invoke(scrubber_db, ["check", "storage", "cfg1"], storage=swh_storage)
+    result = invoke(scrubber_db, ["check", "run", "cfg1"], storage=swh_storage)
     assert result.exit_code == 0, result.output
     assert result.output == ""
 
@@ -275,17 +282,68 @@ def test_check_storage(mocker, scrubber_db, swh_storage):
 
     # using the config id instead of the config name
     result = invoke(
-        scrubber_db, ["check", "storage", "--config-id", "1"], storage=swh_storage
+        scrubber_db, ["check", "run", "--config-id", "1"], storage=swh_storage
     )
     assert result.exit_code == 0, result.output
     assert result.output == ""
 
 
-def test_check_storage_ko(mocker, scrubber_db, swh_storage):
-    # using the config id instead of the config name
-    result = invoke(scrubber_db, ["check", "storage"], storage=swh_storage)
-    assert result.exit_code == 1, result.output
-    assert result.output == "Error: A valid configuration name/id must be set\n"
+def test_check_run_journal(
+    mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+    journal_checker = MagicMock()
+    JournalChecker = mocker.patch(
+        "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker
+    )
+    get_scrubber_db = mocker.patch(
+        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+    )
+    result = invoke(
+        scrubber_db,
+        [
+            "check",
+            "init",
+            "journal",
+            "--object-type",
+            "snapshot",
+            "--nb-partitions",
+            "4",
+            "--name",
+            "cfg1",
+        ],
+        kafka_server=kafka_server,
+        kafka_prefix=kafka_prefix,
+        kafka_consumer_group=kafka_consumer_group,
+    )
+    assert result.exit_code == 0, result.output
+    msg = "Created configuration cfg1 [1] for checking snapshot in kafka journal"
+    assert result.output.strip() == msg
+
+    result = invoke(
+        scrubber_db,
+        ["check", "run", "cfg1"],
+        kafka_server=kafka_server,
+        kafka_prefix=kafka_prefix,
+        kafka_consumer_group=kafka_consumer_group,
+    )
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    assert get_scrubber_db.call_count == 2
+    get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn)
+
+    JournalChecker.assert_called_once_with(
+        db=scrubber_db,
+        journal_client_config={
+            "brokers": kafka_server,
+            "cls": "kafka",
+            "group_id": kafka_consumer_group,
+            "prefix": kafka_prefix,
+            "on_eof": "stop",
+        },
+        config_id=1,
+    )
+    assert journal_checker.method_calls == [call.run()]
 
 
 def test_check_list(mocker, scrubber_db, swh_storage):
@@ -642,6 +700,108 @@ Stuck partitions for cfg1 [id=1, type=snapshot]:
     assert next(scrubber_db.checked_partition_iter_next(1)) == 3
 
 
+def test_locate_origins(mocker, scrubber_db, swh_storage, naive_graph_client):
+    origin_locator = MagicMock()
+    OriginLocator = mocker.patch(
+        "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator
+    )
+    get_scrubber_db = mocker.patch(
+        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+    )
+    mocker.patch(
+        "swh.graph.http_client.RemoteGraphClient",
+        return_value=naive_graph_client,
+    )
+
+    result = invoke(scrubber_db, ["locate"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn)
+    OriginLocator.assert_called_once_with(
+        db=scrubber_db,
+        storage=OriginLocator.mock_calls[0][2]["storage"],
+        graph=OriginLocator.mock_calls[0][2]["graph"],
+        start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+        end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+    )
+    assert origin_locator.method_calls == [call.run()]
+
+
+def test_fix_objects(mocker, scrubber_db):
+    fixer = MagicMock()
+    Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer)
+    get_scrubber_db = mocker.patch(
+        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+    )
+    result = invoke(scrubber_db, ["fix"])
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn)
+    Fixer.assert_called_once_with(
+        db=scrubber_db,
+        start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+        end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+    )
+    assert fixer.method_calls == [call.run()]
+
+
+# deprecated commands
+def test_check_storage(mocker, scrubber_db, swh_storage):
+    storage_checker = MagicMock()
+    StorageChecker = mocker.patch(
+        "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker
+    )
+    get_scrubber_db = mocker.patch(
+        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+    )
+    result = invoke(
+        scrubber_db,
+        [
+            "check",
+            "init",
+            "storage",
+            "--object-type",
+            "snapshot",
+            "--nb-partitions",
+            "4",
+            "--name",
+            "cfg1",
+        ],
+        storage=swh_storage,
+    )
+    assert result.exit_code == 0, result.output
+    msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage"
+    assert result.output.strip() == msg
+
+    result = invoke(scrubber_db, ["check", "storage", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    assert (
+        result.output.strip()
+        == "DeprecationWarning: The command 'storage' is deprecated."
+    )
+
+    get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn)
+    StorageChecker.assert_called_once_with(
+        db=scrubber_db,
+        config_id=1,
+        storage=StorageChecker.mock_calls[0][2]["storage"],
+        limit=0,
+    )
+    assert storage_checker.method_calls == [call.run()]
+
+    # using the config id instead of the config name
+    result = invoke(
+        scrubber_db, ["check", "storage", "--config-id", "1"], storage=swh_storage
+    )
+    assert result.exit_code == 0, result.output
+    assert (
+        result.output.strip()
+        == "DeprecationWarning: The command 'storage' is deprecated."
+    )
+
+
 def test_check_journal(
     mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
 ):
@@ -681,7 +841,10 @@ def test_check_journal(
         kafka_consumer_group=kafka_consumer_group,
     )
     assert result.exit_code == 0, result.output
-    assert result.output == ""
+    assert (
+        result.output.strip()
+        == "DeprecationWarning: The command 'journal' is deprecated."
+    )
 
     assert get_scrubber_db.call_count == 2
     get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn)
@@ -698,50 +861,3 @@ def test_check_journal(
         config_id=1,
     )
     assert journal_checker.method_calls == [call.run()]
-
-
-def test_locate_origins(mocker, scrubber_db, swh_storage, naive_graph_client):
-    origin_locator = MagicMock()
-    OriginLocator = mocker.patch(
-        "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator
-    )
-    get_scrubber_db = mocker.patch(
-        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
-    )
-    mocker.patch(
-        "swh.graph.http_client.RemoteGraphClient",
-        return_value=naive_graph_client,
-    )
-
-    result = invoke(scrubber_db, ["locate"], storage=swh_storage)
-    assert result.exit_code == 0, result.output
-    assert result.output == ""
-
-    get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn)
-    OriginLocator.assert_called_once_with(
-        db=scrubber_db,
-        storage=OriginLocator.mock_calls[0][2]["storage"],
-        graph=OriginLocator.mock_calls[0][2]["graph"],
-        start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
-        end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
-    )
-    assert origin_locator.method_calls == [call.run()]
-
-
-def test_fix_objects(mocker, scrubber_db):
-    fixer = MagicMock()
-    Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer)
-    get_scrubber_db = mocker.patch(
-        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
-    )
-    result = invoke(scrubber_db, ["fix"])
-    assert result.exit_code == 0, result.output
-    assert result.output == ""
-
-    get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn)
-    Fixer.assert_called_once_with(
-        db=scrubber_db,
-        start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
-        end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
-    )
-    assert fixer.method_calls == [call.run()]
-- 
GitLab