diff --git a/swh/scrubber/check_journal.py b/swh/scrubber/check_journal.py
new file mode 100644
index 0000000000000000000000000000000000000000..30c137961e73ae56894df212c4a6c71cd6c854c2
--- /dev/null
+++ b/swh/scrubber/check_journal.py
@@ -0,0 +1,61 @@
+# Copyright (C) 2021-2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Reads all objects in a swh-storage instance and recomputes their checksums."""
+
+import logging
+from typing import Any, Dict, List
+
+from swh.journal.client import get_journal_client
+from swh.journal.serializers import kafka_to_value
+from swh.model import model
+
+from .db import Datastore, ScrubberDb
+
+logger = logging.getLogger(__name__)
+
+
+class JournalChecker:
+    _datastore = None
+
+    def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]):
+        self.db = db
+        self.journal_client_config = journal_client
+        self.journal_client = get_journal_client(
+            **journal_client,
+            # Remove default deserializer; so process_kafka_values() gets the message
+            # verbatim so it can archive it with as few modifications a possible.
+            value_deserializer=lambda obj_type, msg: msg,
+        )
+
+    def datastore_info(self) -> Datastore:
+        if self._datastore is None:
+            config = self.journal_client_config
+            if config["cls"] == "kafka":
+                self._datastore = Datastore(
+                    package="journal",
+                    cls="kafka",
+                    instance=(
+                        f"brokers={config['brokers']!r} prefix={config['prefix']!r}"
+                    ),
+                )
+            else:
+                raise NotImplementedError(
+                    f"StorageChecker(journal_client={self.journal_client_config!r})"
+                    f".datastore()"
+                )
+        return self._datastore
+
+    def check_journal(self):
+        self.journal_client.process(self.process_kafka_messages)
+
+    def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
+        for (object_type, messages) in all_messages.items():
+            cls = getattr(model, object_type.capitalize())
+            for message in messages:
+                object_ = cls.from_dict(kafka_to_value(message))
+                real_id = object_.compute_hash()
+                if object_.id != real_id:
+                    self.db.corrupt_object_add(self.datastore_info(), object_, message)
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index fa58cedeed9f24e03ada9d86b92a99954b30ea3f..f218d26463bdb211cb277fd306b5c9b114d53a7f 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -4,6 +4,7 @@
 # See top-level LICENSE file for more information
 
 import os
+from typing import Optional
 
 import click
 
@@ -20,8 +21,41 @@ from swh.core.cli import swh as swh_cli_group
     help="Configuration file.",
 )
 @click.pass_context
-def scrubber_cli_group(ctx, config_file):
+def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
     """main command group of the datastore scrubber
+
+    Expected config format::
+
+        scrubber_db:
+            cls: local
+            db: "service=..."  # libpq DSN
+
+        # for storage checkers only:
+        storage:
+            cls: postgresql  # cannot be remote, as it needs direct access to the pg DB
+            db": "service=..."  # libpq DSN
+            objstorage:
+                cls: memory
+
+        # for journal checkers only:
+        journal_client:
+            # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html
+            # for the full list of options
+            sasl.mechanism: SCRAM-SHA-512
+            security.protocol: SASL_SSL
+            sasl.username: ...
+            sasl.password: ...
+            group_id: ...
+            privileged: True
+            message.max.bytes: 524288000
+            brokers:
+              - "broker1.journal.softwareheritage.org:9093
+              - "broker2.journal.softwareheritage.org:9093
+              - "broker3.journal.softwareheritage.org:9093
+              - "broker4.journal.softwareheritage.org:9093
+              - "broker5.journal.softwareheritage.org:9093
+            object_types: [directory, revision, snapshot, release]
+            auto_offset_reset: earliest
     """
     from swh.core import config
 
@@ -74,6 +108,7 @@ def scrubber_check_cli_group(ctx):
 @click.option("--end-object", default="f" * 40)
 @click.pass_context
 def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
+    """Reads a postgresql storage, and reports corrupt objects to the scrubber DB."""
     conf = ctx.obj["config"]
     if "storage" not in conf:
         ctx.fail("You must have a storage configured in your config file.")
@@ -91,3 +126,19 @@ def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object:
     )
 
     checker.check_storage()
+
+
+@scrubber_check_cli_group.command(name="journal")
+@click.pass_context
+def scrubber_check_journal(ctx) -> None:
+    """Reads a complete kafka journal, and reports corrupt objects to
+    the scrubber DB."""
+    conf = ctx.obj["config"]
+    if "journal_client" not in conf:
+        ctx.fail("You must have a journal_client configured in your config file.")
+
+    from .check_journal import JournalChecker
+
+    checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
+
+    checker.check_journal()
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index 50889a58977ec87979e57a8a2353c34ea45d160d..29f10ec7f94fd46fa58ab59f5b315612b80915b3 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -3,7 +3,6 @@
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
-import copy
 import tempfile
 from unittest.mock import MagicMock
 
@@ -13,24 +12,41 @@ import yaml
 from swh.scrubber.check_storage import storage_db
 from swh.scrubber.cli import scrubber_cli_group
 
-CLI_CONFIG = {
-    "storage": {
-        "cls": "postgresql",
-        "db": "<replaced at runtime>",
-        "objstorage": {"cls": "memory"},
-    },
-    "scrubber_db": {"cls": "local", "db": "<replaced at runtime>"},
-}
 
-
-def invoke(swh_storage, scrubber_db, args):
+def invoke(
+    scrubber_db,
+    args,
+    storage=None,
+    kafka_server=None,
+    kafka_prefix=None,
+    kafka_consumer_group=None,
+):
     runner = CliRunner()
 
