Skip to content
Snippets Groups Projects
Commit c0a4d44b authored by David Douard's avatar David Douard
Browse files

Add a cli command to get statistics for a given config entry

As well as a command to list partitions being checked.

For example:

```
$ swh scrubber check stats snapshot_16 -j
{
  "config": {
    "name": "snapshot_16",
    "datastore": {
      "package": "storage",
      "cls": "postgresql",
      "instance": "postgresql:///?service=swh-storage"
    },
    "object_type": "snapshot",
    "nb_partitions": 65536,
    "check_hashes": true,
    "check_references": true
  },
  "min_duration": 0.002196,
  "max_duration": 0.107398,
  "avg_duration": 0.005969,
  "checked_partition": 65536,
  "running_partition": 0,
  "missing_object": 0,
  "missing_object_reference": 0,
  "corrupt_object": 0
}

$ swh scrubber check running cfg1

Running partitions for cfg1 [id=1, type=snapshot]:
0:	running since today (20 minutes)

```
parent 781f84a1
No related branches found
No related tags found
1 merge request!55Add a cli command to get statistics for a given config entry
Pipeline #4738 passed
......@@ -304,6 +304,127 @@ def scrubber_check_stalled(
)
@scrubber_check_cli_group.command(name="running")
@click.argument(
"name",
type=str,
default=None,
required=False, # can be given by config_id instead
)
@click.option(
"--config-id",
type=int,
)
@click.pass_context
def scrubber_check_running(ctx, name: str, config_id: int):
"""List partitions being checked for the check session <name>"""
import datetime
from humanize import naturaldate, naturaldelta
db = ctx.obj["db"]
if name and config_id is None:
config_id = db.config_get_by_name(name)
if config_id is None:
raise click.ClickException("A valid configuration name/id must be set")
cfg = db.config_get(config_id)
in_flight = list(db.checked_partition_get_running(config_id))
if in_flight:
click.echo(
f"Running partitions for {cfg.name} [id={config_id}, "
f"type={cfg.object_type.name.lower()}]:"
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
for partition, running_since in in_flight:
click.echo(
f"{partition}:\trunning since {naturaldate(running_since)} "
f"({naturaldelta(now-running_since)})"
)
else:
click.echo(
f"No running partition found for {cfg.name} [id={config_id}, "
f"type={cfg.object_type.name.lower()}]"
)
@scrubber_check_cli_group.command(name="stats")
@click.argument(
"name",
type=str,
default=None,
required=False, # can be given by config_id instead
)
@click.option(
"--config-id",
type=int,
)
@click.option(
"-j",
"--json",
"json_format",
is_flag=True,
)
@click.pass_context
def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
"""Display statistics for the check session <name>"""
from dataclasses import asdict
from json import dumps
import textwrap
from humanize import naturaldelta
db = ctx.obj["db"]
if name and config_id is None:
config_id = db.config_get_by_name(name)
if config_id is None:
raise click.ClickException("A valid configuration name/id must be set")
cfg = db.config_get(config_id)
nb_partitions = cfg.nb_partitions
stats = db.config_get_stats(config_id)
if json_format:
stats["config"] = asdict(stats["config"])
stats["config"]["object_type"] = stats["config"]["object_type"].name.lower()
click.echo(dumps(stats, indent=2))
else:
percentage = stats["checked_partition"] / nb_partitions * 100.0
click.echo(
textwrap.dedent(
f"""\
Check session {name} ({config_id}):
object type: {cfg.object_type.name}
datastore: {cfg.datastore.instance}
check hashes: {cfg.check_hashes}
check references: {cfg.check_references}
partitions:
total: {nb_partitions}
running: {stats['running_partition']}
done: {stats['checked_partition']} ({percentage:.2f}%)"""
)
)
if stats["checked_partition"]:
click.echo(
" "
+ textwrap.dedent(
f"""\
duration:
min: {naturaldelta(stats['min_duration'])}
avg: {naturaldelta(stats['avg_duration'])}
max: {naturaldelta(stats['max_duration'])}"""
)
)
if cfg.check_hashes:
click.echo(f" corrupted objects: {stats['corrupt_object']}")
if cfg.check_references:
click.echo(f" missing objects: {stats['missing_object']}")
click.echo(f" from references: {stats['missing_object_reference']}")
@scrubber_check_cli_group.command(name="storage")
@click.argument(
"name",
......
......@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
from typing import Iterable, Iterator, List, Optional, Tuple
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple
import psycopg2
......@@ -292,6 +292,95 @@ class ScrubberDb(BaseDb):
),
)
def config_get_stats(
self,
config_id: int,
) -> Dict[str, Any]:
"""Return statistics for the check configuration <check_id>."""
config = self.config_get(config_id)
stats = {"config": config}
with self.transaction() as cur:
cur.execute(
"""
SELECT
min(end_date - start_date),
avg(end_date - start_date),
max(end_date - start_date)
FROM checked_partition
WHERE config_id=%s AND end_date is not NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
minv, avgv, maxv = row
stats["min_duration"] = minv.total_seconds() if minv is not None else 0.0
stats["max_duration"] = maxv.total_seconds() if maxv is not None else 0.0
stats["avg_duration"] = avgv.total_seconds() if avgv is not None else 0.0
cur.execute(
"""
SELECT count(*)
FROM checked_partition
WHERE config_id=%s AND end_date is not NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["checked_partition"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM checked_partition
WHERE config_id=%s AND end_date is NULL
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["running_partition"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM missing_object
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["missing_object"] = row[0]
cur.execute(
"""
SELECT count(distinct reference_id)
FROM missing_object_reference
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["missing_object_reference"] = row[0]
cur.execute(
"""
SELECT count(*)
FROM corrupt_object
WHERE config_id=%s
""",
(config_id,),
)
row = cur.fetchone()
assert row
stats["corrupt_object"] = row[0]
return stats
####################################
# Checkpointing/progress tracking
####################################
......
......@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import tempfile
from unittest.mock import MagicMock, call
......@@ -398,6 +399,170 @@ Stuck partitions for cfg1 [id=1, type=snapshot]:
assert result.output == expected, result.output
def test_check_running(mocker, scrubber_db, swh_storage):
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
assert result.exit_code == 0, result.output
assert result.output == ""
result = invoke(
scrubber_db,
[
"check",
"init",
"storage",
"--object-type",
"snapshot",
"--nb-partitions",
"4",
"--name",
"cfg1",
],
storage=swh_storage,
)
assert result.exit_code == 0, result.output
result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
assert result.exit_code == 0, result.output
expected = "No running partition found for cfg1 [id=1, type=snapshot]\n"
assert result.output == expected, result.output
# insert a partition started 20mn ago
with scrubber_db.transaction() as cur:
cur.execute(
"INSERT INTO checked_partition VALUES (1, 0, now() - '20m'::interval, NULL);"
)
result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
assert result.exit_code == 0, result.output
expected = """\
Running partitions for cfg1 [id=1, type=snapshot]:
0: running since today (20 minutes)
"""
assert result.output == expected, result.output
# insert another partition started 20mn ago
with scrubber_db.transaction() as cur:
cur.execute(
"INSERT INTO checked_partition VALUES (1, 1, now() - '2h'::interval, NULL);"
)
result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
assert result.exit_code == 0, result.output
expected = """\
Running partitions for cfg1 [id=1, type=snapshot]:
0: running since today (20 minutes)
1: running since today (2 hours)
"""
assert result.output == expected, result.output
def test_check_stats(mocker, scrubber_db, swh_storage):
from swh.scrubber.storage_checker import get_datastore
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
assert result.exit_code == 0, result.output
assert result.output == ""
for otype in ("snapshot", "revision", "release"):
result = invoke(
scrubber_db,
[
"check",
"init",
"storage",
"--object-type",
otype,
"--nb-partitions",
"4",
"--name",
f"cfg_{otype}",
],
storage=swh_storage,
)
assert result.exit_code == 0, result.output
result = invoke(
scrubber_db, ["check", "stats", "cfg_snapshot"], storage=swh_storage
)
assert result.exit_code == 0, result.output
for otype in ("snapshot", "revision", "release"):
result = invoke(
scrubber_db,
["check", "stats", "--json", f"cfg_{otype}"],
storage=swh_storage,
)
stats = json.loads(result.output)
assert stats == {
"config": {
"name": f"cfg_{otype}",
"datastore": {
"package": "storage",
"cls": "postgresql",
"instance": get_datastore(swh_storage).instance,
},
"object_type": otype,
"nb_partitions": 4,
"check_hashes": True,
"check_references": True,
},
"min_duration": 0,
"max_duration": 0,
"avg_duration": 0,
"checked_partition": 0,
"running_partition": 0,
"missing_object": 0,
"missing_object_reference": 0,
"corrupt_object": 0,
}
with scrubber_db.transaction() as cur:
# insert a pair of checked partitions
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 0, now() - '40m'::interval, now() - '20m'::interval)"
)
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 1, now() - '20m'::interval, now() - '10m'::interval)"
)
# and a pair of running ones
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 2, now() - '10m'::interval, NULL)"
)
cur.execute(
"INSERT INTO checked_partition "
"VALUES (1, 3, now() - '20m'::interval, NULL)"
)
# also add a checked partitions for another config entry
cur.execute(
"INSERT INTO checked_partition "
"VALUES (2, 0, now() - '40m'::interval, now() - '20m'::interval)"
)
cur.execute(
"INSERT INTO checked_partition "
"VALUES (2, 1, now() - '40m'::interval, now() - '20m'::interval)"
)
# and add a running checker for another config entry
cur.execute(
"INSERT INTO checked_partition "
"VALUES (2, 3, now() - '20m'::interval, NULL)"
)
result = invoke(
scrubber_db, ["check", "stats", "-j", "cfg_snapshot"], storage=swh_storage
)
assert result.exit_code == 0, result.output
stats = json.loads(result.output)
assert stats["config"]["name"] == "cfg_snapshot"
assert stats["min_duration"] == 600
assert stats["max_duration"] == 1200
assert stats["avg_duration"] == 900
assert stats["checked_partition"] == 2
assert stats["running_partition"] == 2
def test_check_reset(mocker, scrubber_db, swh_storage):
mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment