diff --git a/swh/scrubber/base_checker.py b/swh/scrubber/base_checker.py
index 5cf7fc39093b79446d62a2226915ea430fd1d543..3e7c36f81b5511d0a9e0ad136f80cf2b4ea73b6a 100644
--- a/swh/scrubber/base_checker.py
+++ b/swh/scrubber/base_checker.py
@@ -33,6 +33,12 @@ class BaseChecker(ABC):
 
         self._config: Optional[ConfigEntry] = None
         self._statsd: Optional[Statsd] = None
+        self.statsd_constant_tags = {
+            "object_type": self.object_type.name.lower(),
+            "datastore_package": self.datastore.package,
+            "datastore_cls": self.datastore.cls,
+            "datastore_instance": self.datastore.instance,
+        }
 
     @property
     def config(self) -> ConfigEntry:
@@ -59,6 +65,11 @@ class BaseChecker(ABC):
             )
         return self._statsd
 
+    @property
+    def object_type(self) -> swhids.ObjectType:
+        """Returns the type of object being checked."""
+        return self.config.object_type
+
     @property
     def check_hashes(self) -> bool:
         return self.config.check_hashes
@@ -84,17 +95,7 @@ class BasePartitionChecker(BaseChecker):
     ):
         super().__init__(db=db, config_id=config_id)
         self.limit = limit
-        self.statsd_constant_tags = {
-            "object_type": self.object_type,
-            "nb_partitions": self.nb_partitions,
-            "datastore_package": self.datastore.package,
-            "datastore_cls": self.datastore.cls,
-        }
-
-    @property
-    def object_type(self) -> swhids.ObjectType:
-        """Returns the type of object being checked."""
-        return self.config.object_type
+        self.statsd_constant_tags["nb_partitions"] = self.nb_partitions
 
     @property
     def nb_partitions(self) -> int:
diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py
index dae1aae2c2c6bedc3c058bc53b8b70cbeb836cb0..3ee04dc314b93f8e5882ccfdc9812552fc878941 100644
--- a/swh/scrubber/journal_checker.py
+++ b/swh/scrubber/journal_checker.py
@@ -7,32 +7,41 @@
 
 import json
 import logging
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
 
 import attr
 
 from swh.journal.client import get_journal_client
 from swh.journal.serializers import kafka_to_value
 from swh.model import model
+from swh.objstorage.factory import get_objstorage
 
 from .base_checker import BaseChecker
 from .db import Datastore, ScrubberDb
+from .objstorage_checker import check_content
 
 logger = logging.getLogger(__name__)
 
 
-def get_datastore(journal_cfg) -> Datastore:
+def get_datastore(
+    journal_cfg: Dict[str, Any], objstorage_cfg: Optional[Dict[str, Any]] = None
+) -> Datastore:
     if journal_cfg.get("cls") == "kafka":
+        package = "journal"
+        cls = "kafka"
+        instance = {
+            "brokers": journal_cfg["brokers"],
+            "group_id": journal_cfg["group_id"],
+            "prefix": journal_cfg["prefix"],
+        }
+        if objstorage_cfg is not None:
+            # objstorage integrity is checked by reading journal content topic in that case
+            instance = {"journal": instance, "objstorage": objstorage_cfg}
+            cls = objstorage_cfg.get("cls", "")
         datastore = Datastore(
-            package="journal",
-            cls="kafka",
-            instance=json.dumps(
-                {
-                    "brokers": journal_cfg["brokers"],
-                    "group_id": journal_cfg["group_id"],
-                    "prefix": journal_cfg["prefix"],
-                }
-            ),
+            package=package,
+            cls=cls,
+            instance=json.dumps(instance),
         )
     else:
         raise NotImplementedError(
@@ -46,15 +55,17 @@ class JournalChecker(BaseChecker):
     reports errors in a separate database."""
 
     def __init__(
-        self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any]
+        self,
+        db: ScrubberDb,
+        config_id: int,
+        journal_client_config: Dict[str, Any],
+        objstorage_config: Optional[Dict[str, Any]] = None,
     ):
         super().__init__(db=db, config_id=config_id)
-        self.statsd_constant_tags = {
-            "datastore_package": self.datastore.package,
-            "datastore_cls": self.datastore.cls,
-        }
 
-        if self.config.check_references:
+        object_type = self.object_type.name.lower()
+
+        if self.config.check_references and object_type != "content":
             raise ValueError(
                 "The journal checker cannot check for references, please set "
                 "the 'check_references' to False in the config entry %s.",
@@ -66,15 +77,23 @@ class JournalChecker(BaseChecker):
                 "The journal_client configuration entry should not define the "
                 "object_types field; this is handled by the scrubber configuration entry"
             )
-        self.journal_client_config["object_types"] = [
-            self.config.object_type.name.lower()
-        ]
+        self.journal_client_config["object_types"] = [object_type]
         self.journal_client = get_journal_client(
             **self.journal_client_config,
             # Remove default deserializer; so process_kafka_values() gets the message
             # verbatim so it can archive it with as few modifications a possible.
             value_deserializer=lambda obj_type, msg: msg,
         )
+        self.objstorage = (
+            get_objstorage(**objstorage_config)
+            if objstorage_config is not None
+            else None
+        )
+        if object_type == "content" and self.objstorage is None:
+            raise ValueError(
+                "The journal checker cannot check content objects if an objstorage "
+                "configuration is not provided."
+            )
 
     def run(self) -> None:
         """Runs a journal client with the given configuration.
@@ -106,9 +125,19 @@ class JournalChecker(BaseChecker):
                             "duplicate_directory_entries_total",
                             tags={"object_type": "directory"},
                         )
