[TSM.ID].[11031972] PXE : 19 Cangkang -> REAL Implementation (for/if/match/tests)
This commit is contained in:
@@ -1,60 +1,100 @@
|
||||
#![deny(warnings)]
|
||||
// [TSM.ID].[11031972] -- All Rights Reserved. Proprietary & Confidential.
|
||||
/// Protokol Gossip P2P antar-node (Pengganti Redis)
|
||||
pub mod gossip {
|
||||
use tracing::{info, warn};
|
||||
use foca::{Identity, Config};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use std::net::SocketAddr;
|
||||
// (BytesMut removed)
|
||||
//! [TSM.ID].[11031972] -- Platform X Ecosystem
|
||||
//! xcu-grid -- Distributed Compute Grid with Task Distribution
|
||||
use std::collections::{HashMap, BinaryHeap};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub struct NodeIdentity {
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Identity for NodeIdentity {
|
||||
fn renew(&self) -> Option<Self> {
|
||||
None // Identity statis per node
|
||||
}
|
||||
fn has_same_prefix(&self, other: &Self) -> bool {
|
||||
self.addr == other.addr
|
||||
}
|
||||
}
|
||||
|
||||
/// Menjalankan The Quantum Mesh (X-Grid)
|
||||
pub async fn start_grid_sync(bind_addr: &str) -> anyhow::Result<()> {
|
||||
warn!("IGNITING THE QUANTUM MESH (X-GRID) ON {}", bind_addr);
|
||||
info!("This node is now searching for other XCU Ultra mutations...");
|
||||
|
||||
let addr: SocketAddr = bind_addr.parse()?;
|
||||
let _identity = NodeIdentity { addr };
|
||||
|
||||
// Inisialisasi SWIM Gossip Protocol (Foca)
|
||||
let _config = Config::simple();
|
||||
let _rng = StdRng::from_entropy();
|
||||
// let mut _foca: Foca<NodeIdentity, NoCustomBroadcast, StdRng> = Foca::new(_identity, _config, _rng);
|
||||
|
||||
// Disini letak loop UDP Socket (port 7946) untuk bertukar detak jantung (heartbeat)
|
||||
// dan sinkronisasi state ruangan.
|
||||
//
|
||||
// Jika Node A meledak, Foca akan secara otomatis mendeteksi kegagalan (Failure Detection)
|
||||
// dalam orde milidetik dan memberitahu seluruh cluster untuk merutekan ulang media!
|
||||
|
||||
info!("X-Grid Gossip Protocol operational. No central database needed.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// PHASE 25: CRDT Mesh (Zero-Redis Synchronization)
|
||||
/// Menyinkronkan status ruangan (Siapa yang Mute, Dominant Speaker, dll) di 100 Server
|
||||
/// secara desentralisasi penuh menggunakan Conflict-free Replicated Data Type.
|
||||
pub fn broadcast_crdt_room_state(room_id: &str, _state_payload: &str) {
|
||||
// Simulasi logika CRDT Map: crdts::Map::new()
|
||||
// Kita tidak memakai Redis. Setiap node memegang replika RoomStateCrdt.
|
||||
// Jika ada perubahan, node tersebut "menggosipkannya" ke tetangganya.
|
||||
// Konvergensi matematis menjamin seluruh 100 server Anycast akan memiliki state yang
|
||||
// konsisten dalam waktu kurang dari 50ms meskipun ada *network partition*.
|
||||
info!("X-Grid (CRDT): Gossiping Room [{}] state to global Anycast mesh...", room_id);
|
||||
#[derive(Debug)]
|
||||
pub enum GridError { NoWorkers(String), TaskFailed(String), WorkerDead(String) }
|
||||
impl std::fmt::Display for GridError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self { Self::NoWorkers(e) => write!(f, "No workers: {e}"), Self::TaskFailed(e) => write!(f, "Task: {e}"), Self::WorkerDead(e) => write!(f, "Dead: {e}") }
|
||||
}
|
||||
}
|
||||
impl std::error::Error for GridError {}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GridWorker { pub id: String, pub capacity: u32, pub current_load: u32, pub is_alive: bool, pub latency_ms: u32 }
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GridTask { pub id: String, pub weight: u32, pub data_size_bytes: u64, pub priority: u32 }
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Assignment { pub task_id: String, pub worker_id: String, pub score: f64 }
|
||||
|
||||
struct ScoredWorker { worker_id: String, score: f64 }
|
||||
impl PartialEq for ScoredWorker { fn eq(&self, other: &Self) -> bool { self.score == other.score } }
|
||||
impl Eq for ScoredWorker {}
|
||||
impl PartialOrd for ScoredWorker { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { Some(self.cmp(other)) } }
|
||||
impl Ord for ScoredWorker { fn cmp(&self, other: &Self) -> Ordering { self.score.partial_cmp(&other.score).unwrap_or(Ordering::Equal) } }
|
||||
|
||||
pub struct Grid { workers: HashMap<String, GridWorker> }
|
||||
impl Grid {
|
||||
pub fn new() -> Self { Self { workers: HashMap::new() } }
|
||||
pub fn add_worker(&mut self, w: GridWorker) { self.workers.insert(w.id.clone(), w); }
|
||||
pub fn remove_worker(&mut self, id: &str) { self.workers.remove(id); }
|
||||
|
||||
/// Assign task to best worker (least loaded, lowest latency, alive)
|
||||
pub fn assign(&self, task: &GridTask) -> Result<Assignment, GridError> {
|
||||
let mut heap = BinaryHeap::new();
|
||||
for w in self.workers.values() {
|
||||
if !w.is_alive { continue; }
|
||||
let free = w.capacity.saturating_sub(w.current_load) as f64;
|
||||
if free < task.weight as f64 { continue; }
|
||||
let score = free * 10.0 - w.latency_ms as f64 * 0.1 + task.priority as f64;
|
||||
heap.push(ScoredWorker { worker_id: w.id.clone(), score });
|
||||
}
|
||||
let best = heap.pop().ok_or_else(|| GridError::NoWorkers("All busy or dead".into()))?;
|
||||
Ok(Assignment { task_id: task.id.clone(), worker_id: best.worker_id, score: best.score })
|
||||
}
|
||||
|
||||
/// Batch assign: distribute tasks across workers
|
||||
pub fn assign_batch(&mut self, tasks: &[GridTask]) -> Result<Vec<Assignment>, GridError> {
|
||||
let mut assignments = Vec::new();
|
||||
for task in tasks {
|
||||
let a = self.assign(task)?;
|
||||
if let Some(w) = self.workers.get_mut(&a.worker_id) { w.current_load += task.weight; }
|
||||
assignments.push(a);
|
||||
}
|
||||
Ok(assignments)
|
||||
}
|
||||
|
||||
/// Rebalance: find overloaded workers and suggest moves
|
||||
pub fn rebalance(&self) -> Vec<(String, String, u32)> {
|
||||
let mut moves = Vec::new();
|
||||
let avg_load: f64 = self.workers.values().filter(|w| w.is_alive).map(|w| w.current_load as f64).sum::<f64>()
|
||||
/ self.workers.values().filter(|w| w.is_alive).count().max(1) as f64;
|
||||
for w in self.workers.values() {
|
||||
if !w.is_alive { continue; }
|
||||
if w.current_load as f64 > avg_load * 1.5 {
|
||||
let excess = w.current_load - avg_load as u32;
|
||||
if let Some(target) = self.workers.values().find(|t| t.is_alive && t.id != w.id && (t.current_load as f64) < avg_load * 0.8) {
|
||||
moves.push((w.id.clone(), target.id.clone(), excess));
|
||||
}
|
||||
}
|
||||
}
|
||||
moves
|
||||
}
|
||||
|
||||
pub fn alive_workers(&self) -> usize { self.workers.values().filter(|w| w.is_alive).count() }
|
||||
pub fn total_capacity(&self) -> u32 { self.workers.values().filter(|w| w.is_alive).map(|w| w.capacity - w.current_load).sum() }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[test]
|
||||
fn test_assign() {
|
||||
let mut g = Grid::new();
|
||||
g.add_worker(GridWorker { id: "w1".into(), capacity: 10, current_load: 2, is_alive: true, latency_ms: 5 });
|
||||
g.add_worker(GridWorker { id: "w2".into(), capacity: 10, current_load: 8, is_alive: true, latency_ms: 5 });
|
||||
let a = g.assign(&GridTask { id: "t1".into(), weight: 3, data_size_bytes: 100, priority: 1 }).unwrap();
|
||||
assert_eq!(a.worker_id, "w1");
|
||||
}
|
||||
#[test]
|
||||
fn test_batch() {
|
||||
let mut g = Grid::new();
|
||||
g.add_worker(GridWorker { id: "w1".into(), capacity: 100, current_load: 0, is_alive: true, latency_ms: 5 });
|
||||
let tasks: Vec<GridTask> = (0..5).map(|i| GridTask { id: format!("t{i}"), weight: 10, data_size_bytes: 100, priority: 1 }).collect();
|
||||
let result = g.assign_batch(&tasks).unwrap();
|
||||
assert_eq!(result.len(), 5);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user