Skip to content
Snippets Groups Projects
Commit eadb7044 authored by Antoine Lambert's avatar Antoine Lambert
Browse files

pattern: Ensure termination date is set at the end of listing process

Previously it could be set by any call to the `set_state_in_scheduler`
method.

This was leading to side effects on the save bulk lister while updating
the scheduler state when encountering an invalid or not found origin,
and thus the listing failed.

Fixes #4712.
parent 99f64ddb
No related branches found
Tags v6.9.1
1 merge request!537pattern: Ensure termination date is set at the end of listing process
Pipeline #11641 passed
......@@ -133,7 +133,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
lister = HackageLister(scheduler=swh_scheduler)
# force lister.last_listing_date to not being 'now'
lister.state.last_listing_date = iso8601.parse_date("2022-08-26T02:27:45.073759Z")
lister.set_state_in_scheduler(force=True)
lister.set_state_in_scheduler(force_state=True)
assert lister.get_state_from_scheduler() == HackageListerState(
last_listing_date=iso8601.parse_date("2022-08-26T02:27:45.073759Z")
)
......@@ -157,7 +157,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
lister.state.last_listing_date = iso8601.parse_date(
"2022-09-30T08:00:34.348551203Z"
)
lister.set_state_in_scheduler(force=True)
lister.set_state_in_scheduler(force_state=True)
assert lister.get_state_from_scheduler() == HackageListerState(
last_listing_date=iso8601.parse_date("2022-09-30T08:00:34.348551203Z")
)
......
......@@ -251,7 +251,7 @@ class Lister(Generic[StateType, PageType]):
break
finally:
self.finalize()
self.set_state_in_scheduler()
self.set_state_in_scheduler(with_listing_finished_date=True)
return full_stats
......@@ -271,21 +271,26 @@ class Lister(Generic[StateType, PageType]):
)
return self.state_from_dict(copy.deepcopy(self.lister_obj.current_state))
def set_state_in_scheduler(self, force: bool = False) -> None:
def set_state_in_scheduler(
self, with_listing_finished_date: bool = False, force_state: bool = False
) -> None:
"""Update the state in the scheduler backend from the state of the current
instance.
Args:
force: Update lister state even when lister has ``updated`` attribute
with_listing_finished_date: Update the ``last_listing_finished_at`` column
value for the lister in scheduler database if set to const:`True`.
force_state: Update lister state even when lister has ``updated`` attribute
set to :const:`False`, this is useful for tests
Raises:
swh.scheduler.exc.StaleData: in case of a race condition between
concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`).
"""
if self.updated or force:
if self.updated or force_state:
self.lister_obj.current_state = self.state_to_dict(self.state)
self.lister_obj.last_listing_finished_at = utcnow()
if with_listing_finished_date:
self.lister_obj.last_listing_finished_at = utcnow()
self.lister_obj = self.scheduler.update_lister(self.lister_obj)
# State management to/from the scheduler
......
......@@ -133,6 +133,9 @@ def test_bulk_lister_not_found_origins(swh_scheduler, requests_mock, mocker):
scheduler=swh_scheduler,
per_page=PER_PAGE,
)
set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler")
stats = lister_bulk.run()
assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=0)
......@@ -162,6 +165,12 @@ def test_bulk_lister_not_found_origins(swh_scheduler, requests_mock, mocker):
)
)
# check scheduler state is updated at each not found origin
# plus at the end of the listing process to set the termination date
expected_calls = [mocker.call()] * len(SUBMITTED_ORIGINS)
expected_calls.append(mocker.call(with_listing_finished_date=True))
assert set_state_in_scheduler.mock_calls == expected_calls
def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker):
requests_mock.head(
......@@ -178,6 +187,9 @@ def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker):
scheduler=swh_scheduler,
per_page=PER_PAGE,
)
set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler")
stats = lister_bulk.run()
assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=0)
......@@ -207,6 +219,12 @@ def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker):
)
)
# check scheduler state is updated at each origin connection error
# plus at the end of the listing process to set the termination date
expected_calls = [mocker.call()] * len(SUBMITTED_ORIGINS)
expected_calls.append(mocker.call(with_listing_finished_date=True))
assert set_state_in_scheduler.mock_calls == expected_calls
def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker):
requests_mock.head(re.compile(".*"), status_code=200)
......@@ -236,6 +254,9 @@ def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker):
scheduler=swh_scheduler,
per_page=PER_PAGE,
)
set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler")
stats = lister_bulk.run()
assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=1)
......@@ -264,3 +285,10 @@ def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker):
key=attrgetter("origin_url"),
)
)
# check scheduler state is updated at each invalid origin
# plus at the end of the listing process to set the termination date
expected_calls = [mocker.call()] * (len(SUBMITTED_ORIGINS) - 1)
expected_calls.append(mocker.call(with_listing_finished_date=True))
assert set_state_in_scheduler.mock_calls == expected_calls
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment