Skip to content
Snippets Groups Projects
Commit 87412380 authored by David Douard's avatar David Douard
Browse files

Add a `--reset` flag to the `swh scrubber check stalled` command

This flag reset the partitions identified as stalled by setting
start_date and end_date to NULL.

This should put these reset partition to be selected for checking by a
scrubber worker.
parent 67a743d0
No related branches found
No related tags found
1 merge request!43Refactor the checker stack
......@@ -206,12 +206,15 @@ def scrubber_check_list(
default="auto",
help="Delay for a partition to be considered as stuck; in seconds or 'auto'",
)
@click.option(
"--reset",
is_flag=True,
default=False,
help="Reset the stalled partition so it can be grabbed by a scrubber worker",
)
@click.pass_context
def scrubber_check_stalled(
ctx,
name: str,
config_id: int,
delay: Optional[str],
ctx, name: str, config_id: int, delay: Optional[str], reset: bool
):
"""List the stuck partitions for a given config"""
import datetime
......@@ -239,8 +242,15 @@ def scrubber_check_stalled(
now = datetime.datetime.now(tz=datetime.timezone.utc)
for partition, stuck_since in in_flight:
click.echo(
f" {partition}: {naturaldate(stuck_since)} ({naturaldelta(now-stuck_since)})"
f"{partition}:\tstuck since {naturaldate(stuck_since)} "
f"({naturaldelta(now-stuck_since)})"
)
if reset:
if db.checked_partition_reset(config_id, partition):
click.echo("\tpartition reset")
else:
click.echo("\tpartition NOT reset")
else:
click.echo(
f"No stuck partition found for {cfg.name} [id={config_id}, type={cfg.object_type}]"
......
......@@ -300,6 +300,21 @@ class ScrubberDb(BaseDb):
return
yield partition_id
def checked_partition_reset(self, config_id: int, partition_id: int) -> bool:
"""
Reset the partition, aka clear start_date and end_date
"""
with self.transaction() as cur:
cur.execute(
"""
UPDATE checked_partition
SET start_date=NULL, end_date=NULL
WHERE config_id=%(config_id)s AND partition_id=%(partition_id)s
""",
{"config_id": config_id, "partition_id": partition_id},
)
return bool(cur.rowcount)
def checked_partition_upsert(
self,
config_id: int,
......
......@@ -225,7 +225,7 @@ def test_check_stalled(mocker, scrubber_db, swh_storage):
assert result.exit_code == 0, result.output
expected = """\
Stuck partitions for cfg1 [id=1, type=snapshot]:
1: today (2 hours)
1: stuck since today (2 hours)
"""
assert result.output == expected, result.output
......@@ -248,12 +248,91 @@ Stuck partitions for cfg1 [id=1, type=snapshot]:
assert result.exit_code == 0, result.output
expected = """\
Stuck partitions for cfg1 [id=1, type=snapshot]:
0: today (20 minutes)
1: today (2 hours)
0: stuck since today (20 minutes)
1: stuck since today (2 hours)
"""
assert result.output == expected, result.output
def test_check_reset(mocker, scrubber_db, swh_storage):
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
assert result.exit_code == 0, result.output
assert result.output == ""
result = invoke(
scrubber_db,
[
"check",
"init",
"--object-type",
"snapshot",
"--nb-partitions",
"4",
"--name",
"cfg1",
],
storage=swh_storage,
)
assert result.exit_code == 0, result.output
result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
assert result.exit_code == 0, result.output
expected = "No stuck partition found for cfg1 [id=1, type=snapshot]\n"
assert result.output == expected, result.output
# insert a few partitions
with scrubber_db.transaction() as cur:
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 0, now() - '20m'::interval, NULL);"
)
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 1, now() - '2h'::interval, NULL);"
)
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 2, now() - '2h'::interval, now() - '1h55m'::interval);"
)
# partitions 0 and 1 are considered as stalled
result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
assert result.exit_code == 0, result.output
expected = """\
Stuck partitions for cfg1 [id=1, type=snapshot]:
0: stuck since today (20 minutes)
1: stuck since today (2 hours)
"""
assert result.output == expected, result.output
# let's reset them
result = invoke(
scrubber_db, ["check", "stalled", "--reset", "cfg1"], storage=swh_storage
)
assert result.exit_code == 0, result.output
expected = """\
Stuck partitions for cfg1 [id=1, type=snapshot]:
0: stuck since today (20 minutes)
partition reset
1: stuck since today (2 hours)
partition reset
""" # noqa: W191,E101
assert result.output == expected, result.output
with scrubber_db.transaction() as cur:
cur.execute(
"SELECT partition_id, end_date "
"FROM checked_partition "
"WHERE config_id=1 AND start_date is NULL"
)
assert cur.fetchall() == [(0, None), (1, None)]
# for good measure, check the next few selected partitions, expected 0, 1 and 3
assert next(scrubber_db.checked_partition_iter_next(1)) == 0
assert next(scrubber_db.checked_partition_iter_next(1)) == 1
assert next(scrubber_db.checked_partition_iter_next(1)) == 3
def test_check_journal(
mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
):
......
......@@ -125,6 +125,30 @@ def test_checked_partition_get_next(
assert next(dir_part_gen) == 4
def test_checked_partition_get_next_with_hole(
datastore: Datastore, scrubber_db: ScrubberDb, config_id: int
):
dir_part_gen = scrubber_db.checked_partition_iter_next(config_id)
# fill the checked_partition table
list(zip(range(20), dir_part_gen))
# one hole at a time
for part_id in range(10):
assert scrubber_db.checked_partition_reset(config_id, part_id)
assert next(dir_part_gen) == part_id
# a series of holes
for part_id in range(0, 10, 2):
assert scrubber_db.checked_partition_reset(config_id, part_id)
for i in range(5):
assert next(dir_part_gen) == 2 * i
# all the holes are filled, next partition is 20
assert next(dir_part_gen) == 20
def test_checked_partition_update(
datastore: Datastore, scrubber_db: ScrubberDb, config_id: int
):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment