Skip to content
Snippets Groups Projects
Verified Commit 80317628 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

swh.loader.core.loader: Add stateless loader base class

Related T907
parent da1ed5c4
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@
import datetime
import logging
import os
import psycopg2
import requests
import traceback
......@@ -126,6 +127,9 @@ class SWHLoader(config.SWHConfig, metaclass=ABCMeta):
'send_releases': ('bool', True),
'send_occurrences': ('bool', True),
'save_data': ('bool', False),
'save_data_path': ('str', ''),
# Number of contents
'content_packet_size': ('int', 10000),
# packet of 100Mib contents
......@@ -188,6 +192,36 @@ class SWHLoader(config.SWHConfig, metaclass=ABCMeta):
'occurrences': 0,
}
# Make sure the config is sane
save_data = self.config.get('save_data')
if save_data:
path = self.config['save_data_path']
os.stat(path)
if not os.access(path, os.R_OK | os.W_OK):
raise PermissionError("Permission denied: %r" % path)
def save_data(self):
"""Save the data associated to the current load"""
raise NotImplementedError
def get_save_data_path(self):
"""The path to which we save the data"""
if not hasattr(self, '__save_data_path'):
origin_id = self.origin_id
year = str(self.visit_date.year)
path = os.path.join(
self.config['save_data_path'],
"%04d" % (origin_id % 10000),
"%08d" % origin_id,
year,
)
os.makedirs(path, exist_ok=True)
self.__save_data_path = path
return self.__save_data_path
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_origin(self, origin):
log_id = str(uuid.uuid4())
......@@ -880,3 +914,92 @@ class SWHLoader(config.SWHConfig, metaclass=ABCMeta):
self.cleanup()
return self.load_status()
class SWHStatelessLoader(SWHLoader):
"""This base class is a pattern for stateless loaders.
Stateless loaders are able to load all the data in one go. For
example, the loader defined in swh-loader-git
:class:`BulkUpdater`.
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`),
inherit directly from :class:`SWHLoader`.
"""
ADDITIONAL_CONFIG = {}
def __init__(self, logging_class=None, config=None):
super().__init__(logging_class=logging_class, config=None)
self.visit_date = None # possibly overridden in self.prepare method
def cleanup(self):
"""Clean up an eventual state installed for computations."""
pass
def has_contents(self):
"""Checks whether we need to load contents"""
return True
def get_contents(self):
"""Get the contents that need to be loaded"""
raise NotImplementedError
def has_directories(self):
"""Checks whether we need to load directories"""
return True
def get_directories(self):
"""Get the directories that need to be loaded"""
raise NotImplementedError
def has_revisions(self):
"""Checks whether we need to load revisions"""
return True
def get_revisions(self):
"""Get the revisions that need to be loaded"""
raise NotImplementedError
def has_releases(self):
"""Checks whether we need to load releases"""
return True
def get_releases(self):
"""Get the releases that need to be loaded"""
raise NotImplementedError
def has_occurrences(self):
"""Checks whether we need to load occurrences"""
return True
def get_occurrences(self):
"""Get the occurrences that need to be loaded"""
raise NotImplementedError
def get_fetch_history_result(self):
"""Return the data to store in fetch_history for the current loader"""
raise NotImplementedError
def eventful(self):
"""Whether the load was eventful"""
raise NotImplementedError
def save_data(self):
"""Save the data associated to the current load"""
raise NotImplementedError
def store_data(self):
if self.config['save_data']:
self.save_data()
if self.config['send_contents'] and self.has_contents():
self.send_batch_contents(self.get_contents())
if self.config['send_directories'] and self.has_directories():
self.send_batch_directories(self.get_directories())
if self.config['send_revisions'] and self.has_revisions():
self.send_batch_revisions(self.get_revisions())
if self.config['send_releases'] and self.has_releases():
self.send_batch_releases(self.get_releases())
if self.config['send_occurrences'] and self.has_occurrences():
self.send_batch_occurrences(self.get_occurrences())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment