From 76eac7c2781a02d8eaba915cdd9a85f604b493a7 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Fri, 28 Mar 2025 12:01:58 +0100 Subject: [PATCH] Add methods SwhGraph::{iter_nodes, par_iter_nodes} --- rust/src/graph.rs | 43 +++++++++++++++++++++++++++ tools/topology/src/bin/edges.rs | 14 ++++----- tools/topology/src/bin/generations.rs | 39 ++++++++++-------------- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/rust/src/graph.rs b/rust/src/graph.rs index 97afdd249..0746b9259 100644 --- a/rust/src/graph.rs +++ b/rust/src/graph.rs @@ -15,9 +15,12 @@ use std::borrow::Borrow; use std::iter::Iterator; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{bail, Context, Result}; use dsi_bitstream::prelude::BE; +use dsi_progress_logger::ProgressLog; +use rayon::prelude::*; use webgraph::graphs::vec_graph::LabeledVecGraph; use webgraph::prelude::*; @@ -31,6 +34,8 @@ use crate::labels::{EdgeLabel, UntypedEdgeLabel}; use crate::mph::SwhidMphf; use crate::properties; pub use crate::underlying_graph::DefaultUnderlyingGraph; +use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; +use crate::utils::shuffle::par_iter_shuffled_range; use crate::utils::suffix_path; /// Alias for [`usize`], which may become a newtype in a future version. @@ -159,6 +164,44 @@ pub trait SwhGraph { /// Return whether there is an arc going from `src_node_id` to `dst_node_id`. fn has_arc(&self, src_node_id: NodeId, dst_node_id: NodeId) -> bool; + + /// Returns an iterator on all the nodes + /// + /// Order is not guaranteed. + /// + /// Updates the progress logger on every node id from 0 to `self.num_nodes()`, + /// even those that are filtered out. + fn iter_nodes<'a>( + &'a self, + mut pl: impl ProgressLog + 'a, + ) -> impl Iterator<Item = NodeId> + 'a { + (0..self.num_nodes()) + .inspect(move |_| pl.light_update()) + .filter(|&node_id| self.has_node(node_id)) + } + /// Returns a parallel iterator on all the nodes + /// + /// Order is not guaranteed. + /// + /// Updates the progress logger on every node id from 0 to `self.num_nodes()`, + /// even those that are filtered out. + fn par_iter_nodes<'a>( + &'a self, + pl: &'a mut (impl ProgressLog + Send), + ) -> impl ParallelIterator<Item = NodeId> + 'a + where + Self: Sync, + { + par_iter_shuffled_range(0..self.num_nodes()) + .map_with( + BufferedProgressLogger::new(Arc::new(Mutex::new(pl))), + move |pl, node_id| { + pl.update(); + node_id + }, + ) + .filter(|&node_id| self.has_node(node_id)) + } } pub trait SwhForwardGraph: SwhGraph { diff --git a/tools/topology/src/bin/edges.rs b/tools/topology/src/bin/edges.rs index 857471349..a4253bb81 100644 --- a/tools/topology/src/bin/edges.rs +++ b/tools/topology/src/bin/edges.rs @@ -4,7 +4,7 @@ // See top-level LICENSE file for more information use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::{Context, Result}; use clap::Parser; @@ -17,8 +17,6 @@ use swh_graph::graph::*; use swh_graph::labels::{Branch, DirEntry, Visit, VisitStatus}; use swh_graph::mph::DynMphf; use swh_graph::utils::parse_allowed_node_types; -use swh_graph::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; -use swh_graph::utils::shuffle::par_iter_shuffled_range; use swh_graph::views::Subgraph; use swh_graph::{NodeType, SWHID}; @@ -158,9 +156,9 @@ pub fn main() -> Result<()> { ); pl.start("Listing nodes and edges..."); - par_iter_shuffled_range(0..graph.num_nodes()).try_for_each_with( - BufferedProgressLogger::new(Arc::new(Mutex::new(&mut pl))), - |thread_pl, node| -> Result<()> { + graph + .par_iter_nodes(&mut pl) + .try_for_each(|node| -> Result<()> { let node_type = graph.properties().node_type(node); match node_type { @@ -195,10 +193,8 @@ pub fn main() -> Result<()> { &mut snapshot_writers.get_thread_writer().unwrap(), )?, } - thread_pl.light_update(); Ok::<_, anyhow::Error>(()) - }, - )?; + })?; content_writers.close()?; directory_writers.close()?; diff --git a/tools/topology/src/bin/generations.rs b/tools/topology/src/bin/generations.rs index 3d0d0be77..db8e5ea21 100644 --- a/tools/topology/src/bin/generations.rs +++ b/tools/topology/src/bin/generations.rs @@ -19,7 +19,6 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::mpsc::{sync_channel, SyncSender}; -use std::sync::{Arc, Mutex}; use anyhow::{anyhow, bail, Context, Result}; use clap::{Parser, ValueEnum}; @@ -30,8 +29,6 @@ use rayon::prelude::*; use swh_graph::graph::*; use swh_graph::mph::DynMphf; use swh_graph::utils::parse_allowed_node_types; -use swh_graph::utils::progress_logger::BufferedProgressLogger; -use swh_graph::utils::shuffle::par_iter_shuffled_range; use swh_graph::views::{Subgraph, Transposed}; use swh_graph::NodeType; @@ -188,36 +185,32 @@ where let mut total_arcs = 0; let mut leaves = Vec::new(); - par_iter_shuffled_range(0..graph.num_nodes()) + graph + .par_iter_nodes(&mut pl) .try_fold_with( ( - BufferedProgressLogger::new(Arc::new(Mutex::new(&mut pl))), 0, // total_arcs Vec::new(), // leaves ), - |(mut thread_pl, mut thread_total_arcs, mut thread_leaves), node| { - use swh_graph::utils::progress_logger::MinimalProgressLog; - thread_pl.light_update(); - if graph.has_node(node) { - let outdegree = graph.indegree(node); - thread_total_arcs += outdegree; - let outdegree: u32 = outdegree.try_into().with_context(|| { - format!( - "{} has outdegree {outdegree}, which does not fit in u32", - graph.properties().swhid(node) - ) - })?; - num_unvisited_predecessors[node].store(outdegree, Ordering::Relaxed); - if outdegree == 0 { - thread_leaves.push(node); - } + |(mut thread_total_arcs, mut thread_leaves), node| { + let outdegree = graph.indegree(node); + thread_total_arcs += outdegree; + let outdegree: u32 = outdegree.try_into().with_context(|| { + format!( + "{} has outdegree {outdegree}, which does not fit in u32", + graph.properties().swhid(node) + ) + })?; + num_unvisited_predecessors[node].store(outdegree, Ordering::Relaxed); + if outdegree == 0 { + thread_leaves.push(node); } - Ok((thread_pl, thread_total_arcs, thread_leaves)) + Ok((thread_total_arcs, thread_leaves)) }, ) .collect::<Result<Vec<_>>>()? .into_iter() - .for_each(|(_thread_pl, thread_total_arcs, thread_leaves)| { + .for_each(|(thread_total_arcs, thread_leaves)| { total_arcs += thread_total_arcs; leaves.extend(thread_leaves); }); -- GitLab