From 4c4e63841116caa70fe1db1c6c6376542a9e6705 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont <nicolas@dandrimont.eu> Date: Mon, 24 Feb 2025 18:48:13 +0100 Subject: [PATCH 1/3] Add swh scheduler origin peek-next command This command checks which tasks would be picked up next by the given policy and arguments. It is implemented with a dry_run flag on `grab_next_visits`. --- swh/scheduler/backend.py | 68 +++++++++++--------- swh/scheduler/cli/origin.py | 125 ++++++++++++++++++++++++++++++++++++ swh/scheduler/in_memory.py | 5 +- swh/scheduler/interface.py | 2 + 4 files changed, 167 insertions(+), 33 deletions(-) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index cc7080b..c011dcf 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -437,6 +437,7 @@ class SchedulerBackend: failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, db=None, cur=None, ) -> List[ListedOrigin]: @@ -540,23 +541,25 @@ class SchedulerBackend: ] ) - # fmt: off + if not dry_run: + # fmt: off + + # This policy requires updating the global queue position for this + # visit type + common_table_expressions.append(("update_queue_position", """ + INSERT INTO + visit_scheduler_queue_position(visit_type, position) + SELECT + visit_type, COALESCE(MAX(next_visit_queue_position), 0) + FROM selected_origins + GROUP BY visit_type + ON CONFLICT(visit_type) DO UPDATE + SET position=GREATEST( + visit_scheduler_queue_position.position, EXCLUDED.position + ) + """)) + # fmt: on - # This policy requires updating the global queue position for this - # visit type - common_table_expressions.append(("update_queue_position", """ - INSERT INTO - visit_scheduler_queue_position(visit_type, position) - SELECT - visit_type, COALESCE(MAX(next_visit_queue_position), 0) - FROM selected_origins - GROUP BY visit_type - ON CONFLICT(visit_type) DO UPDATE - SET position=GREATEST( - visit_scheduler_queue_position.position, EXCLUDED.position - ) - """)) - # fmt: on elif policy == "first_visits_after_listing": assert lister_uuid is not None or ( lister_name is not None and lister_instance_name is not None @@ -628,23 +631,24 @@ class SchedulerBackend: """)) # fmt: on - # fmt: off - common_table_expressions.append(("update_stats", """ - INSERT INTO - origin_visit_stats (url, visit_type, last_scheduled) - SELECT - url, visit_type, %s - FROM - deduplicated_selected_origins - ON CONFLICT (url, visit_type) DO UPDATE - SET last_scheduled = GREATEST( - origin_visit_stats.last_scheduled, - EXCLUDED.last_scheduled - ) - """)) - # fmt: on + if not dry_run: + # fmt: off + common_table_expressions.append(("update_stats", """ + INSERT INTO + origin_visit_stats (url, visit_type, last_scheduled) + SELECT + url, visit_type, %s + FROM + deduplicated_selected_origins + ON CONFLICT (url, visit_type) DO UPDATE + SET last_scheduled = GREATEST( + origin_visit_stats.last_scheduled, + EXCLUDED.last_scheduled + ) + """)) + # fmt: on - query_args.append(timestamp) + query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index b7cb60f..40ea46d 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -269,6 +269,131 @@ def send_from_scheduler_to_celery_cli( ) +@origin.command("peek-next") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.option( + "--tablesample", + help="Table sampling percentage", + type=float, +) +@click.option( + "--only-enabled/--only-disabled", + "enabled", + is_flag=True, + default=True, + help="""Determine whether we want to scheduled enabled or disabled origins. As default, we + want to reasonably deal with enabled origins. For some edge case though, we + might want the disabled ones.""", +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.option( + "--absolute-cooldown", + "absolute_cooldown_str", + default="12 hours", + help="Minimal interval between two visits of the same origin", +) +@click.option( + "--scheduled-cooldown", + "scheduled_cooldown_str", + default="7 days", + help="Minimal interval to wait before scheduling the same origins again", +) +@click.option( + "--not-found-cooldown", + "not_found_cooldown_str", + default="14 days", + help="The minimal interval to wait before rescheduling not_found origins", +) +@click.option( + "--failed-cooldown", + "failed_cooldown_str", + default="31 days", + help="Minimal interval to wait before rescheduling failed origins", +) +@click.option( + "--fields", "-f", default=None, help="Listed origin fields to print on output" +) +@click.option( + "--with-header/--without-header", + is_flag=True, + default=True, + help="Print the CSV header?", +) +@click.argument("type", type=str) +@click.argument("count", type=int) +@click.pass_context +def peek_next( + ctx, + policy: str, + tablesample: Optional[float], + type: str, + count: int, + enabled: bool, + fields: Optional[str], + with_header: bool, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + absolute_cooldown_str: Optional[str] = None, + scheduled_cooldown_str: Optional[str] = None, + failed_cooldown_str: Optional[str] = None, + not_found_cooldown_str: Optional[str] = None, +): + """Get information about the next ``count`` origins that will be scheduled + for ``type``.""" + from .utils import parse_time_interval + + if fields: + parsed_fields: Optional[List[str]] = fields.split(",") + else: + parsed_fields = None + + absolute_cooldown = ( + parse_time_interval(absolute_cooldown_str) if absolute_cooldown_str else None + ) + scheduled_cooldown = ( + parse_time_interval(scheduled_cooldown_str) if scheduled_cooldown_str else None + ) + failed_cooldown = ( + parse_time_interval(failed_cooldown_str) if failed_cooldown_str else None + ) + not_found_cooldown = ( + parse_time_interval(not_found_cooldown_str) if not_found_cooldown_str else None + ) + + scheduler = ctx.obj["scheduler"] + + origins = scheduler.grab_next_visits( + visit_type=type, + count=count, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + absolute_cooldown=absolute_cooldown, + scheduled_cooldown=scheduled_cooldown, + failed_cooldown=failed_cooldown, + not_found_cooldown=not_found_cooldown, + dry_run=True, + ) + + for line in format_origins( + ctx, origins, fields=parsed_fields, with_header=with_header + ): + click.echo(line) + + @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( diff --git a/swh/scheduler/in_memory.py b/swh/scheduler/in_memory.py index b576e8f..c1aee4f 100644 --- a/swh/scheduler/in_memory.py +++ b/swh/scheduler/in_memory.py @@ -212,6 +212,7 @@ class InMemoryScheduler: failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() @@ -392,7 +393,9 @@ class InMemoryScheduler: ) for (o, s) in origins_stats ] - self.origin_visit_stats_upsert(ovs) + + if not dry_run: + self.origin_visit_stats_upsert(ovs) return [o for (o, _) in origins_stats] diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py index 15e35d4..64f9cf5 100644 --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -483,6 +483,7 @@ class SchedulerInterface(Protocol): failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. @@ -512,6 +513,7 @@ class SchedulerInterface(Protocol): not_found origin tablesample: the percentage of the table on which we run the query (None: no sampling) + dry_run: if True, only return the origins without marking them as scheduled """ ... -- GitLab From 46764435f8b4e927a78fbd51a6c8510a645b89c0 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont <nicolas@dandrimont.eu> Date: Mon, 24 Feb 2025 19:04:29 +0100 Subject: [PATCH 2/3] grab_next_visits: support multiple ORDER BY clauses --- swh/scheduler/backend.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index c011dcf..a72fa62 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -453,6 +453,7 @@ class SchedulerBackend: query_args: List[Any] = [] where_clauses = [] + order_by: List[str] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] @@ -504,14 +505,14 @@ class SchedulerBackend: query_args.append(timestamp - not_found_cooldown) if policy == "oldest_scheduled_first": - order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"] elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") - order_by = "listed_origins.last_update" + order_by = ["listed_origins.last_update"] elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? @@ -525,21 +526,19 @@ class SchedulerBackend: ) # order by decreasing visit lag - order_by = ( + order_by = [ "listed_origins.last_update - origin_visit_stats.last_successful DESC" - ) + ] elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") - order_by = ", ".join( - [ - # By default, sort using the queue position. If the queue - # position is null, then the origin has never been visited, - # which we want to handle first - "origin_visit_stats.next_visit_queue_position nulls first", - # Schedule unknown origins in the order we've seen them - "listed_origins.first_seen", - ] - ) + order_by = [ + # By default, sort using the queue position. If the queue + # position is null, then the origin has never been visited, + # which we want to handle first + "origin_visit_stats.next_visit_queue_position nulls first", + # Schedule unknown origins in the order we've seen them + "listed_origins.first_seen", + ] if not dry_run: # fmt: off @@ -577,7 +576,7 @@ class SchedulerBackend: where_clause += " OR origin_visit_stats.last_scheduled < %s" query_args.append(lister.last_listing_finished_at) where_clauses.append(where_clause) - order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"] else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") @@ -605,6 +604,8 @@ class SchedulerBackend: f"left join {table} {clause}" for table, clause in joins.items() ) + order_by_clause = ", ".join(order_by) + # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT @@ -615,7 +616,7 @@ class SchedulerBackend: WHERE ({") AND (".join(where_clauses)}) ORDER BY - {order_by} + {order_by_clause} LIMIT %s """)) # fmt: on -- GitLab From 7f17b742aae6106ac60120e0bf9ae0b3a9392534 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont <nicolas@dandrimont.eu> Date: Mon, 24 Feb 2025 19:05:05 +0100 Subject: [PATCH 3/3] grab_next_visits: implement `stop_after_success` policy This stops scheduling visits to origins when the last visit was successful. This will help stop triggering visits for unchanging origins such as tarball-directory, git-checkout, svn-export etc. Implements #4691. --- swh/scheduler/backend.py | 8 ++++++++ swh/scheduler/in_memory.py | 26 +++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index a72fa62..df0bc4d 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -506,6 +506,14 @@ class SchedulerBackend: if policy == "oldest_scheduled_first": order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"] + elif policy == "stop_after_success": + where_clauses.append( + "origin_visit_stats.last_visit_status is distinct from 'successful'" + ) + order_by = [ + "listed_origins.last_update NULLS LAST", + "listed_origins.first_seen", + ] elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") diff --git a/swh/scheduler/in_memory.py b/swh/scheduler/in_memory.py index c1aee4f..6c2beee 100644 --- a/swh/scheduler/in_memory.py +++ b/swh/scheduler/in_memory.py @@ -22,7 +22,13 @@ from swh.scheduler.utils import utcnow from .exc import StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList -from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics +from .model import ( + LastVisitStatus, + ListedOrigin, + Lister, + OriginVisitStats, + SchedulerMetrics, +) logger = logging.getLogger(__name__) epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc) @@ -290,6 +296,24 @@ class InMemoryScheduler: and e[1].last_scheduled.timestamp() or -1 ) + elif policy == "stop_after_success": + origins_stats = [ + (o, s) + for (o, s) in origins_stats + if (s is None or s.last_visit_status != LastVisitStatus.successful) + ] + + def sort_key(origin_stats): + (origin, _stats) = origin_stats + return ( + # the next two statements implement `listed_origins.last_update NULLS LAST` + 0 if origin.last_update else 1, + origin.last_update, + # then order by listed_origins.first_seen + origin.first_seen, + ) + + origins_stats.sort(key=sort_key) elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot origins_stats = [ -- GitLab