diff --git a/swh/web/client/client.py b/swh/web/client/client.py index f59c2ff1fffd9cfdc962dccd3297cd33126c53de..9833e90bdd34688e09d2565f753347013f41e2ec 100644 --- a/swh/web/client/client.py +++ b/swh/web/client/client.py @@ -916,6 +916,137 @@ class WebAPIClient: with self._call(q, http_method="get") as r: return r.text + def _get_one_leaf( + self, + node: str, + return_types: str, + direction="forward", + edges="*", + resolve_origins=True, + ) -> Optional[str]: + """internal function used by get_provenance_info""" + query = ( + f"graph/leaves/{node}/?direction={direction}" + f"&edges={edges}" + f"&return_types={return_types}" + f"&max_matching_nodes=1" + ) + if resolve_origins: + query += "&resolve_origins=true" + try: + with self._call(query, http_method="get") as r: + value = r.text.rstrip("\n") + except requests.HTTPError as fail: + # the graph raise 404 for unknown node so we have catch 404 for now + # https://gitlab.softwareheritage.org/swh/devel/swh-graph/-/issues/4763 + if fail.response.status_code not in (400, 404): + raise + return None + if not value: # empty result + return None + return value + + def get_provenance_info(self, swhid: CoreSWHID) -> Dict[str, Dict[str, Any]]: + """find a revision, release and origin containing this revision + + Revision and Release might not be found, we prioritize finding a + Release over finding a Revision when possible. + + note: The quality of the result is not guaranteed whatsoever. Since the + definition of "best" likely vary from one usage to the next, this API + will evolve in the futur when this notion get better defined. + + For example, if we are looking for provenance information to detect + prior art. We search for the first appearance of a content. So the + "best answer" is the oldest content, something a bit tricky to + determine as we can't fully trust the date of revision. On the other + hand, if we try to known which library are used and at which version, + to detect CVE or outdated dependencies, the best answer is the most + recent release/revision in the authoritative origin relevant to a + content. Finding the authoritative origin is a challenge in itself. + + This method exist for the swh-scanner and is likely to change + significantly and/or be replaced, we do not recommend using it. + + Args: + swhid: the SWHID of the Content or Directory to find info for + + Returns: + {"revision": rev, "release": rev, "origin": ori) + + rev: information about the revision, unset if none found + rel: information unset if none found + ori: information about the origin, unset if none found + + For unknown content, an empty dict will be returned. + + Raises: + requests.HTTPError: if HTTP request fails + """ + if swhid.object_type not in (ObjectType.DIRECTORY, ObjectType.CONTENT): + msg = "swhid should be %r or %r as parameter, not: %r" + msg %= (ObjectType.DIRECTORY, ObjectType.CONTENT, swhid.object_type) + raise ValueError(msg) + + content_or_dir = str(swhid) + + # XXX: If we have a content, the provenance API could search for a rev + # or rel more efficiently. However it does not work for Directory and + # only cover some of the node, so we need the call the graph anyway. + + # XXX: The graph can also lag behind the archive so it is possible that + # we identify a known content without being able to find an origin. + + # Try to find a release first + top_id = release = self._get_one_leaf( + node=content_or_dir, + direction="backward", + edges="dir:dir,cnt:dir,dir:rev,rev:rel,dir:rel,cnt:rel", + return_types="rel", + ) + if release is not None: + revision = self._get_one_leaf( + node=release, + edges="rel:rev", + return_types="rev", + ) + else: + # We did not find a release, + # directly search for a revision instead. + top_id = revision = self._get_one_leaf( + node=content_or_dir, + direction="backward", + edges="dir:dir,cnt:dir,dir:rev", + return_types="rev", + ) + + if top_id is None: + # could not find anything, give up + return {} + + # now search the associated origin + origin = self._get_one_leaf( + node=top_id, + direction="backward", + edges="*:snp,*:ori", + return_types="ori", + ) + + info: Dict[str, Dict[str, Any]] = {} + if revision is not None: + rev_info = self.get(revision, typify=False) + rev_info["swhid"] = CoreSWHID.from_string(revision) + info["revision"] = rev_info + if release is not None: + rel_info = self.get(release, typify=False) + rel_info["swhid"] = CoreSWHID.from_string(release) + info["release"] = rel_info + if origin is not None: + info["origin"] = { + "url": origin, + } + return info + def cooking_request( self, bundle_type: str, swhid: SWHIDish, email: Optional[str] = None, **req_args ) -> Dict[str, Any]: