Skip to content
Snippets Groups Projects
Commit 46fdf4c6 authored by vlorentz's avatar vlorentz Committed by vlorentz
Browse files

storage_checker: Add more complete typing

parent 229c7f4f
Branches cassandra
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ import dataclasses
import datetime
import json
import logging
from typing import Iterable, Optional, Tuple, Union
from typing import Any, Iterable, NoReturn, Optional, Tuple, Union
import psycopg2
import tenacity
......@@ -31,7 +31,7 @@ from swh.model.model import (
from swh.storage.algos.directory import directory_get_many
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.cassandra.storage import CassandraStorage
from swh.storage.interface import StorageInterface
from swh.storage.interface import PagedResult, StorageInterface
from swh.storage.postgresql.storage import Storage as PostgresqlStorage
from .db import Datastore, ScrubberDb
......@@ -41,6 +41,16 @@ logger = logging.getLogger(__name__)
ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content]
def assert_never(value: NoReturn, msg) -> NoReturn:
"""mypy makes sure this function is never called, through exhaustive checking
of ``value`` in the parent function.
See https://mypy.readthedocs.io/en/latest/literal_types.html#exhaustive-checks
for details.
"""
assert False, msg
@contextlib.contextmanager
def postgresql_storage_db(storage):
db = storage.get_db()
......@@ -204,27 +214,41 @@ class StorageChecker:
) -> None:
page_token = None
while True:
if object_type in (swhids.ObjectType.RELEASE, swhids.ObjectType.REVISION):
method = getattr(self.storage, f"{self.object_type}_get_partition")
page = method(partition_id, self.nb_partitions, page_token=page_token)
page: PagedResult[Any]
objects: Iterable[ScrubbableObject]
if object_type is swhids.ObjectType.REVISION:
page = self.storage.revision_get_partition(
partition_id, self.nb_partitions, page_token=page_token
)
objects = page.results
elif object_type is swhids.ObjectType.RELEASE:
page = self.storage.release_get_partition(
partition_id, self.nb_partitions, page_token=page_token
)
objects = page.results
elif object_type == swhids.ObjectType.DIRECTORY:
elif object_type is swhids.ObjectType.DIRECTORY:
page = self.storage.directory_get_id_partition(
partition_id, self.nb_partitions, page_token=page_token
)
directory_ids = page.results
objects = list(directory_get_many(self.storage, directory_ids))
elif object_type == swhids.ObjectType.SNAPSHOT:
objects = []
for dir_ in directory_get_many(self.storage, directory_ids):
assert dir_ is not None, directory_ids
objects.append(dir_)
elif object_type is swhids.ObjectType.SNAPSHOT:
page = self.storage.snapshot_get_id_partition(
partition_id, self.nb_partitions, page_token=page_token
)
snapshot_ids = page.results
objects = [
snapshot_get_all_branches(self.storage, snapshot_id)
for snapshot_id in snapshot_ids
]
objects = []
for snapshot_id in snapshot_ids:
snp = snapshot_get_all_branches(self.storage, snapshot_id)
assert snp is not None
objects.append(snp)
elif object_type is swhids.ObjectType.CONTENT:
assert False, "storage_checker does not support content objects yet"
else:
assert False, f"Unexpected object type: {object_type}"
assert_never(object_type, f"Unexpected object type: {object_type}")
with self.statsd().timed(
"batch_duration_seconds", tags={"operation": "check_hashes"}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment