diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index cc7080bbdce67de44e06dab90047222b508e6a22..df0bc4dd051fbeee25d4748cb23fa6b8c3beedb4 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -437,6 +437,7 @@ class SchedulerBackend: failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, db=None, cur=None, ) -> List[ListedOrigin]: @@ -452,6 +453,7 @@ class SchedulerBackend: query_args: List[Any] = [] where_clauses = [] + order_by: List[str] # list of (name, query) handled as CTEs before the main query common_table_expressions: List[Tuple[str, str]] = [] @@ -503,14 +505,22 @@ class SchedulerBackend: query_args.append(timestamp - not_found_cooldown) if policy == "oldest_scheduled_first": - order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"] + elif policy == "stop_after_success": + where_clauses.append( + "origin_visit_stats.last_visit_status is distinct from 'successful'" + ) + order_by = [ + "listed_origins.last_update NULLS LAST", + "listed_origins.first_seen", + ] elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot where_clauses.append("origin_visit_stats.last_snapshot IS NULL") # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") - order_by = "listed_origins.last_update" + order_by = ["listed_origins.last_update"] elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? @@ -524,39 +534,39 @@ class SchedulerBackend: ) # order by decreasing visit lag - order_by = ( + order_by = [ "listed_origins.last_update - origin_visit_stats.last_successful DESC" - ) + ] elif policy == "origins_without_last_update": where_clauses.append("last_update IS NULL") - order_by = ", ".join( - [ - # By default, sort using the queue position. If the queue - # position is null, then the origin has never been visited, - # which we want to handle first - "origin_visit_stats.next_visit_queue_position nulls first", - # Schedule unknown origins in the order we've seen them - "listed_origins.first_seen", - ] - ) - - # fmt: off + order_by = [ + # By default, sort using the queue position. If the queue + # position is null, then the origin has never been visited, + # which we want to handle first + "origin_visit_stats.next_visit_queue_position nulls first", + # Schedule unknown origins in the order we've seen them + "listed_origins.first_seen", + ] + + if not dry_run: + # fmt: off + + # This policy requires updating the global queue position for this + # visit type + common_table_expressions.append(("update_queue_position", """ + INSERT INTO + visit_scheduler_queue_position(visit_type, position) + SELECT + visit_type, COALESCE(MAX(next_visit_queue_position), 0) + FROM selected_origins + GROUP BY visit_type + ON CONFLICT(visit_type) DO UPDATE + SET position=GREATEST( + visit_scheduler_queue_position.position, EXCLUDED.position + ) + """)) + # fmt: on - # This policy requires updating the global queue position for this - # visit type - common_table_expressions.append(("update_queue_position", """ - INSERT INTO - visit_scheduler_queue_position(visit_type, position) - SELECT - visit_type, COALESCE(MAX(next_visit_queue_position), 0) - FROM selected_origins - GROUP BY visit_type - ON CONFLICT(visit_type) DO UPDATE - SET position=GREATEST( - visit_scheduler_queue_position.position, EXCLUDED.position - ) - """)) - # fmt: on elif policy == "first_visits_after_listing": assert lister_uuid is not None or ( lister_name is not None and lister_instance_name is not None @@ -574,7 +584,7 @@ class SchedulerBackend: where_clause += " OR origin_visit_stats.last_scheduled < %s" query_args.append(lister.last_listing_finished_at) where_clauses.append(where_clause) - order_by = "origin_visit_stats.last_scheduled NULLS FIRST" + order_by = ["origin_visit_stats.last_scheduled NULLS FIRST"] else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") @@ -602,6 +612,8 @@ class SchedulerBackend: f"left join {table} {clause}" for table, clause in joins.items() ) + order_by_clause = ", ".join(order_by) + # fmt: off common_table_expressions.insert(0, ("selected_origins", f""" SELECT @@ -612,7 +624,7 @@ class SchedulerBackend: WHERE ({") AND (".join(where_clauses)}) ORDER BY - {order_by} + {order_by_clause} LIMIT %s """)) # fmt: on @@ -628,23 +640,24 @@ class SchedulerBackend: """)) # fmt: on - # fmt: off - common_table_expressions.append(("update_stats", """ - INSERT INTO - origin_visit_stats (url, visit_type, last_scheduled) - SELECT - url, visit_type, %s - FROM - deduplicated_selected_origins - ON CONFLICT (url, visit_type) DO UPDATE - SET last_scheduled = GREATEST( - origin_visit_stats.last_scheduled, - EXCLUDED.last_scheduled - ) - """)) - # fmt: on + if not dry_run: + # fmt: off + common_table_expressions.append(("update_stats", """ + INSERT INTO + origin_visit_stats (url, visit_type, last_scheduled) + SELECT + url, visit_type, %s + FROM + deduplicated_selected_origins + ON CONFLICT (url, visit_type) DO UPDATE + SET last_scheduled = GREATEST( + origin_visit_stats.last_scheduled, + EXCLUDED.last_scheduled + ) + """)) + # fmt: on - query_args.append(timestamp) + query_args.append(timestamp) formatted_ctes = ",\n".join( f"{name} AS (\n{cte}\n)" for name, cte in common_table_expressions diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index b7cb60f7d17761c00f3a225465b7fe9691f28b35..40ea46d98dfa5dc2bf5f8f24c586807cc6d49a42 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -269,6 +269,131 @@ def send_from_scheduler_to_celery_cli( ) +@origin.command("peek-next") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.option( + "--tablesample", + help="Table sampling percentage", + type=float, +) +@click.option( + "--only-enabled/--only-disabled", + "enabled", + is_flag=True, + default=True, + help="""Determine whether we want to scheduled enabled or disabled origins. As default, we + want to reasonably deal with enabled origins. For some edge case though, we + might want the disabled ones.""", +) +@click.option( + "--lister-name", + default=None, + help="Limit origins to those listed from lister with provided name", +) +@click.option( + "--lister-instance-name", + default=None, + help="Limit origins to those listed from lister with instance name", +) +@click.option( + "--absolute-cooldown", + "absolute_cooldown_str", + default="12 hours", + help="Minimal interval between two visits of the same origin", +) +@click.option( + "--scheduled-cooldown", + "scheduled_cooldown_str", + default="7 days", + help="Minimal interval to wait before scheduling the same origins again", +) +@click.option( + "--not-found-cooldown", + "not_found_cooldown_str", + default="14 days", + help="The minimal interval to wait before rescheduling not_found origins", +) +@click.option( + "--failed-cooldown", + "failed_cooldown_str", + default="31 days", + help="Minimal interval to wait before rescheduling failed origins", +) +@click.option( + "--fields", "-f", default=None, help="Listed origin fields to print on output" +) +@click.option( + "--with-header/--without-header", + is_flag=True, + default=True, + help="Print the CSV header?", +) +@click.argument("type", type=str) +@click.argument("count", type=int) +@click.pass_context +def peek_next( + ctx, + policy: str, + tablesample: Optional[float], + type: str, + count: int, + enabled: bool, + fields: Optional[str], + with_header: bool, + lister_name: Optional[str] = None, + lister_instance_name: Optional[str] = None, + absolute_cooldown_str: Optional[str] = None, + scheduled_cooldown_str: Optional[str] = None, + failed_cooldown_str: Optional[str] = None, + not_found_cooldown_str: Optional[str] = None, +): + """Get information about the next ``count`` origins that will be scheduled + for ``type``.""" + from .utils import parse_time_interval + + if fields: + parsed_fields: Optional[List[str]] = fields.split(",") + else: + parsed_fields = None + + absolute_cooldown = ( + parse_time_interval(absolute_cooldown_str) if absolute_cooldown_str else None + ) + scheduled_cooldown = ( + parse_time_interval(scheduled_cooldown_str) if scheduled_cooldown_str else None + ) + failed_cooldown = ( + parse_time_interval(failed_cooldown_str) if failed_cooldown_str else None + ) + not_found_cooldown = ( + parse_time_interval(not_found_cooldown_str) if not_found_cooldown_str else None + ) + + scheduler = ctx.obj["scheduler"] + + origins = scheduler.grab_next_visits( + visit_type=type, + count=count, + policy=policy, + tablesample=tablesample, + enabled=enabled, + lister_name=lister_name, + lister_instance_name=lister_instance_name, + absolute_cooldown=absolute_cooldown, + scheduled_cooldown=scheduled_cooldown, + failed_cooldown=failed_cooldown, + not_found_cooldown=not_found_cooldown, + dry_run=True, + ) + + for line in format_origins( + ctx, origins, fields=parsed_fields, with_header=with_header + ): + click.echo(line) + + @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( diff --git a/swh/scheduler/in_memory.py b/swh/scheduler/in_memory.py index b576e8f07e94f9acfa5117cb4a0ac319ef14742d..6c2beee8f7032bc47119d1f36ae2dca0d4fd603b 100644 --- a/swh/scheduler/in_memory.py +++ b/swh/scheduler/in_memory.py @@ -22,7 +22,13 @@ from swh.scheduler.utils import utcnow from .exc import StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList -from .model import ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics +from .model import ( + LastVisitStatus, + ListedOrigin, + Lister, + OriginVisitStats, + SchedulerMetrics, +) logger = logging.getLogger(__name__) epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc) @@ -212,6 +218,7 @@ class InMemoryScheduler: failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, ) -> List[ListedOrigin]: if timestamp is None: timestamp = utcnow() @@ -289,6 +296,24 @@ class InMemoryScheduler: and e[1].last_scheduled.timestamp() or -1 ) + elif policy == "stop_after_success": + origins_stats = [ + (o, s) + for (o, s) in origins_stats + if (s is None or s.last_visit_status != LastVisitStatus.successful) + ] + + def sort_key(origin_stats): + (origin, _stats) = origin_stats + return ( + # the next two statements implement `listed_origins.last_update NULLS LAST` + 0 if origin.last_update else 1, + origin.last_update, + # then order by listed_origins.first_seen + origin.first_seen, + ) + + origins_stats.sort(key=sort_key) elif policy == "never_visited_oldest_update_first": # never visited origins have a NULL last_snapshot origins_stats = [ @@ -392,7 +417,9 @@ class InMemoryScheduler: ) for (o, s) in origins_stats ] - self.origin_visit_stats_upsert(ovs) + + if not dry_run: + self.origin_visit_stats_upsert(ovs) return [o for (o, _) in origins_stats] diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py index 15e35d449bcfe7fa51c2479563786e8e94b858ab..64f9cf56b7c09e5da16eefaec2bf3313a5ef8578 100644 --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -483,6 +483,7 @@ class SchedulerInterface(Protocol): failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, + dry_run: bool = False, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. @@ -512,6 +513,7 @@ class SchedulerInterface(Protocol): not_found origin tablesample: the percentage of the table on which we run the query (None: no sampling) + dry_run: if True, only return the origins without marking them as scheduled """ ...