From c0a4d44b7d3d98689e41e26923ba0a974b45ad6a Mon Sep 17 00:00:00 2001
From: David Douard <david.douard@sdfa3.org>
Date: Tue, 17 Oct 2023 13:42:04 +0200
Subject: [PATCH] Add a cli command to get statistics for a given config entry

As well as a command to list partitions being checked.

For example:

```
$ swh scrubber check stats snapshot_16 -j
{
  "config": {
    "name": "snapshot_16",
    "datastore": {
      "package": "storage",
      "cls": "postgresql",
      "instance": "postgresql:///?service=swh-storage"
    },
    "object_type": "snapshot",
    "nb_partitions": 65536,
    "check_hashes": true,
    "check_references": true
  },
  "min_duration": 0.002196,
  "max_duration": 0.107398,
  "avg_duration": 0.005969,
  "checked_partition": 65536,
  "running_partition": 0,
  "missing_object": 0,
  "missing_object_reference": 0,
  "corrupt_object": 0
}

$ swh scrubber check running cfg1

Running partitions for cfg1 [id=1, type=snapshot]:
0:	running since today (20 minutes)

```
---
 swh/scrubber/cli.py            | 121 ++++++++++++++++++++++++
 swh/scrubber/db.py             |  91 +++++++++++++++++-
 swh/scrubber/tests/test_cli.py | 165 +++++++++++++++++++++++++++++++++
 3 files changed, 376 insertions(+), 1 deletion(-)

diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index e3c00e2..e279228 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -304,6 +304,127 @@ def scrubber_check_stalled(
         )
 
 
+@scrubber_check_cli_group.command(name="running")
+@click.argument(
+    "name",
+    type=str,
+    default=None,
+    required=False,  # can be given by config_id instead
+)
+@click.option(
+    "--config-id",
+    type=int,
+)
+@click.pass_context
+def scrubber_check_running(ctx, name: str, config_id: int):
+    """List partitions being checked for the check session <name>"""
+    import datetime
+
+    from humanize import naturaldate, naturaldelta
+
+    db = ctx.obj["db"]
+    if name and config_id is None:
+        config_id = db.config_get_by_name(name)
+
+    if config_id is None:
+        raise click.ClickException("A valid configuration name/id must be set")
+
+    cfg = db.config_get(config_id)
+    in_flight = list(db.checked_partition_get_running(config_id))
+    if in_flight:
+        click.echo(
+            f"Running partitions for {cfg.name} [id={config_id}, "
+            f"type={cfg.object_type.name.lower()}]:"
+        )
+        now = datetime.datetime.now(tz=datetime.timezone.utc)
+        for partition, running_since in in_flight:
+            click.echo(
+                f"{partition}:\trunning since {naturaldate(running_since)} "
+                f"({naturaldelta(now-running_since)})"
+            )
+    else:
+        click.echo(
+            f"No running partition found for {cfg.name} [id={config_id}, "
+            f"type={cfg.object_type.name.lower()}]"
+        )
+
+
+@scrubber_check_cli_group.command(name="stats")
+@click.argument(
+    "name",
+    type=str,
+    default=None,
+    required=False,  # can be given by config_id instead
+)
+@click.option(
+    "--config-id",
+    type=int,
+)
+@click.option(
+    "-j",
+    "--json",
+    "json_format",
+    is_flag=True,
+)
+@click.pass_context
+def scrubber_check_stats(ctx, name: str, config_id: int, json_format: bool):
+    """Display statistics for the check session <name>"""
+    from dataclasses import asdict
+    from json import dumps
+    import textwrap
+
+    from humanize import naturaldelta
+
+    db = ctx.obj["db"]
+    if name and config_id is None:
+        config_id = db.config_get_by_name(name)
+
+    if config_id is None:
+        raise click.ClickException("A valid configuration name/id must be set")
+
+    cfg = db.config_get(config_id)
+    nb_partitions = cfg.nb_partitions
+    stats = db.config_get_stats(config_id)
+
+    if json_format:
+
+        stats["config"] = asdict(stats["config"])
+        stats["config"]["object_type"] = stats["config"]["object_type"].name.lower()
+        click.echo(dumps(stats, indent=2))
+    else:
+        percentage = stats["checked_partition"] / nb_partitions * 100.0
+        click.echo(
+            textwrap.dedent(
+                f"""\
+                Check session {name} ({config_id}):
+                  object type: {cfg.object_type.name}
+                  datastore: {cfg.datastore.instance}
+                  check hashes: {cfg.check_hashes}
+                  check references: {cfg.check_references}
+                  partitions:
+                    total: {nb_partitions}
+                    running: {stats['running_partition']}
+                    done: {stats['checked_partition']} ({percentage:.2f}%)"""
+            )
+        )
+        if stats["checked_partition"]:
+            click.echo(
+                "  "
+                + textwrap.dedent(
+                    f"""\
+                    duration:
+                        min: {naturaldelta(stats['min_duration'])}
+                        avg: {naturaldelta(stats['avg_duration'])}
+                        max: {naturaldelta(stats['max_duration'])}"""
+                )
+            )
+        if cfg.check_hashes:
+            click.echo(f"  corrupted objects: {stats['corrupt_object']}")
+        if cfg.check_references:
+            click.echo(f"  missing objects: {stats['missing_object']}")
+            click.echo(f"  from references: {stats['missing_object_reference']}")
+
+
 @scrubber_check_cli_group.command(name="storage")
 @click.argument(
     "name",
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
index 3c472b5..1f55b89 100644
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -7,7 +7,7 @@
 import dataclasses
 import datetime
 import functools
-from typing import Iterable, Iterator, List, Optional, Tuple
+from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple
 
 import psycopg2
 
@@ -292,6 +292,95 @@ class ScrubberDb(BaseDb):
                     ),
                 )
 