+                elif object_type == "content":
+                    content = model.Content.from_dict(kafka_to_value(message))
+                    assert self.objstorage is not None
+                    check_content(
+                        content, self.objstorage, self.db, self.config, self.statsd
+                    )
                 else:
                     object_ = cls.from_dict(kafka_to_value(message))
                     has_duplicate_dir_entries = False
-                real_id = object_.compute_hash()
-                if object_.id != real_id or has_duplicate_dir_entries:
-                    self.db.corrupt_object_add(object_.swhid(), self.config, message)
+
+                if object_type != "content":
+                    real_id = object_.compute_hash()
+                    if object_.id != real_id or has_duplicate_dir_entries:
+                        self.db.corrupt_object_add(
+                            object_.swhid(), self.config, message
+                        )
diff --git a/swh/scrubber/objstorage_checker.py b/swh/scrubber/objstorage_checker.py
index de08a88f1286a25ea565d1ec5c8c90e536a830f0..cbbebdb5133d88bfc90cc7fedba5180a44af2c4b 100644
--- a/swh/scrubber/objstorage_checker.py
+++ b/swh/scrubber/objstorage_checker.py
@@ -7,6 +7,7 @@ import json
 import logging
 from typing import Iterable, Optional
 
+from swh.core.statsd import Statsd
 from swh.journal.serializers import value_to_kafka
 from swh.model.model import Content
 from swh.model.swhids import ObjectType
@@ -15,7 +16,7 @@ from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
 from swh.storage.interface import StorageInterface
 
 from .base_checker import BasePartitionChecker
-from .db import Datastore, ScrubberDb
+from .db import ConfigEntry, Datastore, ScrubberDb
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +30,34 @@ def get_objstorage_datastore(objstorage_config):
     )
 
 
