From 4c4e63841116caa70fe1db1c6c6376542a9e6705 Mon Sep 17 00:00:00 2001
From: Nicolas Dandrimont <nicolas@dandrimont.eu>
Date: Mon, 24 Feb 2025 18:48:13 +0100
Subject: [PATCH 1/3] Add swh scheduler origin peek-next command

This command checks which tasks would be picked up next by the given
policy and arguments. It is implemented with a dry_run flag on
`grab_next_visits`.
---
 swh/scheduler/backend.py    |  68 +++++++++++---------
 swh/scheduler/cli/origin.py | 125 ++++++++++++++++++++++++++++++++++++
 swh/scheduler/in_memory.py  |   5 +-
 swh/scheduler/interface.py  |   2 +
 4 files changed, 167 insertions(+), 33 deletions(-)

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index cc7080b..c011dcf 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -437,6 +437,7 @@ class SchedulerBackend:
         failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
         not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
         tablesample: Optional[float] = None,
+        dry_run: bool = False,
         db=None,
         cur=None,
     ) -> List[ListedOrigin]:
@@ -540,23 +541,25 @@ class SchedulerBackend:
                 ]
             )
 
-            # fmt: off
+            if not dry_run:
+                # fmt: off
+
+                # This policy requires updating the global queue position for this
+                # visit type
+                common_table_expressions.append(("update_queue_position", """
+                    INSERT INTO
+                      visit_scheduler_queue_position(visit_type, position)
+                    SELECT
+                      visit_type, COALESCE(MAX(next_visit_queue_position), 0)
+                    FROM selected_origins
+                    GROUP BY visit_type
+                    ON CONFLICT(visit_type) DO UPDATE
+                      SET position=GREATEST(
+                        visit_scheduler_queue_position.position, EXCLUDED.position
+                      )
+                """))
+                # fmt: on
 
