Skip to content
Snippets Groups Projects
Commit 9657658b authored by Antoine Pietri's avatar Antoine Pietri
Browse files

Merge branch 'lowlevel_api'

parents 8c1c7cd5 c94deee9
No related branches found
Tags v0.1.0
No related merge requests found
package org.softwareheritage.graph;
import java.util.ArrayList;
import java.util.Map;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import py4j.GatewayServer;
import org.softwareheritage.graph.algo.NodeIdConsumer;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.algo.Stats;
import org.softwareheritage.graph.algo.NodeIdsConsumer;
import org.softwareheritage.graph.algo.Traversal;
public class Entry {
Graph graph;
private Graph graph;
final long PATH_SEPARATOR_ID = -1;
private final long PATH_SEPARATOR_ID = -1;
public void load_graph(String graphBasename) throws IOException {
System.err.println("Loading graph " + graphBasename + " ...");
......@@ -27,18 +23,46 @@ public class Entry {
System.err.println("Graph loaded.");
}
public Graph get_graph() {
return graph.copy();
}
public String stats() {
try {
Stats stats = new Stats(graph.getPath());
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
String res = objectMapper.writeValueAsString(stats);
return res;
return objectMapper.writeValueAsString(stats);
} catch (IOException e) {
throw new RuntimeException("Cannot read stats: " + e);
}
}
private interface NodeCountVisitor {
void accept(long nodeId, NodeIdConsumer consumer);
}
private int count_visitor(NodeCountVisitor f, long srcNodeId) {
int count[] = { 0 };
f.accept(srcNodeId, (node) -> { count[0]++; });
return count[0];
}
public int count_leaves(String direction, String edgesFmt, long srcNodeId) {
Traversal t = new Traversal(this.graph, direction, edgesFmt);
return count_visitor(t::leavesVisitor, srcNodeId);
}
public int count_neighbors(String direction, String edgesFmt, long srcNodeId) {
Traversal t = new Traversal(this.graph, direction, edgesFmt);
return count_visitor(t::neighborsVisitor, srcNodeId);
}
public int count_visit_nodes(String direction, String edgesFmt, long srcNodeId) {
Traversal t = new Traversal(this.graph, direction, edgesFmt);
return count_visitor(t::visitNodesVisitor, srcNodeId);
}
public QueryHandler get_handler(String clientFIFO) {
return new QueryHandler(this.graph.copy(), clientFIFO);
}
......
......@@ -5,10 +5,11 @@
import asyncio
import contextlib
import json
import io
import os
import pathlib
import struct
import subprocess
import sys
import tempfile
......@@ -25,7 +26,7 @@ PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
swh_graph_root = pathlib.Path(__file__).parents[3]
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/server/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
......@@ -37,6 +38,16 @@ def find_graph_jar():
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?")
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path):
self.gateway = None
......@@ -49,13 +60,14 @@ class Backend:
classpath=find_graph_jar(),
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=sys.stderr,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT)
self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
......@@ -63,37 +75,35 @@ class Backend:
def stats(self):
return self.entry.stats()
def count(self, ttype, direction, edges_fmt, src):
method = getattr(self.entry, 'count_' + ttype)
return method(direction, edges_fmt, src)
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes', 'visit_paths')
src_id = self.pid2node[src]
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src_id):
if node_id == PATH_SEPARATOR_ID:
yield None
else:
yield self.node2pid[node_id]
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
src_id = self.pid2node[src]
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src_id, dst)
src, dst)
else:
dst_id = self.pid2node[dst]
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src_id, dst_id)
src, dst)
async for node_id in it:
yield self.node2pid[node_id]
async def visit_paths(self, *args):
buffer = []
async for res_pid in self.simple_traversal('visit_paths', *args):
if res_pid is None: # Path separator, flush
yield json.dumps(buffer)
buffer = []
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
buffer.append(res_pid)
path.append(node)
class JavaStreamProxy:
......@@ -117,7 +127,12 @@ class JavaStreamProxy:
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
with (await loop.run_in_executor(None, open, fname, 'rb')) as f:
open_thread = loop.run_in_executor(None, open, fname, 'rb')
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
......@@ -155,7 +170,17 @@ class JavaStreamProxy:
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
async for value in reader:
yield value
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
......@@ -11,7 +11,7 @@ from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.graph import client
from swh.graph.pid import PidToIntMap, IntToPidMap
from swh.graph.server.app import make_app
from swh.graph.server.backend import Backend
from swh.graph.backend import Backend
@click.group(name='graph', context_settings=CONTEXT_SETTINGS,
......
......@@ -80,3 +80,27 @@ class RemoteGraphClient(RPCClient):
'traversal': traversal,
'direction': direction
})
def count_leaves(self, src, edges="*", direction="forward"):
return self.get(
'leaves/count/{}'.format(src),
params={
'edges': edges,
'direction': direction
})
def count_neighbors(self, src, edges="*", direction="forward"):
return self.get(
'neighbors/count/{}'.format(src),
params={
'edges': edges,
'direction': direction
})
def count_visit_nodes(self, src, edges="*", direction="forward"):
return self.get(
'visit/nodes/count/{}'.format(src),
params={
'edges': edges,
'direction': direction
})
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import lru_cache
import subprocess
import collections
KIND_TO_SHAPE = {
'ori': 'egg',
'snp': 'doubleoctagon',
'rel': 'octagon',
'rev': 'diamond',
'dir': 'folder',
'cnt': 'oval',
}
@lru_cache()
def dot_to_svg(dot):
try:
p = subprocess.run(
['dot', '-Tsvg'], input=dot,
universal_newlines=True, capture_output=True,
check=True
)
except subprocess.CalledProcessError as e:
raise RuntimeError(e.stderr) from e
return p.stdout
def graph_dot(nodes):
ids = {n.id for n in nodes}
by_kind = collections.defaultdict(list)
for n in nodes:
by_kind[n.kind].append(n)
forward_edges = [
(node.id, child.id)
for node in nodes
for child in node.children()
if child.id in ids
]
backward_edges = [
(parent.id, node.id)
for node in nodes
for parent in node.parents()
if parent.id in ids
]
edges = set(forward_edges + backward_edges)
edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges)
nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes)
s = """digraph G {{
ranksep=1;
nodesep=0.5;
{nodes}
{edges}
}}""".format(nodes=nodes_fmt, edges=edges_fmt)
return s
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import functools
from swh.graph.backend import Backend
from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE
BASE_URL = 'https://archive.softwareheritage.org/browse'
KIND_TO_URL_FRAGMENT = {
'ori': '/origin/{}',
'snp': '/snapshot/{}',
'rel': '/release/{}',
'rev': '/revision/{}',
'dir': '/directory/{}',
'cnt': '/content/sha1_git:{}/',
}
def call_async_gen(generator, *args, **kwargs):
loop = asyncio.get_event_loop()
it = generator(*args, **kwargs).__aiter__()
while True:
try:
res = loop.run_until_complete(it.__anext__())
yield res
except StopAsyncIteration:
break
class Neighbors:
"""Neighbor iterator with custom O(1) length method"""
def __init__(self, graph, iterator, length_func):
self.graph = graph
self.iterator = iterator
self.length_func = length_func
def __iter__(self):
return self
def __next__(self):
succ = self.iterator.nextLong()
if succ == -1:
raise StopIteration
return GraphNode(self.graph, succ)
def __len__(self):
return self.length_func()
class GraphNode:
"""Node in the SWH graph"""
def __init__(self, graph, node_id):
self.graph = graph
self.id = node_id
def children(self):
return Neighbors(
self.graph,
self.graph.java_graph.successors(self.id),
lambda: self.graph.java_graph.outdegree(self.id))
def parents(self):
return Neighbors(
self.graph,
self.graph.java_graph.predecessors(self.id),
lambda: self.graph.java_graph.indegree(self.id))
def simple_traversal(self, ttype, direction='forward', edges='*'):
for node in call_async_gen(
self.graph.backend.simple_traversal,
ttype, direction, edges, self.id
):
yield self.graph[node]
def leaves(self, *args, **kwargs):
yield from self.simple_traversal('leaves', *args, **kwargs)
def visit_nodes(self, *args, **kwargs):
yield from self.simple_traversal('visit_nodes', *args, **kwargs)
def visit_paths(self, direction='forward', edges='*'):
for path in call_async_gen(
self.graph.backend.visit_paths,
direction, edges, self.id
):
yield [self.graph[node] for node in path]
def walk(self, dst, direction='forward', edges='*', traversal='dfs'):
for node in call_async_gen(
self.graph.backend.walk,
direction, edges, traversal, self.id, dst
):
yield self.graph[node]
def _count(self, ttype, direction='forward', edges='*'):
return self.graph.backend.count(ttype, direction, edges, self.id)
count_leaves = functools.partialmethod(_count, ttype='leaves')
count_neighbors = functools.partialmethod(_count, ttype='neighbors')
count_visit_nodes = functools.partialmethod(_count, ttype='visit_nodes')
@property
def pid(self):
return self.graph.node2pid[self.id]
@property
def kind(self):
return self.pid.split(':')[2]
def __str__(self):
return self.pid
def __repr__(self):
return '<{}>'.format(self.pid)
def dot_fragment(self):
swh, version, kind, hash = self.pid.split(':')
label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:])
url = BASE_URL + KIND_TO_URL_FRAGMENT[kind].format(hash)
shape = KIND_TO_SHAPE[kind]
return ('{} [label="{}", href="{}", target="_blank", shape="{}"];'
.format(self.id, label, url, shape))
def _repr_svg_(self):
nodes = [self, *list(self.children()), *list(self.parents())]
dot = graph_dot(nodes)
svg = dot_to_svg(dot)
return svg
class Graph:
def __init__(self, backend, node2pid, pid2node):
self.backend = backend
self.java_graph = backend.entry.get_graph()
self.node2pid = node2pid
self.pid2node = pid2node
def stats(self):
return self.backend.stats()
@property
def path(self):
return self.java_graph.getPath()
def __len__(self):
return self.java_graph.getNbNodes()
def __getitem__(self, node_id):
if isinstance(node_id, int):
self.node2pid[node_id] # check existence
return GraphNode(self, node_id)
elif isinstance(node_id, str):
node_id = self.pid2node[node_id]
return GraphNode(self, node_id)
@contextlib.contextmanager
def load(graph_path):
with Backend(graph_path) as backend:
yield Graph(backend, backend.node2pid, backend.pid2node)
......@@ -8,10 +8,12 @@ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using
FIFO as a transport to stream integers between the two languages.
"""
import json
import contextlib
import aiohttp.web
from swh.core.api.asynchronous import RPCServerApp
from swh.model.identifiers import PID_TYPES
@contextlib.asynccontextmanager
......@@ -45,14 +47,18 @@ async def stats(request):
def get_simple_traversal_handler(ttype):
async def simple_traversal(request):
backend = request.app['backend']
src = request.match_info['src']
edges = request.query.get('edges', '*')
direction = request.query.get('direction', 'forward')
src_node = backend.pid2node[src]
async with stream_response(request) as response:
async for res_pid in request.app['backend'].simple_traversal(
ttype, direction, edges, src
async for res_node in backend.simple_traversal(
ttype, direction, edges, src_node
):
res_pid = backend.node2pid[res_node]
await response.write('{}\n'.format(res_pid).encode())
return response
......@@ -60,35 +66,64 @@ def get_simple_traversal_handler(ttype):
async def walk(request):
backend = request.app['backend']
src = request.match_info['src']
dst = request.match_info['dst']
edges = request.query.get('edges', '*')
direction = request.query.get('direction', 'forward')
algo = request.query.get('traversal', 'dfs')
it = request.app['backend'].walk(direction, edges, algo, src, dst)
src_node = backend.pid2node[src]
if dst not in PID_TYPES:
dst = backend.pid2node[dst]
async with stream_response(request) as response:
async for res_pid in it:
async for res_node in backend.walk(
direction, edges, algo, src_node, dst
):
res_pid = backend.node2pid[res_node]
await response.write('{}\n'.format(res_pid).encode())
return response
async def visit_paths(request):
backend = request.app['backend']
src = request.match_info['src']
edges = request.query.get('edges', '*')
direction = request.query.get('direction', 'forward')
it = request.app['backend'].visit_paths(direction, edges, src)
src_node = backend.pid2node[src]
it = backend.visit_paths(direction, edges, src_node)
async with stream_response(request) as response:
async for res_pid in it:
await response.write('{}\n'.format(res_pid).encode())
async for res_path in it:
res_path_pid = [backend.node2pid[n] for n in res_path]
line = json.dumps(res_path_pid)
await response.write('{}\n'.format(line).encode())
return response
def get_count_handler(ttype):
async def count(request):
backend = request.app['backend']
src = request.match_info['src']
edges = request.query.get('edges', '*')
direction = request.query.get('direction', 'forward')
src_node = backend.pid2node[src]
cnt = backend.count(ttype, direction, edges, src_node)
return aiohttp.web.Response(body=str(cnt),
content_type='application/json')
return count
def make_app(backend, **kwargs):
app = RPCServerApp(**kwargs)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/graph/stats', stats)
app.router.add_route('GET', '/graph/leaves/{src}',
get_simple_traversal_handler('leaves'))
app.router.add_route('GET', '/graph/neighbors/{src}',
......@@ -98,5 +133,12 @@ def make_app(backend, **kwargs):
app.router.add_route('GET', '/graph/visit/paths/{src}', visit_paths)
app.router.add_route('GET', '/graph/walk/{src}/{dst}', walk)
app.router.add_route('GET', '/graph/neighbors/count/{src}',
get_count_handler('neighbors'))
app.router.add_route('GET', '/graph/leaves/count/{src}',
get_count_handler('leaves'))
app.router.add_route('GET', '/graph/visit/nodes/count/{src}',
get_count_handler('visit_nodes'))
app['backend'] = backend
return app
......@@ -4,8 +4,9 @@ from pathlib import Path
from aiohttp.test_utils import TestServer, TestClient, loop_context
from swh.graph.graph import load as graph_load
from swh.graph.client import RemoteGraphClient
from swh.graph.server.backend import Backend
from swh.graph.backend import Backend
from swh.graph.server.app import make_app
SWH_GRAPH_ROOT = Path(__file__).parents[3]
......@@ -21,7 +22,7 @@ class GraphServerProcess(multiprocessing.Process):
backend = Backend(graph_path=str(TEST_GRAPH_PATH))
with backend:
with loop_context() as loop:
app = make_app(backend=backend)
app = make_app(backend=backend, debug=True)
client = TestClient(TestServer(app), loop=loop)
loop.run_until_complete(client.start_server())
url = client.make_url('/graph/')
......@@ -37,3 +38,9 @@ def graph_client():
url = queue.get()
yield RemoteGraphClient(str(url))
server.terminate()
@pytest.fixture(scope="module")
def graph():
with graph_load(str(TEST_GRAPH_PATH)) as g:
yield g
......@@ -102,3 +102,21 @@ def test_walk(graph_client):
'swh:1:rel:0000000000000000000000000000000000000019'
]
assert set(actual) == set(expected)
def test_count(graph_client):
print(graph_client)
actual = graph_client.count_leaves(
'swh:1:ori:0000000000000000000000000000000000000021'
)
assert actual == 4
actual = graph_client.count_visit_nodes(
'swh:1:rel:0000000000000000000000000000000000000010',
edges='rel:rev,rev:rev'
)
assert actual == 3
actual = graph_client.count_neighbors(
'swh:1:rev:0000000000000000000000000000000000000009',
direction='backward'
)
assert actual == 3
import pytest
def test_graph(graph):
assert len(graph) == 21
obj = 'swh:1:dir:0000000000000000000000000000000000000008'
node = graph[obj]
assert str(node) == obj
assert len(node.children()) == 3
assert len(node.parents()) == 2
actual = {p.pid for p in node.children()}
expected = {
'swh:1:cnt:0000000000000000000000000000000000000001',
'swh:1:dir:0000000000000000000000000000000000000006',
'swh:1:cnt:0000000000000000000000000000000000000007'
}
assert expected == actual
actual = {p.pid for p in node.parents()}
expected = {
'swh:1:rev:0000000000000000000000000000000000000009',
'swh:1:dir:0000000000000000000000000000000000000012',
}
assert expected == actual
def test_invalid_pid(graph):
with pytest.raises(IndexError):
graph[1337]
with pytest.raises(IndexError):
graph[len(graph) + 1]
with pytest.raises(KeyError):
graph['swh:1:dir:0000000000000000000000000000000420000012']
def test_leaves(graph):
actual = list(graph['swh:1:ori:0000000000000000000000000000000000000021']
.leaves())
actual = [p.pid for p in actual]
expected = [
'swh:1:cnt:0000000000000000000000000000000000000001',
'swh:1:cnt:0000000000000000000000000000000000000004',
'swh:1:cnt:0000000000000000000000000000000000000005',
'swh:1:cnt:0000000000000000000000000000000000000007'
]
assert set(actual) == set(expected)
def test_visit_nodes(graph):
actual = list(graph['swh:1:rel:0000000000000000000000000000000000000010']
.visit_nodes(edges='rel:rev,rev:rev'))
actual = [p.pid for p in actual]
expected = [
'swh:1:rel:0000000000000000000000000000000000000010',
'swh:1:rev:0000000000000000000000000000000000000009',
'swh:1:rev:0000000000000000000000000000000000000003'
]
assert set(actual) == set(expected)
def test_visit_paths(graph):
actual = list(graph['swh:1:snp:0000000000000000000000000000000000000020']
.visit_paths(edges='snp:*,rev:*'))
actual = [tuple(n.pid for n in path) for path in actual]
expected = [
(
'swh:1:snp:0000000000000000000000000000000000000020',
'swh:1:rev:0000000000000000000000000000000000000009',
'swh:1:rev:0000000000000000000000000000000000000003',
'swh:1:dir:0000000000000000000000000000000000000002'
),
(
'swh:1:snp:0000000000000000000000000000000000000020',
'swh:1:rev:0000000000000000000000000000000000000009',
'swh:1:dir:0000000000000000000000000000000000000008'
),
(
'swh:1:snp:0000000000000000000000000000000000000020',
'swh:1:rel:0000000000000000000000000000000000000010'
)
]
assert set(actual) == set(expected)
def test_walk(graph):
actual = list(graph['swh:1:dir:0000000000000000000000000000000000000016']
.walk('rel',
edges='dir:dir,dir:rev,rev:*',
direction='backward',
traversal='bfs'))
actual = [p.pid for p in actual]
expected = [
'swh:1:dir:0000000000000000000000000000000000000016',
'swh:1:dir:0000000000000000000000000000000000000017',
'swh:1:rev:0000000000000000000000000000000000000018',
'swh:1:rel:0000000000000000000000000000000000000019'
]
assert set(actual) == set(expected)
def test_count(graph):
assert (graph['swh:1:ori:0000000000000000000000000000000000000021']
.count_leaves() == 4)
assert (graph['swh:1:rel:0000000000000000000000000000000000000010']
.count_visit_nodes(edges='rel:rev,rev:rev') == 3)
assert (graph['swh:1:rev:0000000000000000000000000000000000000009']
.count_neighbors(direction='backward') == 3)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment