Skip to content
Snippets Groups Projects
Commit cce4e144 authored by Antoine Lambert's avatar Antoine Lambert
Browse files

cli: Add support for scrubbbing an object storage

Enable to configure and trigger the scrubbing of an object storage with
swh-scrubber CLI, either using partitions of contents provided by a
storage or by consuming the content kafka topic from a SWH journal
(in that case, the --use-journal flag must be provided to "swh check run"
command).

Related to #4694.
parent 68d754ae
No related branches found
No related tags found
No related merge requests found
......@@ -112,7 +112,7 @@ def scrubber_check_cli_group(ctx):
@scrubber_check_cli_group.command(name="init")
@click.argument("backend", type=click.Choice(["storage", "journal"]))
@click.argument("backend", type=click.Choice(["storage", "journal", "objstorage"]))
@click.option(
"--object-type",
type=click.Choice(
......@@ -123,6 +123,7 @@ def scrubber_check_cli_group(ctx):
"revision",
"release",
"directory",
"content",
# TODO:
# "raw_extrinsic_metadata",
# "extid",
......@@ -148,11 +149,11 @@ def scrubber_check_init(
A checker configuration configuration consists simply in a set of:
- backend: the datastore type being scrubbed (storage or journal),
- backend: the datastore type being scrubbed (storage, objstorage or journal),
- object-type: the type of object being checked,
- nb-pertitions: the number of partitions the hash space is divided
- nb-partitions: the number of partitions the hash space is divided
in; must be a power of 2,
- name: an unique name for easier reference,
......@@ -197,6 +198,23 @@ def scrubber_check_init(
datastore = get_journal_datastore(journal_cfg=conf["journal"])
db.datastore_get_or_add(datastore)
nb_partitions = 1
elif backend == "objstorage":
if check_references is None:
check_references = True
if object_type != "content":
raise click.ClickException(
"Object storage scrubber can only check content objects, "
f"not {object_type} ones."
)
if "objstorage" not in conf:
raise click.ClickException(
"You must have an object storage configured in your config file."
)
from .objstorage_checker import get_objstorage_datastore
datastore = get_objstorage_datastore(objstorage_config=conf["objstorage"])
else:
raise click.ClickException(f"Backend type {backend} is not supported")
......@@ -455,12 +473,23 @@ def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
default=None,
help="Config ID (is config name is not given as argument)",
)
@click.option(
"--use-journal",
is_flag=True,
default=False,
help=(
"Flag only relevant for running an object storage scrubber, "
"if set content ids are consumed from a kafka topic of SWH journal "
"instead of getting them from a storage"
),
)
@click.option("--limit", default=0, type=int)
@click.pass_context
def scrubber_check_run(
ctx,
name: Optional[str],
config_id: Optional[int],
use_journal: bool,
limit: int,
):
"""Run the scrubber checker configured as `name` and reports corrupt
......@@ -505,6 +534,36 @@ def scrubber_check_run(
config_id=config_id,
limit=limit,
)
elif datastore.package == "objstorage":
if not use_journal and "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
if use_journal and "journal" not in conf:
ctx.fail("You must have a journal configured in your config file.")
if "objstorage" not in conf:
ctx.fail("You must have an object storage configured in your config file.")
from swh.objstorage.factory import get_objstorage
if use_journal:
from .objstorage_checker import ObjectStorageCheckerFromJournal
checker = ObjectStorageCheckerFromJournal(
db=db,
journal_client_config=conf["journal"],
objstorage=get_objstorage(**conf["objstorage"]),
config_id=config_id,
)
else:
from swh.storage import get_storage
from .objstorage_checker import ObjectStorageCheckerFromStoragePartition
checker = ObjectStorageCheckerFromStoragePartition(
db=db,
storage=get_storage(**conf["storage"]),
objstorage=get_objstorage(**conf["objstorage"]),
config_id=config_id,
limit=limit,
)
elif datastore.package == "journal":
if "journal" not in conf:
ctx.fail("You must have a journal configured in your config file.")
......
......@@ -19,6 +19,7 @@ def invoke(
scrubber_db,
args,
storage=None,
objstorage=None,
kafka_server=None,
kafka_prefix=None,
kafka_consumer_group=None,
......@@ -36,6 +37,8 @@ def invoke(
"db": db.conn.dsn,
"objstorage": {"cls": "memory"},
}
if objstorage:
config["objstorage"] = {"cls": "memory"}
assert (
(kafka_server is None)
......@@ -113,7 +116,7 @@ def test_help_check(mocker, scrubber_db, swh_storage):
assert command in commands
def test_check_init(mocker, scrubber_db, swh_storage):
def test_check_init_storage(mocker, scrubber_db, swh_storage):
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(
scrubber_db,
......@@ -205,6 +208,75 @@ def test_check_init_storage_flags(mocker, scrubber_db, swh_storage):
assert cfg_entry.check_references is True
def test_check_init_objstorage(mocker, scrubber_db, swh_storage, swh_objstorage):
config_name = "cfg1"
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(
scrubber_db,
[
"check",
"init",
"objstorage",
"--object-type",
"content",
"--nb-partitions",
"4",
"--name",
config_name,
],
storage=swh_storage,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
msg = f"Created configuration {config_name} [1] for checking content in memory objstorage"
assert result.output.strip() == msg
cfg_entry = scrubber_db.config_get(scrubber_db.config_get_by_name(config_name))
assert cfg_entry.check_hashes is True
assert cfg_entry.check_references is True
# error: config name already exists
result = invoke(
scrubber_db,
[
"check",
"init",
"objstorage",
"--object-type",
"content",
"--nb-partitions",
"8",
"--name",
config_name,
],
storage=swh_storage,
objstorage=swh_objstorage,
)
assert result.exit_code == 1, result.output
msg = f"Error: Configuration {config_name} already exists"
assert result.output.strip() == msg
# error: missing objstorage config
result = invoke(
scrubber_db,
[
"check",
"init",
"objstorage",
"--object-type",
"content",
"--nb-partitions",
"8",
"--name",
config_name,
],
storage=swh_storage,
)
assert result.exit_code == 1, result.output
msg = "Error: You must have an object storage configured in your config file."
assert result.output.strip() == msg
def test_check_init_journal_flags(
mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
):
......@@ -288,6 +360,136 @@ def test_check_run_storage(mocker, scrubber_db, swh_storage):
assert result.output == ""
def test_check_run_objstorage_partition(
mocker, scrubber_db, swh_storage, swh_objstorage
):
config_name = "cfg1"
objstorage_checker = MagicMock()
ObjectStorageCheckerFromStoragePartition = mocker.patch(
"swh.scrubber.objstorage_checker.ObjectStorageCheckerFromStoragePartition",
return_value=objstorage_checker,
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
)
result = invoke(
scrubber_db,
[
"check",
"init",
"objstorage",
"--object-type",
"content",
"--nb-partitions",
"4",
"--name",
config_name,
],
storage=swh_storage,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
msg = f"Created configuration {config_name} [1] for checking content in memory objstorage"
assert result.output.strip() == msg
result = invoke(
scrubber_db,
["check", "run", config_name],
storage=swh_storage,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
assert result.output == ""
get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn)
ObjectStorageCheckerFromStoragePartition.assert_called_once_with(
db=scrubber_db,
config_id=1,
storage=ObjectStorageCheckerFromStoragePartition.mock_calls[0][2]["storage"],
objstorage=ObjectStorageCheckerFromStoragePartition.mock_calls[0][2][
"objstorage"
],
limit=0,
)
assert objstorage_checker.method_calls == [call.run()]
# using the config id instead of the config name
result = invoke(
scrubber_db,
["check", "run", "--config-id", "1"],
storage=swh_storage,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
assert result.output == ""
def test_check_run_objstorage_journal(
mocker,
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
swh_objstorage,
):
config_name = "cfg1"
journal_checker = MagicMock()
ObjectStorageCheckerFromJournal = mocker.patch(
"swh.scrubber.objstorage_checker.ObjectStorageCheckerFromJournal",
return_value=journal_checker,
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
)
result = invoke(
scrubber_db,
[
"check",
"init",
"objstorage",
"--object-type",
"content",
"--name",
config_name,
],
kafka_server=kafka_server,
kafka_prefix=kafka_prefix,
kafka_consumer_group=kafka_consumer_group,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
msg = f"Created configuration {config_name} [1] for checking content in memory objstorage"
assert result.output.strip() == msg
result = invoke(
scrubber_db,
["check", "run", "--use-journal", config_name],
kafka_server=kafka_server,
kafka_prefix=kafka_prefix,
kafka_consumer_group=kafka_consumer_group,
objstorage=swh_objstorage,
)
assert result.exit_code == 0, result.output
assert result.output == ""
assert get_scrubber_db.call_count == 2
get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn)
ObjectStorageCheckerFromJournal.assert_called_once_with(
db=scrubber_db,
journal_client_config={
"brokers": kafka_server,
"cls": "kafka",
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
"on_eof": "stop",
},
config_id=1,
objstorage=ObjectStorageCheckerFromJournal.mock_calls[0][2]["objstorage"],
)
assert journal_checker.method_calls == [call.run()]
def test_check_run_journal(
mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment