diff --git a/Cargo.lock b/Cargo.lock index 38aa5164a98d2defe566cba2af68f1f4f6b4aa40..d0312d0749824fa4335fbf972e94996a51259e39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3022,9 +3022,9 @@ dependencies = [ [[package]] name = "pthash" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581d1ef02a6acc35e26d88a5ca72f2418ed32239fdf5878f02d2f25742d4effa" +checksum = "1ade741cee90eb10e0a32414b1f19c47abaee8aa5d23d5cab97cbaf3059b0b94" dependencies = [ "autocxx", "autocxx-build", @@ -3033,6 +3033,7 @@ dependencies = [ "cxx-build", "log", "rand", + "rayon", "thiserror", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 353741dbfb2835d8a121ded73c6888d153aefe0b..baa174b546a23915157038ecaf25f0b038baff7d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -23,7 +23,7 @@ webgraph = "0.1.4" libc = "0.2.147" #ph = { git = "https://github.com/beling/bsuccinct-rs.git", rev = "f6636350a8149bc8deca83527ca195fe22311a4e" } ph = { version = "0.8.0" } -pthash = "0.3.1" +pthash = { version = "0.4.0", features = ["rayon"] } faster-hex = { version = "0.8.0", features = ["std"], default-features = false } rayon = { version = "1.9.0" } sha1 = { version = "0.10.1", optional = true } diff --git a/rust/src/compress/label_names.rs b/rust/src/compress/label_names.rs index 9228911d98eeb1ec1bf8554e2a2839dacdcb8e22..ac44a49dff0a5241fb303b5dcadfde5c632434e9 100644 --- a/rust/src/compress/label_names.rs +++ b/rust/src/compress/label_names.rs @@ -5,7 +5,8 @@ use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{ensure, Context, Result}; use dsi_progress_logger::{progress_logger, ProgressLog}; @@ -17,6 +18,7 @@ use rayon::prelude::*; use crate::labels::FilenameId; use crate::map::{MappedPermutation, OwnedPermutation, Permutation}; +use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; pub struct LabelName<T: AsRef<[u8]>>(pub T); @@ -31,10 +33,10 @@ impl<T: AsRef<[u8]>> Hashable for LabelName<T> { // graph has just over 2^32 keys pub type LabelNameMphf = PartitionedPhf<Minimal, MurmurHash2_128, DictionaryDictionary>; -fn iter_labels(path: PathBuf) -> Result<impl Iterator<Item = LabelName<Box<[u8]>>>> { +fn iter_labels(path: &Path) -> Result<impl Iterator<Item = LabelName<Box<[u8]>>>> { let base64 = base64_simd::STANDARD; let labels_file = - File::open(&path).with_context(|| format!("Could not open {}", path.display()))?; + File::open(path).with_context(|| format!("Could not open {}", path.display()))?; Ok(BufReader::new(labels_file) .lines() .map(move |label_base64| { @@ -52,16 +54,21 @@ fn iter_labels(path: PathBuf) -> Result<impl Iterator<Item = LabelName<Box<[u8]> /// Reads base64-encoded labels from the path and return a MPH function for them. pub fn build_mphf(path: PathBuf, num_labels: usize) -> Result<LabelNameMphf> { - let mut pl = progress_logger!( - display_memory = true, - item_name = "label", - local_speed = true, - expected_updates = Some(num_labels), - ); - pl.start("Reading labels"); - - let labels: Vec<_> = iter_labels(path)?.inspect(|_| pl.light_update()).collect(); - pl.done(); + let mut pass_counter = 0; + let iter_labels = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "label", + local_speed = true, + expected_updates = Some(num_labels), + ); + pl.start(&format!("Reading labels (pass #{})", pass_counter)); + let mut pl = BufferedProgressLogger::new(Arc::new(Mutex::new(Box::new(pl)))); + iter_labels(&path) + .expect("Could not read labels") + .inspect(move |_| MinimalProgressLog::light_update(&mut pl)) + }; let temp_dir = tempfile::tempdir().unwrap(); // From zack's benchmarks on the 2023-09-06 graph (4 billion label names) @@ -79,7 +86,7 @@ pub fn build_mphf(path: PathBuf, num_labels: usize) -> Result<LabelNameMphf> { ); let mut f = LabelNameMphf::new(); - f.build_in_internal_memory_from_bytes(&labels, &config) + f.build_in_internal_memory_from_bytes(iter_labels, &config) .context("Failed to build MPH")?; Ok(f) } @@ -116,8 +123,8 @@ pub fn build_order( pl.start("Reading labels"); let mut order: Vec<_> = (0..num_labels).map(|_| usize::MAX).collect(); - for (i, label) in iter_labels(path)?.enumerate() { - pl.light_update(); + for (i, label) in iter_labels(&path)?.enumerate() { + ProgressLog::light_update(&mut pl); let hash = mphf.hash(&label) as usize; ensure!(hash < num_labels, "{} is not minimal", mphf_path.display()); ensure!( diff --git a/rust/src/compress/mph.rs b/rust/src/compress/mph.rs index 9e4b8abe0538f1ede54d57cf0b816fa3efd38a28..62d8de8eb3ac9adc00f8ba393f282acb51145e5b 100644 --- a/rust/src/compress/mph.rs +++ b/rust/src/compress/mph.rs @@ -4,7 +4,7 @@ // See top-level LICENSE file for more information use std::fs::File; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; @@ -16,26 +16,21 @@ use rayon::prelude::*; use crate::compress::zst_dir::*; use crate::mph::{HashableSWHID, SwhidPthash}; -fn par_iter_swhids( - swhids_dir: &Path, - num_nodes: usize, -) -> impl ParallelIterator<Item = Vec<u8>> + '_ { - let mut pl = progress_logger!( - display_memory = true, - item_name = "SWHID", - local_speed = true, - expected_updates = Some(num_nodes), - ); - pl.start("Reading SWHIDs"); - let pl = Arc::new(Mutex::new(Box::new(pl))); - par_iter_lines_from_dir(swhids_dir, pl) -} - /// Reads textual SWHIDs from the path and return a MPH function for them. pub fn build_swhids_mphf(swhids_dir: PathBuf, num_nodes: usize) -> Result<SwhidPthash> { - let swhids: Vec<_> = par_iter_swhids(&swhids_dir, num_nodes) - .map(HashableSWHID) - .collect(); + let mut pass_counter = 0; + let iter_swhids = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "SWHID", + local_speed = true, + expected_updates = Some(num_nodes), + ); + pl.start(&format!("Reading SWHIDs (pass #{})", pass_counter)); + let pl = Arc::new(Mutex::new(Box::new(pl))); + par_iter_lines_from_dir(&swhids_dir, pl).map(HashableSWHID::<Vec<u8>>) + }; let temp_dir = tempfile::tempdir().unwrap(); // Tuned by zack on the 2023-09-06 graph on a machine with two Intel Xeon Gold 6342 CPUs @@ -48,7 +43,7 @@ pub fn build_swhids_mphf(swhids_dir: PathBuf, num_nodes: usize) -> Result<SwhidP log::info!("Building MPH with parameters: {:?}", config); let mut f = PartitionedPhf::new(); - f.build_in_internal_memory_from_bytes(&swhids, &config) + f.par_build_in_internal_memory_from_bytes(iter_swhids, &config) .context("Failed to build MPH")?; Ok(SwhidPthash(f)) } diff --git a/rust/src/compress/persons.rs b/rust/src/compress/persons.rs index 3c3b202150ec7c3d1692213c72f64d2911861d56..b681b69f47a7d4868ad46e549897d23794519eeb 100644 --- a/rust/src/compress/persons.rs +++ b/rust/src/compress/persons.rs @@ -5,7 +5,8 @@ use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use dsi_progress_logger::{progress_logger, ProgressLog}; @@ -14,6 +15,8 @@ use pthash::{ Phf, }; +use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; + pub struct Person<T: AsRef<[u8]>>(pub T); impl<T: AsRef<[u8]>> Hashable for Person<T> { @@ -27,9 +30,9 @@ impl<T: AsRef<[u8]>> Hashable for Person<T> { // graph has just over 2^32 keys pub type PersonMphf = PartitionedPhf<Minimal, MurmurHash2_64, DictionaryDictionary>; -fn iter_persons(path: PathBuf) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> { +fn iter_persons(path: &Path) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> { let persons_file = - File::open(&path).with_context(|| format!("Could not open {}", path.display()))?; + File::open(path).with_context(|| format!("Could not open {}", path.display()))?; Ok(BufReader::new(persons_file).lines().map(move |person| { Person( person @@ -42,16 +45,21 @@ fn iter_persons(path: PathBuf) -> Result<impl Iterator<Item = Person<Box<[u8]>>> /// Reads base64-encoded persons from the path and return a MPH function for them. pub fn build_mphf(path: PathBuf, num_persons: usize) -> Result<PersonMphf> { - let mut pl = progress_logger!( - display_memory = true, - item_name = "person", - local_speed = true, - expected_updates = Some(num_persons), - ); - pl.start("Reading persons"); - - let persons: Vec<_> = iter_persons(path)?.inspect(|_| pl.light_update()).collect(); - pl.done(); + let mut pass_counter = 0; + let iter_persons = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "person", + local_speed = true, + expected_updates = Some(num_persons), + ); + pl.start(&format!("Reading persons (pass #{})", pass_counter)); + let mut pl = BufferedProgressLogger::new(Arc::new(Mutex::new(Box::new(pl)))); + iter_persons(&path) + .expect("Could not read persons") + .inspect(move |_| MinimalProgressLog::light_update(&mut pl)) + }; let temp_dir = tempfile::tempdir().unwrap(); // TODO: tweak those for performance @@ -61,7 +69,7 @@ pub fn build_mphf(path: PathBuf, num_persons: usize) -> Result<PersonMphf> { log::info!("Building MPH with parameters: {:?}", config); let mut f = PersonMphf::new(); - f.build_in_internal_memory_from_bytes(&persons, &config) + f.build_in_internal_memory_from_bytes(iter_persons, &config) .context("Failed to build MPH")?; Ok(f) }