-    config = copy.deepcopy(CLI_CONFIG)
-    with storage_db(swh_storage) as db:
-        config["storage"]["db"] = db.conn.dsn
+    config = {
+        "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn},
+    }
+    if storage:
+        with storage_db(storage) as db:
+            config["storage"] = {
+                "cls": "postgresql",
+                "db": db.conn.dsn,
+                "objstorage": {"cls": "memory"},
+            }
 
-    config["scrubber_db"]["db"] = scrubber_db.conn.dsn
+    assert (
+        (kafka_server is None)
+        == (kafka_prefix is None)
+        == (kafka_consumer_group is None)
+    )
+    if kafka_server:
+        config["journal_client"] = dict(
+            cls="kafka",
+            brokers=kafka_server,
+            group_id=kafka_consumer_group,
+            prefix=kafka_prefix,
+            stop_on_eof=True,
+        )
 
     with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
         yaml.dump(config, config_fd)
@@ -40,7 +56,7 @@ def invoke(swh_storage, scrubber_db, args):
     return result
 
 
-def test_check_storage(swh_storage, mocker, scrubber_db):
+def test_check_storage(mocker, scrubber_db, swh_storage):
     storage_checker = MagicMock()
     StorageChecker = mocker.patch(
         "swh.scrubber.check_storage.StorageChecker", return_value=storage_checker
@@ -49,7 +65,7 @@ def test_check_storage(swh_storage, mocker, scrubber_db):
         "swh.scrubber.get_scrubber_db", return_value=scrubber_db
     )
     result = invoke(
-        swh_storage, scrubber_db, ["check", "storage", "--object-type=snapshot"]
+        scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage
     )
     assert result.exit_code == 0, result.output
     assert result.output == ""
@@ -62,3 +78,36 @@ def test_check_storage(swh_storage, mocker, scrubber_db):
         start_object="0" * 40,
         end_object="f" * 40,
     )
+
+
+def test_check_journal(
+    mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+    journal_checker = MagicMock()
+    JournalChecker = mocker.patch(
+        "swh.scrubber.check_journal.JournalChecker", return_value=journal_checker
+    )
+    get_scrubber_db = mocker.patch(
+        "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+    )
+    result = invoke(
+        scrubber_db,
+        ["check", "journal"],
+        kafka_server=kafka_server,
+        kafka_prefix=kafka_prefix,
+        kafka_consumer_group=kafka_consumer_group,
+    )
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+    JournalChecker.assert_called_once_with(
+        db=scrubber_db,
+        journal_client={
+            "brokers": kafka_server,
+            "cls": "kafka",
+            "group_id": kafka_consumer_group,
+            "prefix": kafka_prefix,
+            "stop_on_eof": True,
+        },
+    )
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
new file mode 100644
index 0000000000000000000000000000000000000000..5f455f20fd51fdd18adb9e868fa875867d4e3d96
--- /dev/null
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2022  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+
+import attr
+import pytest
+
+from swh.journal.serializers import kafka_to_value
+from swh.journal.writer import get_journal_writer
+from swh.model import swhids
+from swh.model.tests import swh_model_data
+from swh.scrubber.check_journal import JournalChecker
+
+
+def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group):
+    return dict(
+        cls="kafka",
+        brokers=kafka_server,
+        group_id=kafka_consumer_group,
+        prefix=kafka_prefix,
+        stop_on_eof=True,
+    )
+
+
+def journal_writer(kafka_server, kafka_prefix):
+    return get_journal_writer(
+        cls="kafka",
+        brokers=[kafka_server],
+        client_id="kafka_writer",
+        prefix=kafka_prefix,
+        anonymize=False,
+    )
+
+
+def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group):
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("directory", swh_model_data.DIRECTORIES)
+    writer.write_additions("revision", swh_model_data.REVISIONS)
+    writer.write_additions("release", swh_model_data.RELEASES)
+    writer.write_additions("snapshot", swh_model_data.SNAPSHOTS)
+
+    JournalChecker(
+        db=scrubber_db,
+        journal_client=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+    ).check_journal()
+
+    assert list(scrubber_db.corrupt_object_iter()) == []
+
+
+@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
+def test_corrupt_snapshot(
+    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx
+):
+    snapshots = list(swh_model_data.SNAPSHOTS)
+    snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("snapshot", snapshots)
+
+    before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+    JournalChecker(
+        db=scrubber_db,
+        journal_client=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+    ).check_journal()
+    after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 1
+    assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+        "swh:1:snp:0000000000000000000000000000000000000000"
+    )
+    assert corrupt_objects[0].datastore.package == "journal"
+    assert corrupt_objects[0].datastore.cls == "kafka"
+    assert (
+        corrupt_objects[0].datastore.instance
+        == f"brokers='{kafka_server}' prefix='{kafka_prefix}'"
+    )
+    assert (
+        before_date - datetime.timedelta(seconds=5)
+        <= corrupt_objects[0].first_occurrence
+        <= after_date + datetime.timedelta(seconds=5)
+    )
+    assert (
+        kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict()
+    )
+
+
+def test_corrupt_snapshots(
+    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
+):
+    snapshots = list(swh_model_data.SNAPSHOTS)
+    for i in (0, 1):
+        snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("snapshot", snapshots)
+
+    JournalChecker(
+        db=scrubber_db,
+        journal_client=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+    ).check_journal()
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 2
+    assert {co.id for co in corrupt_objects} == {
+        swhids.CoreSWHID.from_string(swhid)
+        for swhid in [
+            "swh:1:snp:0000000000000000000000000000000000000000",
+            "swh:1:snp:0101010101010101010101010101010101010101",
+        ]
+    }