From 49fcf52f29839a4c0d989479f4ab67769b48ac2f Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Tue, 27 Aug 2024 11:32:45 +0200 Subject: [PATCH 1/2] Don't materialize vec of values while building MPHs Instead, pthash will iterate on the values (while we read them from disk) and only materialize a vec of hashes, which is much smaller than a vec of values. When building the SWHIDs MPH, this reduces memory usage from 3.6 to 0.6TB. --- Cargo.lock | 5 ++-- rust/Cargo.toml | 2 +- rust/src/compress/label_names.rs | 39 +++++++++++++++++++------------- rust/src/compress/mph.rs | 35 ++++++++++++---------------- rust/src/compress/persons.rs | 34 +++++++++++++++++----------- 5 files changed, 63 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38aa5164..d0312d07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3022,9 +3022,9 @@ dependencies = [ [[package]] name = "pthash" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581d1ef02a6acc35e26d88a5ca72f2418ed32239fdf5878f02d2f25742d4effa" +checksum = "1ade741cee90eb10e0a32414b1f19c47abaee8aa5d23d5cab97cbaf3059b0b94" dependencies = [ "autocxx", "autocxx-build", @@ -3033,6 +3033,7 @@ dependencies = [ "cxx-build", "log", "rand", + "rayon", "thiserror", ] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 353741db..baa174b5 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -23,7 +23,7 @@ webgraph = "0.1.4" libc = "0.2.147" #ph = { git = "https://github.com/beling/bsuccinct-rs.git", rev = "f6636350a8149bc8deca83527ca195fe22311a4e" } ph = { version = "0.8.0" } -pthash = "0.3.1" +pthash = { version = "0.4.0", features = ["rayon"] } faster-hex = { version = "0.8.0", features = ["std"], default-features = false } rayon = { version = "1.9.0" } sha1 = { version = "0.10.1", optional = true } diff --git a/rust/src/compress/label_names.rs b/rust/src/compress/label_names.rs index 9228911d..ac44a49d 100644 --- a/rust/src/compress/label_names.rs +++ b/rust/src/compress/label_names.rs @@ -5,7 +5,8 @@ use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{ensure, Context, Result}; use dsi_progress_logger::{progress_logger, ProgressLog}; @@ -17,6 +18,7 @@ use rayon::prelude::*; use crate::labels::FilenameId; use crate::map::{MappedPermutation, OwnedPermutation, Permutation}; +use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; pub struct LabelName<T: AsRef<[u8]>>(pub T); @@ -31,10 +33,10 @@ impl<T: AsRef<[u8]>> Hashable for LabelName<T> { // graph has just over 2^32 keys pub type LabelNameMphf = PartitionedPhf<Minimal, MurmurHash2_128, DictionaryDictionary>; -fn iter_labels(path: PathBuf) -> Result<impl Iterator<Item = LabelName<Box<[u8]>>>> { +fn iter_labels(path: &Path) -> Result<impl Iterator<Item = LabelName<Box<[u8]>>>> { let base64 = base64_simd::STANDARD; let labels_file = - File::open(&path).with_context(|| format!("Could not open {}", path.display()))?; + File::open(path).with_context(|| format!("Could not open {}", path.display()))?; Ok(BufReader::new(labels_file) .lines() .map(move |label_base64| { @@ -52,16 +54,21 @@ fn iter_labels(path: PathBuf) -> Result<impl Iterator<Item = LabelName<Box<[u8]> /// Reads base64-encoded labels from the path and return a MPH function for them. pub fn build_mphf(path: PathBuf, num_labels: usize) -> Result<LabelNameMphf> { - let mut pl = progress_logger!( - display_memory = true, - item_name = "label", - local_speed = true, - expected_updates = Some(num_labels), - ); - pl.start("Reading labels"); - - let labels: Vec<_> = iter_labels(path)?.inspect(|_| pl.light_update()).collect(); - pl.done(); + let mut pass_counter = 0; + let iter_labels = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "label", + local_speed = true, + expected_updates = Some(num_labels), + ); + pl.start(&format!("Reading labels (pass #{})", pass_counter)); + let mut pl = BufferedProgressLogger::new(Arc::new(Mutex::new(Box::new(pl)))); + iter_labels(&path) + .expect("Could not read labels") + .inspect(move |_| MinimalProgressLog::light_update(&mut pl)) + }; let temp_dir = tempfile::tempdir().unwrap(); // From zack's benchmarks on the 2023-09-06 graph (4 billion label names) @@ -79,7 +86,7 @@ pub fn build_mphf(path: PathBuf, num_labels: usize) -> Result<LabelNameMphf> { ); let mut f = LabelNameMphf::new(); - f.build_in_internal_memory_from_bytes(&labels, &config) + f.build_in_internal_memory_from_bytes(iter_labels, &config) .context("Failed to build MPH")?; Ok(f) } @@ -116,8 +123,8 @@ pub fn build_order( pl.start("Reading labels"); let mut order: Vec<_> = (0..num_labels).map(|_| usize::MAX).collect(); - for (i, label) in iter_labels(path)?.enumerate() { - pl.light_update(); + for (i, label) in iter_labels(&path)?.enumerate() { + ProgressLog::light_update(&mut pl); let hash = mphf.hash(&label) as usize; ensure!(hash < num_labels, "{} is not minimal", mphf_path.display()); ensure!( diff --git a/rust/src/compress/mph.rs b/rust/src/compress/mph.rs index 9e4b8abe..62d8de8e 100644 --- a/rust/src/compress/mph.rs +++ b/rust/src/compress/mph.rs @@ -4,7 +4,7 @@ // See top-level LICENSE file for more information use std::fs::File; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; @@ -16,26 +16,21 @@ use rayon::prelude::*; use crate::compress::zst_dir::*; use crate::mph::{HashableSWHID, SwhidPthash}; -fn par_iter_swhids( - swhids_dir: &Path, - num_nodes: usize, -) -> impl ParallelIterator<Item = Vec<u8>> + '_ { - let mut pl = progress_logger!( - display_memory = true, - item_name = "SWHID", - local_speed = true, - expected_updates = Some(num_nodes), - ); - pl.start("Reading SWHIDs"); - let pl = Arc::new(Mutex::new(Box::new(pl))); - par_iter_lines_from_dir(swhids_dir, pl) -} - /// Reads textual SWHIDs from the path and return a MPH function for them. pub fn build_swhids_mphf(swhids_dir: PathBuf, num_nodes: usize) -> Result<SwhidPthash> { - let swhids: Vec<_> = par_iter_swhids(&swhids_dir, num_nodes) - .map(HashableSWHID) - .collect(); + let mut pass_counter = 0; + let iter_swhids = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "SWHID", + local_speed = true, + expected_updates = Some(num_nodes), + ); + pl.start(&format!("Reading SWHIDs (pass #{})", pass_counter)); + let pl = Arc::new(Mutex::new(Box::new(pl))); + par_iter_lines_from_dir(&swhids_dir, pl).map(HashableSWHID::<Vec<u8>>) + }; let temp_dir = tempfile::tempdir().unwrap(); // Tuned by zack on the 2023-09-06 graph on a machine with two Intel Xeon Gold 6342 CPUs @@ -48,7 +43,7 @@ pub fn build_swhids_mphf(swhids_dir: PathBuf, num_nodes: usize) -> Result<SwhidP log::info!("Building MPH with parameters: {:?}", config); let mut f = PartitionedPhf::new(); - f.build_in_internal_memory_from_bytes(&swhids, &config) + f.par_build_in_internal_memory_from_bytes(iter_swhids, &config) .context("Failed to build MPH")?; Ok(SwhidPthash(f)) } diff --git a/rust/src/compress/persons.rs b/rust/src/compress/persons.rs index 3c3b2021..ea52a6b3 100644 --- a/rust/src/compress/persons.rs +++ b/rust/src/compress/persons.rs @@ -5,7 +5,8 @@ use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use dsi_progress_logger::{progress_logger, ProgressLog}; @@ -14,6 +15,8 @@ use pthash::{ Phf, }; +use crate::utils::progress_logger::{BufferedProgressLogger, MinimalProgressLog}; + pub struct Person<T: AsRef<[u8]>>(pub T); impl<T: AsRef<[u8]>> Hashable for Person<T> { @@ -27,7 +30,7 @@ impl<T: AsRef<[u8]>> Hashable for Person<T> { // graph has just over 2^32 keys pub type PersonMphf = PartitionedPhf<Minimal, MurmurHash2_64, DictionaryDictionary>; -fn iter_persons(path: PathBuf) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> { +fn iter_persons(path: &Path) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> { let persons_file = File::open(&path).with_context(|| format!("Could not open {}", path.display()))?; Ok(BufReader::new(persons_file).lines().map(move |person| { @@ -42,16 +45,21 @@ fn iter_persons(path: PathBuf) -> Result<impl Iterator<Item = Person<Box<[u8]>>> /// Reads base64-encoded persons from the path and return a MPH function for them. pub fn build_mphf(path: PathBuf, num_persons: usize) -> Result<PersonMphf> { - let mut pl = progress_logger!( - display_memory = true, - item_name = "person", - local_speed = true, - expected_updates = Some(num_persons), - ); - pl.start("Reading persons"); - - let persons: Vec<_> = iter_persons(path)?.inspect(|_| pl.light_update()).collect(); - pl.done(); + let mut pass_counter = 0; + let iter_persons = || { + pass_counter += 1; + let mut pl = progress_logger!( + display_memory = true, + item_name = "person", + local_speed = true, + expected_updates = Some(num_persons), + ); + pl.start(&format!("Reading persons (pass #{})", pass_counter)); + let mut pl = BufferedProgressLogger::new(Arc::new(Mutex::new(Box::new(pl)))); + iter_persons(&path) + .expect("Could not read persons") + .inspect(move |_| MinimalProgressLog::light_update(&mut pl)) + }; let temp_dir = tempfile::tempdir().unwrap(); // TODO: tweak those for performance @@ -61,7 +69,7 @@ pub fn build_mphf(path: PathBuf, num_persons: usize) -> Result<PersonMphf> { log::info!("Building MPH with parameters: {:?}", config); let mut f = PersonMphf::new(); - f.build_in_internal_memory_from_bytes(&persons, &config) + f.build_in_internal_memory_from_bytes(iter_persons, &config) .context("Failed to build MPH")?; Ok(f) } -- GitLab From 89e18b0e72fbdb3d5896ea1b4354b78bd63ffd24 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Tue, 27 Aug 2024 14:44:01 +0200 Subject: [PATCH 2/2] Happy clippy --- rust/src/compress/persons.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/compress/persons.rs b/rust/src/compress/persons.rs index ea52a6b3..b681b69f 100644 --- a/rust/src/compress/persons.rs +++ b/rust/src/compress/persons.rs @@ -32,7 +32,7 @@ pub type PersonMphf = PartitionedPhf<Minimal, MurmurHash2_64, DictionaryDictiona fn iter_persons(path: &Path) -> Result<impl Iterator<Item = Person<Box<[u8]>>>> { let persons_file = - File::open(&path).with_context(|| format!("Could not open {}", path.display()))?; + File::open(path).with_context(|| format!("Could not open {}", path.display()))?; Ok(BufReader::new(persons_file).lines().map(move |person| { Person( person -- GitLab