From 76eac7c2781a02d8eaba915cdd9a85f604b493a7 Mon Sep 17 00:00:00 2001
From: Valentin Lorentz <vlorentz@softwareheritage.org>
Date: Fri, 28 Mar 2025 12:01:58 +0100
Subject: [PATCH] Add methods SwhGraph::{iter_nodes, par_iter_nodes}

---
 rust/src/graph.rs                     | 43 +++++++++++++++++++++++++++
 tools/topology/src/bin/edges.rs       | 14 ++++-----
 tools/topology/src/bin/generations.rs | 39 ++++++++++--------------
 3 files changed, 64 insertions(+), 32 deletions(-)

diff --git a/rust/src/graph.rs b/rust/src/graph.rs
index 97afdd249..0746b9259 100644
--- a/rust/src/graph.rs
+++ b/rust/src/graph.rs
@@ -15,9 +15,12 @@
 use std::borrow::Borrow;
 use std::iter::Iterator;
 use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, Context, Result};
 use dsi_bitstream::prelude::BE;
+use dsi_progress_logger::ProgressLog;
+use rayon::prelude::*;
 use webgraph::graphs::vec_graph::LabeledVecGraph;
 use webgraph::prelude::*;
 
@@ -31,6 +34,8 @@ use crate::labels::{EdgeLabel, UntypedEdgeLabel};
 use crate::mph::SwhidMphf;
 use crate::properties;
 pub use crate::underlying_graph::DefaultUnderlyingGraph;
+use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog};
+use crate::utils::shuffle::par_iter_shuffled_range;
 use crate::utils::suffix_path;
 
 /// Alias for [`usize`], which may become a newtype in a future version.
@@ -159,6 +164,44 @@ pub trait SwhGraph {
 
     /// Return whether there is an arc going from `src_node_id` to `dst_node_id`.
     fn has_arc(&self, src_node_id: NodeId, dst_node_id: NodeId) -> bool;
+
+    /// Returns an iterator on all the nodes
+    ///
+    /// Order is not guaranteed.
+    ///
+    /// Updates the progress logger on every node id from 0 to `self.num_nodes()`,
+    /// even those that are filtered out.
+    fn iter_nodes<'a>(
+        &'a self,
+        mut pl: impl ProgressLog + 'a,
+    ) -> impl Iterator<Item = NodeId> + 'a {
+        (0..self.num_nodes())
+            .inspect(move |_| pl.light_update())
+            .filter(|&node_id| self.has_node(node_id))
+    }
+    /// Returns a parallel iterator on all the nodes
+    ///
+    /// Order is not guaranteed.
+    ///
+    /// Updates the progress logger on every node id from 0 to `self.num_nodes()`,
+    /// even those that are filtered out.
+    fn par_iter_nodes<'a>(
+        &'a self,
+        pl: &'a mut (impl ProgressLog + Send),
+    ) -> impl ParallelIterator<Item = NodeId> + 'a
+    where
+        Self: Sync,
+    {
+        par_iter_shuffled_range(0..self.num_nodes())
+            .map_with(
+                BufferedProgressLogger::new(Arc::new(Mutex::new(pl))),
+                move |pl, node_id| {
+                    pl.update();
+                    node_id
+                },
+            )
+            .filter(|&node_id| self.has_node(node_id))
+    }
 }
 
 pub trait SwhForwardGraph: SwhGraph {
diff --git a/tools/topology/src/bin/edges.rs b/tools/topology/src/bin/edges.rs
index 857471349..a4253bb81 100644
--- a/tools/topology/src/bin/edges.rs
+++ b/tools/topology/src/bin/edges.rs
@@ -4,7 +4,7 @@
 // See top-level LICENSE file for more information
 
 use std::path::PathBuf;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use anyhow::{Context, Result};
 use clap::Parser;
@@ -17,8 +17,6 @@ use swh_graph::graph::*;
 use swh_graph::labels::{Branch, DirEntry, Visit, VisitStatus};
 use swh_graph::mph::DynMphf;
 use swh_graph::utils::parse_allowed_node_types;
-use swh_graph::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog};
-use swh_graph::utils::shuffle::par_iter_shuffled_range;
 use swh_graph::views::Subgraph;
 use swh_graph::{NodeType, SWHID};
 
