Skip to content
Snippets Groups Projects

cli: Use temporary scheduler as fallback when no configuration detected

Merged Antoine Lambert requested to merge anlambert/swh-lister:cli-improve-run into master
1 unresolved thread

In order to simplify the testing of listers, allow to call the run command of swh-lister CLI without scheduler configuration. In that case a temporary scheduler instance with a postgresql backend is created and used.

It enables to easily test a lister with the following command:

$ swh -l DEBUG lister run <lister_name> url=<forge_url>

Used by swh-scheduler!349 (merged)

Edited by vlorentz

Merge request reports

Pipeline #5031 passed

Pipeline passed for 2eb32234 on anlambert:cli-improve-run

Approval is optional

Merged by Antoine LambertAntoine Lambert 1 year ago (Nov 8, 2023 9:42am UTC)

Merge details

  • Changes merged into with 2eb32234.
  • Did not delete the source branch.

Pipeline #5043 passed

Pipeline passed for 2eb32234 on master

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
  • Jenkins job DLS/gitlab-builds #94 failed .
    See Console Output and Coverage Report for more details.

  • Antoine Lambert added 1 commit

    added 1 commit

    • 397397a7 - debian: Fix mypy error 'Unused "type: ignore" comment'

    Compare with previous version

  • Antoine Lambert added 5 commits

    added 5 commits

    • 397397a7...ad6644a6 - 3 commits from branch swh/devel:master
    • e7111c7d - cli: Use temporary scheduler as fallback when no configuration detected
    • b7523862 - cli: Print lister stats at the end of the run command

    Compare with previous version

  • Jenkins job DLS/gitlab-builds #95 succeeded .
    See Console Output and Coverage Report for more details.

  • Jenkins job DLS/gitlab-builds #96 succeeded .
    See Console Output and Coverage Report for more details.

  • I don't think we should be doing this by default, and I don't think we should make our runtime depend on a postgresql testing harness.

    If we're to keep this (I'm not sure we should have swh-scheduler-specific initialization code in swh-lister, feels a bit weird), this should be opted-into explicitly

    • In practical terms maybe it would feel cleaner if this was added to swh.scheduler as a cls=temporary option?

    • Author Maintainer

      Good idea, it would make that feature reusable elsewhere. I will move that code in a new scheduler backend then.

    • Author Maintainer

      I opened swh-scheduler!349 (merged) adding a temporary scheduler backend. I will update that MR accordingly.

    • Alternatively, here is a minimal scheduler which doesn't need postgresql and is good enough to run a lister:

      $ cat swh/scheduler/noop_backend.py
      # Copyright (C) 2015-2022  The Software Heritage developers
      # See the AUTHORS file at the top-level directory of this distribution
      # License: GNU General Public License version 3, or any later version
      # See top-level LICENSE file for more information
      
      import datetime
      import json
      import logging
      from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
      import uuid
      
      import attr
      from psycopg2.errors import CardinalityViolation
      from psycopg2.extensions import AsIs
      import psycopg2.extras
      import psycopg2.pool
      
      from swh.core.db import BaseDb
      from swh.core.db.common import db_transaction
      from swh.scheduler.utils import utcnow
      
      from .exc import SchedulerException, StaleData, UnknownPolicy
      from .interface import ListedOriginPageToken, PaginatedListedOriginList
      from .model import (
          LastVisitStatus,
          ListedOrigin,
          Lister,
          OriginVisitStats,
          SchedulerMetrics,
      )
      
      logger = logging.getLogger(__name__)
      
      
      class NoopBackend:
          """Dummy Backend for the Software Heritage scheduler."""
      
          def __init__(self):
              pass
      
          def create_task_type(self, task_type):
              pass
      
          def get_task_type(self, task_type_name):
              return None
      
          def get_task_types(self):
              return []
      
          def get_listers(self) -> List[Lister]:
              return []
      
          def get_listers_by_id(
              self, lister_ids: List[str]
          ) -> List[Lister]:
              return []
      
          def get_lister(
              self, name: str, instance_name: Optional[str] = None
          ) -> Optional[Lister]:
              return Lister(name=name, instance_name=instance_name, id=uuid.uuid4())
      
          def get_or_create_lister(
              self, name: str, instance_name: Optional[str] = None
          ) -> Lister:
              return Lister(name=name, instance_name=instance_name, id=uuid.uuid4())
      
          def update_lister(self, lister: Lister) -> Lister:
              return lister
      
          def record_listed_origins(
              self, listed_origins: Iterable[ListedOrigin]
          ) -> List[ListedOrigin]:
              for listed_origin in listed_origins:
                  print("recording origin:", listed_origin)
              return listed_origins
      
          def get_listed_origins(
              self,
              lister_id: Optional[uuid.UUID] = None,
              url: Optional[str] = None,
              enabled: Optional[bool] = True,
              limit: int = 1000,
              page_token: Optional[ListedOriginPageToken] = None,
          ) -> PaginatedListedOriginList:
              return PaginatedListedOriginList([], None)
      
          def grab_next_visits(
              self,
              visit_type: str,
              count: int,
              policy: str,
              enabled: bool = True,
              lister_uuid: Optional[str] = None,
              lister_name: Optional[str] = None,
              lister_instance_name: Optional[str] = None,
              timestamp: Optional[datetime.datetime] = None,
              absolute_cooldown: Optional[datetime.timedelta] = datetime.timedelta(hours=12),
              scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
              failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
              not_found_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
              tablesample: Optional[float] = None,
          ) -> List[ListedOrigin]:
              return []
      
          def create_tasks(self, tasks, policy="recurring"):
              for task in tasks:
                  print("creating task:", task)
              return []
      
          def set_status_tasks(
              self,
              task_ids: List[int],
              status: str = "disabled",
              next_run: Optional[datetime.datetime] = None,
          ):
              pass
      
          def disable_tasks(self, task_ids):
              pass
      
          def search_tasks(
              self,
              task_id=None,
              task_type=None,
              status=None,
              priority=None,
              policy=None,
              before=None,
              after=None,
              limit=None,
          ):
              return []
      
          def get_tasks(self, task_ids):
              return []
      
          def peek_ready_tasks(
              self,
              task_type: str,
              timestamp: Optional[datetime.datetime] = None,
              num_tasks: Optional[int] = None,
          ) -> List[Dict]:
              return []
      
          def grab_ready_tasks(
              self,
              task_type: str,
              timestamp: Optional[datetime.datetime] = None,
              num_tasks: Optional[int] = None,
          ) -> List[Dict]:
              return []
      
          def peek_ready_priority_tasks(
              self,
              task_type: str,
              timestamp: Optional[datetime.datetime] = None,
              num_tasks: Optional[int] = None,
          ) -> List[Dict]:
              return []
      
          def grab_ready_priority_tasks(
              self,
              task_type: str,
              timestamp: Optional[datetime.datetime] = None,
              num_tasks: Optional[int] = None,
          ) -> List[Dict]:
              return []
      
          def schedule_task_run(
              self, task_id, backend_id, metadata=None, timestamp=None
          ):
              pass
      
          def mass_schedule_task_runs(self, task_runs):
              pass
      
          def start_task_run(
              self, backend_id, metadata=None, timestamp=None
          ):
              pass
      
          def end_task_run(
              self,
              backend_id,
              status,
              metadata=None,
              timestamp=None,
              result=None,
          ):
              pass
      
          def filter_task_to_archive(
              self,
              after_ts: str,
              before_ts: str,
              limit: int = 10,
              page_token: Optional[str] = None,
          ) -> Dict[str, Any]:
              pass
      
          def delete_archived_tasks(self, task_ids):
              pass
      
          def get_task_runs(self, task_ids, limit=None):
              return []
      
          def origin_visit_stats_upsert(
              self, origin_visit_stats: Iterable[OriginVisitStats]
          ) -> None:
              for stat in origin_visit_stats:
                  print("upserting visit stats:", stat)
      
          def origin_visit_stats_get(
              self, ids: Iterable[Tuple[str, str]]
          ) -> List[OriginVisitStats]:
              return []
      
          def visit_scheduler_queue_position_get(self) -> Dict[str, int]:
              return {}
      
          def visit_scheduler_queue_position_set(
              self,
              visit_type: str,
              position: int,
          ) -> None:
              pass
      
          def update_metrics(
              self,
              lister_id: Optional[uuid.UUID] = None,
              timestamp: Optional[datetime.datetime] = None,
              db=None,
              cur=None,
          ) -> List[SchedulerMetrics]:
              return []
      
          def get_metrics(
              self,
              lister_id: Optional[uuid.UUID] = None,
              visit_type: Optional[str] = None,
          ) -> List[SchedulerMetrics]:
              return []
      
      
      
      $ git diff | cat
      diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py
      index cfb15fa..ae9005d 100644
      --- a/swh/scheduler/__init__.py
      +++ b/swh/scheduler/__init__.py
      @@ -28,6 +28,7 @@
       
       BACKEND_TYPES: Dict[str, str] = {
           "postgresql": ".backend.SchedulerBackend",
      +    "noop": ".noop_backend.NoopBackend",
           "remote": ".api.client.RemoteScheduler",
           # deprecated
           "local": ".backend.SchedulerBackend",
      Edited by vlorentz
    • Please register or sign in to reply
  • Antoine Lambert added 2 commits

    added 2 commits

    • 5a6f1272 - cli: Use temporary scheduler as fallback when no configuration detected
    • 9fb8658f - cli: Print lister stats at the end of the run command

    Compare with previous version

  • Jenkins job DLS/gitlab-builds #97 succeeded .
    See Console Output and Coverage Report for more details.

  • mentioned in merge request swh-scheduler!349 (merged)

  • Antoine Lambert changed the description

    changed the description

  • vlorentz changed the description

    changed the description

  • Antoine Lambert added 52 commits

    added 52 commits

    • 9fb8658f...7344d264 - 50 commits from branch swh/devel:master
    • 7092e4e4 - cli: Use temporary scheduler as fallback when no configuration detected
    • 2eb32234 - cli: Print lister stats at the end of the run command

    Compare with previous version

  • Author Maintainer

    ^ Bump swh.scheduler in reqirements

  • Jenkins job DLS/gitlab-builds #276 succeeded .
    See Console Output and Coverage Report for more details.

  • Nicolas Dandrimont approved this merge request

    approved this merge request

Please register or sign in to reply
Loading