Skip to content
Snippets Groups Projects
Verified Commit 461bf099 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

Drop no longer swh.lister.core.{indexing,page_by_page}_lister

The listers depending on it got ported to the new lister api.
parent e09ad272
No related branches found
No related tags found
No related merge requests found
# Copyright (C) 2015-2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
from datetime import datetime
from itertools import count
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
import dateutil
from requests import Response
from sqlalchemy import func
from .lister_base import ListerBase
from .lister_transports import ListerHttpTransport
logger = logging.getLogger(__name__)
class IndexingLister(ListerBase):
"""Lister* intermediate class for any service that follows the pattern:
- The service must report at least one stable unique identifier, known
herein as the UID value, for every listed repository.
- If the service splits the list of repositories into sublists, it must
report at least one stable and sorted index identifier for every listed
repository, known herein as the indexable value, which can be used as
part of the service endpoint query to request a sublist beginning from
that index. This might be the UID if the UID is monotonic.
- Client sends a request to list repositories starting from a given
index.
- Client receives structured (json/xml/etc) response with information about
a sequential series of repositories starting from that index and, if
necessary/available, some indication of the URL or index for fetching the
next series of repository data.
See :class:`swh.lister.core.lister_base.ListerBase` for more details.
This class cannot be instantiated. To create a new Lister for a source
code listing service that follows the model described above, you must
subclass this class and provide the required overrides in addition to
any unmet implementation/override requirements of this class's base.
(see parent class and member docstrings for details)
Required Overrides::
def get_next_target_from_response
"""
flush_packet_db = 20
"""Number of iterations in-between write flushes of lister repositories to
db (see fn:`run`).
"""
default_min_bound = ""
"""Default initialization value for the minimum boundary index to use when
undefined (see fn:`run`).
"""
@abc.abstractmethod
def get_next_target_from_response(
self, response: Response
) -> Union[Optional[datetime], Optional[str], Optional[int]]:
"""Find the next server endpoint identifier given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
# You probably don't need to override anything below this line.
def filter_before_inject(
self, models_list: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Overrides ListerBase.filter_before_inject
Bounds query results by this Lister's set max_index.
"""
models_list = [
m
for m in models_list
if self.is_within_bounds(m["indexable"], None, self.max_index)
]
return models_list
def db_query_range(self, start, end):
"""Look in the db for a range of repositories with indexable
values in the range [start, end]
Args:
start (model indexable type): start of desired indexable range
end (model indexable type): end of desired indexable range
Returns:
a list of sqlalchemy.ext.declarative.declarative_base objects
with indexable values within the given range
"""
retlist = self.db_session.query(self.MODEL)
if start is not None:
retlist = retlist.filter(self.MODEL.indexable >= start)
if end is not None:
retlist = retlist.filter(self.MODEL.indexable <= end)
return retlist
def db_partition_indices(
self, partition_size: int
) -> List[Tuple[Optional[int], Optional[int]]]:
"""Describe an index-space compartmentalization of the db table
in equal sized chunks. This is used to describe min&max bounds for
parallelizing fetch tasks.
Args:
partition_size (int): desired size to make each partition
Returns:
a list of tuples (begin, end) of indexable value that
declare approximately equal-sized ranges of existing
repos
"""
n = max(self.db_num_entries(), 10)
partition_size = min(partition_size, n)
n_partitions = n // partition_size
min_index = self.db_first_index()
max_index = self.db_last_index()
if min_index is None or max_index is None:
# Nothing to list
return []
if isinstance(min_index, str):
def format_bound(bound):
return bound.isoformat()
min_index = dateutil.parser.parse(min_index)
max_index = dateutil.parser.parse(max_index)
elif isinstance(max_index - min_index, int):
def format_bound(bound):
return int(bound)
else:
def format_bound(bound):
return bound
partition_width = (max_index - min_index) / n_partitions
# Generate n_partitions + 1 bounds for n_partitions partitons
bounds = [
format_bound(min_index + i * partition_width)
for i in range(n_partitions + 1)
]
# Trim duplicate bounds
bounds.append(None)
bounds = [cur for cur, next in zip(bounds[:-1], bounds[1:]) if cur != next]
# Remove bounds for lowest and highest partition
bounds[0] = bounds[-1] = None
return list(zip(bounds[:-1], bounds[1:]))
def db_first_index(self):
"""Look in the db for the smallest indexable value
Returns:
the smallest indexable value of all repos in the db
"""
t = self.db_session.query(func.min(self.MODEL.indexable)).first()
if t:
return t[0]
return None
def db_last_index(self):
"""Look in the db for the largest indexable value
Returns:
the largest indexable value of all repos in the db
"""
t = self.db_session.query(func.max(self.MODEL.indexable)).first()
if t:
return t[0]
return None
def disable_deleted_repo_tasks(self, start, end, keep_these):
"""Disable tasks for repos that no longer exist between start and end.
Args:
start: beginning of range to disable
end: end of range to disable
keep_these (uid list): do not disable repos with uids in this list
"""
if end is None:
end = self.db_last_index()
if not self.is_within_bounds(end, None, self.max_index):
end = self.max_index
deleted_repos = self.winnow_models(
self.db_query_range(start, end), self.MODEL.uid, keep_these
)
tasks_to_disable = [
repo.task_id for repo in deleted_repos if repo.task_id is not None
]
if tasks_to_disable:
self.scheduler.disable_tasks(tasks_to_disable)
for repo in deleted_repos:
repo.task_id = None
def run(self, min_bound=None, max_bound=None):
"""Main entry function. Sequentially fetches repository data
from the service according to the basic outline in the class
docstring, continually fetching sublists until either there
is no next index reference given or the given next index is greater
than the desired max_bound.
Args:
min_bound (indexable type): optional index to start from
max_bound (indexable type): optional index to stop at
Returns:
nothing
"""
status = "uneventful"
self.min_index = min_bound
self.max_index = max_bound
def ingest_indexes():
index = min_bound or self.default_min_bound
for i in count(1):
response, injected_repos = self.ingest_data(index)
if not response and not injected_repos:
logger.info("No response from api server, stopping")
return
next_index = self.get_next_target_from_response(response)
# Determine if any repos were deleted, and disable their tasks.
keep_these = list(injected_repos.keys())
self.disable_deleted_repo_tasks(index, next_index, keep_these)
# termination condition
if next_index is None or next_index == index:
logger.info("stopping after index %s, no next link found", index)
return
index = next_index
logger.debug("Index: %s", index)
yield i
for i in ingest_indexes():
if (i % self.flush_packet_db) == 0:
logger.debug("Flushing updates at index %s", i)
self.db_session.commit()
self.db_session = self.mk_session()
status = "eventful"
self.db_session.commit()
self.db_session = self.mk_session()
return {"status": status}
class IndexingHttpLister(ListerHttpTransport, IndexingLister):
"""Convenience class for ensuring right lookup and init order
when combining IndexingLister and ListerHttpTransport."""
def __init__(self, url=None, override_config=None):
IndexingLister.__init__(self, override_config=override_config)
ListerHttpTransport.__init__(self, url=url)
# Copyright (C) 2015-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
from .lister_base import ListerBase
from .lister_transports import ListerHttpTransport
class PageByPageLister(ListerBase):
"""Lister* intermediate class for any service that follows the simple
pagination page pattern.
- Client sends a request to list repositories starting from a
given page identifier.
- Client receives structured (json/xml/etc) response with
information about a sequential series of repositories (per page)
starting from a given index. And, if available, some indication
of the next page index for fetching the remaining repository
data.
See :class:`swh.lister.core.lister_base.ListerBase` for more
details.
This class cannot be instantiated. To create a new Lister for a
source code listing service that follows the model described
above, you must subclass this class. Then provide the required
overrides in addition to any unmet implementation/override
requirements of this class's base (see parent class and member
docstrings for details).
Required Overrides::
def get_next_target_from_response
"""
@abc.abstractmethod
def get_next_target_from_response(self, response):
"""Find the next server endpoint page given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use the headers links to provide the
next page.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
@abc.abstractmethod
def get_pages_information(self):
"""Find the total number of pages.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use dedicated headers:
- x-total-pages to provide the total number of pages
- x-total to provide the total number of repositories
- x-per-page to provide the number of elements per page
Returns:
tuple (total number of repositories, total number of
pages, per_page)
"""
pass
# You probably don't need to override anything below this line.
def do_additional_checks(self, models_list):
"""Potentially check for existence of repositories in models_list.
This will be called only if check_existence is flipped on in
the run method below.
"""
for m in models_list:
sql_repo = self.db_query_equal("uid", m["uid"])
if sql_repo:
return False
return models_list
def run(self, min_bound=None, max_bound=None, check_existence=False):
"""Main entry function. Sequentially fetches repository data from the
service according to the basic outline in the class
docstring. Continually fetching sublists until either there
is no next page reference given or the given next page is
greater than the desired max_page.
Args:
min_bound: optional page to start from
max_bound: optional page to stop at
check_existence (bool): optional existence check (for
incremental lister whose sort
order is inverted)
Returns:
nothing
"""
status = "uneventful"
page = min_bound or 0
loop_count = 0
self.min_page = min_bound
self.max_page = max_bound
while self.is_within_bounds(page, self.min_page, self.max_page):
logging.info("listing repos starting at %s" % page)
response, injected_repos = self.ingest_data(page, checks=check_existence)
if not response and not injected_repos:
logging.info("No response from api server, stopping")
break
elif not injected_repos:
logging.info("Repositories already seen, stopping")
break
status = "eventful"
next_page = self.get_next_target_from_response(response)
# termination condition
if (next_page is None) or (next_page == page):
logging.info("stopping after page %s, no next link found" % page)
break
else:
page = next_page
loop_count += 1
if loop_count == 20:
logging.info("flushing updates")
loop_count = 0
self.db_session.commit()
self.db_session = self.mk_session()
self.db_session.commit()
self.db_session = self.mk_session()
return {"status": status}
class PageByPageHttpLister(ListerHttpTransport, PageByPageLister):
"""Convenience class for ensuring right lookup and init order when
combining PageByPageLister and ListerHttpTransport.
"""
def __init__(self, url=None, override_config=None):
PageByPageLister.__init__(self, override_config=override_config)
ListerHttpTransport.__init__(self, url=url)
# Copyright (C) 2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.lister.core.indexing_lister import IndexingLister
class MockedIndexingListerDbPartitionIndices(IndexingLister):
# Abstract Attribute boilerplate
LISTER_NAME = "DbPartitionIndices"
MODEL = type(None)
# ABC boilerplate
def get_next_target_from_response(self, *args, **kwargs):
pass
def __init__(self, num_entries, first_index, last_index):
self.num_entries = num_entries
self.first_index = first_index
self.last_index = last_index
def db_num_entries(self):
return self.num_entries
def db_first_index(self):
return self.first_index
def db_last_index(self):
return self.last_index
def test_db_partition_indices():
m = MockedIndexingListerDbPartitionIndices(
num_entries=1000, first_index=1, last_index=10001,
)
assert m
partitions = m.db_partition_indices(100)
# 1000 entries with indices 1 - 10001, partitions of 100 entries
assert len(partitions) == 10
assert partitions[0] == (None, 1001)
assert partitions[-1] == (9001, None)
def test_db_partition_indices_zero_first():
m = MockedIndexingListerDbPartitionIndices(
num_entries=1000, first_index=0, last_index=10000,
)
assert m
partitions = m.db_partition_indices(100)
# 1000 entries with indices 0 - 10000, partitions of 100 entries
assert len(partitions) == 10
assert partitions[0] == (None, 1000)
assert partitions[-1] == (9000, None)
def test_db_partition_indices_small_index_range():
m = MockedIndexingListerDbPartitionIndices(
num_entries=5000, first_index=0, last_index=5,
)
assert m
partitions = m.db_partition_indices(100)
assert partitions == [(None, 1), (1, 2), (2, 3), (3, 4), (4, None)]
def test_db_partition_indices_date_indices():
# 24 hour delta
first = datetime.datetime.fromisoformat("2019-11-01T00:00:00+00:00")
last = datetime.datetime.fromisoformat("2019-11-02T00:00:00+00:00")
m = MockedIndexingListerDbPartitionIndices(
# one entry per second
num_entries=24 * 3600,
first_index=first,
last_index=last,
)
assert m
# 3600 entries per partition => 1 partition per hour
partitions = m.db_partition_indices(3600)
assert len(partitions) == 24
expected_bounds = [first + datetime.timedelta(hours=i) for i in range(25)]
expected_bounds[0] = expected_bounds[-1] = None
assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:]))
def test_db_partition_indices_float_index_range():
m = MockedIndexingListerDbPartitionIndices(
num_entries=10000, first_index=0.0, last_index=1.0,
)
assert m
partitions = m.db_partition_indices(1000)
assert len(partitions) == 10
expected_bounds = [0.1 * i for i in range(11)]
expected_bounds[0] = expected_bounds[-1] = None
assert partitions == list(zip(expected_bounds[:-1], expected_bounds[1:]))
def test_db_partition_indices_uneven_int_index_range():
m = MockedIndexingListerDbPartitionIndices(
num_entries=5641, first_index=0, last_index=10000,
)
assert m
partitions = m.db_partition_indices(500)
assert len(partitions) == 5641 // 500
for i, (start, end) in enumerate(partitions):
assert isinstance(start, int) or (i == 0 and start is None)
assert isinstance(end, int) or (i == 10 and end is None)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment