Skip to content
Snippets Groups Projects
Commit fef8a513 authored by vlorentz's avatar vlorentz
Browse files

db: Add table checked_ranges

It will be used by the storage_checker to 'remember' what ranges
it already checked recently across runs (and crashes), and to
monitor progress.
parent 630001ce
No related branches found
No related tags found
1 merge request!24db: Add table checked_ranges
......@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
from typing import Iterable, Iterator, List, Optional
from typing import Iterable, Iterator, List, Optional, Tuple
import psycopg2
......@@ -60,7 +60,7 @@ class FixedObject:
class ScrubberDb(BaseDb):
current_version = 3
current_version = 4
####################################
# Shared tables
......@@ -98,6 +98,81 @@ class ScrubberDb(BaseDb):
(id_,) = res
return id_
####################################
# Checkpointing/progress tracking
####################################
def checked_range_upsert(
self,
datastore: Datastore,
range_start: CoreSWHID,
range_end: CoreSWHID,
date: datetime.datetime,
) -> None:
"""
Records in the database the given range was last checked at the given date.
"""
datastore_id = self.datastore_get_or_add(datastore)
with self.transaction() as cur:
cur.execute(
"""
INSERT INTO checked_range(datastore, range_start, range_end, last_date)
VALUES (%s, %s, %s, %s)
ON CONFLICT (datastore, range_start, range_end) DO UPDATE
SET last_date = GREATEST(checked_range.last_date, EXCLUDED.last_date)
""",
(datastore_id, str(range_start), str(range_end), date),
)
def checked_range_get_last_date(
self, datastore: Datastore, range_start: CoreSWHID, range_end: CoreSWHID
) -> Optional[datetime.datetime]:
"""
Returns the last date the given range was checked in the given datastore,
or :const:`None` if it was never checked.
Currently, this checks range boundaries exactly, with no regard for ranges
that contain or are contained by it.
"""
datastore_id = self.datastore_get_or_add(datastore)
with self.transaction() as cur:
cur.execute(
"""
SELECT last_date
FROM checked_range
WHERE datastore=%s AND range_start=%s AND range_end=%s
""",
(datastore_id, str(range_start), str(range_end)),
)
res = cur.fetchone()
if res is None:
return None
else:
(date,) = res
return date
def checked_range_iter(
self, datastore: Datastore
) -> Iterator[Tuple[CoreSWHID, CoreSWHID, datetime.datetime]]:
datastore_id = self.datastore_get_or_add(datastore)
with self.transaction() as cur:
cur.execute(
"""
SELECT range_start, range_end, last_date
FROM checked_range
WHERE datastore=%s
""",
(datastore_id,),
)
for (range_start, range_end, last_date) in cur:
yield (
CoreSWHID.from_string(range_start),
CoreSWHID.from_string(range_end),
last_date,
)
####################################
# Inventory of objects with issues
####################################
......
......@@ -19,6 +19,23 @@ comment on column datastore.class is 'For datastores with multiple backends, nam
comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.';
-------------------------------------
-- Checkpointing/progress tracking
-------------------------------------
create table checked_range
(
datastore int not null,
range_start swhid not null,
range_end swhid not null,
last_date timestamptz not null
);
comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.';
comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).';
comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).';
comment on column checked_range.last_date is 'Date the last scrub of that range *started*.';
-------------------------------------
-- Inventory of objects with issues
-------------------------------------
......
......@@ -9,6 +9,12 @@ alter table datastore add primary key using index datastore_pkey;
create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
-------------------------------------
-- Checkpointing/progress tracking
-------------------------------------
create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end);
alter table checked_range add primary key using index checked_range_pkey;
-------------------------------------
-- Inventory of objects with issues
......
-- SWH Scrubber DB schema upgrade
-- from_version: 3
-- to_version: 4
-- description: Add checked_range
create table checked_range
(
datastore int not null,
range_start swhid not null,
range_end swhid not null,
last_date timestamptz not null
);
comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.';
comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).';
comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).';
comment on column checked_range.last_date is 'Date the last scrub of that range *started*.';
create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end);
alter table checked_range add primary key using index checked_range_pkey;
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.model import swhids
from swh.scrubber.db import Datastore, ScrubberDb
DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test")
SNP_SWHID1 = swhids.CoreSWHID.from_string(
"swh:1:snp:5000000000000000000000000000000000000000"
)
SNP_SWHID2 = swhids.CoreSWHID.from_string(
"swh:1:snp:e000000000000000000000000000000000000000"
)
DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc)
def test_checked_range_insert(scrubber_db: ScrubberDb):
scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
(SNP_SWHID1, SNP_SWHID2, DATE)
]
def test_checked_range_update(scrubber_db: ScrubberDb):
scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
date2 = DATE + datetime.timedelta(days=1)
scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date2)
assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
(SNP_SWHID1, SNP_SWHID2, date2)
]
date3 = DATE + datetime.timedelta(days=-1)
scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date3)
assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
(SNP_SWHID1, SNP_SWHID2, date2) # newest date wins
]
def test_checked_range_get(scrubber_db: ScrubberDb):
assert (
scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2)
is None
)
scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
assert (
scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2)
== DATE
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment