From 67a743d067323a3f41b4402d2f82ef4a0c37eff7 Mon Sep 17 00:00:00 2001
From: David Douard <david.douard@sdfa3.org>
Date: Wed, 5 Jul 2023 15:07:12 +0200
Subject: [PATCH] Add a 'swh scrubber check stalled` command listing stalled
 partitions

For a given configuration (hence sotrage, object_type and partition scheme)
list partitions that have a start_date but no end_date for a long enough
time.

By default, it will compute the delay for a partition to be considered as
stalled based on the 10 last partitions checked for the given
configuration.
---
 requirements.txt               |  1 +
 swh/scrubber/cli.py            | 59 +++++++++++++++++++++++++
 swh/scrubber/db.py             | 67 ++++++++++++++++++++++++++++
 swh/scrubber/tests/test_cli.py | 79 ++++++++++++++++++++++++++++++++++
 swh/scrubber/tests/test_db.py  | 35 +++++++++++++++
 5 files changed, 241 insertions(+)

diff --git a/requirements.txt b/requirements.txt
index a5f4288..a1d7dcd 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,5 +3,6 @@
 # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
 
 dulwich
+humanize
 psycopg2
 tenacity
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
index 10c711e..5a826f1 100644
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -188,6 +188,65 @@ def scrubber_check_list(
             )
 
 
