Skip to content
Snippets Groups Projects
Commit 60530d3a authored by Antoine Lambert's avatar Antoine Lambert
Browse files

pattern: Store termination date to scheduler database at end of listing

It enables to track last lister execution date and will be used to schedule
first visits with high priority for listed origins.

Related to swh/devel/swh-scheduler#4687.
parent 927aebbd
No related branches found
Tags v0.0.4
No related merge requests found
# Copyright (C) 2022 The Software Heritage developers
# Copyright (C) 2022-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -133,7 +133,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
lister = HackageLister(scheduler=swh_scheduler)
# force lister.last_listing_date to not being 'now'
lister.state.last_listing_date = iso8601.parse_date("2022-08-26T02:27:45.073759Z")
lister.set_state_in_scheduler()
lister.set_state_in_scheduler(force=True)
assert lister.get_state_from_scheduler() == HackageListerState(
last_listing_date=iso8601.parse_date("2022-08-26T02:27:45.073759Z")
)
......@@ -157,7 +157,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
lister.state.last_listing_date = iso8601.parse_date(
"2022-09-30T08:00:34.348551203Z"
)
lister.set_state_in_scheduler()
lister.set_state_in_scheduler(force=True)
assert lister.get_state_from_scheduler() == HackageListerState(
last_listing_date=iso8601.parse_date("2022-09-30T08:00:34.348551203Z")
)
......
# Copyright (C) 2020-2023 The Software Heritage developers
# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import copy
from dataclasses import dataclass
import logging
from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, TypeVar
......@@ -20,6 +21,7 @@ from swh.core.retry import http_retry
from swh.core.utils import grouper
from swh.scheduler import get_scheduler, model
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.utils import utcnow
from . import USER_AGENT_TEMPLATE
from .utils import is_valid_origin_url
......@@ -247,8 +249,7 @@ class Lister(Generic[StateType, PageType]):
break
finally:
self.finalize()
if self.updated:
self.set_state_in_scheduler()
self.set_state_in_scheduler()
return full_stats
......@@ -262,19 +263,26 @@ class Lister(Generic[StateType, PageType]):
the state retrieved from the scheduler backend
"""
self.lister_obj = self.scheduler.get_or_create_lister(
name=self.LISTER_NAME, instance_name=self.instance
name=self.LISTER_NAME,
instance_name=self.instance,
)
return self.state_from_dict(self.lister_obj.current_state)
return self.state_from_dict(copy.deepcopy(self.lister_obj.current_state))
def set_state_in_scheduler(self) -> None:
def set_state_in_scheduler(self, force: bool = False) -> None:
"""Update the state in the scheduler backend from the state of the current
instance.
Args:
force: Update lister state even when lister has ``updated`` attribute
set to :const:`False`, this is useful for tests
Raises:
swh.scheduler.exc.StaleData: in case of a race condition between
concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`).
"""
self.lister_obj.current_state = self.state_to_dict(self.state)
if self.updated or force:
self.lister_obj.current_state = self.state_to_dict(self.state)
self.lister_obj.last_listing_finished_at = utcnow()
self.lister_obj = self.scheduler.update_lister(self.lister_obj)
# State management to/from the scheduler
......
......@@ -413,4 +413,5 @@ class SaveBulkLister(Lister[SaveBulkListerState, SaveBulkListerPage]):
# update scheduler state at each rejected origin to get feedback
# using Web API before end of listing
self.state.rejected_origins = list(self.rejected_origins)
self.updated = True
self.set_state_in_scheduler()
# Copyright (C) 2020-2021 The Software Heritage developers
# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -151,21 +151,34 @@ def test_run(swh_scheduler):
update_date = lister.lister_obj.updated
assert lister.lister_obj.last_listing_finished_at is None
run_result = lister.run()
assert run_result.pages == 2
assert run_result.origins == 20
stored_lister = swh_scheduler.get_or_create_lister(
name="test-pattern-lister", instance_name="example.com"
name=lister.lister_obj.name, instance_name=lister.lister_obj.instance_name
)
# Check that the finalize operation happened
assert stored_lister.updated > update_date
assert stored_lister.current_state["updated"] == "yes"
assert stored_lister.last_listing_finished_at is not None
last_listing_finished_at = stored_lister.last_listing_finished_at
check_listed_origins(swh_scheduler, lister, stored_lister)
lister.run()
stored_lister = swh_scheduler.get_or_create_lister(
name=lister.lister_obj.name, instance_name=lister.lister_obj.instance_name
)
assert stored_lister.last_listing_finished_at > last_listing_finished_at
class InstantiableStatelessLister(pattern.StatelessLister[PageType]):
LISTER_NAME = "test-stateless-lister"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment