Skip to content
Snippets Groups Projects
Commit a2e9cf16 authored by Antoine Cezar's avatar Antoine Cezar
Browse files

add swh-hg-identify a cli to identify hg objects

parent bd914dec
No related branches found
No related tags found
No related merge requests found
......@@ -8,3 +8,4 @@ python-dateutil
python-hglib
retrying
sqlitedict
mercurial
......@@ -54,6 +54,8 @@ setup(
entry_points="""
[swh.workers]
loader.mercurial=swh.loader.mercurial:register
[console_scripts]
swh-hg-identify=swh.loader.mercurial.identify:main
""",
classifiers=[
"Programming Language :: Python :: 3",
......
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import re
import subprocess
from codecs import escape_decode # type: ignore
from pathlib import Path
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.model.cli import identify_object
from swh.model.hashutil import hash_to_bytehex
from swh.model.identifiers import normalize_timestamp, swhid
from swh.model.model import RevisionType
TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)")
class HgAuthor(NamedTuple):
"""Represent a Mercurial revision author."""
fullname: bytes
"""full name of the author"""
name: Optional[bytes]
"""name of the author"""
email: Optional[bytes]
"""email of the author"""
@staticmethod
def from_bytes(data: bytes) -> "HgAuthor":
"""Convert bytes to an HgAuthor named tuple.
Expected format: "name <email>"
"""
from swh.loader.mercurial.converters import parse_author
result = parse_author(data)
return HgAuthor(
fullname=result["fullname"], name=result["name"], email=result["email"]
)
def to_dict(self) -> Dict[str, Optional[bytes]]:
return {"fullname": self.fullname, "name": self.name, "email": self.email}
HG_REVISION_TEMPLATE = "\n".join(
[
"node_id:{node}",
"author:{author}",
"timestamp_offset:{date|json}",
"p1:{p1.node}",
"p2:{p2.node}",
"extras:{join(extras, '\nextras:')}",
]
) # Log template for HgRevision.from_bytes
NULL_NODE_ID = b"0" * 40 # Value used when no parent
class HgRevision(NamedTuple):
"""Represent a Mercurial revision."""
node_id: bytes
"""raw bytes of the revision hash"""
author: HgAuthor
"""author of the revision"""
timestamp: bytes
"""timestamp of the revision"""
offset: bytes
"""offset of the revision"""
parents: List[bytes]
"""hex bytes of the revision's parents"""
extras: Dict[bytes, bytes]
"""metadata of the revision"""
description: bytes
"""description of the revision"""
@staticmethod
def from_bytes(data: bytes, description: bytes) -> "HgRevision":
"""Convert bytes to an HgRevision named tuple.
Expected data format:
'''
node_id:{node}
author:{author}
timestamp_offset:[{timestamp}, {offset}]
p1:{p1}
p2:{p2}
extras:{key1}={value1}
...
extras:{keyn}={value}
'''
"""
lines = data.split(b"\n")
tuples = [line.split(b":", 1) for line in lines]
fields: Dict[str, Any] = {
"parents": [],
"extras": {},
"description": description,
}
for key, value in tuples:
if key == b"timestamp_offset":
timestamp, offset = json.loads(value)
fields["timestamp"] = timestamp
fields["offset"] = offset
elif key in (b"p1", b"p2"):
if value != NULL_NODE_ID:
fields["parents"].append(value)
elif key == b"extras":
extra_key, extra_value = value.split(b"=", 1)
fields["extras"][extra_key] = extra_value
elif key == b"author":
fields["author"] = HgAuthor.from_bytes(value)
else:
fields[key.decode()] = value
return HgRevision(**fields)
def branch(self) -> bytes:
return self.extras.get(b"branch", b"default")
def to_dict(self) -> Dict:
"""Convert a HgRevision to a dict for SWHID computation"""
date = normalize_timestamp(int(self.timestamp))
extra_headers = [
(b"time_offset_seconds", str(self.offset).encode("utf-8")),
]
for key, value in self.extras.items():
if key == b"branch" and value == b"default":
# branch default is skipped to match historical implementation
continue
if key == b"transplant_source":
# transplant_source is converted to hex
# to match historical implementation
value = hash_to_bytehex(escape_decode(value)[0])
extra_headers.append((key, value))
author = self.author.to_dict()
return {
"author": author,
"date": date,
"committer": author,
"committer_date": date,
"type": RevisionType.MERCURIAL.value,
"message": self.description,
"metadata": {"node": self.node_id},
"extra_headers": tuple(extra_headers),
"synthetic": False,
"parents": self.parents,
}
class HgBranch(NamedTuple):
"""Represent a Mercurial branch."""
name: bytes
"""name of the branch"""
node_id: bytes
"""row bytes of the target revision hash"""
class HgTag(NamedTuple):
"""Represent a Mercurial tag."""
name: bytes
"""name of the tag"""
node_id: bytes
"""hex bytes of the target revision"""
class Hg:
"""Provide methods to extract data from a Mercurial repository."""
def __init__(self, repository_root: Path) -> None:
self._root = repository_root
def _output(self, *args) -> bytes:
"""Return the outpout of a `hg` call."""
return subprocess.check_output(["hg", *args], cwd=self._root)
def _call(self, *args) -> None:
"""Perform a `hg` call."""
subprocess.check_call(
["hg", *args],
cwd=self._root,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def root(self) -> Path:
"""Return the root of the Mercurial repository."""
return self._root
def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]:
"""Return the specified revisions of the Mercurial repository.
Mercurial revsets are supported. (See `hg help revsets`)
If no revision range is specified, return all revisions".
"""
if rev:
node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines()
else:
node_ids = self._output("log", "-T", "{node}\n").splitlines()
revisions = [self._revision(node_id) for node_id in reversed(node_ids)]
return revisions
def _revision(self, revision: bytes) -> HgRevision:
data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE)
# hg log strips the description so the raw description has to be taken
# from debugdata
# The description follows some metadata and is separated from them
# by an empty line
_, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1)
return HgRevision.from_bytes(data, desc)
def up(self, rev: bytes) -> None:
"""Update the repository working directory to the specified revision."""
self._call("up", rev)
def branches(self) -> List[HgBranch]:
"""List the repository named branches."""
output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip()
branches = []
for block in output.split(b"\n\n"):
name, node_id = block.splitlines()
branches.append(HgBranch(name=name, node_id=node_id))
return branches
def tip(self) -> HgRevision:
"""Return the `tip` node-id."""
return self.log("tip")[0]
def tags(self) -> List[HgTag]:
"""Return the repository's tags as defined in the `.hgtags` file.
`.hgtags` being like any other repository's tracked file, its content can vary
from revision to revision. The returned value therefore depends on the current
revision of the repository.
"""
hgtags = self._root / ".hgtags"
tags = {}
if hgtags.is_file():
for line in hgtags.read_bytes().splitlines():
match = TAG_PATTERN.match(line)
if match is None:
continue
node_id, name = match.groups()
tags[node_id] = name
return [HgTag(name=name, node_id=node_id) for node_id, name in tags.items()]
@click.group()
@click.option(
"--directory",
"-d",
help=("Path to the Mercurial repository. If unset, the current directory is used"),
)
@click.pass_context
def main(ctx, directory=None):
"""Compute the Software Heritage persistent identifier (SWHID) for the given
source code object(s).
For more details about SWHIDs see:
https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html
"""
# ensure that ctx.obj exists and is a dict (in case `cli()` is called
# by means other than the `if` block below)
ctx.ensure_object(dict)
root = Path(directory) if directory else Path()
if not root.exists():
raise IOError(f"{root!r} does not exists")
ctx.obj["HG_ROOT"] = root
def identify_directory(path: Path) -> str:
"""Return the SWHID of the given path."""
uri = identify_object(
"directory", follow_symlinks=True, exclude_patterns=[".hg"], obj=str(path)
)[1]
return uri.split(":")[-1]
class RevisionIdentity(NamedTuple):
"""Represent a swh revision identity."""
swhid: bytes
"""SWHID raw bytes"""
node_id: bytes
"""node_id hex bytes"""
directory_swhid: bytes
def dir_uri(self) -> str:
"""Return the SWHID uri of the revision's directory."""
return f"swh:1:dir:{self.directory_swhid.hex()}\t{self.node_id.decode()}"
def __str__(self) -> str:
"""Return the string representation of a RevisionIdentity."""
uri = swhid("revision", self.swhid.hex())
return f"{uri}\t{self.node_id.decode()}"
def identify_revision(
hg: Hg,
rev: Optional[bytes] = None,
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
) -> Iterator[RevisionIdentity]:
"""Return the repository revision identities.
hg: A `Hg` repository instance
rev: An optional revision or Mercurial revsets (See `hg help revsets`)
If not provided all the repository revisions will be computed.
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
It will be updated in place with new mappings.
"""
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Revision
if node_id_2_swhid is None:
node_id_2_swhid = {}
for revision in hg.log(rev):
data = revision.to_dict()
hg.up(revision.node_id)
directory_swhid = hash_to_bytes(identify_directory(hg.root()))
data["directory"] = directory_swhid
parents = []
for parent in data["parents"]:
if parent not in node_id_2_swhid:
parent_revision = next(identify_revision(hg, parent, node_id_2_swhid))
node_id_2_swhid[parent] = parent_revision.swhid
parents.append(node_id_2_swhid[parent])
data["parents"] = parents
revision_swhid = hash_to_bytes(Revision.from_dict(data).id)
node_id_2_swhid[revision.node_id] = revision_swhid
yield RevisionIdentity(
swhid=revision_swhid,
node_id=revision.node_id,
directory_swhid=directory_swhid,
)
class ReleaseIdentity(NamedTuple):
"""Represent a swh release identity."""
swhid: str
"""SWHID hex string"""
node_id: bytes
"""node_id hex bytes"""
name: bytes
"""name of the release"""
def __str__(self) -> str:
"""Return the string representation of a ReleaseIdentity."""
uri = swhid("release", self.swhid)
return f"{uri}\t{self.name.decode()}"
def identify_release(
hg: Hg, node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
) -> Iterator[ReleaseIdentity]:
"""Return the repository's release identities.
hg: A `Hg` repository instance
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
If not provided it will be computed using `identify_revision`.
"""
from swh.model.model import ObjectType, Release
if node_id_2_swhid is None:
node_id_2_swhid = {
revision.node_id: revision.swhid for revision in identify_revision(hg)
}
for tag in hg.tags():
data = {
"name": tag.name,
"target": node_id_2_swhid[tag.node_id],
"target_type": ObjectType.REVISION.value,
"message": None,
"metadata": None,
"synthetic": False,
"author": {"name": None, "email": None, "fullname": b""},
"date": None,
}
release_swhid = Release.from_dict(data).id
yield ReleaseIdentity(
swhid=release_swhid, node_id=tag.node_id, name=tag.name,
)
def identify_snapshot(
hg: Hg,
node_id_2_swhid: Optional[Dict[bytes, bytes]] = None,
releases: Optional[List[ReleaseIdentity]] = None,
) -> str:
"""Return the repository snapshot identity.
hg: A `Hg` repository instance
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
If not provided it will be computed using `identify_revision`.
release: an optional list of `ReleaseIdentity`.
If not provided it will be computed using `identify_release`.
"""
from swh.model.model import Snapshot, TargetType
if node_id_2_swhid is None:
node_id_2_swhid = {
revision.node_id: revision.swhid for revision in identify_revision(hg)
}
if releases is None:
releases = [release for release in identify_release(hg, node_id_2_swhid)]
branches = {}
tip = hg.tip()
branches[b"HEAD"] = {
"target": tip.branch(),
"target_type": TargetType.ALIAS.value,
}
for branch in hg.branches():
branches[branch.name] = {
"target": node_id_2_swhid[branch.node_id],
"target_type": TargetType.REVISION.value,
}
for release in releases:
branches[release.name] = {
"target": release.swhid,
"target_type": TargetType.RELEASE.value,
}
return Snapshot.from_dict({"branches": branches}).id
@main.command()
@click.argument("rev", required=False)
@click.pass_context
def revision(ctx, rev):
"""Compute the SWHID of a given revision.
If specified REV allow to select a single or multiple revisions
(using the Mercurial revsets language: `hg help revsets`)
"""
hg = Hg(ctx.obj["HG_ROOT"])
for identity in identify_revision(hg, rev):
click.echo(identity)
@main.command()
@click.pass_context
def snapshot(ctx):
"""Compute the SWHID of the snapshot."""
root = ctx.obj["HG_ROOT"]
hg = Hg(root)
snapshot_swhid = identify_snapshot(hg)
uri = swhid("snapshot", snapshot_swhid)
click.echo(f"{uri}\t{root}")
@main.command()
@click.pass_context
def all(ctx):
"""Compute the SWHID of all the repository objects."""
root = ctx.obj["HG_ROOT"]
hg = Hg(root)
dir_uris = []
rev_uris = []
rel_uris = []
node_id_2_swhid = {}
for revision in identify_revision(hg):
dir_uris.append(revision.dir_uri())
rev_uris.append(str(revision))
node_id_2_swhid[revision.node_id] = revision.swhid
releases = []
for release in identify_release(hg, node_id_2_swhid):
rel_uris.append(str(release))
releases.append(release)
snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases)
for uri in dir_uris + rev_uris + rel_uris:
click.echo(uri)
uri = swhid("snapshot", snapshot_swhid)
click.echo(f"{uri}\t{root}")
if __name__ == "__main__":
main()
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from textwrap import dedent
from urllib.parse import urlsplit
from click.testing import CliRunner
from swh.loader.mercurial.identify import main
from swh.loader.tests import prepare_repository_from_archive
def test_all_revisions(datadir: str, tmp_path: str):
archive_name = "hello"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
directory = urlsplit(repo_url).path
runner = CliRunner()
result = runner.invoke(main, ["-d", directory, "revision"])
expected = dedent(
"""
swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9
swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2\t82e55d328c8ca4ee16520036c0aaace03a5beb65
swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27\tb985ae4a07e12ac662f45a171e2d42b13be5b50c
"""
).lstrip()
assert result.output == expected
def test_single_revision(datadir: str, tmp_path: str):
archive_name = "hello"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
directory = urlsplit(repo_url).path
runner = CliRunner()
result = runner.invoke(
main, ["-d", directory, "revision", "0a04b987be5ae354b710cefeba0e2d9de7ad41a9"]
)
expected = (
"swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940"
"\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9\n"
)
assert result.output == expected
def test_all(datadir: str, tmp_path: str):
archive_name = "hello"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path)
directory = urlsplit(repo_url).path
runner = CliRunner()
result = runner.invoke(main, ["-d", directory, "all"])
expected = dedent(
f"""
swh:1:dir:43d727f2f3f2f7cb3b098ddad1d7038464a4cee2\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9
swh:1:dir:b3f85f210ff86d334575f64cb01c5bf49895b63e\t82e55d328c8ca4ee16520036c0aaace03a5beb65
swh:1:dir:8f2be433c945384c85920a8e60f2a68d2c0f20fb\tb985ae4a07e12ac662f45a171e2d42b13be5b50c
swh:1:rev:93b48d515580522a05f389bec93227fc8e43d940\t0a04b987be5ae354b710cefeba0e2d9de7ad41a9
swh:1:rev:8dd3db5d5519e4947f035d141581d304565372d2\t82e55d328c8ca4ee16520036c0aaace03a5beb65
swh:1:rev:c3dbe4fbeaaa98dd961834e4007edb3efb0e2a27\tb985ae4a07e12ac662f45a171e2d42b13be5b50c
swh:1:rel:515c4d72e089404356d0f4b39d60f948b8999140\t0.1
swh:1:snp:d35668e02e2ba4321dc951cd308cf883786f918a\t{directory}
"""
).lstrip()
assert result.output == expected
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment