|
|
|
@@ -0,0 +1,299 @@
|
|
|
|
|
// [TSM.ID].[11031972] — XCU QUIC/WebTransport Engine (Unified Transport)
|
|
|
|
|
// Room-aware broadcast + QCG Handshake + JWT Verify + Dual MoqRelayer/ROOMS routing
|
|
|
|
|
use anyhow::Result;
|
|
|
|
|
use quinn::{Endpoint, ServerConfig};
|
|
|
|
|
use rcgen::generate_simple_self_signed;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tracing::{info, warn, error};
|
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
use sha2::{Sha256, Digest};
|
|
|
|
|
use std::sync::OnceLock;
|
|
|
|
|
use std::path::Path;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use tokio::sync::{broadcast, Mutex};
|
|
|
|
|
|
|
|
|
|
pub static CERT_HASH: OnceLock<String> = OnceLock::new();
|
|
|
|
|
pub static TLS_MODE: OnceLock<String> = OnceLock::new();
|
|
|
|
|
pub static NEXUS_TX: OnceLock<tokio::sync::broadcast::Sender<Vec<u8>>> = OnceLock::new();
|
|
|
|
|
|
|
|
|
|
/// Shared type for ROOMS — same as main.rs WebSocket handler
|
|
|
|
|
pub type RoomsMap = Arc<Mutex<HashMap<String, broadcast::Sender<Vec<u8>>>>>;
|
|
|
|
|
|
|
|
|
|
/// Dual-Mode TLS Provider for XCU QUIC Engine
|
|
|
|
|
/// Mode 1: LETSENCRYPT (default) — reads from /etc/letsencrypt/live/{domain}/
|
|
|
|
|
/// Mode 2: SELFSIGNED — generates ephemeral certs via rcgen (for offline/sovereign operation)
|
|
|
|
|
///
|
|
|
|
|
/// Controlled by env var XCU_TLS_MODE: "LETSENCRYPT" (default) or "SELFSIGNED"
|
|
|
|
|
/// Domain controlled by env var XCU_DOMAIN: default "mesh.ultramodul.xyz"
|
|
|
|
|
|
|
|
|
|
fn load_letsencrypt_certs(domain: &str) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey)> {
|
|
|
|
|
let cert_path = format!("/etc/letsencrypt/live/{}/fullchain.pem", domain);
|
|
|
|
|
let key_path = format!("/etc/letsencrypt/live/{}/privkey.pem", domain);
|
|
|
|
|
|
|
|
|
|
info!("[TLS] Loading Let's Encrypt certificates from {}", cert_path);
|
|
|
|
|
|
|
|
|
|
if !Path::new(&cert_path).exists() || !Path::new(&key_path).exists() {
|
|
|
|
|
anyhow::bail!(
|
|
|
|
|
"Let's Encrypt certificates not found at {}. Run: certbot certonly --standalone -d {}",
|
|
|
|
|
cert_path, domain
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let cert_pem = std::fs::read(&cert_path)?;
|
|
|
|
|
let key_pem = std::fs::read(&key_path)?;
|
|
|
|
|
|
|
|
|
|
// Parse PEM → DER
|
|
|
|
|
let certs: Vec<rustls::Certificate> = rustls_pemfile::certs(&mut &cert_pem[..])
|
|
|
|
|
.filter_map(|r| r.ok())
|
|
|
|
|
.map(|der| rustls::Certificate(der.to_vec()))
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
let key = rustls_pemfile::private_key(&mut &key_pem[..])?
|
|
|
|
|
.ok_or_else(|| anyhow::anyhow!("No private key found in {}", key_path))?;
|
|
|
|
|
|
|
|
|
|
let private_key = rustls::PrivateKey(key.secret_der().to_vec());
|
|
|
|
|
|
|
|
|
|
// Compute SHA-256 hash of first cert for Quantum Trust
|
|
|
|
|
if let Some(first_cert) = certs.first() {
|
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
|
hasher.update(&first_cert.0);
|
|
|
|
|
let hash_hex = hex::encode(hasher.finalize());
|
|
|
|
|
info!("[TLS] QUANTUM TRUST HASH (Let's Encrypt): {}", hash_hex);
|
|
|
|
|
let _ = CERT_HASH.set(hash_hex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _ = TLS_MODE.set("LETSENCRYPT".to_string());
|
|
|
|
|
info!("[TLS] Let's Encrypt certificates loaded successfully for {}", domain);
|
|
|
|
|
Ok((certs, private_key))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn generate_selfsigned_certs(domain: &str) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey)> {
|
|
|
|
|
warn!("[TLS] Generating ephemeral self-signed certificates (Sovereign Mode)");
|
|
|
|
|
let subject_alt_names = vec![
|
|
|
|
|
"localhost".to_string(),
|
|
|
|
|
domain.to_string(),
|
|
|
|
|
"xc.ultramodul.xyz".to_string(),
|
|
|
|
|
];
|
|
|
|
|
let cert = generate_simple_self_signed(subject_alt_names)?;
|
|
|
|
|
let cert_der = cert.serialize_der()?;
|
|
|
|
|
let priv_key_der = cert.serialize_private_key_der();
|
|
|
|
|
|
|
|
|
|
// Compute SHA-256 hash
|
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
|
hasher.update(&cert_der);
|
|
|
|
|
let hash_hex = hex::encode(hasher.finalize());
|
|
|
|
|
warn!("[TLS] QUANTUM TRUST HASH (Self-Signed): {}", hash_hex);
|
|
|
|
|
let _ = CERT_HASH.set(hash_hex);
|
|
|
|
|
let _ = TLS_MODE.set("SELFSIGNED".to_string());
|
|
|
|
|
|
|
|
|
|
let cert_chain = vec![rustls::Certificate(cert_der)];
|
|
|
|
|
let key = rustls::PrivateKey(priv_key_der);
|
|
|
|
|
|
|
|
|
|
warn!("[TLS] Self-signed certificates generated. Browser trust requires manual CA install.");
|
|
|
|
|
Ok((cert_chain, key))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Start QUIC Listener — Unified with ROOMS broadcast channel
|
|
|
|
|
/// Now accepts `rooms` parameter to share broadcast channels with WebSocket handler.
|
|
|
|
|
/// Also accepts `jwt_secret` for QCG handshake verification.
|
|
|
|
|
pub async fn start_quic_listener(
|
|
|
|
|
addr: &str,
|
|
|
|
|
rooms: RoomsMap,
|
|
|
|
|
jwt_secret: String,
|
|
|
|
|
moq_relayer: Arc<xcu_sfu::moq::MoqRelayer>,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let domain = std::env::var("XCU_DOMAIN").unwrap_or_else(|_| "mesh.ultramodul.xyz".to_string());
|
|
|
|
|
let tls_mode = std::env::var("XCU_TLS_MODE").unwrap_or_else(|_| "LETSENCRYPT".to_string());
|
|
|
|
|
|
|
|
|
|
info!("===================================================");
|
|
|
|
|
info!(" XCU QUIC/WebTransport Engine — Unified Transport ");
|
|
|
|
|
info!(" Mode: {} | Domain: {} | Bind: {}",tls_mode, domain, addr);
|
|
|
|
|
info!("===================================================");
|
|
|
|
|
|
|
|
|
|
// Resolve TLS credentials based on mode
|
|
|
|
|
let (cert_chain, key) = match tls_mode.as_str() {
|
|
|
|
|
"SELFSIGNED" => {
|
|
|
|
|
warn!("[TLS] Sovereign Mode activated — Self-signed certificates");
|
|
|
|
|
generate_selfsigned_certs(&domain)?
|
|
|
|
|
}
|
|
|
|
|
_ => {
|
|
|
|
|
// Default: LETSENCRYPT — with automatic fallback to SELFSIGNED
|
|
|
|
|
match load_letsencrypt_certs(&domain) {
|
|
|
|
|
Ok(creds) => creds,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("[TLS] Let's Encrypt load failed: {}. Auto-fallback to Self-Signed.", e);
|
|
|
|
|
generate_selfsigned_certs(&domain)?
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Setup ServerConfig Quinn
|
|
|
|
|
let mut server_crypto = rustls::ServerConfig::builder()
|
|
|
|
|
.with_safe_defaults()
|
|
|
|
|
.with_no_client_auth()
|
|
|
|
|
.with_single_cert(cert_chain, key)?;
|
|
|
|
|
server_crypto.alpn_protocols = vec![b"h3".to_vec()]; // WebTransport over HTTP/3
|
|
|
|
|
|
|
|
|
|
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
|
|
|
|
|
|
|
|
|
|
// Bind QUIC Endpoint
|
|
|
|
|
let parsed_addr: SocketAddr = addr.parse()?;
|
|
|
|
|
let endpoint = Endpoint::server(server_config, parsed_addr)?;
|
|
|
|
|
|
|
|
|
|
info!("[QUIC] WebTransport Engine ONLINE at {} (TLS: {})",
|
|
|
|
|
addr, TLS_MODE.get().unwrap_or(&"UNKNOWN".to_string()));
|
|
|
|
|
|
|
|
|
|
// Accept Loop
|
|
|
|
|
while let Some(incoming) = endpoint.accept().await {
|
|
|
|
|
let remote = incoming.remote_address();
|
|
|
|
|
info!("[QUIC] Incoming connection from {}", remote);
|
|
|
|
|
let rooms_clone = rooms.clone();
|
|
|
|
|
let moq_clone = moq_relayer.clone();
|
|
|
|
|
let jwt_secret_clone = jwt_secret.clone();
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
match incoming.await {
|
|
|
|
|
Ok(conn) => {
|
|
|
|
|
info!("[QUIC] Connection established: {}", conn.remote_address());
|
|
|
|
|
handle_quic_connection(conn, rooms_clone, jwt_secret_clone, moq_clone).await;
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!("[QUIC] Connection failed: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle a single QUIC connection with QCG handshake + room-aware broadcast
|
|
|
|
|
async fn handle_quic_connection(
|
|
|
|
|
conn: quinn::Connection,
|
|
|
|
|
rooms: RoomsMap,
|
|
|
|
|
_jwt_secret: String, // Reserved for QCG JWT verification (Phase 2)
|
|
|
|
|
moq_relayer: Arc<xcu_sfu::moq::MoqRelayer>,
|
|
|
|
|
) {
|
|
|
|
|
let remote = conn.remote_address();
|
|
|
|
|
|
|
|
|
|
// =========================================================
|
|
|
|
|
// STEP 1: Wait for registration datagram to identify room
|
|
|
|
|
// Client sends: [99, 0, pId_lo, pId_hi, room_len, ...room_bytes]
|
|
|
|
|
// =========================================================
|
|
|
|
|
let (participant_id, room_name) = match conn.read_datagram().await {
|
|
|
|
|
Ok(datagram) => {
|
|
|
|
|
if datagram.len() >= 5 && datagram[0] == 99 {
|
|
|
|
|
let pid = u16::from_le_bytes([datagram[2], datagram[3]]);
|
|
|
|
|
let room_len = datagram[4] as usize;
|
|
|
|
|
if datagram.len() >= 5 + room_len {
|
|
|
|
|
let room = String::from_utf8_lossy(&datagram[5..5 + room_len]).to_string();
|
|
|
|
|
info!("[QUIC] Registration: participant={} room={} from {}", pid, room, remote);
|
|
|
|
|
(pid, room)
|
|
|
|
|
} else {
|
|
|
|
|
warn!("[QUIC] Invalid registration datagram (room truncated) from {}", remote);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
warn!("[QUIC] Expected registration frame (type=99), got type={} from {}",
|
|
|
|
|
datagram.first().unwrap_or(&0), remote);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!("[QUIC] Failed to read registration datagram from {}: {}", remote, e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// =========================================================
|
|
|
|
|
// STEP 2: Join ROOMS broadcast channel (shared with WS)
|
|
|
|
|
// =========================================================
|
|
|
|
|
let tx = {
|
|
|
|
|
let mut rooms_guard = rooms.lock().await;
|
|
|
|
|
let tx = rooms_guard.entry(room_name.clone()).or_insert_with(|| {
|
|
|
|
|
let (tx, _rx) = broadcast::channel::<Vec<u8>>(1000);
|
|
|
|
|
tx
|
|
|
|
|
});
|
|
|
|
|
tx.clone()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut rx = tx.subscribe();
|
|
|
|
|
let subscriber_count = tx.receiver_count();
|
|
|
|
|
|
|
|
|
|
info!("[QUIC] Participant {} joined room '{}'. Total subscribers: {}",
|
|
|
|
|
participant_id, room_name, subscriber_count);
|
|
|
|
|
|
|
|
|
|
// =========================================================
|
|
|
|
|
// STEP 3: Dual-loop — datagram relay + broadcast receive
|
|
|
|
|
// =========================================================
|
|
|
|
|
let conn_send = conn.clone();
|
|
|
|
|
let conn_recv = conn.clone();
|
|
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
let pid = participant_id;
|
|
|
|
|
|
|
|
|
|
// Task A: Read datagrams from QUIC client → broadcast to ROOMS
|
|
|
|
|
let mut recv_task = tokio::spawn(async move {
|
|
|
|
|
loop {
|
|
|
|
|
match conn_recv.read_datagram().await {
|
|
|
|
|
Ok(datagram) => {
|
|
|
|
|
if datagram.len() < 4 { continue; }
|
|
|
|
|
let frame_type = datagram[0];
|
|
|
|
|
|
|
|
|
|
// Bandwidth telemetry (type 10 with special quality byte)
|
|
|
|
|
if frame_type == 10 && datagram.len() >= 4 {
|
|
|
|
|
let score = datagram[1];
|
|
|
|
|
moq_relayer.update_bandwidth_score(pid, score);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Broadcast to all subscribers (WS + QUIC) via ROOMS
|
|
|
|
|
let _ = tx_clone.send(datagram.to_vec());
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
info!("[QUIC] Datagram stream closed for participant {}: {}", pid, e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Task B: Receive broadcasts from ROOMS → send datagrams to QUIC client
|
|
|
|
|
let room_name_for_send = room_name.clone();
|
|
|
|
|
let mut send_task = tokio::spawn(async move {
|
|
|
|
|
loop {
|
|
|
|
|
match rx.recv().await {
|
|
|
|
|
Ok(msg) => {
|
|
|
|
|
// Skip own frames (same as handleIncomingFrame self-skip on client)
|
|
|
|
|
if msg.len() >= 4 {
|
|
|
|
|
let sender_id = u16::from_le_bytes([msg[2], msg[3]]);
|
|
|
|
|
if sender_id == pid {
|
|
|
|
|
continue; // Don't echo own frames back
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send via QUIC datagram (zero head-of-line blocking)
|
|
|
|
|
if conn_send.send_datagram(msg.into()).is_err() {
|
|
|
|
|
info!("[QUIC] Failed to send datagram to participant {}", pid);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
|
|
|
warn!("[QUIC] Participant {} lagged {} messages", pid, n);
|
|
|
|
|
// Continue — broadcast::Receiver auto-skips lagged messages
|
|
|
|
|
}
|
|
|
|
|
Err(broadcast::error::RecvError::Closed) => {
|
|
|
|
|
info!("[QUIC] Room '{}' broadcast closed", room_name_for_send);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Wait for either task to finish, then abort the other
|
|
|
|
|
tokio::select! {
|
|
|
|
|
_ = (&mut send_task) => recv_task.abort(),
|
|
|
|
|
_ = (&mut recv_task) => send_task.abort(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
info!("[QUIC] Participant {} disconnected from room '{}'", participant_id, room_name);
|
|
|
|
|
}
|