+@scrubber_check_cli_group.command(name="stalled")
+@click.argument(
+    "name",
+    type=str,
+    default=None,
+    required=False,  # can be given by config_id instead
+)
+@click.option(
+    "--config-id",
+    type=int,
+)
+@click.option(
+    "--for",
+    "delay",
+    type=str,
+    default="auto",
+    help="Delay for a partition to be considered as stuck; in seconds or 'auto'",
+)
+@click.pass_context
+def scrubber_check_stalled(
+    ctx,
+    name: str,
+    config_id: int,
+    delay: Optional[str],
+):
+    """List the stuck partitions for a given config"""
+    import datetime
+
+    from humanize import naturaldate, naturaldelta
+
+    db = ctx.obj["db"]
+    if name and config_id is None:
+        config_id = db.config_get_by_name(name)
+
+    if config_id is None:
+        raise click.ClickException("A valid configuration name/id must be set")
+
+    cfg = db.config_get(config_id)
+    delay_td: Optional[datetime.timedelta]
+    if delay == "auto":
+        delay_td = None
+    elif delay:
+        delay_td = datetime.timedelta(seconds=int(delay))
+    in_flight = list(db.checked_partition_get_stuck(config_id, delay_td))
+    if in_flight:
+        click.echo(
+            f"Stuck partitions for {cfg.name} [id={config_id}, type={cfg.object_type}]:"
+        )
+        now = datetime.datetime.now(tz=datetime.timezone.utc)
+        for partition, stuck_since in in_flight:
+            click.echo(
+                f"  {partition}: {naturaldate(stuck_since)} ({naturaldelta(now-stuck_since)})"
+            )
+    else:
+        click.echo(
+            f"No stuck partition found for {cfg.name} [id={config_id}, type={cfg.object_type}]"
+        )
+
+
 @scrubber_check_cli_group.command(name="storage")
 @click.argument(
     "name",
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
index b20fc47..67b29d4 100644
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -355,6 +355,73 @@ class ScrubberDb(BaseDb):
                 (date,) = res
                 return date
 
+    def checked_partition_get_running(
+        self,
+        config_id: int,
+    ) -> Iterator[Tuple[int, datetime.datetime]]:
+        """Yields the partitions which are currently being checked; i.e. which have a
+        start_date but no end_date.
+        """
+        with self.transaction() as cur:
+            cur.execute(
+                """
+                SELECT partition_id, start_date
+                FROM checked_partition
+                WHERE config_id=%s AND end_date is NULL
+                """,
+                (config_id,),
+            )
+
+            for partition_id, start_date in cur:
+                yield (partition_id, start_date)
+
+    def checked_partition_get_stuck(
+        self,
+        config_id: int,
+        since: Optional[datetime.timedelta] = None,
+    ) -> Iterator[Tuple[int, datetime.datetime]]:
+        """Yields the partitions which are currently running for more than `since`; if
+        not set, automatically guess a reasonable delay from completed partitions.
+        If no such a delay can be extracted, fall back to 1 hour.
+
+        The heuristic for the automatic delay is 2x max(end_date-start_date)
+        for the last 10 partitions checked.
+
+        """
+        with self.transaction() as cur:
+            if since is None:
+                cur.execute(
+                    """
+                    WITH delays as
+                    (
+                    SELECT end_date - start_date as delay
+                    FROM checked_partition
+                    WHERE config_id=%s AND end_date is not NULL
+                    ORDER BY start_date DESC
+                    LIMIT 10
+                    )
+                    SELECT 2*max(delay) from delays
+                    """,
+                    (config_id,),
+                )
+                res = cur.fetchone()
+                assert res is not None
+                (since,) = res
+            if since is None:
+                since = datetime.timedelta(hours=1)
+
+            cur.execute(
+                """
+                SELECT partition_id, start_date
+                FROM checked_partition
+                WHERE config_id=%s AND end_date is NULL AND start_date < %s
+                """,
+                (config_id, now() - since),
+            )
+
+            for partition_id, start_date in cur:
+                yield (partition_id, start_date)
+
     def checked_partition_iter(
         self, config_id: int
     ) -> Iterator[Tuple[int, int, datetime.datetime, Optional[datetime.datetime]]]:
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
index 917b563..dffaaf5 100644
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -175,6 +175,85 @@ def test_check_list(mocker, scrubber_db, swh_storage):
     assert result.output == expected, result.output
 
 
+def test_check_stalled(mocker, scrubber_db, swh_storage):
+    mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db)
+    result = invoke(scrubber_db, ["check", "list"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    assert result.output == ""
+
+    result = invoke(
+        scrubber_db,
+        [
+            "check",
+            "init",
+            "--object-type",
+            "snapshot",
+            "--nb-partitions",
+            "4",
+            "--name",
+            "cfg1",
+        ],
+        storage=swh_storage,
+    )
+    assert result.exit_code == 0, result.output
+
+    result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = "No stuck partition found for cfg1 [id=1, type=snapshot]\n"
+    assert result.output == expected, result.output
+
+    # insert a partition started 20mn ago
+    with scrubber_db.transaction() as cur:
+        cur.execute(
+            "INSERT INTO checked_partition VALUES (1, 0, now() - '20m'::interval, NULL);"
+        )
+
+    # there are no existing completed partition, defaults to 1h to be considered as stalled
+    # so a partition just added is not stalled
+    result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = "No stuck partition found for cfg1 [id=1, type=snapshot]\n"
+    assert result.output == expected, result.output
+
+    # insert a partition started 2 hours from now
+    with scrubber_db.transaction() as cur:
+        cur.execute(
+            "INSERT INTO checked_partition VALUES (1, 1, now() - '2h'::interval, NULL);"
+        )
+    # it is considered as stalled by default
+    result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = """\
+Stuck partitions for cfg1 [id=1, type=snapshot]:
+  1: today (2 hours)
+"""
+    assert result.output == expected, result.output
+
+    # explicitly specify a delay > 2h to be considered as stelles: no one stalled
+    result = invoke(
+        scrubber_db, ["check", "stalled", "--for", "8000", "cfg1"], storage=swh_storage
+    )
+    assert result.exit_code == 0, result.output
+    expected = "No stuck partition found for cfg1 [id=1, type=snapshot]\n"
+    assert result.output == expected, result.output
+
+    # insert a transaction that took 5mn to run
+    with scrubber_db.transaction() as cur:
+        cur.execute(
+            "INSERT INTO checked_partition "
+            "VALUES (1, 2, now() - '2h'::interval, now() - '1h55m'::interval);"
+        )
+    # so now both partitions 0 and 1 should be considered a stalled
+    result = invoke(scrubber_db, ["check", "stalled", "cfg1"], storage=swh_storage)
+    assert result.exit_code == 0, result.output
+    expected = """\
+Stuck partitions for cfg1 [id=1, type=snapshot]:
+  0: today (20 minutes)
+  1: today (2 hours)
+"""
+    assert result.output == expected, result.output
+
+
 def test_check_journal(
     mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group
 ):
diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py
index e6cc573..c74e9a5 100644
--- a/swh/scrubber/tests/test_db.py
+++ b/swh/scrubber/tests/test_db.py
@@ -159,3 +159,38 @@ def test_checked_partition_get(
     scrubber_db.checked_partition_upsert(config_id, part_id, DATE)
 
     assert scrubber_db.checked_partition_get_last_date(config_id, part_id) == DATE
+
+
+def test_checked_partition_get_running(
+    datastore: Datastore, scrubber_db: ScrubberDb, config_id: int
+):
+    assert list(scrubber_db.checked_partition_get_running(config_id)) == []
+    with patch("swh.scrubber.db.now", return_value=DATE):
+        dir_part_gen = scrubber_db.checked_partition_iter_next(config_id)
+        part_id1 = next(dir_part_gen)
+        part_id2 = next(dir_part_gen)
+        part_id3 = next(dir_part_gen)
+
+    assert scrubber_db.checked_partition_get_last_date(config_id, part_id1) is None
+    assert scrubber_db.checked_partition_get_last_date(config_id, part_id2) is None
+    assert scrubber_db.checked_partition_get_last_date(config_id, part_id3) is None
+
+    assert list(scrubber_db.checked_partition_get_running(config_id)) == [
+        (part_id1, DATE),
+        (part_id2, DATE),
+        (part_id3, DATE),
+    ]
+
+    scrubber_db.checked_partition_upsert(config_id, part_id2, DATE + ONE_MINUTE)
+    assert list(scrubber_db.checked_partition_get_running(config_id)) == [
+        (part_id1, DATE),
+        (part_id3, DATE),
+    ]
+
+    scrubber_db.checked_partition_upsert(config_id, part_id1, DATE + ONE_MINUTE)
+    assert list(scrubber_db.checked_partition_get_running(config_id)) == [
+        (part_id3, DATE),
+    ]
+
+    scrubber_db.checked_partition_upsert(config_id, part_id3, DATE + ONE_MINUTE)
+    assert list(scrubber_db.checked_partition_get_running(config_id)) == []
-- 
GitLab