implement multithreading using rayon and dashmap

This commit is contained in:
jacekpoz 2024-05-09 01:01:54 +02:00
parent 98d90239bf
commit 27f23e67b9
Signed by: poz
SSH key fingerprint: SHA256:JyLeVWE4bF3tDnFeUpUaJsPsNlJyBldDGV/dIKSLyN8

View file

@ -1,88 +1,70 @@
use std::{collections::HashMap, env::args, fs::{self, OpenOptions}, io::{self, Write}, sync::{Arc, RwLock}, thread, time::Duration};
use std::{env::args, fs::{self, OpenOptions}, io::{self, Write}, sync::Arc};
use dashmap::DashMap;
use libgen::gen_rand;
use libselect::{normal_select::NormalSelect, randomized_select::RandomizedSelect, Select};
use rayon::{current_thread_index, iter::{IntoParallelIterator, ParallelIterator}};
fn comp_avg(results_map: &HashMap<u64, Vec<(u64, u64)>>) -> HashMap<u64, f64> {
fn comp_avg(results_map: &DashMap<u64, Vec<(u64, u64)>>) -> DashMap<u64, f64> {
results_map.iter()
.map(|(i, results)|
.map(|ref_multi| {
let (i, results) = ref_multi.pair();
(*i, (results.iter()
.map(|res| res.0)
.sum::<u64>() as f64 / results.len() as f64)))
.sum::<u64>() as f64 / results.len() as f64))
}
)
.collect()
}
fn swap_avg(results_map: &HashMap<u64, Vec<(u64, u64)>>) -> HashMap<u64, f64> {
fn swap_avg(results_map: &DashMap<u64, Vec<(u64, u64)>>) -> DashMap<u64, f64> {
results_map.iter()
.map(|(i, results)|
.map(|ref_multi| {
let (i, results) = ref_multi.pair();
(*i, (results.iter()
.map(|res| res.1)
.sum::<u64>() as f64 / results.len() as f64)))
.sum::<u64>() as f64 / results.len() as f64))
}
)
.collect()
}
fn main() -> io::Result<()> {
let k = args().nth(1)
.expect(format!("usage: {} <k>", args().nth(0).unwrap()).as_str())
.parse::<usize>()
.expect("k must be usize");
let m = 50;
let normal_results: Arc<RwLock<HashMap<u64, Vec<(u64, u64)>>>> = Arc::new(RwLock::new(HashMap::new()));
let randomized_results: Arc<RwLock<HashMap<u64, Vec<(u64, u64)>>>> = Arc::new(RwLock::new(HashMap::new()));
let normal_results: Arc<DashMap<u64, Vec<(u64, u64)>>> = Arc::new(DashMap::new());
let randomized_results: Arc<DashMap<u64, Vec<(u64, u64)>>> = Arc::new(DashMap::new());
let mut thread_handles = vec![];
let normal_results_clone = Arc::clone(&normal_results);
let randomized_results_clone = Arc::clone(&randomized_results);
let num_cpus = thread::available_parallelism()?.get();
(100u64..=50000u64).step_by(100).collect::<Vec<_>>().into_par_iter().for_each(move |n| {
if normal_results_clone.get(&n).is_some() {
return;
}
if randomized_results_clone.get(&n).is_some() {
return;
}
println!("{}: starting n: {n}", current_thread_index().unwrap());
let input = gen_rand(n);
let num_cpus = num_cpus - 4;
normal_results_clone.insert(n, vec![]);
randomized_results_clone.insert(n, vec![]);
for cpu in 0..num_cpus {
let cpu = cpu.clone();
let mut normal = NormalSelect::new(false);
let mut randomized = RandomizedSelect::new(false);
let normal_results = Arc::clone(&normal_results);
let randomized_results = Arc::clone(&randomized_results);
let handle = thread::spawn(move || {
for n in ((100 + cpu * 100) as u64..=50000 as u64).step_by(100 * num_cpus) {
if normal_results.read().expect("can't read insertion results small map").get(&(n as u64)).is_some() {
continue;
}
let mut nr = normal_results.write().expect("can't write to insertion results small");
if randomized_results.read().expect("can't read quick results small map").get(&(n as u64)).is_some() {
continue;
}
let mut rr = randomized_results.write().expect("can't write to quick results small");
println!("cpu {cpu}: {n}");
let input = gen_rand(n);
nr.insert(n, vec![]);
rr.insert(n, vec![]);
let mut normal = NormalSelect::new(false);
let mut randomized = RandomizedSelect::new(false);
for i in 0..m {
print!("{i} ");
_ = std::io::stdout().flush();
normal.select(&input, k);
randomized.select(&input, k);
nr.get_mut(&n).unwrap().push((normal.num_comp(), normal.num_swap()));
rr.get_mut(&n).unwrap().push((randomized.num_comp(), randomized.num_swap()));
}
println!();
}
});
thread_handles.push(handle);
}
for handle in thread_handles {
handle.join().expect("couldn't join handle");
}
let normal_results = normal_results.read().expect("can't access results");
let randomized_results = randomized_results.read().expect("can't access results");
let k = 1;
for _ in 0..m {
_ = std::io::stdout().flush();
normal.select(&input, k);
randomized.select(&input, k);
normal_results_clone.get_mut(&n).unwrap().push((normal.num_comp(), normal.num_swap()));
randomized_results_clone.get_mut(&n).unwrap().push((randomized.num_comp(), randomized.num_swap()));
}
println!("{}: finished n: {n}", current_thread_index().unwrap());
});
let normal_comp_averages = comp_avg(&normal_results);
@ -97,18 +79,18 @@ fn main() -> io::Result<()> {
let mut results_file = OpenOptions::new()
.create(true)
.append(true)
.open(format!("./results/k{k}"))?;
.open(format!("./results/k1"))?;
for n in (100..=50000).step_by(100) {
writeln!(results_file, "{n} {} {} {} {} {} {} {} {}",
normal_comp_averages[&n],
normal_swap_averages[&n],
randomized_comp_averages[&n],
randomized_swap_averages[&n],
normal_comp_averages[&n] / n as f64,
normal_swap_averages[&n] / n as f64,
randomized_comp_averages[&n] / n as f64,
randomized_swap_averages[&n] / n as f64,
normal_comp_averages.get(&n).unwrap().value(),
normal_swap_averages.get(&n).unwrap().value(),
randomized_comp_averages.get(&n).unwrap().value(),
randomized_swap_averages.get(&n).unwrap().value(),
normal_comp_averages.get(&n).unwrap().value() / n as f64,
normal_swap_averages.get(&n).unwrap().value() / n as f64,
randomized_comp_averages.get(&n).unwrap().value() / n as f64,
randomized_swap_averages.get(&n).unwrap().value() / n as f64,
)?;
}