cli: Use temporary scheduler as fallback when no configuration detected
In order to simplify the testing of listers, allow to call the run command
of swh-lister
CLI without scheduler configuration. In that case a temporary
scheduler instance with a postgresql backend is created and used.
It enables to easily test a lister with the following command:
$ swh -l DEBUG lister run <lister_name> url=<forge_url>
Used by swh-scheduler!349 (merged)
Merge request reports
Activity
Jenkins job DLS/gitlab-builds #94 failed .
See Console Output and Coverage Report for more details.added 1 commit
- 397397a7 - debian: Fix mypy error 'Unused "type: ignore" comment'
added 5 commits
-
397397a7...ad6644a6 - 3 commits from branch
swh/devel:master
- e7111c7d - cli: Use temporary scheduler as fallback when no configuration detected
- b7523862 - cli: Print lister stats at the end of the run command
-
397397a7...ad6644a6 - 3 commits from branch
Jenkins job DLS/gitlab-builds #95 succeeded .
See Console Output and Coverage Report for more details.Jenkins job DLS/gitlab-builds #96 succeeded .
See Console Output and Coverage Report for more details.I don't think we should be doing this by default, and I don't think we should make our runtime depend on a postgresql testing harness.
If we're to keep this (I'm not sure we should have swh-scheduler-specific initialization code in swh-lister, feels a bit weird), this should be opted-into explicitly
I opened swh-scheduler!349 (merged) adding a temporary scheduler backend. I will update that MR accordingly.
Alternatively, here is a minimal scheduler which doesn't need postgresql and is good enough to run a lister:
$ cat swh/scheduler/noop_backend.py # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import uuid import attr from psycopg2.errors import CardinalityViolation from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.scheduler.utils import utcnow from .exc import SchedulerException, StaleData, UnknownPolicy from .interface import ListedOriginPageToken, PaginatedListedOriginList from .model import ( LastVisitStatus, ListedOrigin, Lister, OriginVisitStats, SchedulerMetrics, ) logger = logging.getLogger(__name__) class NoopBackend: """Dummy Backend for the Software Heritage scheduler.""" def __init__(self): pass def create_task_type(self, task_type): pass def get_task_type(self, task_type_name): return None def get_task_types(self): return [] def get_listers(self) -> List[Lister]: return [] def get_listers_by_id( self, lister_ids: List[str] ) -> List[Lister]: return [] def get_lister( self, name: str, instance_name: Optional[str] = None ) -> Optional[Lister]: return Lister(name=name, instance_name=instance_name, id=uuid.uuid4()) def get_or_create_lister( self, name: str, instance_name: Optional[str] = None ) -> Lister: return Lister(name=name, instance_name=instance_name, id=uuid.uuid4()) def update_lister(self, lister: Lister) -> Lister: return lister def record_listed_origins( self, listed_origins: Iterable[ListedOrigin] ) -> List[ListedOrigin]: for listed_origin in listed_origins: print("recording origin:", listed_origin) return listed_origins def get_listed_origins( self, lister_id: Optional[uuid.UUID] = None, url: Optional[str] = None, enabled: Optional[bool] = True, limit: int = 1000, page_token: Optional[ListedOriginPageToken] = None, ) -> PaginatedListedOriginList: return PaginatedListedOriginList([], None) def grab_next_visits( self, visit_type: str, count: int, policy: str, enabled: bool = True, lister_uuid: Optional[str] = None, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, timestamp: Optional[datetime.datetime] = None, absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12), scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), tablesample: Optional[float] = None, ) -> List[ListedOrigin]: return [] def create_tasks(self, tasks, policy="recurring"): for task in tasks: print("creating task:", task) return [] def set_status_tasks( self, task_ids: List[int], status: str = "disabled", next_run: Optional[datetime.datetime] = None, ): pass def disable_tasks(self, task_ids): pass def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, ): return [] def get_tasks(self, task_ids): return [] def peek_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: return [] def grab_ready_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: return [] def peek_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: return [] def grab_ready_priority_tasks( self, task_type: str, timestamp: Optional[datetime.datetime] = None, num_tasks: Optional[int] = None, ) -> List[Dict]: return [] def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None ): pass def mass_schedule_task_runs(self, task_runs): pass def start_task_run( self, backend_id, metadata=None, timestamp=None ): pass def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, ): pass def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, ) -> Dict[str, Any]: pass def delete_archived_tasks(self, task_ids): pass def get_task_runs(self, task_ids, limit=None): return [] def origin_visit_stats_upsert( self, origin_visit_stats: Iterable[OriginVisitStats] ) -> None: for stat in origin_visit_stats: print("upserting visit stats:", stat) def origin_visit_stats_get( self, ids: Iterable[Tuple[str, str]] ) -> List[OriginVisitStats]: return [] def visit_scheduler_queue_position_get(self) -> Dict[str, int]: return {} def visit_scheduler_queue_position_set( self, visit_type: str, position: int, ) -> None: pass def update_metrics( self, lister_id: Optional[uuid.UUID] = None, timestamp: Optional[datetime.datetime] = None, db=None, cur=None, ) -> List[SchedulerMetrics]: return [] def get_metrics( self, lister_id: Optional[uuid.UUID] = None, visit_type: Optional[str] = None, ) -> List[SchedulerMetrics]: return [] $ git diff | cat diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index cfb15fa..ae9005d 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -28,6 +28,7 @@ BACKEND_TYPES: Dict[str, str] = { "postgresql": ".backend.SchedulerBackend", + "noop": ".noop_backend.NoopBackend", "remote": ".api.client.RemoteScheduler", # deprecated "local": ".backend.SchedulerBackend",
Edited by vlorentz
Jenkins job DLS/gitlab-builds #97 succeeded .
See Console Output and Coverage Report for more details.mentioned in merge request swh-scheduler!349 (merged)
assigned to @anlambert
added mr-reviewed-fall-2023 label
added 52 commits
-
9fb8658f...7344d264 - 50 commits from branch
swh/devel:master
- 7092e4e4 - cli: Use temporary scheduler as fallback when no configuration detected
- 2eb32234 - cli: Print lister stats at the end of the run command
-
9fb8658f...7344d264 - 50 commits from branch
Jenkins job DLS/gitlab-builds #276 succeeded .
See Console Output and Coverage Report for more details.