-            # This policy requires updating the global queue position for this
-            # visit type
-            common_table_expressions.append(("update_queue_position", """
-                INSERT INTO
-                  visit_scheduler_queue_position(visit_type, position)
-                SELECT
-                  visit_type, COALESCE(MAX(next_visit_queue_position), 0)
-                FROM selected_origins
-                GROUP BY visit_type
-                ON CONFLICT(visit_type) DO UPDATE
-                  SET position=GREATEST(
-                    visit_scheduler_queue_position.position, EXCLUDED.position
-                  )
-            """))
-            # fmt: on
         elif policy == "first_visits_after_listing":
             assert lister_uuid is not None or (
                 lister_name is not None and lister_instance_name is not None
@@ -628,23 +631,24 @@ class SchedulerBackend:
         """))
         # fmt: on
 
-        # fmt: off
-        common_table_expressions.append(("update_stats", """
-            INSERT INTO
-              origin_visit_stats (url, visit_type, last_scheduled)
-            SELECT
-              url, visit_type, %s
-            FROM
-              deduplicated_selected_origins
-            ON CONFLICT (url, visit_type) DO UPDATE
-              SET last_scheduled = GREATEST(
-                origin_visit_stats.last_scheduled,
-                EXCLUDED.last_scheduled
-              )
-        """))
-        # fmt: on
+        if not dry_run:
+            # fmt: off
+            common_table_expressions.append(("update_stats", """
+                INSERT INTO
+                  origin_visit_stats (url, visit_type, last_scheduled)
+                SELECT
+                  url, visit_type, %s
+                FROM
+                  deduplicated_selected_origins
+                ON CONFLICT (url, visit_type) DO UPDATE
+                  SET last_scheduled = GREATEST(
+                    origin_visit_stats.last_scheduled,
+                    EXCLUDED.last_scheduled
+                  )
+            """))
+            # fmt: on
 
-        query_args.append(timestamp)
+            query_args.append(timestamp)
 
         formatted_ctes = ",\n".join(
             f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
index b7cb60f..40ea46d 100644
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -269,6 +269,131 @@ def send_from_scheduler_to_celery_cli(
     )
 
 
+@origin.command("peek-next")
+@click.option(
+    "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
+)
+@click.option(
+    "--tablesample",
+    help="Table sampling percentage",
+    type=float,
+)
+@click.option(
+    "--only-enabled/--only-disabled",
+    "enabled",
+    is_flag=True,
+    default=True,
+    help="""Determine whether we want to scheduled enabled or disabled origins. As default, we
+            want to reasonably deal with enabled origins. For some edge case though, we
+            might want the disabled ones.""",
+)
+@click.option(
+    "--lister-name",
+    default=None,
+    help="Limit origins to those listed from lister with provided name",
+)
+@click.option(
+    "--lister-instance-name",
+    default=None,
+    help="Limit origins to those listed from lister with instance name",
+)
+@click.option(
+    "--absolute-cooldown",
+    "absolute_cooldown_str",
+    default="12 hours",
+    help="Minimal interval between two visits of the same origin",
+)
+@click.option(
+    "--scheduled-cooldown",
+    "scheduled_cooldown_str",
+    default="7 days",
+    help="Minimal interval to wait before scheduling the same origins again",
+)
+@click.option(
+    "--not-found-cooldown",
+    "not_found_cooldown_str",
+    default="14 days",
+    help="The minimal interval to wait before rescheduling not_found origins",
+)
+@click.option(
+    "--failed-cooldown",
+    "failed_cooldown_str",
+    default="31 days",
+    help="Minimal interval to wait before rescheduling failed origins",
+)
+@click.option(
+    "--fields", "-f", default=None, help="Listed origin fields to print on output"
+)
+@click.option(
+    "--with-header/--without-header",
+    is_flag=True,
+    default=True,
+    help="Print the CSV header?",
+)
+@click.argument("type", type=str)
+@click.argument("count", type=int)
+@click.pass_context
+def peek_next(
+    ctx,
+    policy: str,
+    tablesample: Optional[float],
+    type: str,
+    count: int,
+    enabled: bool,
+    fields: Optional[str],
+    with_header: bool,
+    lister_name: Optional[str] = None,
+    lister_instance_name: Optional[str] = None,
+    absolute_cooldown_str: Optional[str] = None,
+    scheduled_cooldown_str: Optional[str] = None,
+    failed_cooldown_str: Optional[str] = None,
+    not_found_cooldown_str: Optional[str] = None,
+):
+    """Get information about the next ``count`` origins that will be scheduled
+    for ``type``."""
+    from .utils import parse_time_interval
+
+    if fields:
+        parsed_fields: Optional[List[str]] = fields.split(",")
+    else:
+        parsed_fields = None
+
+    absolute_cooldown = (
+        parse_time_interval(absolute_cooldown_str) if absolute_cooldown_str else None
+    )
+    scheduled_cooldown = (
+        parse_time_interval(scheduled_cooldown_str) if scheduled_cooldown_str else None
+    )
+    failed_cooldown = (
+        parse_time_interval(failed_cooldown_str) if failed_cooldown_str else None
+    )
+    not_found_cooldown = (
+        parse_time_interval(not_found_cooldown_str) if not_found_cooldown_str else None
+    )
+
+    scheduler = ctx.obj["scheduler"]
+
+    origins = scheduler.grab_next_visits(
+        visit_type=type,
+        count=count,
+        policy=policy,
+        tablesample=tablesample,
+        enabled=enabled,
+        lister_name=lister_name,
+        lister_instance_name=lister_instance_name,
+        absolute_cooldown=absolute_cooldown,
+        scheduled_cooldown=scheduled_cooldown,
+        failed_cooldown=failed_cooldown,
+        not_found_cooldown=not_found_cooldown,
+        dry_run=True,
+    )
+
+    for line in format_origins(
+        ctx, origins, fields=parsed_fields, with_header=with_header
+    ):
+        click.echo(line)
+
+
 @origin.command("update-metrics")
 @click.option("--lister", default=None, help="Only update metrics for this lister")
 @click.option(
diff --git a/swh/scheduler/in_memory.py b/swh/scheduler/in_memory.py
index b576e8f..c1aee4f 100644
--- a/swh/scheduler/in_memory.py
+++ b/swh/scheduler/in_memory.py
@@ -212,6 +212,7 @@ class InMemoryScheduler:
         failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
         not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
         tablesample: Optional[float] = None,
+        dry_run: bool = False,
     ) -> List[ListedOrigin]:
         if timestamp is None:
             timestamp = utcnow()
@@ -392,7 +393,9 @@ class InMemoryScheduler:
             )
             for (o, s) in origins_stats
         ]
-        self.origin_visit_stats_upsert(ovs)
+
+        if not dry_run:
+            self.origin_visit_stats_upsert(ovs)
 
         return [o for (o, _) in origins_stats]
 
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
index 15e35d4..64f9cf5 100644
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -483,6 +483,7 @@ class SchedulerInterface(Protocol):
         failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
         not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
         tablesample: Optional[float] = None,
+        dry_run: bool = False,
     ) -> List[ListedOrigin]:
         """Get at most the `count` next origins that need to be visited with
         the `visit_type` loader according to the given scheduling `policy`.
@@ -512,6 +513,7 @@ class SchedulerInterface(Protocol):
             not_found origin
           tablesample: the percentage of the table on which we run the query
             (None: no sampling)
+          dry_run: if True, only return the origins without marking them as scheduled
 
         """
         ...
-- 
GitLab


From 46764435f8b4e927a78fbd51a6c8510a645b89c0 Mon Sep 17 00:00:00 2001
From: Nicolas Dandrimont <nicolas@dandrimont.eu>
Date: Mon, 24 Feb 2025 19:04:29 +0100
Subject: [PATCH 2/3] grab_next_visits: support multiple ORDER BY clauses

---
 swh/scheduler/backend.py | 33 +++++++++++++++++----------------
 1 file changed, 17 insertions(+), 16 deletions(-)

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index c011dcf..a72fa62 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -453,6 +453,7 @@ class SchedulerBackend:
         query_args: List[Any] = []
 
         where_clauses = []
+        order_by: List[str]
 
         # list of (name, query) handled as CTEs before the main query
         common_table_expressions: List[Tuple[str, str]] = []
@@ -504,14 +505,14 @@ class SchedulerBackend:
             query_args.append(timestamp - not_found_cooldown)
 
         if policy == "oldest_scheduled_first":
-            order_by = "origin_visit_stats.last_scheduled NULLS FIRST"
+            order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"]
         elif policy == "never_visited_oldest_update_first":
             # never visited origins have a NULL last_snapshot
             where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
 
             # order by increasing last_update (oldest first)
             where_clauses.append("listed_origins.last_update IS NOT NULL")
-            order_by = "listed_origins.last_update"
+            order_by = ["listed_origins.last_update"]
         elif policy == "already_visited_order_by_lag":
             # TODO: store "visit lag" in a materialized view?
 
@@ -525,21 +526,19 @@ class SchedulerBackend:
             )
 
             # order by decreasing visit lag
-            order_by = (
+            order_by = [
                 "listed_origins.last_update - origin_visit_stats.last_successful DESC"
-            )
+            ]
         elif policy == "origins_without_last_update":
             where_clauses.append("last_update IS NULL")
-            order_by = ", ".join(
-                [
-                    # By default, sort using the queue position. If the queue
-                    # position is null, then the origin has never been visited,
-                    # which we want to handle first
-                    "origin_visit_stats.next_visit_queue_position nulls first",
-                    # Schedule unknown origins in the order we've seen them
-                    "listed_origins.first_seen",
-                ]
-            )
+            order_by = [
+                # By default, sort using the queue position. If the queue
+                # position is null, then the origin has never been visited,
+                # which we want to handle first
+                "origin_visit_stats.next_visit_queue_position nulls first",
+                # Schedule unknown origins in the order we've seen them
+                "listed_origins.first_seen",
+            ]
 
             if not dry_run:
                 # fmt: off
@@ -577,7 +576,7 @@ class SchedulerBackend:
                 where_clause += " OR origin_visit_stats.last_scheduled < %s"
                 query_args.append(lister.last_listing_finished_at)
             where_clauses.append(where_clause)
-            order_by = "origin_visit_stats.last_scheduled NULLS FIRST"
+            order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"]
         else:
             raise UnknownPolicy(f"Unknown scheduling policy {policy}")
 
@@ -605,6 +604,8 @@ class SchedulerBackend:
             f"left join {table} {clause}" for table, clause in joins.items()
         )
 
+        order_by_clause = ", ".join(order_by)
+
         # fmt: off
         common_table_expressions.insert(0, ("selected_origins", f"""
             SELECT
@@ -615,7 +616,7 @@ class SchedulerBackend:
             WHERE
               ({") AND (".join(where_clauses)})
             ORDER BY
-              {order_by}
+              {order_by_clause}
             LIMIT %s
         """))
         # fmt: on
-- 
GitLab


From 7f17b742aae6106ac60120e0bf9ae0b3a9392534 Mon Sep 17 00:00:00 2001
From: Nicolas Dandrimont <nicolas@dandrimont.eu>
Date: Mon, 24 Feb 2025 19:05:05 +0100
Subject: [PATCH 3/3] grab_next_visits: implement `stop_after_success` policy

This stops scheduling visits to origins when the last visit was
successful. This will help stop triggering visits for unchanging origins
such as tarball-directory, git-checkout, svn-export etc.

Implements #4691.
---
 swh/scheduler/backend.py   |  8 ++++++++
 swh/scheduler/in_memory.py | 26 +++++++++++++++++++++++++-
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index a72fa62..df0bc4d 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -506,6 +506,14 @@ class SchedulerBackend:
 
         if policy == "oldest_scheduled_first":
             order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"]
+        elif policy == "stop_after_success":
+            where_clauses.append(
+                "origin_visit_stats.last_visit_status is distinct from 'successful'"
+            )
+            order_by = [
+                "listed_origins.last_update NULLS LAST",
+                "listed_origins.first_seen",
+            ]
         elif policy == "never_visited_oldest_update_first":
             # never visited origins have a NULL last_snapshot
             where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
diff --git a/swh/scheduler/in_memory.py b/swh/scheduler/in_memory.py
index c1aee4f..6c2beee 100644
--- a/swh/scheduler/in_memory.py
+++ b/swh/scheduler/in_memory.py
@@ -22,7 +22,13 @@ from swh.scheduler.utils import utcnow
 
 from .exc import StaleData, UnknownPolicy
 from .interface import ListedOriginPageToken, PaginatedListedOriginList
-from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics
+from .model import (
+    LastVisitStatus,
+    ListedOrigin,
+    Lister,
+    OriginVisitStats,
+    SchedulerMetrics,
+)
 
 logger = logging.getLogger(__name__)
 epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
@@ -290,6 +296,24 @@ class InMemoryScheduler:
                 and e[1].last_scheduled.timestamp()
                 or -1
             )
+        elif policy == "stop_after_success":
+            origins_stats = [
+                (o, s)
+                for (o, s) in origins_stats
+                if (s is None or s.last_visit_status != LastVisitStatus.successful)
+            ]
+
+            def sort_key(origin_stats):
+                (origin, _stats) = origin_stats
+                return (
+                    # the next two statements implement `listed_origins.last_update NULLS LAST`
+                    0 if origin.last_update else 1,
+                    origin.last_update,
+                    # then order by listed_origins.first_seen
+                    origin.first_seen,
+                )
+
+            origins_stats.sort(key=sort_key)
         elif policy == "never_visited_oldest_update_first":
             # never visited origins have a NULL last_snapshot
             origins_stats = [
-- 
GitLab