diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index 78b4568c6eaf35072619eab996da7912abf7d632..844cdb99a88c3e3b548b3617a96de25f25d9853a 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -100,11 +100,7 @@ def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: ) conf["scrubber"] = conf.pop("scrubber_db") - if "scrubber" not in conf: - click.echo( - "WARNING: You must have a scrubber configured in your config file.\n" - ) - else: + if "scrubber" in conf: ctx.obj["db"] = get_scrubber_db(**conf["scrubber"]) @@ -173,6 +169,8 @@ def scrubber_check_init( ) conf = ctx.obj["config"] + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") db = ctx.obj["db"] if backend == "storage": @@ -228,9 +226,10 @@ def scrubber_check_list( ): """List the know configurations""" conf = ctx.obj["config"] + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") - db = ctx.obj["db"] for id_, cfg in db.config_iter(): @@ -279,6 +278,8 @@ def scrubber_check_stalled( from humanize import naturaldate, naturaldelta + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") db = ctx.obj["db"] if name and config_id is None: config_id = db.config_get_by_name(name) @@ -335,6 +336,8 @@ def scrubber_check_running(ctx, name: str, config_id: int): from humanize import naturaldate, naturaldelta + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") db = ctx.obj["db"] if name and config_id is None: config_id = db.config_get_by_name(name) @@ -388,6 +391,8 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool): from humanize import naturaldelta + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") db = ctx.obj["db"] if name and config_id is None: config_id = db.config_get_by_name(name) @@ -437,7 +442,7 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool): click.echo(f" from references: {stats['missing_object_reference']}") -@scrubber_check_cli_group.command(name="storage") +@scrubber_check_cli_group.command(name="run") @click.argument( "name", type=str, @@ -447,14 +452,94 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool): @click.option( "--config-id", type=int, + default=None, + help="Config ID (is config name is not given as argument)", +) +@click.option("--limit", default=0, type=int) +@click.pass_context +def scrubber_check_run( + ctx, + name: Optional[str], + config_id: Optional[int], + limit: int, +): + """Run the scrubber checker configured as `name` and reports corrupt + objects to the scrubber DB. + + This runs a single thread; parallelism is achieved by running this command + multiple times. + + This command references an existing scrubbing configuration (either by name + or by id); the configuration holds the object type, number of partitions + and the storage configuration this scrubbing session will check on. + + """ + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") + db = ctx.obj["db"] + if name and config_id is None: + config_id = db.config_get_by_name(name) + + if config_id is None: + ctx.fail("A valid configuration name/id must be given.") + + from swh.scrubber.base_checker import BaseChecker + + scrubber_cfg = db.config_get(config_id) + datastore = scrubber_cfg.datastore + conf = ctx.obj["config"] + + assert config_id is not None + checker: BaseChecker + + if datastore.package == "storage": + if "storage" not in conf: + ctx.fail("You must have a storage configured in your config file.") + from swh.storage import get_storage + + from .storage_checker import StorageChecker + + checker = StorageChecker( + db=db, + storage=get_storage(**conf["storage"]), + config_id=config_id, + limit=limit, + ) + elif datastore.package == "journal": + if "journal" not in conf: + ctx.fail("You must have a journal configured in your config file.") + from .journal_checker import JournalChecker + + checker = JournalChecker( + db=db, + journal_client_config=conf["journal"], + config_id=config_id, + ) + else: + ctx.fail(f"Unsupported scruber package {datastore.package}") + + checker.run() + + +@scrubber_check_cli_group.command(name="storage", deprecated=True, hidden=True) +@click.argument( + "name", + type=str, + default=None, + required=False, # can be given by config_id instead +) +@click.option( + "--config-id", + type=int, + default=None, help="Config ID (is config name is not given as argument)", ) @click.option("--limit", default=0, type=int) @click.pass_context def scrubber_check_storage( ctx, - name: str, - config_id: int, + name: Optional[str], + config_id: Optional[int], limit: int, ): """Reads a swh-storage instance, and reports corrupt objects to the scrubber DB. @@ -473,8 +558,10 @@ def scrubber_check_storage( check session is stored in the database, so the number of concurrent workers can be dynamically adjusted. - """ # noqa + """ conf = ctx.obj["config"] + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") db = ctx.obj["db"] @@ -506,7 +593,7 @@ def scrubber_check_storage( checker.run() -@scrubber_check_cli_group.command(name="journal") +@scrubber_check_cli_group.command(name="journal", deprecated=True, hidden=True) @click.argument( "name", type=str, @@ -523,6 +610,8 @@ def scrubber_check_journal(ctx, name, config_id) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] + if "db" not in ctx.obj: + ctx.fail("You must have a scrubber configured in your config file.") if "journal" not in conf: ctx.fail("You must have a journal configured in your config file.") db = ctx.obj["db"] diff --git a/swh/scrubber/interface.py b/swh/scrubber/interface.py new file mode 100644 index 0000000000000000000000000000000000000000..86802743c233f5cbd1ac1f3d77378856e84c957b --- /dev/null +++ b/swh/scrubber/interface.py @@ -0,0 +1,29 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing_extensions import Protocol, runtime_checkable + +from swh.core.statsd import Statsd + +from .db import ConfigEntry, Datastore + + +@runtime_checkable +class CheckerInterface(Protocol): + @property + def config(self) -> ConfigEntry: + ... + + @property + def datastore(self) -> Datastore: + ... + + @property + def statsd(self) -> Statsd: + ... + + def run(self) -> None: + ... diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 6ccbfdbb8ebc7e7e8c3b21d49a17211f5a439f6e..90a6f2b8f3ed5fecddd91593fa4901ba1cc8db23 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -92,24 +92,24 @@ def test_help_check(mocker, scrubber_db, swh_storage): # With older click version (e.g. 7.0-1), the text wrapping can be different, # resulting in some docstring text included in this command list, so check we find # the expected commands instead - for command in ["init", "journal", "list", "stalled", "storage"]: + for command in ["init", "list", "run", "stalled"]: assert command in commands + for command in ["storage", "journal"]: + assert command not in commands - # without a config file, --help should still work but with an extra message + # without a config file, --help should still work result = CliRunner().invoke( scrubber_cli_group, ["check", "--help"], catch_exceptions=False ) output = result.output.splitlines(keepends=False) - msg = "WARNING: You must have a scrubber configured in your config file." - assert output[0] == msg msg = "Usage: scrubber check [OPTIONS] COMMAND [ARGS]..." - assert output[2] == msg + assert output[0] == msg assert "Commands:" in output commands = [cmd.split()[0] for cmd in output[output.index("Commands:") + 1 :]] # With older click version (e.g. 7.0-1), the text wrapping can be different, # resulting in some docstring text included in this command list, so check we find # the expected commands instead - for command in ["init", "journal", "list", "stalled", "storage"]: + for command in ["init", "list", "run", "stalled"]: assert command in commands @@ -233,7 +233,14 @@ def test_check_init_journal_flags( assert cfg_entry.check_references is False -def test_check_storage(mocker, scrubber_db, swh_storage): +def test_check_run_ko(mocker, scrubber_db, swh_storage): + # using the config id instead of the config name + result = invoke(scrubber_db, ["check", "run"], storage=swh_storage) + assert result.exit_code == 2, result.output + assert "Error: A valid configuration name/id must be given." in result.output + + +def test_check_run_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker @@ -260,7 +267,7 @@ def test_check_storage(mocker, scrubber_db, swh_storage): msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage" assert result.output.strip() == msg - result = invoke(scrubber_db, ["check", "storage", "cfg1"], storage=swh_storage) + result = invoke(scrubber_db, ["check", "run", "cfg1"], storage=swh_storage) assert result.exit_code == 0, result.output assert result.output == "" @@ -275,17 +282,68 @@ def test_check_storage(mocker, scrubber_db, swh_storage): # using the config id instead of the config name result = invoke( - scrubber_db, ["check", "storage", "--config-id", "1"], storage=swh_storage + scrubber_db, ["check", "run", "--config-id", "1"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" -def test_check_storage_ko(mocker, scrubber_db, swh_storage): - # using the config id instead of the config name - result = invoke(scrubber_db, ["check", "storage"], storage=swh_storage) - assert result.exit_code == 1, result.output - assert result.output == "Error: A valid configuration name/id must be set\n" +def test_check_run_journal( + mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group +): + journal_checker = MagicMock() + JournalChecker = mocker.patch( + "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke( + scrubber_db, + [ + "check", + "init", + "journal", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + kafka_server=kafka_server, + kafka_prefix=kafka_prefix, + kafka_consumer_group=kafka_consumer_group, + ) + assert result.exit_code == 0, result.output + msg = "Created configuration cfg1 [1] for checking snapshot in kafka journal" + assert result.output.strip() == msg + + result = invoke( + scrubber_db, + ["check", "run", "cfg1"], + kafka_server=kafka_server, + kafka_prefix=kafka_prefix, + kafka_consumer_group=kafka_consumer_group, + ) + assert result.exit_code == 0, result.output + assert result.output == "" + + assert get_scrubber_db.call_count == 2 + get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn) + + JournalChecker.assert_called_once_with( + db=scrubber_db, + journal_client_config={ + "brokers": kafka_server, + "cls": "kafka", + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + "on_eof": "stop", + }, + config_id=1, + ) + assert journal_checker.method_calls == [call.run()] def test_check_list(mocker, scrubber_db, swh_storage): @@ -642,6 +700,108 @@ Stuck partitions for cfg1 [id=1, type=snapshot]: assert next(scrubber_db.checked_partition_iter_next(1)) == 3 +def test_locate_origins(mocker, scrubber_db, swh_storage, naive_graph_client): + origin_locator = MagicMock() + OriginLocator = mocker.patch( + "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + mocker.patch( + "swh.graph.http_client.RemoteGraphClient", + return_value=naive_graph_client, + ) + + result = invoke(scrubber_db, ["locate"], storage=swh_storage) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) + OriginLocator.assert_called_once_with( + db=scrubber_db, + storage=OriginLocator.mock_calls[0][2]["storage"], + graph=OriginLocator.mock_calls[0][2]["graph"], + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert origin_locator.method_calls == [call.run()] + + +def test_fix_objects(mocker, scrubber_db): + fixer = MagicMock() + Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke(scrubber_db, ["fix"]) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) + Fixer.assert_called_once_with( + db=scrubber_db, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert fixer.method_calls == [call.run()] + + +# deprecated commands +def test_check_storage(mocker, scrubber_db, swh_storage): + storage_checker = MagicMock() + StorageChecker = mocker.patch( + "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke( + scrubber_db, + [ + "check", + "init", + "storage", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + storage=swh_storage, + ) + assert result.exit_code == 0, result.output + msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage" + assert result.output.strip() == msg + + result = invoke(scrubber_db, ["check", "storage", "cfg1"], storage=swh_storage) + assert result.exit_code == 0, result.output + assert ( + result.output.strip() + == "DeprecationWarning: The command 'storage' is deprecated." + ) + + get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn) + StorageChecker.assert_called_once_with( + db=scrubber_db, + config_id=1, + storage=StorageChecker.mock_calls[0][2]["storage"], + limit=0, + ) + assert storage_checker.method_calls == [call.run()] + + # using the config id instead of the config name + result = invoke( + scrubber_db, ["check", "storage", "--config-id", "1"], storage=swh_storage + ) + assert result.exit_code == 0, result.output + assert ( + result.output.strip() + == "DeprecationWarning: The command 'storage' is deprecated." + ) + + def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): @@ -681,7 +841,10 @@ def test_check_journal( kafka_consumer_group=kafka_consumer_group, ) assert result.exit_code == 0, result.output - assert result.output == "" + assert ( + result.output.strip() + == "DeprecationWarning: The command 'journal' is deprecated." + ) assert get_scrubber_db.call_count == 2 get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn) @@ -698,50 +861,3 @@ def test_check_journal( config_id=1, ) assert journal_checker.method_calls == [call.run()] - - -def test_locate_origins(mocker, scrubber_db, swh_storage, naive_graph_client): - origin_locator = MagicMock() - OriginLocator = mocker.patch( - "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator - ) - get_scrubber_db = mocker.patch( - "swh.scrubber.get_scrubber_db", return_value=scrubber_db - ) - mocker.patch( - "swh.graph.http_client.RemoteGraphClient", - return_value=naive_graph_client, - ) - - result = invoke(scrubber_db, ["locate"], storage=swh_storage) - assert result.exit_code == 0, result.output - assert result.output == "" - - get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) - OriginLocator.assert_called_once_with( - db=scrubber_db, - storage=OriginLocator.mock_calls[0][2]["storage"], - graph=OriginLocator.mock_calls[0][2]["graph"], - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), - ) - assert origin_locator.method_calls == [call.run()] - - -def test_fix_objects(mocker, scrubber_db): - fixer = MagicMock() - Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer) - get_scrubber_db = mocker.patch( - "swh.scrubber.get_scrubber_db", return_value=scrubber_db - ) - result = invoke(scrubber_db, ["fix"]) - assert result.exit_code == 0, result.output - assert result.output == "" - - get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) - Fixer.assert_called_once_with( - db=scrubber_db, - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), - ) - assert fixer.method_calls == [call.run()]