Skip to content
Snippets Groups Projects
Commit d2e5c3df authored by vlorentz's avatar vlorentz
Browse files

cli: Rename config key 'journal_client' to 'journal'

parent a89cd673
Branches mypy-1.0
No related tags found
No related merge requests found
......@@ -42,7 +42,7 @@ def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
cls: memory
# for journal checkers only:
journal_client:
journal:
# see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html
# for the full list of options
sasl.mechanism: SCRAM-SHA-512
......@@ -137,14 +137,14 @@ def scrubber_check_journal(ctx) -> None:
"""Reads a complete kafka journal, and reports corrupt objects to
the scrubber DB."""
conf = ctx.obj["config"]
if "journal_client" not in conf:
ctx.fail("You must have a journal_client configured in your config file.")
if "journal" not in conf:
ctx.fail("You must have a journal configured in your config file.")
from .journal_checker import JournalChecker
checker = JournalChecker(
db=ctx.obj["db"],
journal_client=conf["journal_client"],
journal=conf["journal"],
)
checker.run()
......
......@@ -23,11 +23,11 @@ class JournalChecker:
_datastore = None
def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]):
def __init__(self, db: ScrubberDb, journal: Dict[str, Any]):
self.db = db
self.journal_client_config = journal_client
self.journal_client_config = journal
self.journal_client = get_journal_client(
**journal_client,
**journal,
# Remove default deserializer; so process_kafka_values() gets the message
# verbatim so it can archive it with as few modifications a possible.
value_deserializer=lambda obj_type, msg: msg,
......
......@@ -42,7 +42,7 @@ def invoke(
== (kafka_consumer_group is None)
)
if kafka_server:
config["journal_client"] = dict(
config["journal"] = dict(
cls="kafka",
brokers=kafka_server,
group_id=kafka_consumer_group,
......@@ -106,7 +106,7 @@ def test_check_journal(
get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn)
JournalChecker.assert_called_once_with(
db=scrubber_db,
journal_client={
journal={
"brokers": kafka_server,
"cls": "kafka",
"group_id": kafka_consumer_group,
......
......@@ -44,9 +44,7 @@ def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_g
JournalChecker(
db=scrubber_db,
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group),
).run()
assert list(scrubber_db.corrupt_object_iter()) == []
......@@ -65,9 +63,7 @@ def test_corrupt_snapshot(
before_date = datetime.datetime.now(tz=datetime.timezone.utc)
JournalChecker(
db=scrubber_db,
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group),
).run()
after_date = datetime.datetime.now(tz=datetime.timezone.utc)
......@@ -104,9 +100,7 @@ def test_corrupt_snapshots(
JournalChecker(
db=scrubber_db,
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group),
).run()
corrupt_objects = list(scrubber_db.corrupt_object_iter())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment