diff --git a/java/src/main/java/org/softwareheritage/graph/utils/CountPaths.java b/java/src/main/java/org/softwareheritage/graph/utils/CountPaths.java new file mode 100644 index 0000000000000000000000000000000000000000..0471fca69eb143dfd74343700f316b0e54a401fe --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/utils/CountPaths.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023 The Software Heritage developers + * See the AUTHORS file at the top-level directory of this distribution + * License: GNU General Public License version 3, or any later version + * See top-level LICENSE file for more information + */ + +package org.softwareheritage.graph.utils; + +import com.martiansoftware.jsap.*; +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import it.unimi.dsi.fastutil.longs.LongBigArrayBigList; +import it.unimi.dsi.logging.ProgressLogger; +import org.softwareheritage.graph.*; + +import java.io.IOException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* Counts the number of (non-singleton) paths reaching each node, from all other nodes. + * + * Sample invocation: + * + * $ zstdcat /poolswh/softwareheritage/vlorentz/2022-04-25_toposort_ori,snp,rel,rev,dir.txt.zst + * | pv --line-mode --wait + * | java -cp ~/swh-environment/swh-graph/java/target/swh-graph-*.jar -Xmx1000G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.CountPaths /dev/shm/swh-graph/default/graph forward \ + * | zstdmt \ + * > /poolswh/softwareheritage/vlorentz/2022-04-25_path_counts_forward_ori,snp,rel,rev,dir.txt.zst + */ + +public class CountPaths { + private SwhBidirectionalGraph graph; + private LongBigArrayBigList countsFromRoots; + private LongBigArrayBigList countsFromAll; + + final static Logger logger = LoggerFactory.getLogger(TopoSort.class); + + public static void main(String[] args) throws IOException, ClassNotFoundException { + if (args.length != 2) { + System.err.println( + "Syntax: java org.softwareheritage.graph.utils.CountPaths <path/to/graph> {forward|backward}"); + System.exit(1); + } + + String graphPath = args[0]; + String directionString = args[1]; + + CountPaths countPaths = new CountPaths(); + + countPaths.loadGraph(graphPath); + + if (directionString.equals("backward")) { + countPaths.graph = countPaths.graph.transpose(); + } else if (!directionString.equals("forward")) { + System.err.println("Invalid direction " + directionString); + System.exit(1); + } + System.err.println("Starting..."); + + countPaths.countPaths(); + } + + public void loadGraph(String graphBasename) throws IOException { + System.err.println("Loading graph " + graphBasename + " ..."); + graph = SwhBidirectionalGraph.loadMapped(graphBasename); + } + + public void countPaths() { + Scanner stdin = new Scanner(System.in); + + String firstLine = stdin.nextLine().strip(); + if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) { + System.err.format("Unexpected header: %s\n", firstLine); + System.exit(2); + } + + long numNodes = graph.numNodes(); + countsFromRoots = new LongBigArrayBigList(numNodes); + countsFromAll = new LongBigArrayBigList(numNodes); + + ProgressLogger pl = new ProgressLogger(logger); + pl.logInterval = 60000; + pl.itemsName = "nodes"; + pl.expectedUpdates = graph.numNodes(); + pl.start("Initializing counts..."); + for (long i = 0; i < numNodes; i++) { + pl.lightUpdate(); + countsFromRoots.add(0); + countsFromAll.add(0); + } + pl.done(); + + pl = new ProgressLogger(logger); + pl.itemsName = "nodes"; + pl.expectedUpdates = graph.numNodes(); + pl.start("Counting paths..."); + + System.out.println("swhid,paths_from_roots,all_paths"); + while (stdin.hasNextLine()) { + pl.lightUpdate(); + String cells[] = stdin.nextLine().strip().split(",", -1); + SWHID nodeSWHID = new SWHID(cells[0]); + long nodeId = graph.getNodeId(nodeSWHID); + long countFromRoots = countsFromRoots.getLong(nodeId); + long countFromAll = countsFromAll.getLong(nodeId); + + /* Print counts for this node */ + System.out.format("%s,%d,%d\n", cells[0], countFromRoots, countFromAll); + + /* Add counts of paths coming from this node to all successors */ + countFromAll++; + if (countFromRoots == 0) { + /* This node is itself a root */ + countFromRoots++; + } + LazyLongIterator it = graph.successors(nodeId); + for (long successorId; (successorId = it.nextLong()) != -1;) { + countsFromAll.set(successorId, countsFromAll.getLong(successorId) + countFromAll); + countsFromRoots.set(successorId, countsFromRoots.getLong(successorId) + countFromRoots); + } + + } + + pl.done(); + } +} diff --git a/swh/graph/luigi/misc_datasets.py b/swh/graph/luigi/misc_datasets.py index 7f500628b266ca385f37d0319f643cb7718e0841..7075d9f0bbc842e8379fc50037566c0af4fea5cb 100644 --- a/swh/graph/luigi/misc_datasets.py +++ b/swh/graph/luigi/misc_datasets.py @@ -35,7 +35,7 @@ And optionally:: # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path -from typing import List +from typing import Dict, List import luigi @@ -114,3 +114,57 @@ class PopularContents(luigi.Task): | zstdmt -19 """ # noqa run_script(script, Path(self.output().path)) + + +class CountPaths(luigi.Task): + """Creates a file that lists: + + * the number of paths leading to each node, and starting from all leaves, and + * the number of paths leading to each node, and starting from all other nodes + + Singleton paths are not counted. + """ + + local_graph_path = luigi.PathParameter() + topological_order_dir = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + object_types = luigi.Parameter() + direction = luigi.ChoiceParameter(choices=["forward", "backward"]) + max_ram = luigi.Parameter(default="200G") + + def requires(self) -> Dict[str, luigi.Task]: + """Returns an instance of :class:`LocalGraph` and one of :class:`TopoSort`.""" + return { + "graph": LocalGraph(local_graph_path=self.local_graph_path), + "toposort": TopoSort( + local_graph_path=self.local_graph_path, + graph_name=self.graph_name, + topological_order_dir=self.topological_order_dir, + object_types=self.object_types, + direction=self.direction, + ), + } + + def output(self) -> luigi.Target: + """.csv.zst file that contains the counts.""" + return luigi.LocalTarget( + self.topological_order_dir + / f"path_counts_{self.direction}_{self.object_types}.csv.zst" + ) + + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.CountPaths and compresses""" + invalid_object_types = set(self.object_types.split(",")) - OBJECT_TYPES + if invalid_object_types: + raise ValueError(f"Invalid object types: {invalid_object_types}") + class_name = "org.softwareheritage.graph.utils.CountPaths" + topological_order_path = self.input()["toposort"].path + # TODO: pass max_ram to run_script() correctly so it can pass it to + # check_config(), instead of hardcoding it on the command line here + script = f""" + zstdcat '{topological_order_path}' \ + | pv --line-mode --wait --size $(zstdcat '{topological_order_path}' | wc -l) \ + | java -Xmx{self.max_ram} {class_name} '{self.local_graph_path}/{self.graph_name}' '{self.direction}' \ + | zstdmt -19 + """ # noqa + run_script(script, Path(self.output().path)) diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index 174877bbc02a2e176e5b3d338cc20e69419f5e15..b3138f86932b48f0c8b53f2ebcb8bcb945306e23 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -22,7 +22,7 @@ from swh.model.model import ( TimestampWithTimezone, ) -from .test_toposort import EXPECTED_BACKWARD as TOPOLOGICAL_ORDER +from .test_toposort import TOPO_ORDER_BACKWARD as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py index ec32264ca8beb61f8c92d4d5dbe16db0f66db96f..c2475687ae838e80d75e1316e6896ad41af00a8e 100644 --- a/swh/graph/tests/test_toposort.py +++ b/swh/graph/tests/test_toposort.py @@ -9,7 +9,7 @@ import subprocess import pytest -from swh.graph.luigi.misc_datasets import TopoSort +from swh.graph.luigi.misc_datasets import CountPaths, TopoSort DATA_DIR = Path(__file__).parents[0] / "dataset" @@ -17,7 +17,7 @@ DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: the order of sample ancestors should not be hardcoded # FIXME: swh:1:snp:0000000000000000000000000000000000000022,3,1,swh has three possible # sample ancestors; they should not be hardecoded here -EXPECTED_BACKWARD = """\ +TOPO_ORDER_BACKWARD = """\ SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2 swh:1:rev:0000000000000000000000000000000000000003,0,1,, swh:1:rev:0000000000000000000000000000000000000009,1,4,swh:1:rev:0000000000000000000000000000000000000003, @@ -32,7 +32,7 @@ swh:1:snp:0000000000000000000000000000000000000022,3,1,swh:1:rev:000000000000000 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1,0,swh:1:snp:0000000000000000000000000000000000000022, """ -EXPECTED_FORWARD = """\ +TOPO_ORDER_FORWARD = """\ SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2 swh:1:rel:0000000000000000000000000000000000000019,0,1,, swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0,1,, @@ -47,6 +47,36 @@ swh:1:rev:0000000000000000000000000000000000000009,4,1,swh:1:snp:000000000000000 swh:1:rev:0000000000000000000000000000000000000003,1,0,swh:1:rev:0000000000000000000000000000000000000009, """ +PATH_COUNTS_BACKWARD = """\ +swhid,paths_from_roots,all_paths +swh:1:rev:0000000000000000000000000000000000000003,0,0 +swh:1:rev:0000000000000000000000000000000000000009,1,1 +swh:1:rel:0000000000000000000000000000000000000010,1,2 +swh:1:snp:0000000000000000000000000000000000000020,2,5 +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2,6 +swh:1:rev:0000000000000000000000000000000000000013,1,2 +swh:1:rev:0000000000000000000000000000000000000018,1,3 +swh:1:rel:0000000000000000000000000000000000000019,1,4 +swh:1:rel:0000000000000000000000000000000000000021,1,4 +swh:1:snp:0000000000000000000000000000000000000022,3,10 +swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,3,11 +""" + +PATH_COUNTS_FORWARD = """\ +swhid,paths_from_roots,all_paths +swh:1:rel:0000000000000000000000000000000000000019,0,0 +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0,0 +swh:1:snp:0000000000000000000000000000000000000020,1,1 +swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0,0 +swh:1:snp:0000000000000000000000000000000000000022,1,1 +swh:1:rel:0000000000000000000000000000000000000021,1,2 +swh:1:rev:0000000000000000000000000000000000000018,2,4 +swh:1:rev:0000000000000000000000000000000000000013,2,5 +swh:1:rel:0000000000000000000000000000000000000010,2,4 +swh:1:rev:0000000000000000000000000000000000000009,6,15 +swh:1:rev:0000000000000000000000000000000000000003,6,16 +""" + @pytest.mark.parametrize( "direction,algorithm", itertools.product(["backward", "forward"], ["bfs", "dfs"]) @@ -71,7 +101,7 @@ def test_toposort(tmpdir, direction: str, algorithm: str): csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode() - expected = EXPECTED_BACKWARD if direction == "backward" else EXPECTED_FORWARD + expected = TOPO_ORDER_BACKWARD if direction == "backward" else TOPO_ORDER_FORWARD (header, *rows) = csv_text.split("\n") (expected_header, *expected_lines) = expected.split("\n") @@ -107,3 +137,33 @@ def test_toposort(tmpdir, direction: str, algorithm: str): "swh:1:rev:0000000000000000000000000000000000000003,1,0," "swh:1:rev:0000000000000000000000000000000000000009," ) + + +@pytest.mark.parametrize("direction", ["backward", "forward"]) +def test_countpaths(tmpdir, direction: str): + tmpdir = Path(tmpdir) + + topological_order_path = ( + tmpdir / f"topological_order_dfs_{direction}_rev,rel,snp,ori.csv.zst" + ) + path_counts_path = tmpdir / f"path_counts_{direction}_rev,rel,snp,ori.csv.zst" + + topological_order_path.write_text( + TOPO_ORDER_BACKWARD if direction == "backward" else TOPO_ORDER_FORWARD + ) + + task = CountPaths( + local_graph_path=DATA_DIR / "compressed", + topological_order_dir=tmpdir, + direction=direction, + object_types="rev,rel,snp,ori", + graph_name="example", + ) + + task.run() + + csv_text = subprocess.check_output(["zstdcat", path_counts_path]).decode() + + expected = PATH_COUNTS_BACKWARD if direction == "backward" else PATH_COUNTS_FORWARD + + assert csv_text == expected