From 27f23e67b974c520b5ac28f6326667a00ac1a61d Mon Sep 17 00:00:00 2001 From: jacekpoz Date: Thu, 9 May 2024 01:01:54 +0200 Subject: [PATCH] implement multithreading using rayon and dashmap --- lab3/zad2/src/main.rs | 120 ++++++++++++++++++------------------------ 1 file changed, 51 insertions(+), 69 deletions(-) diff --git a/lab3/zad2/src/main.rs b/lab3/zad2/src/main.rs index 7fb411d..936a734 100644 --- a/lab3/zad2/src/main.rs +++ b/lab3/zad2/src/main.rs @@ -1,88 +1,70 @@ -use std::{collections::HashMap, env::args, fs::{self, OpenOptions}, io::{self, Write}, sync::{Arc, RwLock}, thread, time::Duration}; +use std::{env::args, fs::{self, OpenOptions}, io::{self, Write}, sync::Arc}; +use dashmap::DashMap; use libgen::gen_rand; use libselect::{normal_select::NormalSelect, randomized_select::RandomizedSelect, Select}; +use rayon::{current_thread_index, iter::{IntoParallelIterator, ParallelIterator}}; -fn comp_avg(results_map: &HashMap>) -> HashMap { +fn comp_avg(results_map: &DashMap>) -> DashMap { results_map.iter() - .map(|(i, results)| + .map(|ref_multi| { + let (i, results) = ref_multi.pair(); (*i, (results.iter() .map(|res| res.0) - .sum::() as f64 / results.len() as f64))) + .sum::() as f64 / results.len() as f64)) + } + ) .collect() } -fn swap_avg(results_map: &HashMap>) -> HashMap { +fn swap_avg(results_map: &DashMap>) -> DashMap { results_map.iter() - .map(|(i, results)| + .map(|ref_multi| { + let (i, results) = ref_multi.pair(); (*i, (results.iter() .map(|res| res.1) - .sum::() as f64 / results.len() as f64))) + .sum::() as f64 / results.len() as f64)) + } + ) .collect() } fn main() -> io::Result<()> { - let k = args().nth(1) - .expect(format!("usage: {} ", args().nth(0).unwrap()).as_str()) - .parse::() - .expect("k must be usize"); - let m = 50; - let normal_results: Arc>>> = Arc::new(RwLock::new(HashMap::new())); - let randomized_results: Arc>>> = Arc::new(RwLock::new(HashMap::new())); + let normal_results: Arc>> = Arc::new(DashMap::new()); + let randomized_results: Arc>> = Arc::new(DashMap::new()); - let mut thread_handles = vec![]; + let normal_results_clone = Arc::clone(&normal_results); + let randomized_results_clone = Arc::clone(&randomized_results); - let num_cpus = thread::available_parallelism()?.get(); + (100u64..=50000u64).step_by(100).collect::>().into_par_iter().for_each(move |n| { + if normal_results_clone.get(&n).is_some() { + return; + } + if randomized_results_clone.get(&n).is_some() { + return; + } + println!("{}: starting n: {n}", current_thread_index().unwrap()); + let input = gen_rand(n); - let num_cpus = num_cpus - 4; + normal_results_clone.insert(n, vec![]); + randomized_results_clone.insert(n, vec![]); - for cpu in 0..num_cpus { - let cpu = cpu.clone(); + let mut normal = NormalSelect::new(false); + let mut randomized = RandomizedSelect::new(false); - let normal_results = Arc::clone(&normal_results); - let randomized_results = Arc::clone(&randomized_results); - let handle = thread::spawn(move || { - for n in ((100 + cpu * 100) as u64..=50000 as u64).step_by(100 * num_cpus) { - if normal_results.read().expect("can't read insertion results small map").get(&(n as u64)).is_some() { - continue; - } - let mut nr = normal_results.write().expect("can't write to insertion results small"); - if randomized_results.read().expect("can't read quick results small map").get(&(n as u64)).is_some() { - continue; - } - let mut rr = randomized_results.write().expect("can't write to quick results small"); - println!("cpu {cpu}: {n}"); - let input = gen_rand(n); - - nr.insert(n, vec![]); - rr.insert(n, vec![]); - - let mut normal = NormalSelect::new(false); - let mut randomized = RandomizedSelect::new(false); - for i in 0..m { - print!("{i} "); - _ = std::io::stdout().flush(); - normal.select(&input, k); - randomized.select(&input, k); - nr.get_mut(&n).unwrap().push((normal.num_comp(), normal.num_swap())); - rr.get_mut(&n).unwrap().push((randomized.num_comp(), randomized.num_swap())); - } - println!(); - } - }); - - thread_handles.push(handle); - } - - for handle in thread_handles { - handle.join().expect("couldn't join handle"); - } - - let normal_results = normal_results.read().expect("can't access results"); - let randomized_results = randomized_results.read().expect("can't access results"); + let k = 1; + for _ in 0..m { + _ = std::io::stdout().flush(); + normal.select(&input, k); + randomized.select(&input, k); + normal_results_clone.get_mut(&n).unwrap().push((normal.num_comp(), normal.num_swap())); + randomized_results_clone.get_mut(&n).unwrap().push((randomized.num_comp(), randomized.num_swap())); + } + println!("{}: finished n: {n}", current_thread_index().unwrap()); + }); let normal_comp_averages = comp_avg(&normal_results); @@ -97,18 +79,18 @@ fn main() -> io::Result<()> { let mut results_file = OpenOptions::new() .create(true) .append(true) - .open(format!("./results/k{k}"))?; + .open(format!("./results/k1"))?; for n in (100..=50000).step_by(100) { writeln!(results_file, "{n} {} {} {} {} {} {} {} {}", - normal_comp_averages[&n], - normal_swap_averages[&n], - randomized_comp_averages[&n], - randomized_swap_averages[&n], - normal_comp_averages[&n] / n as f64, - normal_swap_averages[&n] / n as f64, - randomized_comp_averages[&n] / n as f64, - randomized_swap_averages[&n] / n as f64, + normal_comp_averages.get(&n).unwrap().value(), + normal_swap_averages.get(&n).unwrap().value(), + randomized_comp_averages.get(&n).unwrap().value(), + randomized_swap_averages.get(&n).unwrap().value(), + normal_comp_averages.get(&n).unwrap().value() / n as f64, + normal_swap_averages.get(&n).unwrap().value() / n as f64, + randomized_comp_averages.get(&n).unwrap().value() / n as f64, + randomized_swap_averages.get(&n).unwrap().value() / n as f64, )?; }