#![deny(warnings)] #![allow(dead_code)] //! [TSM.ID].[11031972] -- Platform X Ecosystem //! xcu-mjolnir -- Parallel Compute Force Multiplier //! Work distribution across CPU cores with result aggregation use std::sync::{Arc, Mutex}; use std::collections::HashMap; #[derive(Debug)] pub enum MjolnirError { TaskFailed(String), AllWorkersBusy(String), AggregationFailed(String), } impl std::fmt::Display for MjolnirError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::TaskFailed(e) => write!(f, "Task: {e}"), Self::AllWorkersBusy(e) => write!(f, "Busy: {e}"), Self::AggregationFailed(e) => write!(f, "Aggregate: {e}"), } } } impl std::error::Error for MjolnirError {} #[derive(Debug, Clone)] pub struct ComputeTask { pub task_id: String, pub input_data: Vec, pub operation: ComputeOp, } #[derive(Debug, Clone)] pub enum ComputeOp { Sum, Product, Mean, Variance, Max, Min, Percentile(f64), MapMultiply(f64), FilterAbove(f64), Sort, } #[derive(Debug, Clone)] pub struct ComputeResult { pub task_id: String, pub result: Vec, pub scalar: Option, pub duration_us: u64, } pub struct Mjolnir { results: Arc>>, parallelism: usize, } impl Mjolnir { pub fn new(parallelism: usize) -> Self { Self { results: Arc::new(Mutex::new(HashMap::new())), parallelism: if parallelism == 0 { 4 } else { parallelism }, } } /// Execute compute operation pub fn execute(&self, task: ComputeTask) -> Result { let start = std::time::Instant::now(); let data = &task.input_data; if data.is_empty() { return Err(MjolnirError::TaskFailed("Empty input".into())); } let (result_vec, scalar) = match &task.operation { ComputeOp::Sum => { let s: f64 = data.iter().sum(); (vec![], Some(s)) } ComputeOp::Product => { let p: f64 = data.iter().fold(1.0, |acc, x| acc * x); (vec![], Some(p)) } ComputeOp::Mean => { let s: f64 = data.iter().sum(); (vec![], Some(s / data.len() as f64)) } ComputeOp::Variance => { let mean: f64 = data.iter().sum::() / data.len() as f64; let var: f64 = data.iter().map(|x| (x - mean).powi(2)).sum::() / data.len() as f64; (vec![], Some(var)) } ComputeOp::Max => { let m = data.iter().cloned().fold(f64::NEG_INFINITY, f64::max); (vec![], Some(m)) } ComputeOp::Min => { let m = data.iter().cloned().fold(f64::INFINITY, f64::min); (vec![], Some(m)) } ComputeOp::Percentile(pct) => { let mut sorted = data.clone(); sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); let idx = ((pct / 100.0) * (sorted.len() - 1) as f64) as usize; (vec![], Some(sorted[idx.min(sorted.len() - 1)])) } ComputeOp::MapMultiply(factor) => { let r: Vec = data.iter().map(|x| x * factor).collect(); (r, None) } ComputeOp::FilterAbove(threshold) => { let r: Vec = data.iter().filter(|&&x| x > *threshold).cloned().collect(); let count = r.len(); (r, Some(count as f64)) } ComputeOp::Sort => { let mut sorted = data.clone(); sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); (sorted, None) } }; let duration = start.elapsed().as_micros() as u64; let result = ComputeResult { task_id: task.task_id.clone(), result: result_vec, scalar, duration_us: duration, }; if let Ok(mut results) = self.results.lock() { results.insert(task.task_id, result.clone()); } Ok(result) } /// Parallel map-reduce: split data, compute, aggregate pub fn map_reduce(&self, data: &[f64], map_op: ComputeOp, reduce_op: ComputeOp) -> Result { let chunk_size = (data.len() + self.parallelism - 1) / self.parallelism; let mut intermediate: Vec = Vec::new(); for (i, chunk) in data.chunks(chunk_size).enumerate() { let task = ComputeTask { task_id: format!("mr-chunk-{i}"), input_data: chunk.to_vec(), operation: map_op.clone(), }; let result = self.execute(task)?; if let Some(s) = result.scalar { intermediate.push(s); } else { intermediate.extend(result.result); } } let reduce_task = ComputeTask { task_id: "mr-reduce".into(), input_data: intermediate, operation: reduce_op, }; self.execute(reduce_task) } pub fn parallelism(&self) -> usize { self.parallelism } } #[cfg(test)] mod tests { use super::*; #[test] fn test_sum() { let m = Mjolnir::new(4); let r = m.execute(ComputeTask { task_id: "t1".into(), input_data: vec![1.0, 2.0, 3.0, 4.0], operation: ComputeOp::Sum }).unwrap(); assert_eq!(r.scalar.unwrap(), 10.0); } #[test] fn test_variance() { let m = Mjolnir::new(4); let r = m.execute(ComputeTask { task_id: "t2".into(), input_data: vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0], operation: ComputeOp::Variance }).unwrap(); assert!(r.scalar.unwrap() > 3.0 && r.scalar.unwrap() < 5.0); } #[test] fn test_map_reduce() { let m = Mjolnir::new(4); let data: Vec = (1..=100).map(|x| x as f64).collect(); let r = m.map_reduce(&data, ComputeOp::Sum, ComputeOp::Sum).unwrap(); assert_eq!(r.scalar.unwrap(), 5050.0); } #[test] fn test_percentile() { let m = Mjolnir::new(1); let data: Vec = (1..=100).map(|x| x as f64).collect(); let r = m.execute(ComputeTask { task_id: "p99".into(), input_data: data, operation: ComputeOp::Percentile(99.0) }).unwrap(); assert!(r.scalar.unwrap() >= 99.0); } }