diff --git a/rust/src/utils/sort.rs b/rust/src/utils/sort.rs index e7a71cf982c97417402fb9511e1269d7e04576e3..ab97f388a7141aa4d2fc467f07c5e12304e33669 100644 --- a/rust/src/utils/sort.rs +++ b/rust/src/utils/sort.rs @@ -112,6 +112,9 @@ where }) .count(); + log::info!("Counted {} rows", num_rows); + log::info!("Flushing remaining buffers"); + let is_empty = num_rows == 0; // Write remaining buffers @@ -121,6 +124,8 @@ where flush_buffer(state) } + log::info!("Sorting"); + // Notify sorters they reached the end of their inputs for state in thread_states.iter_mut() { // This is safe because other threads ended @@ -150,7 +155,7 @@ where .with_context(|| format!("Could not create directory {}", target_dir.display()))?; if is_empty { - // No persons; write an empty file so the rest of the pipeline does not + // No items at all; write an empty file so the rest of the pipeline does not // need special-casing for the absence of files. let path = target_dir.join("0.csv.zst"); let file = std::fs::File::create(&path) @@ -168,6 +173,8 @@ where assert!(sorted_files.len() > 0, "Sorters did not run"); + log::info!("Merging sorted rows and writing to disk"); + // Spawn sort * | pv | split // TODO: it would be nice to start merging without waiting for all sorters @@ -211,6 +218,8 @@ where merge.wait().with_context(|| "pv crashed")?; split.wait().with_context(|| "split/zstdmt crashed")?; + log::info!("Done sorting rows and writing to disk"); + Ok(()) } }