diff --git a/swh/lister/hackage/tests/test_lister.py b/swh/lister/hackage/tests/test_lister.py index f44e9add15ba485878d084d35868f2d9c27ae09c..48224f0f5f2bdf49d3d92ce000bf108ea02f9494 100644 --- a/swh/lister/hackage/tests/test_lister.py +++ b/swh/lister/hackage/tests/test_lister.py @@ -133,7 +133,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir): lister = HackageLister(scheduler=swh_scheduler) # force lister.last_listing_date to not being 'now' lister.state.last_listing_date = iso8601.parse_date("2022-08-26T02:27:45.073759Z") - lister.set_state_in_scheduler(force=True) + lister.set_state_in_scheduler(force_state=True) assert lister.get_state_from_scheduler() == HackageListerState( last_listing_date=iso8601.parse_date("2022-08-26T02:27:45.073759Z") ) @@ -157,7 +157,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir): lister.state.last_listing_date = iso8601.parse_date( "2022-09-30T08:00:34.348551203Z" ) - lister.set_state_in_scheduler(force=True) + lister.set_state_in_scheduler(force_state=True) assert lister.get_state_from_scheduler() == HackageListerState( last_listing_date=iso8601.parse_date("2022-09-30T08:00:34.348551203Z") ) diff --git a/swh/lister/pattern.py b/swh/lister/pattern.py index 61ed8d7c1202e4a2d1a10808c1c2e3224d9da144..ec697e80bd5ee66f5e4ac7104b369f33a89bd1bb 100644 --- a/swh/lister/pattern.py +++ b/swh/lister/pattern.py @@ -251,7 +251,7 @@ class Lister(Generic[StateType, PageType]): break finally: self.finalize() - self.set_state_in_scheduler() + self.set_state_in_scheduler(with_listing_finished_date=True) return full_stats @@ -271,21 +271,26 @@ class Lister(Generic[StateType, PageType]): ) return self.state_from_dict(copy.deepcopy(self.lister_obj.current_state)) - def set_state_in_scheduler(self, force: bool = False) -> None: + def set_state_in_scheduler( + self, with_listing_finished_date: bool = False, force_state: bool = False + ) -> None: """Update the state in the scheduler backend from the state of the current instance. Args: - force: Update lister state even when lister has ``updated`` attribute + with_listing_finished_date: Update the ``last_listing_finished_at`` column + value for the lister in scheduler database if set to const:`True`. + force_state: Update lister state even when lister has ``updated`` attribute set to :const:`False`, this is useful for tests Raises: swh.scheduler.exc.StaleData: in case of a race condition between concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`). """ - if self.updated or force: + if self.updated or force_state: self.lister_obj.current_state = self.state_to_dict(self.state) - self.lister_obj.last_listing_finished_at = utcnow() + if with_listing_finished_date: + self.lister_obj.last_listing_finished_at = utcnow() self.lister_obj = self.scheduler.update_lister(self.lister_obj) # State management to/from the scheduler diff --git a/swh/lister/save_bulk/tests/test_lister.py b/swh/lister/save_bulk/tests/test_lister.py index b3365965aa9f326aa16e4485c7fb1881d6f31011..5b3a8e6dca39c063b31bb30a2b7058d6b29955f8 100644 --- a/swh/lister/save_bulk/tests/test_lister.py +++ b/swh/lister/save_bulk/tests/test_lister.py @@ -133,6 +133,9 @@ def test_bulk_lister_not_found_origins(swh_scheduler, requests_mock, mocker): scheduler=swh_scheduler, per_page=PER_PAGE, ) + + set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler") + stats = lister_bulk.run() assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=0) @@ -162,6 +165,12 @@ def test_bulk_lister_not_found_origins(swh_scheduler, requests_mock, mocker): ) ) + # check scheduler state is updated at each not found origin + # plus at the end of the listing process to set the termination date + expected_calls = [mocker.call()] * len(SUBMITTED_ORIGINS) + expected_calls.append(mocker.call(with_listing_finished_date=True)) + assert set_state_in_scheduler.mock_calls == expected_calls + def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker): requests_mock.head( @@ -178,6 +187,9 @@ def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker): scheduler=swh_scheduler, per_page=PER_PAGE, ) + + set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler") + stats = lister_bulk.run() assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=0) @@ -207,6 +219,12 @@ def test_bulk_lister_connection_errors(swh_scheduler, requests_mock, mocker): ) ) + # check scheduler state is updated at each origin connection error + # plus at the end of the listing process to set the termination date + expected_calls = [mocker.call()] * len(SUBMITTED_ORIGINS) + expected_calls.append(mocker.call(with_listing_finished_date=True)) + assert set_state_in_scheduler.mock_calls == expected_calls + def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker): requests_mock.head(re.compile(".*"), status_code=200) @@ -236,6 +254,9 @@ def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker): scheduler=swh_scheduler, per_page=PER_PAGE, ) + + set_state_in_scheduler = mocker.spy(lister_bulk, "set_state_in_scheduler") + stats = lister_bulk.run() assert stats == ListerStats(pages=len(SUBMITTED_ORIGINS) // PER_PAGE, origins=1) @@ -264,3 +285,10 @@ def test_bulk_lister_invalid_origins(swh_scheduler, requests_mock, mocker): key=attrgetter("origin_url"), ) ) + + # check scheduler state is updated at each invalid origin + # plus at the end of the listing process to set the termination date + expected_calls = [mocker.call()] * (len(SUBMITTED_ORIGINS) - 1) + expected_calls.append(mocker.call(with_listing_finished_date=True)) + + assert set_state_in_scheduler.mock_calls == expected_calls