+    def config_get_stats(
+        self,
+        config_id: int,
+    ) -> Dict[str, Any]:
+        """Return statistics for the check configuration <check_id>."""
+        config = self.config_get(config_id)
+        stats = {"config": config}
+
+        with self.transaction() as cur:
+            cur.execute(
+                """
+                SELECT
+                  min(end_date - start_date),
+                  avg(end_date - start_date),
+                  max(end_date - start_date)
+                FROM checked_partition
+                WHERE config_id=%s AND end_date is not NULL
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            minv, avgv, maxv = row
+            stats["min_duration"] = minv.total_seconds() if minv is not None else 0.0
+            stats["max_duration"] = maxv.total_seconds() if maxv is not None else 0.0
+            stats["avg_duration"] = avgv.total_seconds() if avgv is not None else 0.0
+
+            cur.execute(
+                """
+                SELECT count(*)
+                FROM checked_partition
+                WHERE config_id=%s AND end_date is not NULL
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            stats["checked_partition"] = row[0]
+
+            cur.execute(
+                """
+                SELECT count(*)
+                FROM checked_partition
+                WHERE config_id=%s AND end_date is NULL
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            stats["running_partition"] = row[0]
+
+            cur.execute(
+                """
+                SELECT count(*)
+                FROM missing_object
+                WHERE config_id=%s
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            stats["missing_object"] = row[0]
+
+            cur.execute(
+                """
+                SELECT count(distinct reference_id)
+                FROM missing_object_reference
+                WHERE config_id=%s
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            stats["missing_object_reference"] = row[0]
+
+            cur.execute(
+                """
+                SELECT count(*)
+                FROM corrupt_object
+                WHERE config_id=%s
+                """,
+                (config_id,),
+            )
+            row = cur.fetchone()
+            assert row
+            stats["corrupt_object"] = row[0]
+
+        return stats
+
     ####################################
     # Checkpointing/progress tracking
     ####################################
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index da4a1d5..53f9796 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -3,6 +3,7 @@
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
+import json
 import tempfile
 from unittest.mock import MagicMock, call
 
@@ -398,6 +399,170 @@ Stuck partitions for cfg1 [id=1, type=snapshot]:
     assert result.output == expected, result.output
 
 
+def test_check_running(mocker, scrubber_db, swh_storage):
+    mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
+    result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    result = invoke(
+        scrubber_db,
+        [
+            "check",
+            "init",
+            "storage",
+            "--object-type",
+            "snapshot",
+            "--nb-partitions",
+            "4",
+            "--name",
+            "cfg1",
+        ],
+        storage=swh_storage,
+    )
+    assert result.exit_code == 0, result.output
+
+    result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = "No running partition found for cfg1 [id=1, type=snapshot]\n"
+    assert result.output == expected, result.output
+
+    # insert a partition started 20mn ago
+    with scrubber_db.transaction() as cur:
+        cur.execute(
+            "INSERT INTO checked_partition VALUES (1, 0, now() - '20m'::interval, NULL);"
+        )
+    result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = """\
+Running partitions for cfg1 [id=1, type=snapshot]:
+0:	running since today (20 minutes)
+"""
+    assert result.output == expected, result.output
+
+    # insert another partition started 20mn ago
+    with scrubber_db.transaction() as cur:
+        cur.execute(
+            "INSERT INTO checked_partition VALUES (1, 1, now() - '2h'::interval, NULL);"
+        )
+    result = invoke(scrubber_db, ["check", "running", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = """\
+Running partitions for cfg1 [id=1, type=snapshot]:
+0:	running since today (20 minutes)
+1:	running since today (2 hours)
+"""
+    assert result.output == expected, result.output
+
+
+def test_check_stats(mocker, scrubber_db, swh_storage):
+    from swh.scrubber.storage_checker import get_datastore
+
+    mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
+    result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    for otype in ("snapshot", "revision", "release"):
+        result = invoke(
+            scrubber_db,
+            [
+                "check",
+                "init",
+                "storage",
+                "--object-type",
+                otype,
+                "--nb-partitions",
+                "4",
+                "--name",
+                f"cfg_{otype}",
+            ],
+            storage=swh_storage,
+        )
+        assert result.exit_code == 0, result.output
+
+    result = invoke(
+        scrubber_db, ["check", "stats", "cfg_snapshot"], storage=swh_storage
+    )
+    assert result.exit_code == 0, result.output
+
+    for otype in ("snapshot", "revision", "release"):
+
+        result = invoke(
+            scrubber_db,
+            ["check", "stats", "--json", f"cfg_{otype}"],
+            storage=swh_storage,
+        )
+        stats = json.loads(result.output)
+        assert stats == {
+            "config": {
+                "name": f"cfg_{otype}",
+                "datastore": {
+                    "package": "storage",
+                    "cls": "postgresql",
+                    "instance": get_datastore(swh_storage).instance,
+                },
+                "object_type": otype,
+                "nb_partitions": 4,
+                "check_hashes": True,
+                "check_references": True,
+            },
+            "min_duration": 0,
+            "max_duration": 0,
+            "avg_duration": 0,
+            "checked_partition": 0,
+            "running_partition": 0,
+            "missing_object": 0,
+            "missing_object_reference": 0,
+            "corrupt_object": 0,
+        }
+
+    with scrubber_db.transaction() as cur:
+        # insert a pair of checked partitions
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (1, 0, now() - '40m'::interval, now() - '20m'::interval)"
+        )
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (1, 1, now() - '20m'::interval, now() - '10m'::interval)"
+        )
+        # and a pair of running ones
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (1, 2, now() - '10m'::interval, NULL)"
+        )
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (1, 3, now() - '20m'::interval, NULL)"
+        )
+        # also add a checked partitions for another config entry
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (2, 0, now() - '40m'::interval, now() - '20m'::interval)"
+        )
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (2, 1, now() - '40m'::interval, now() - '20m'::interval)"
+        )
+        # and add a running checker for another config entry
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (2, 3, now() - '20m'::interval, NULL)"
+        )
+    result = invoke(
+        scrubber_db, ["check", "stats", "-j", "cfg_snapshot"], storage=swh_storage
+    )
+    assert result.exit_code == 0, result.output
+    stats = json.loads(result.output)
+    assert stats["config"]["name"] == "cfg_snapshot"
+    assert stats["min_duration"] == 600
+    assert stats["max_duration"] == 1200
+    assert stats["avg_duration"] == 900
+    assert stats["checked_partition"] == 2
+    assert stats["running_partition"] == 2
+
+
 def test_check_reset(mocker, scrubber_db, swh_storage):
     mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
     result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
-- 
GitLab