+def check_content(
+    content: Content,
+    objstorage: ObjStorageInterface,
+    db: ScrubberDb,
+    config: ConfigEntry,
+    statsd: Statsd,
+) -> None:
+    content_hashes = objid_from_dict(content.hashes())
+    try:
+        content_bytes = objstorage.get(content_hashes)
+    except ObjNotFoundError:
+        if config.check_references:
+            statsd.increment("missing_object_total")
+            db.missing_object_add(id=content.swhid(), reference_ids={}, config=config)
+    else:
+        if config.check_hashes:
+            recomputed_hashes = objid_from_dict(
+                Content.from_data(content_bytes).hashes()
+            )
+            if content_hashes != recomputed_hashes:
+                statsd.increment("hash_mismatch_total")
+                db.corrupt_object_add(
+                    id=content.swhid(),
+                    config=config,
+                    serialized_object=value_to_kafka(content.to_dict()),
+                )
+
+
 class ObjectStorageChecker(BasePartitionChecker):
     """A checker to detect missing and corrupted contents in an object storage.
 
@@ -79,24 +108,4 @@ class ObjectStorageChecker(BasePartitionChecker):
 
     def check_contents(self, contents: Iterable[Content]) -> None:
         for content in contents:
-            content_hashes = objid_from_dict(content.hashes())
-            try:
-                content_bytes = self.objstorage.get(content_hashes)
-            except ObjNotFoundError:
-                if self.check_references:
-                    self.statsd.increment("missing_object_total")
-                    self.db.missing_object_add(
-                        id=content.swhid(), reference_ids={}, config=self.config
-                    )
-            else:
-                if self.check_hashes:
-                    recomputed_hashes = objid_from_dict(
-                        Content.from_data(content_bytes).hashes()
-                    )
-                    if content_hashes != recomputed_hashes:
-                        self.statsd.increment("hash_mismatch_total")
-                        self.db.corrupt_object_add(
-                            id=content.swhid(),
-                            config=self.config,
-                            serialized_object=value_to_kafka(content.to_dict()),
-                        )
+            check_content(content, self.objstorage, self.db, self.config, self.statsd)
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
index 6f5096faaa514c82bf8f7dec072a3c0a0fba1548..e4faf41e830134b97d30307cc7d44b54c8f46cda 100644
--- a/swh/scrubber/tests/test_journal_kafka.py
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -40,9 +40,21 @@ def journal_writer(kafka_server: str, kafka_prefix: str):
     )
 
 
+@pytest.fixture
+def swh_objstorage_config(tmpdir):
+    return {
+        "cls": "pathslicing",
+        "root": str(tmpdir),
+        "slicing": "0:2/2:4/4:6",
+        "compression": "gzip",
+    }
+
+
 @pytest.fixture
 def datastore(
-    kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
+    kafka_server: str,
+    kafka_prefix: str,
+    kafka_consumer_group: str,
 ) -> Datastore:
     journal_config = journal_client_config(
         kafka_server, kafka_prefix, kafka_consumer_group
@@ -51,8 +63,26 @@ def datastore(
     return datastore
 
 
+@pytest.fixture
+def objstorage_datastore(
+    kafka_server: str,
+    kafka_prefix: str,
+    kafka_consumer_group: str,
+    swh_objstorage_config,
+) -> Datastore:
+    journal_config = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    datastore = get_datastore(journal_config, swh_objstorage_config)
+    return datastore
+
+
 def test_no_corruption(
-    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    datastore,
 ):
     writer = journal_writer(kafka_server, kafka_prefix)
     writer.write_additions("directory", swh_model_data.DIRECTORIES)
@@ -85,6 +115,67 @@ def test_no_corruption(
     assert list(scrubber_db.corrupt_object_iter()) == []
 
 
+def test_contents_check_missing_objstorage_config(
+    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
+):
+    journal_cfg = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    gid = journal_cfg["group_id"] + "_"
+
+    object_type = "content"
+    journal_cfg["group_id"] = gid + object_type
+    config_id = scrubber_db.config_add(
+        name=f"cfg_{object_type}",
+        datastore=datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    with pytest.raises(
+        ValueError, match="journal checker cannot check content objects"
+    ):
+        JournalChecker(
+            db=scrubber_db, config_id=config_id, journal_client_config=journal_cfg
+        )
+
+
+def test_contents_no_corruption(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    swh_objstorage_config,
+):
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", swh_model_data.CONTENTS)
+
+    journal_cfg = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    gid = journal_cfg["group_id"] + "_"
+
+    object_type = "content"
+    journal_cfg["group_id"] = gid + object_type
+    config_id = scrubber_db.config_add(
+        name=f"cfg_{object_type}",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_cfg,
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
+    jc.run()
+    jc.journal_client.close()
+
+    assert list(scrubber_db.corrupt_object_iter()) == []
+
+
 @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
 def test_corrupt_snapshot(
     scrubber_db,
@@ -174,6 +265,107 @@ def test_corrupt_snapshots(
     }
 
 
+@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_corrupt_content(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    corrupt_idx,
+    swh_objstorage_config,
+):
+    config_id = scrubber_db.config_add(
+        name="cfg_content",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    contents = list(swh_model_data.CONTENTS)
+    contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", contents)
+
+    before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch({c.sha1: c.data for c in contents})
+    jc.run()
+    after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 1
+    assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+        "swh:1:cnt:0000000000000000000000000000000000000000"
+    )
+    assert corrupt_objects[0].config.datastore.package == "journal"
+    assert corrupt_objects[0].config.datastore.cls == swh_objstorage_config["cls"]
+    assert '"journal":' in corrupt_objects[0].config.datastore.instance
+    assert '"objstorage":' in corrupt_objects[0].config.datastore.instance
+    assert (
+        before_date - datetime.timedelta(seconds=5)
+        <= corrupt_objects[0].first_occurrence
+        <= after_date + datetime.timedelta(seconds=5)
+    )
+    corrupted_content = contents[corrupt_idx].to_dict()
+    corrupted_content.pop("data")
+    assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
+
+
+@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_missing_content(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    missing_idx,
+    swh_objstorage_config,
+):
+    config_id = scrubber_db.config_add(
+        name="cfg_content",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    contents = list(swh_model_data.CONTENTS)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", contents)
+
+    before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch(
+        {c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
+    )
+    jc.run()
+    after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+    missing_objects = list(scrubber_db.missing_object_iter())
+    assert len(missing_objects) == 1
+    assert missing_objects[0].id == contents[missing_idx].swhid()
+    assert missing_objects[0].config.datastore == objstorage_datastore
+    assert (
+        before_date - datetime.timedelta(seconds=5)
+        <= missing_objects[0].first_occurrence
+        <= after_date + datetime.timedelta(seconds=5)
+    )
+
+
 def test_duplicate_directory_entries(
     scrubber_db,
     kafka_server,