Skip to content
Snippets Groups Projects

package.debian: Re-implement debian loader within the package loader mechanism

1 file
+ 94
94
Compare changes
  • Side-by-side
  • Inline
+ 94
94
@@ -22,6 +22,100 @@ logger = logging.getLogger(__name__)
UPLOADERS_SPLIT = re.compile(r'(?<=\>)\s*,\s*')
class DebianLoader(PackageLoader):
"""Load debian origins into swh archive.
"""
visit_type = 'debian'
def __init__(self, url: str, date: str, packages: Mapping[str, Dict]):
super().__init__(url=url)
self._info = None
self.packages = packages
def get_versions(self) -> Sequence[str]:
"""Returns the keys of the packages input (e.g.
stretch/contrib/0.7.2-3, etc...)
"""
return self.packages.keys()
def get_default_release(self) -> str:
"""Take the first version as default release
"""
return list(self.packages.keys())[0]
def get_artifacts(self, version: str) -> Generator[
Tuple[Mapping[str, Any], Dict], None, None]:
a_metadata = self.packages[version]
artifacts_package_info = a_metadata.copy()
artifacts_package_info['filename'] = version
yield artifacts_package_info, a_metadata
def resolve_revision_from(
self, known_artifacts: Dict, artifact_metadata: Dict) \
-> Optional[bytes]:
pass # for now
def download_package(self, a_p_info: str, tmpdir: str) -> Tuple[str, Dict]:
"""Contrary to other package loaders (1 package, 1 artifact),
`a_metadata` represents the package's datafiles set to fetch:
- <package-version>.orig.tar.gz
- <package-version>.dsc
- <package-version>.diff.gz
This is delegated to the `download_package` function.
"""
logger.debug('debian: artifactS_package_info: %s', a_p_info)
return tmpdir, download_package(a_p_info, tmpdir)
def uncompress(self, a_path: str, tmpdir: str, a_metadata: Dict) -> str:
return extract_package(a_metadata, tmpdir)
def read_intrinsic_metadata(self, a_metadata: Dict,
a_uncompressed_path: str) -> Dict:
_, dsc_name = dsc_information(a_metadata)
dsc_path = path.join(path.dirname(a_uncompressed_path), dsc_name)
return get_package_metadata(
a_metadata, dsc_path, a_uncompressed_path)
def build_revision(
self, a_metadata: Dict, i_metadata: Dict) -> Dict:
dsc_url, _ = dsc_information(a_metadata)
logger.debug('i_metadata: %s', i_metadata)
logger.debug('a_metadata: %s', a_metadata)
msg = 'Synthetic revision for Debian source package %s version %s' % (
a_metadata['name'], a_metadata['version'])
date = i_metadata['changelog']['date']
author = prepare_person(i_metadata['changelog']['person'])
# inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa
return {
'type': 'dsc',
'message': msg.encode('utf-8'),
'author': author,
'date': date,
'committer': author,
'committer_date': date,
'parents': [],
'metadata': {
'intrinsic': {
'tool': 'dsc',
'raw': i_metadata,
},
'extrinsic': {
'provider': dsc_url,
'when': self.visit_date.isoformat(),
'raw': a_metadata,
},
}
}
def uid_to_person(uid: str) -> Mapping[str, str]:
"""Convert an uid to a person suitable for insertion.
@@ -221,97 +315,3 @@ def get_package_metadata(package: Mapping[str, Any], dsc_path: str,
package_info['maintainers'] = maintainers
return package_info
class DebianLoader(PackageLoader):
"""Load debian origins into swh archive.
"""
visit_type = 'debian'
def __init__(self, url: str, date: str, packages: Mapping[str, Dict]):
super().__init__(url=url)
self._info = None
self.packages = packages
def get_versions(self) -> Sequence[str]:
"""Returns the keys of the packages input (e.g.
stretch/contrib/0.7.2-3, etc...)
"""
return self.packages.keys()
def get_default_release(self) -> str:
"""Take the first version as default release
"""
return list(self.packages.keys())[0]
def get_artifacts(self, version: str) -> Generator[
Tuple[Mapping[str, Any], Dict], None, None]:
a_metadata = self.packages[version]
artifacts_package_info = a_metadata.copy()
artifacts_package_info['filename'] = version
yield artifacts_package_info, a_metadata
def resolve_revision_from(
self, known_artifacts: Dict, artifact_metadata: Dict) \
-> Optional[bytes]:
pass # for now
def download_package(self, a_p_info: str, tmpdir: str) -> Tuple[str, Dict]:
"""Contrary to other package loaders (1 package, 1 artifact),
`a_metadata` represents the package's datafiles set to fetch:
- <package-version>.orig.tar.gz
- <package-version>.dsc
- <package-version>.diff.gz
This is delegated to the `download_package` function.
"""
logger.debug('debian: artifactS_package_info: %s', a_p_info)
return tmpdir, download_package(a_p_info, tmpdir)
def uncompress(self, a_path: str, tmpdir: str, a_metadata: Dict) -> str:
return extract_package(a_metadata, tmpdir)
def read_intrinsic_metadata(self, a_metadata: Dict,
a_uncompressed_path: str) -> Dict:
_, dsc_name = dsc_information(a_metadata)
dsc_path = path.join(path.dirname(a_uncompressed_path), dsc_name)
return get_package_metadata(
a_metadata, dsc_path, a_uncompressed_path)
def build_revision(
self, a_metadata: Dict, i_metadata: Dict) -> Dict:
dsc_url, _ = dsc_information(a_metadata)
logger.debug('i_metadata: %s', i_metadata)
logger.debug('a_metadata: %s', a_metadata)
msg = 'Synthetic revision for Debian source package %s version %s' % (
a_metadata['name'], a_metadata['version'])
date = i_metadata['changelog']['date']
author = prepare_person(i_metadata['changelog']['person'])
# inspired from swh.loader.debian.converters.package_metadata_to_revision # noqa
return {
'type': 'dsc',
'message': msg.encode('utf-8'),
'author': author,
'date': date,
'committer': author,
'committer_date': date,
'parents': [],
'metadata': {
'intrinsic': {
'tool': 'dsc',
'raw': i_metadata,
},
'extrinsic': {
'provider': dsc_url,
'when': self.visit_date.isoformat(),
'raw': a_metadata,
},
}
}
Loading