@@ -158,9 +156,9 @@ pub fn main() -> Result<()> {
     );
     pl.start("Listing nodes and edges...");
 
-    par_iter_shuffled_range(0..graph.num_nodes()).try_for_each_with(
-        BufferedProgressLogger::new(Arc::new(Mutex::new(&mut pl))),
-        |thread_pl, node| -> Result<()> {
+    graph
+        .par_iter_nodes(&mut pl)
+        .try_for_each(|node| -> Result<()> {
             let node_type = graph.properties().node_type(node);
 
             match node_type {
@@ -195,10 +193,8 @@ pub fn main() -> Result<()> {
                     &mut snapshot_writers.get_thread_writer().unwrap(),
                 )?,
             }
-            thread_pl.light_update();
             Ok::<_, anyhow::Error>(())
-        },
-    )?;
+        })?;
 
     content_writers.close()?;
     directory_writers.close()?;
diff --git a/tools/topology/src/bin/generations.rs b/tools/topology/src/bin/generations.rs
index 3d0d0be77..db8e5ea21 100644
--- a/tools/topology/src/bin/generations.rs
+++ b/tools/topology/src/bin/generations.rs
@@ -19,7 +19,6 @@
 use std::path::PathBuf;
 use std::sync::atomic::{AtomicU32, Ordering};
 use std::sync::mpsc::{sync_channel, SyncSender};
-use std::sync::{Arc, Mutex};
 
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{Parser, ValueEnum};
@@ -30,8 +29,6 @@ use rayon::prelude::*;
 use swh_graph::graph::*;
 use swh_graph::mph::DynMphf;
 use swh_graph::utils::parse_allowed_node_types;
-use swh_graph::utils::progress_logger::BufferedProgressLogger;
-use swh_graph::utils::shuffle::par_iter_shuffled_range;
 use swh_graph::views::{Subgraph, Transposed};
 use swh_graph::NodeType;
 
@@ -188,36 +185,32 @@ where
     let mut total_arcs = 0;
     let mut leaves = Vec::new();
 
-    par_iter_shuffled_range(0..graph.num_nodes())
+    graph
+        .par_iter_nodes(&mut pl)
         .try_fold_with(
             (
-                BufferedProgressLogger::new(Arc::new(Mutex::new(&mut pl))),
                 0,          // total_arcs
                 Vec::new(), // leaves
             ),
-            |(mut thread_pl, mut thread_total_arcs, mut thread_leaves), node| {
-                use swh_graph::utils::progress_logger::MinimalProgressLog;
-                thread_pl.light_update();
-                if graph.has_node(node) {
-                    let outdegree = graph.indegree(node);
-                    thread_total_arcs += outdegree;
-                    let outdegree: u32 = outdegree.try_into().with_context(|| {
-                        format!(
-                            "{} has outdegree {outdegree}, which does not fit in u32",
-                            graph.properties().swhid(node)
-                        )
-                    })?;
-                    num_unvisited_predecessors[node].store(outdegree, Ordering::Relaxed);
-                    if outdegree == 0 {
-                        thread_leaves.push(node);
-                    }
+            |(mut thread_total_arcs, mut thread_leaves), node| {
+                let outdegree = graph.indegree(node);
+                thread_total_arcs += outdegree;
+                let outdegree: u32 = outdegree.try_into().with_context(|| {
+                    format!(
+                        "{} has outdegree {outdegree}, which does not fit in u32",
+                        graph.properties().swhid(node)
+                    )
+                })?;
+                num_unvisited_predecessors[node].store(outdegree, Ordering::Relaxed);
+                if outdegree == 0 {
+                    thread_leaves.push(node);
                 }
-                Ok((thread_pl, thread_total_arcs, thread_leaves))
+                Ok((thread_total_arcs, thread_leaves))
             },
         )
         .collect::<Result<Vec<_>>>()?
         .into_iter()
-        .for_each(|(_thread_pl, thread_total_arcs, thread_leaves)| {
+        .for_each(|(thread_total_arcs, thread_leaves)| {
             total_arcs += thread_total_arcs;
             leaves.extend(thread_leaves);
         });
-- 
GitLab