Skip to content
Snippets Groups Projects
Commit 91daa8eb authored by vlorentz's avatar vlorentz
Browse files

Add ListFilesByName

parent 9ad66c42
No related branches found
No related tags found
1 merge request!317Add ListFilesByName
Pipeline #3559 canceled
/*
* Copyright (c) 2023 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
package org.softwareheritage.graph.utils;
import it.unimi.dsi.big.webgraph.labelling.ArcLabelledNodeIterator;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.logging.ProgressLogger;
import org.softwareheritage.graph.*;
import org.softwareheritage.graph.labels.DirEntry;
import java.io.IOException;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.concurrent.*;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/* From every refs/heads/master, refs/heads/main, or HEAD branch in any snapshot,
* browse the whole directory tree looking for files named <filename>, and lists
* them to stdout.
*/
public class ListFilesByName {
private class MyLongBigArrayBigList extends LongBigArrayBigList {
public MyLongBigArrayBigList(long size) {
super(size);
// Allow setting directly in the array without repeatedly calling
// .add() first
this.size = size;
}
}
private int numThreads;
private int batchSize; /* Number of revisions to process in each task */
private String filename;
private long filenameId; // the label id of the filename we are looking for
private final String[] branchNames = {"refs/heads/master", "refs/heads/main", "HEAD"};
private Vector<Long> branchNameIds;
private SwhUnidirectionalGraph graph;
private ThreadLocal<SwhUnidirectionalGraph> threadGraph;
final static Logger logger = LoggerFactory.getLogger(ListFilesByName.class);
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
if (args.length != 4) {
System.err.println(
"Syntax: java org.softwareheritage.graph.utils.ListFilesByName <path/to/graph> <filename> <num_threads> <batch_size>");
System.exit(1);
}
ListFilesByName lfbn = new ListFilesByName();
String graphPath = args[0];
lfbn.filename = args[1];
lfbn.numThreads = Integer.parseInt(args[2]);
lfbn.batchSize = Integer.parseInt(args[3]);
System.err.println("Loading graph " + graphPath + " ...");
lfbn.graph = SwhUnidirectionalGraph.loadLabelledMapped(graphPath);
System.err.println("Loading labels for " + graphPath + " ...");
lfbn.graph.loadLabelNames();
lfbn.threadGraph = new ThreadLocal<SwhUnidirectionalGraph>();
lfbn.run();
}
public void run() throws IOException, InterruptedException, ExecutionException {
ProgressLogger pl = new ProgressLogger(logger);
pl.itemsName = "filenames";
pl.start("Looking for filename and branch ids...");
filenameId = -1L;
String label;
branchNameIds = new Vector<Long>();
for (int j = 0; j < branchNames.length; j++) {
branchNameIds.add(-1L);
}
for (long i = 0;; i++) {
try {
label = new String(graph.getLabelName(i));
} catch (IndexOutOfBoundsException e) {
break;
}
if (label.equals(filename)) {
filenameId = i;
if (!branchNameIds.contains(-1L)) {
break;
}
}
for (int j = 0; j < branchNames.length; j++) {
if (label.equals(branchNames[j])) {
branchNameIds.set(j, i);
if (filenameId != -1L && !branchNameIds.contains(-1L)) {
break;
}
}
}
pl.lightUpdate();
}
pl.done();
if (filenameId == -1) {
System.err.println("Failed to find filename id for " + filename);
System.exit(7);
}
if (branchNameIds.contains(-1L)) {
for (int j = 0; j < branchNames.length; j++) {
if (branchNameIds.get(j) == -1L) {
System.err.println("Failed to find branch name id for " + branchNames[j]);
// FIXME: this fails tests, but we should probably error in prod
// System.exit(7);
}
}
}
BufferedWriter bufferedStdout = new BufferedWriter(new OutputStreamWriter(System.out));
CSVPrinter csvPrinter = new CSVPrinter(bufferedStdout, CSVFormat.RFC4180);
csvPrinter.printRecord("snp_SWHID", "branch_name", "dir_SWHID", "file_name", "cnt_SWHID");
csvPrinter.flush();
bufferedStdout.flush();
listFiles();
}
private void listFiles() throws InterruptedException, ExecutionException {
final long numChunks = numThreads * 1000;
System.err.println("Initializing traversals...");
ExecutorService service = Executors.newFixedThreadPool(numThreads);
Vector<Future> futures = new Vector<Future>();
List<Long> range = new ArrayList<Long>();
for (long i = 0; i < numChunks; i++) {
range.add(i);
}
Collections.shuffle(range); // Make workload homogeneous over time
ProgressLogger pl = new ProgressLogger(logger);
pl.logInterval = 60000;
pl.itemsName = "nodes";
pl.expectedUpdates = graph.numNodes();
pl.start("Visiting snapshots' files...");
for (long i : range) {
final long chunkId = i;
futures.add(service.submit(() -> {
try {
listFilesInSnapshotChunk(chunkId, numChunks, pl);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
}
// Error if any exception occurred
for (Future future : futures) {
future.get();
}
service.shutdown();
service.awaitTermination(365, TimeUnit.DAYS);
pl.done();
}
private void flushToStdout(StringBuffer buffer) throws IOException {
synchronized (System.out) {
System.out.write(buffer.toString().getBytes());
}
}
private void listFilesInSnapshotChunk(long chunkId, long numChunks, ProgressLogger pl) throws IOException {
if (threadGraph.get() == null) {
threadGraph.set(this.graph.copy());
}
SwhUnidirectionalGraph graph = threadGraph.get();
long numNodes = graph.numNodes();
long chunkSize = numNodes / numChunks;
long chunkStart = chunkSize * chunkId;
long chunkEnd = chunkId == numChunks - 1 ? numNodes : chunkSize * (chunkId + 1);
StringBuffer buf = new StringBuffer(batchSize * 100000);
CSVPrinter csvPrinter = new CSVPrinter(buf, CSVFormat.RFC4180);
long flushThreshold = batchSize * 1000000; // Avoids wasting RAM for too long
for (long i = chunkStart; i < chunkEnd; i++) {
if (graph.getNodeType(i) != SwhType.SNP) {
continue;
}
try {
listFilesInSnapshot(graph, i, csvPrinter);
csvPrinter.flush();
} catch (OutOfMemoryError e) {
System.err.format("OOMed while processing %s (buffer grew to %d): %s\n", graph.getSWHID(i),
buf.length(), e);
throw new RuntimeException(e);
}
if (buf.length() > flushThreshold) {
csvPrinter.flush();
flushToStdout(buf);
buf = new StringBuffer(batchSize * 1140);
csvPrinter = new CSVPrinter(buf, CSVFormat.RFC4180);
}
}
csvPrinter.flush();
flushToStdout(buf);
synchronized (pl) {
pl.update(chunkSize);
}
}
/* Performs a BFS, stopping at frontier directories. */
private void listFilesInSnapshot(SwhUnidirectionalGraph graph, long snpId, CSVPrinter csvPrinter)
throws IOException {
SWHID snpSWHID = graph.getSWHID(snpId);
LazyLongIterator it = graph.successors(snpId);
ArcLabelledNodeIterator.LabelledArcIterator s = graph.labelledSuccessors(snpId);
long nodeId;
while ((nodeId = s.nextLong()) >= 0) {
if (graph.getNodeType(nodeId) != SwhType.REV && graph.getNodeType(nodeId) != SwhType.REL) {
continue;
}
DirEntry[] labels = (DirEntry[]) s.label().get();
for (DirEntry label : labels) {
if (branchNameIds.contains(label.filenameId)) {
listFilesInDirectory(graph, snpId, label.filenameId, nodeId, csvPrinter);
}
}
}
}
private void listFilesInDirectory(SwhUnidirectionalGraph graph, long snpId, long branchNameId, long rootId,
CSVPrinter csvPrinter) throws IOException {
// TODO: reuse these across calls instead of reallocating?
LongArrayList stack = new LongArrayList();
LongOpenHashSet visited = new LongOpenHashSet();
stack.push(rootId);
long nodeId, successorId;
while (!stack.isEmpty()) {
nodeId = stack.popLong();
if (visited.contains(nodeId)) {
continue;
}
visited.add(nodeId);
if (graph.getNodeType(nodeId) == SwhType.DIR) {
ArcLabelledNodeIterator.LabelledArcIterator s = graph.labelledSuccessors(nodeId);
long node;
while ((successorId = s.nextLong()) >= 0) {
if (graph.getNodeType(successorId) == SwhType.DIR && !visited.contains(successorId)) {
stack.add(successorId);
} else {
DirEntry[] labels = (DirEntry[]) s.label().get();
for (DirEntry label : labels) {
if (filenameId == label.filenameId) {
// snp_SWHID,branch_name,dir_SWHID,file_name,cnt_SWHID
csvPrinter.printRecord(graph.getSWHID(snpId),
new String(graph.getLabelName(branchNameId)), graph.getSWHID(nodeId), filename,
graph.getSWHID(successorId));
}
}
}
}
} else {
LazyLongIterator it = graph.successors(nodeId);
for (; (successorId = it.nextLong()) != -1;) {
if (graph.getNodeType(successorId) == SwhType.DIR && !visited.contains(successorId)) {
stack.add(successorId);
}
}
}
}
}
}
......@@ -4,10 +4,10 @@
# See top-level LICENSE file for more information
"""
Luigi tasks for producing the most common names of every content
================================================================
Luigi tasks for producing the most common names of every content and datasets based on file names
=================================================================================================
"""
""" # noqa
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
......@@ -80,6 +80,7 @@ class PopularContentNames(luigi.Task):
self.local_graph_path / self.graph_name,
str(self.max_results_per_content),
str(self.popularity_threshold),
max_ram=self._max_ram(),
)
| pv
| Command.zstdmt("-19")
......@@ -228,3 +229,69 @@ class PopularContentNamesOrcToS3(_CsvToOrcToS3ToAthenaTask):
def _athena_table_name(self) -> str:
return "popular_contents"
class ListFilesByName(luigi.Task):
"""Creates a CSV file that contains the most popular name(s) of each content"""
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
output_path = luigi.PathParameter()
file_name = luigi.Parameter()
num_threads = luigi.IntParameter(96)
batch_size = luigi.IntParameter(10000)
def _max_ram(self):
nb_nodes = count_nodes(
self.local_graph_path, self.graph_name, "ori,snp,rel,rev,dir,cnt"
)
graph_size = nb_nodes * 8
num_threads = 96
# see listFilesInSnapshotChunk
csv_buffers = self.batch_size * num_threads * 1000000
# Does not keep any large array for all nodes, but uses temporary stacks
# and hash sets.
# 1GB per thread should be more than enough.
bfs_buffers = 1_000_000_000 * num_threads
spare_space = 1_000_000_000
return graph_size + csv_buffers + bfs_buffers + spare_space
@property
def resources(self):
"""Return the estimated RAM use of this task."""
import socket
return {f"{socket.getfqdn()}_ram_mb": self._max_ram() / 1_000_000}
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`LocalGraph`."""
return [LocalGraph(local_graph_path=self.local_graph_path)]
def output(self) -> luigi.Target:
""".csv.zst file that contains the topological order."""
return luigi.LocalTarget(self.output_path)
def run(self) -> None:
"""Runs org.softwareheritage.graph.utils.PopularContentNames and compresses"""
from .shell import AtomicFileSink, Command, Java
class_name = "org.softwareheritage.graph.utils.ListFilesByName"
# fmt: on
(
Java(
class_name,
self.local_graph_path / self.graph_name,
self.file_name,
str(self.num_threads),
str(self.batch_size),
max_ram=self._max_ram(),
)
| Command.zstdmt("-19")
> AtomicFileSink(self.output())
).run()
# fmt: off
......@@ -6,12 +6,17 @@
import itertools
from pathlib import Path
import subprocess
import textwrap
import pytest
import pyzstd
from swh.graph.example_dataset import DATASET_DIR
from swh.graph.luigi.file_names import PopularContentNames, PopularContentPaths
from swh.graph.luigi.file_names import (
ListFilesByName,
PopularContentNames,
PopularContentPaths,
)
EXPECTED_LINES_DEPTH1 = """\
swh:1:cnt:0000000000000000000000000000000000000005,1337,parser.c,1
......@@ -129,3 +134,58 @@ def test_popularcontentpaths(tmpdir, depth, subset):
assert False, depth
assert list(sorted(rows)) == list(sorted(expected_lines))
@pytest.mark.parametrize("file_name", ["README.md", "parser.c", "TODO.txt", "tests"])
def test_listfilesbyname(tmpdir, file_name):
tmpdir = Path(tmpdir)
output_path = tmpdir / "files.csv.zst"
task = ListFilesByName(
local_graph_path=DATASET_DIR / "compressed",
graph_name="example",
output_path=output_path,
file_name=file_name,
batch_size=100, # faster
num_threads=1, # faster and uses less RAM
)
task.run()
csv_text = subprocess.check_output(["zstdcat", output_path]).decode()
(header, *rows) = csv_text.split("\r\n")
assert header == "snp_SWHID,branch_name,dir_SWHID,file_name,cnt_SWHID"
if file_name == "README.md":
assert set(rows) == set(
textwrap.dedent(
"""\
swh:1:snp:0000000000000000000000000000000000000022,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000008,README.md,swh:1:cnt:0000000000000000000000000000000000000001
swh:1:snp:0000000000000000000000000000000000000022,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000006,README.md,swh:1:cnt:0000000000000000000000000000000000000004
swh:1:snp:0000000000000000000000000000000000000020,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000008,README.md,swh:1:cnt:0000000000000000000000000000000000000001
swh:1:snp:0000000000000000000000000000000000000020,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000006,README.md,swh:1:cnt:0000000000000000000000000000000000000004
"""
).split( # noqa
"\n"
)
)
elif file_name == "parser.c":
assert set(rows) == set(
textwrap.dedent(
"""\
swh:1:snp:0000000000000000000000000000000000000022,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000008,parser.c,swh:1:cnt:0000000000000000000000000000000000000007
swh:1:snp:0000000000000000000000000000000000000022,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000006,parser.c,swh:1:cnt:0000000000000000000000000000000000000005
swh:1:snp:0000000000000000000000000000000000000020,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000008,parser.c,swh:1:cnt:0000000000000000000000000000000000000007
swh:1:snp:0000000000000000000000000000000000000020,refs/heads/master,swh:1:dir:0000000000000000000000000000000000000006,parser.c,swh:1:cnt:0000000000000000000000000000000000000005
"""
).split( # noqa
"\n"
)
)
elif file_name == "TODO.txt":
assert rows == [""]
else:
assert rows == [""]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment