How To Optimize - ruvnet/ruv-FANN GitHub Wiki
A comprehensive guide to optimizing neural network performance, swarm coordination, and system resources in ruv-FANN.
ruv-FANN offers multiple optimization layers:
- Neural Network Optimization: Model architecture and training improvements
- SIMD Optimization: CPU vectorization for faster computations
- Memory Optimization: Efficient memory usage and garbage collection
- Swarm Optimization: Distributed processing coordination
- System Optimization: OS-level and hardware-specific tuning
use ruv_fann::prelude::*;
use micro_core::{RootVector, RootSpace};
// Optimal network architecture for different use cases
fn create_optimized_network(use_case: NetworkUseCase) -> Result<NeuralNetwork> {
match use_case {
NetworkUseCase::FastInference => {
// Optimized for speed - fewer layers, more width
NetworkBuilder::new()
.input_layer(784)
.hidden_layer(256, ActivationFunction::ReLU) // Single wide layer
.output_layer(10, ActivationFunction::SoftMax)
.optimizer(OptimizerType::Adam)
.learning_rate(0.001)
.build()
}
NetworkUseCase::HighAccuracy => {
// Optimized for accuracy - more layers, dropout
NetworkBuilder::new()
.input_layer(784)
.hidden_layer(512, ActivationFunction::GELU)
.dropout_layer(0.1)
.hidden_layer(256, ActivationFunction::GELU)
.dropout_layer(0.1)
.hidden_layer(128, ActivationFunction::GELU)
.output_layer(10, ActivationFunction::SoftMax)
.optimizer(OptimizerType::AdamW)
.learning_rate(0.0001)
.build()
}
NetworkUseCase::LowMemory => {
// Optimized for memory usage - narrow deep networks
NetworkBuilder::new()
.input_layer(784)
.hidden_layer(64, ActivationFunction::ReLU)
.hidden_layer(32, ActivationFunction::ReLU)
.hidden_layer(16, ActivationFunction::ReLU)
.output_layer(10, ActivationFunction::SoftMax)
.optimizer(OptimizerType::SGD)
.learning_rate(0.01)
.build()
}
}
}
// Performance comparison of activation functions
fn benchmark_activations() -> Result<()> {
let input_sizes = vec![128, 256, 512, 1024];
let activation_functions = vec![
(ActivationFunction::ReLU, "ReLU - Fastest"),
(ActivationFunction::LeakyReLU, "LeakyReLU - Good balance"),
(ActivationFunction::GELU, "GELU - Best accuracy"),
(ActivationFunction::Swish, "Swish - Modern choice"),
];
for input_size in input_sizes {
println!("Benchmarking activation functions for input size: {}", input_size);
for (activation, description) in &activation_functions {
let start = std::time::Instant::now();
let network = NetworkBuilder::new()
.input_layer(input_size)
.hidden_layer(input_size / 2, *activation)
.output_layer(input_size / 4, ActivationFunction::Linear)
.build()?;
// Simulate 1000 forward passes
let input = vec![0.5; input_size];
for _ in 0..1000 {
let _ = network.predict(&input)?;
}
let duration = start.elapsed();
println!(" {}: {:?}", description, duration);
}
println!();
}
Ok(())
}
// Result: ReLU is ~3x faster than GELU but GELU provides better accuracy
// Recommendation: Use ReLU for inference, GELU for training
use micro_swarm::{SwarmOrchestrator, BatchProcessor};
struct OptimalBatchSizer {
memory_limit: usize,
model_params: usize,
gradient_accumulation: bool,
}
impl OptimalBatchSizer {
fn calculate_optimal_batch_size(&self, dataset_size: usize) -> usize {
// Calculate based on available memory
let memory_per_sample = self.model_params * 4; // 4 bytes per f32
let max_batch_by_memory = self.memory_limit / memory_per_sample;
// Sweet spot for SGD convergence (typically 32-256)
let convergence_optimal = (dataset_size as f32).sqrt() as usize;
let convergence_optimal = convergence_optimal.clamp(32, 256);
// GPU utilization optimal (power of 2, typically 64-512)
let gpu_optimal = 128; // Good balance for most GPUs
// Choose minimum to avoid memory issues
let optimal = max_batch_by_memory.min(convergence_optimal).min(gpu_optimal);
// Ensure power of 2 for better SIMD utilization
optimal.next_power_of_two() / 2
}
async fn adaptive_batch_training<T: Dataset>(
&self,
network: &mut NeuralNetwork,
dataset: &T,
orchestrator: &mut SwarmOrchestrator,
) -> Result<TrainingMetrics> {
let initial_batch_size = self.calculate_optimal_batch_size(dataset.len());
let mut current_batch_size = initial_batch_size;
let mut best_throughput = 0.0;
println!("Starting adaptive batch size optimization...");
// Test different batch sizes
for multiplier in [1, 2, 4, 8] {
let test_batch_size = initial_batch_size * multiplier;
if test_batch_size * self.model_params * 4 > self.memory_limit {
break; // Would exceed memory limit
}
let start = std::time::Instant::now();
// Train on subset with this batch size
let mut trainer = NetworkTrainer::new()
.batch_size(test_batch_size)
.epochs(5) // Quick test
.early_stopping(false);
if orchestrator.agent_count() > 1 {
trainer = trainer.distributed_training(true);
}
let subset = dataset.subset(0, test_batch_size * 10); // 10 batches
let metrics = trainer.train(network, &subset).await?;
let duration = start.elapsed();
let throughput = (test_batch_size * 10) as f64 / duration.as_secs_f64();
println!("Batch size {}: {:.2} samples/sec", test_batch_size, throughput);
if throughput > best_throughput {
best_throughput = throughput;
current_batch_size = test_batch_size;
}
}
println!("Optimal batch size: {}", current_batch_size);
// Final training with optimal batch size
let trainer = NetworkTrainer::new()
.batch_size(current_batch_size)
.epochs(100)
.early_stopping(true)
.distributed_training(orchestrator.agent_count() > 1);
trainer.train(network, dataset).await
}
}
#[derive(Clone)]
pub enum LearningRateSchedule {
Constant(f32),
Linear { start: f32, end: f32 },
Exponential { start: f32, decay: f32 },
CosineAnnealing { max_lr: f32, min_lr: f32, cycle_length: usize },
OnePeriodCycle { max_lr: f32, div_factor: f32, final_div_factor: f32 },
}
impl LearningRateSchedule {
fn get_lr(&self, epoch: usize, total_epochs: usize) -> f32 {
match self {
Self::Constant(lr) => *lr,
Self::Linear { start, end } => {
start + (end - start) * (epoch as f32 / total_epochs as f32)
}
Self::Exponential { start, decay } => {
start * decay.powi(epoch as i32)
}
Self::CosineAnnealing { max_lr, min_lr, cycle_length } => {
let cycle_pos = (epoch % cycle_length) as f32 / *cycle_length as f32;
min_lr + (max_lr - min_lr) * (1.0 + (std::f32::consts::PI * cycle_pos).cos()) / 2.0
}
Self::OnePeriodCycle { max_lr, div_factor, final_div_factor } => {
let progress = epoch as f32 / total_epochs as f32;
if progress < 0.3 {
// Warmup phase
(max_lr / div_factor) + ((max_lr - max_lr / div_factor) * progress / 0.3)
} else if progress < 0.8 {
// High learning rate phase
*max_lr
} else {
// Cool down phase
max_lr / final_div_factor
}
}
}
}
}
// Usage example with automatic lr finding
async fn find_optimal_learning_rate(
network: &mut NeuralNetwork,
dataset: &impl Dataset,
) -> Result<f32> {
let mut lr_finder = LearningRateFinder::new()
.start_lr(1e-8)
.end_lr(1e-1)
.num_iterations(100);
let lr_history = lr_finder.find(network, dataset).await?;
// Find lr with steepest decline in loss
let optimal_lr = lr_history.iter()
.zip(lr_history.iter().skip(1))
.enumerate()
.min_by(|(_, (loss1, lr1)), (_, (loss2, lr2))| {
let gradient1 = (loss2 - loss1) / (lr2.ln() - lr1.ln());
let gradient2 = if let Some((loss3, lr3)) = lr_history.get(lr_history.len() - 1) {
(loss3 - loss2) / (lr3.ln() - lr2.ln())
} else {
gradient1
};
gradient1.partial_cmp(&gradient2).unwrap()
})
.map(|(i, (_, lr))| *lr)
.unwrap_or(1e-3);
println!("Optimal learning rate found: {:.2e}", optimal_lr);
Ok(optimal_lr)
}
// src/simd/optimized_ops.rs
use std::arch::x86_64::*;
#[inline(always)]
pub fn simd_dot_product(a: &[f32], b: &[f32]) -> f32 {
assert_eq!(a.len(), b.len());
assert_eq!(a.len() % 8, 0, "Vector length must be multiple of 8 for AVX");
unsafe {
let mut sum = _mm256_setzero_ps();
for i in (0..a.len()).step_by(8) {
let va = _mm256_loadu_ps(a.as_ptr().add(i));
let vb = _mm256_loadu_ps(b.as_ptr().add(i));
let prod = _mm256_mul_ps(va, vb);
sum = _mm256_add_ps(sum, prod);
}
// Horizontal sum
let sum_high = _mm256_extractf128_ps(sum, 1);
let sum_low = _mm256_castps256_ps128(sum);
let sum128 = _mm_add_ps(sum_high, sum_low);
let sum64 = _mm_add_ps(sum128, _mm_movehl_ps(sum128, sum128));
let sum32 = _mm_add_ss(sum64, _mm_shuffle_ps(sum64, sum64, 0x55));
_mm_cvtss_f32(sum32)
}
}
#[inline(always)]
pub fn simd_vector_add(a: &[f32], b: &[f32], result: &mut [f32]) {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), result.len());
assert_eq!(a.len() % 8, 0);
unsafe {
for i in (0..a.len()).step_by(8) {
let va = _mm256_loadu_ps(a.as_ptr().add(i));
let vb = _mm256_loadu_ps(b.as_ptr().add(i));
let sum = _mm256_add_ps(va, vb);
_mm256_storeu_ps(result.as_mut_ptr().add(i), sum);
}
}
}
#[inline(always)]
pub fn simd_activation_relu(input: &[f32], output: &mut [f32]) {
assert_eq!(input.len(), output.len());
assert_eq!(input.len() % 8, 0);
unsafe {
let zero = _mm256_setzero_ps();
for i in (0..input.len()).step_by(8) {
let x = _mm256_loadu_ps(input.as_ptr().add(i));
let result = _mm256_max_ps(x, zero);
_mm256_storeu_ps(output.as_mut_ptr().add(i), result);
}
}
}
// Benchmark SIMD vs scalar operations
pub fn benchmark_simd_operations() {
use std::time::Instant;
let sizes = vec![128, 256, 512, 1024, 2048, 4096];
for size in sizes {
let a: Vec<f32> = (0..size).map(|i| i as f32 * 0.01).collect();
let b: Vec<f32> = (0..size).map(|i| (i + size) as f32 * 0.01).collect();
// Scalar dot product
let start = Instant::now();
for _ in 0..10000 {
let _: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
}
let scalar_time = start.elapsed();
// SIMD dot product
let start = Instant::now();
for _ in 0..10000 {
let _ = simd_dot_product(&a, &b);
}
let simd_time = start.elapsed();
let speedup = scalar_time.as_nanos() as f64 / simd_time.as_nanos() as f64;
println!("Size {}: Scalar {:?}, SIMD {:?}, Speedup: {:.2}x",
size, scalar_time, simd_time, speedup);
}
}
use std::arch::x86_64::*;
pub struct CpuFeatures {
pub has_avx: bool,
pub has_avx2: bool,
pub has_fma: bool,
pub has_avx512f: bool,
}
impl CpuFeatures {
pub fn detect() -> Self {
if is_x86_feature_detected!("avx") &&
is_x86_feature_detected!("avx2") &&
is_x86_feature_detected!("fma") {
if is_x86_feature_detected!("avx512f") {
println!("CPU: AVX512 detected - using highest performance path");
CpuFeatures { has_avx: true, has_avx2: true, has_fma: true, has_avx512f: true }
} else {
println!("CPU: AVX2+FMA detected - using high performance path");
CpuFeatures { has_avx: true, has_avx2: true, has_fma: true, has_avx512f: false }
}
} else if is_x86_feature_detected!("avx") {
println!("CPU: AVX detected - using medium performance path");
CpuFeatures { has_avx: true, has_avx2: false, has_fma: false, has_avx512f: false }
} else {
println!("CPU: Using scalar operations - consider upgrading CPU for better performance");
CpuFeatures { has_avx: false, has_avx2: false, has_fma: false, has_avx512f: false }
}
}
pub fn optimal_vector_size(&self) -> usize {
if self.has_avx512f { 16 } // 512-bit vectors = 16 f32s
else if self.has_avx2 { 8 } // 256-bit vectors = 8 f32s
else if self.has_avx { 8 } // 256-bit vectors = 8 f32s
else { 4 } // 128-bit SSE = 4 f32s
}
pub fn select_implementation<T>(&self, avx512: T, avx2: T, avx: T, scalar: T) -> T {
if self.has_avx512f { avx512 }
else if self.has_avx2 { avx2 }
else if self.has_avx { avx }
else { scalar }
}
}
// Usage in network operations
impl NeuralNetwork {
fn optimized_forward_pass(&self, input: &[f32]) -> Result<Vec<f32>> {
let cpu_features = CpuFeatures::detect();
cpu_features.select_implementation(
|| self.forward_pass_avx512(input), // Fastest
|| self.forward_pass_avx2(input), // Fast
|| self.forward_pass_avx(input), // Medium
|| self.forward_pass_scalar(input) // Fallback
)
}
}
// Structure of Arrays (SoA) for better cache performance
#[derive(Debug, Clone)]
pub struct OptimizedNeuronLayer {
weights: Vec<f32>, // All weights together
biases: Vec<f32>, // All biases together
outputs: Vec<f32>, // All outputs together
gradients: Vec<f32>, // All gradients together
layer_size: usize,
input_size: usize,
}
impl OptimizedNeuronLayer {
pub fn new(input_size: usize, layer_size: usize) -> Self {
// Align memory to cache line boundaries (64 bytes)
let total_weights = input_size * layer_size;
Self {
weights: vec![0.0; total_weights],
biases: vec![0.0; layer_size],
outputs: vec![0.0; layer_size],
gradients: vec![0.0; total_weights],
layer_size,
input_size,
}
}
// Cache-optimized matrix multiplication
pub fn forward_pass_optimized(&mut self, input: &[f32]) -> &[f32] {
// Block matrix multiplication for better cache locality
const BLOCK_SIZE: usize = 64; // Tuned for L1 cache
// Zero outputs
self.outputs.fill(0.0);
// Blocked matrix multiplication
for i_block in (0..self.layer_size).step_by(BLOCK_SIZE) {
let i_end = (i_block + BLOCK_SIZE).min(self.layer_size);
for k_block in (0..self.input_size).step_by(BLOCK_SIZE) {
let k_end = (k_block + BLOCK_SIZE).min(self.input_size);
for i in i_block..i_end {
for k in k_block..k_end {
self.outputs[i] += self.weights[i * self.input_size + k] * input[k];
}
}
}
}
// Add biases
for i in 0..self.layer_size {
self.outputs[i] += self.biases[i];
}
&self.outputs
}
}
// Memory pool for reducing allocations
pub struct MemoryPool {
f32_buffers: Vec<Vec<f32>>,
buffer_sizes: Vec<usize>,
in_use: Vec<bool>,
}
impl MemoryPool {
pub fn new() -> Self {
Self {
f32_buffers: Vec::new(),
buffer_sizes: Vec::new(),
in_use: Vec::new(),
}
}
pub fn get_buffer(&mut self, size: usize) -> PooledBuffer {
// Find existing buffer of suitable size
for (i, (&buf_size, &in_use)) in self.buffer_sizes.iter().zip(&self.in_use).enumerate() {
if !in_use && buf_size >= size {
self.in_use[i] = true;
return PooledBuffer {
pool: self as *mut MemoryPool,
index: i,
buffer: &mut self.f32_buffers[i][..size],
};
}
}
// Create new buffer
let index = self.f32_buffers.len();
self.f32_buffers.push(vec![0.0; size]);
self.buffer_sizes.push(size);
self.in_use.push(true);
PooledBuffer {
pool: self as *mut MemoryPool,
index,
buffer: unsafe { std::slice::from_raw_parts_mut(self.f32_buffers[index].as_mut_ptr(), size) },
}
}
}
pub struct PooledBuffer<'a> {
pool: *mut MemoryPool,
index: usize,
buffer: &'a mut [f32],
}
impl<'a> Drop for PooledBuffer<'a> {
fn drop(&mut self) {
unsafe {
(*self.pool).in_use[self.index] = false;
}
}
}
use micro_swarm::{TopologyType, SwarmMetrics, CoordinationLatency};
pub struct TopologyOptimizer {
metrics_history: Vec<SwarmMetrics>,
current_topology: TopologyType,
optimization_interval: Duration,
}
impl TopologyOptimizer {
pub async fn optimize_topology(
&mut self,
orchestrator: &mut SwarmOrchestrator,
workload: &WorkloadCharacteristics,
) -> Result<TopologyType> {
let candidate_topologies = vec![
TopologyType::Mesh,
TopologyType::Hierarchical,
TopologyType::Ring,
TopologyType::Star,
];
let mut best_topology = self.current_topology;
let mut best_performance = 0.0;
for topology in candidate_topologies {
println!("Testing topology: {:?}", topology);
// Temporarily switch topology
orchestrator.reconfigure_topology(topology).await?;
// Run benchmark workload
let start = Instant::now();
let metrics = self.run_benchmark_workload(orchestrator, workload).await?;
let duration = start.elapsed();
// Calculate performance score
let performance_score = self.calculate_performance_score(&metrics, duration);
println!(" Performance score: {:.2}", performance_score);
if performance_score > best_performance {
best_performance = performance_score;
best_topology = topology;
}
}
// Apply best topology
if best_topology != self.current_topology {
println!("Switching to optimal topology: {:?}", best_topology);
orchestrator.reconfigure_topology(best_topology).await?;
self.current_topology = best_topology;
}
Ok(best_topology)
}
fn calculate_performance_score(&self, metrics: &SwarmMetrics, duration: Duration) -> f64 {
let coordination_efficiency = 1.0 / (metrics.average_coordination_latency.as_secs_f64() + 0.001);
let throughput = metrics.tasks_completed as f64 / duration.as_secs_f64();
let resource_efficiency = 1.0 - metrics.resource_utilization_variance;
let fault_tolerance = 1.0 - (metrics.failed_coordinations as f64 / metrics.total_coordinations as f64);
// Weighted score
coordination_efficiency * 0.3 +
throughput * 0.4 +
resource_efficiency * 0.2 +
fault_tolerance * 0.1
}
async fn run_benchmark_workload(
&self,
orchestrator: &mut SwarmOrchestrator,
workload: &WorkloadCharacteristics,
) -> Result<SwarmMetrics> {
let benchmark_tasks = self.generate_benchmark_tasks(workload);
let start_time = Instant::now();
for task in benchmark_tasks {
orchestrator.submit_task(task).await?;
}
// Wait for completion
while orchestrator.pending_tasks() > 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
orchestrator.collect_metrics(start_time)
}
}
pub struct LoadBalancer {
agent_loads: HashMap<AgentId, AgentLoad>,
load_threshold: f64,
rebalance_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct AgentLoad {
cpu_usage: f64,
memory_usage: f64,
task_queue_size: usize,
average_task_duration: Duration,
error_rate: f64,
}
impl LoadBalancer {
pub async fn rebalance_agents(
&mut self,
orchestrator: &mut SwarmOrchestrator,
) -> Result<RebalanceResult> {
let current_loads = self.collect_agent_loads(orchestrator).await?;
// Identify overloaded and underloaded agents
let mut overloaded = Vec::new();
let mut underloaded = Vec::new();
for (agent_id, load) in ¤t_loads {
let load_score = self.calculate_load_score(load);
if load_score > self.load_threshold {
overloaded.push((*agent_id, load_score));
} else if load_score < self.load_threshold * 0.5 {
underloaded.push((*agent_id, load_score));
}
}
// Sort by load intensity
overloaded.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
underloaded.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
let mut rebalanced_tasks = 0;
// Migrate tasks from overloaded to underloaded agents
for (overloaded_agent, _) in overloaded.iter().take(3) { // Top 3 overloaded
for (underloaded_agent, _) in underloaded.iter().take(2) { // Top 2 underloaded
let tasks_to_migrate = self.calculate_optimal_migration_count(
¤t_loads[overloaded_agent],
¤t_loads[underloaded_agent]
);
if tasks_to_migrate > 0 {
orchestrator.migrate_tasks(*overloaded_agent, *underloaded_agent, tasks_to_migrate).await?;
rebalanced_tasks += tasks_to_migrate;
}
}
}
// Spawn new agents if all agents are overloaded
if overloaded.len() > current_loads.len() / 2 {
let new_agents = self.calculate_required_agents(¤t_loads);
for agent_type in new_agents {
orchestrator.spawn_agent(agent_type, "load-balancing").await?;
}
}
// Terminate underutilized agents if many are idle
if underloaded.len() > current_loads.len() * 2 / 3 {
let agents_to_terminate = underloaded.len() / 3;
for (agent_id, _) in underloaded.iter().take(agents_to_terminate) {
orchestrator.graceful_shutdown_agent(*agent_id).await?;
}
}
Ok(RebalanceResult {
tasks_migrated: rebalanced_tasks,
agents_spawned: 0, // Would be calculated based on actual spawning
agents_terminated: 0, // Would be calculated based on actual termination
})
}
fn calculate_load_score(&self, load: &AgentLoad) -> f64 {
// Weighted load calculation
let cpu_weight = 0.4;
let memory_weight = 0.3;
let queue_weight = 0.2;
let error_weight = 0.1;
load.cpu_usage * cpu_weight +
load.memory_usage * memory_weight +
(load.task_queue_size as f64 / 100.0).min(1.0) * queue_weight +
load.error_rate * error_weight
}
}
use serde::{Serialize, Deserialize};
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
#[derive(Serialize, Deserialize)]
pub struct OptimizedMessage {
pub agent_id: AgentId,
pub message_type: MessageType,
pub compressed_payload: Vec<u8>,
pub batch_size: usize,
pub timestamp: u64,
}
pub struct MessageOptimizer {
compression_threshold: usize,
batch_size: usize,
batch_timeout: Duration,
pending_messages: Vec<Message>,
last_batch_time: Instant,
}
impl MessageOptimizer {
pub fn optimize_message(&mut self, message: Message) -> Option<OptimizedMessage> {
self.pending_messages.push(message);
let should_send_batch =
self.pending_messages.len() >= self.batch_size ||
self.last_batch_time.elapsed() > self.batch_timeout;
if should_send_batch {
self.create_optimized_batch()
} else {
None
}
}
fn create_optimized_batch(&mut self) -> Option<OptimizedMessage> {
if self.pending_messages.is_empty() {
return None;
}
// Serialize batch
let batch_data = bincode::serialize(&self.pending_messages).ok()?;
// Compress if beneficial
let compressed_payload = if batch_data.len() > self.compression_threshold {
compress_prepend_size(&batch_data)
} else {
batch_data
};
let batch_size = self.pending_messages.len();
let agent_id = self.pending_messages[0].sender_id;
self.pending_messages.clear();
self.last_batch_time = Instant::now();
Some(OptimizedMessage {
agent_id,
message_type: MessageType::Batch,
compressed_payload,
batch_size,
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
})
}
pub fn decompress_batch(&self, optimized_msg: &OptimizedMessage) -> Result<Vec<Message>, Box<dyn std::error::Error>> {
// Decompress if needed
let decompressed = if optimized_msg.compressed_payload[0] == 0x04 { // LZ4 magic
decompress_size_prepended(&optimized_msg.compressed_payload)?
} else {
optimized_msg.compressed_payload.clone()
};
// Deserialize batch
let messages: Vec<Message> = bincode::deserialize(&decompressed)?;
Ok(messages)
}
}
// Benchmark compression effectiveness
pub fn benchmark_message_compression() {
let sample_messages: Vec<Message> = (0..100).map(|i| Message {
sender_id: AgentId::new(i % 10),
recipient_id: AgentId::new((i + 1) % 10),
message_type: MessageType::TaskResult,
payload: vec![0.1f32; 1000], // Typical neural network output
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
}).collect();
let original_size = bincode::serialize(&sample_messages).unwrap().len();
let compressed_size = compress_prepend_size(&bincode::serialize(&sample_messages).unwrap()).len();
let compression_ratio = original_size as f64 / compressed_size as f64;
println!("Message compression: {} -> {} bytes ({:.2}x reduction)",
original_size, compressed_size, compression_ratio);
}
#!/bin/bash
# system-optimization.sh
echo "Optimizing Linux system for ruv-FANN performance..."
# Network optimizations
echo 'net.core.rmem_max = 268435456' >> /etc/sysctl.conf
echo 'net.core.wmem_max = 268435456' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_rmem = 4096 87380 268435456' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_wmem = 4096 65536 268435456' >> /etc/sysctl.conf
echo 'net.core.netdev_max_backlog = 5000' >> /etc/sysctl.conf
# Memory optimizations
echo 'vm.swappiness = 1' >> /etc/sysctl.conf
echo 'vm.dirty_ratio = 15' >> /etc/sysctl.conf
echo 'vm.dirty_background_ratio = 5' >> /etc/sysctl.conf
echo 'kernel.shmmax = 68719476736' >> /etc/sysctl.conf
echo 'kernel.shmall = 4294967296' >> /etc/sysctl.conf
# CPU optimizations
echo 'kernel.sched_migration_cost_ns = 5000000' >> /etc/sysctl.conf
echo 'kernel.sched_autogroup_enabled = 0' >> /etc/sysctl.conf
# Apply settings
sysctl -p
# CPU governor settings for maximum performance
echo performance > /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
# Disable CPU power saving features
echo 1 > /sys/devices/system/cpu/intel_pstate/disable_turbo
# NUMA optimizations
echo always > /sys/kernel/mm/transparent_hugepage/enabled
echo madvise > /sys/kernel/mm/transparent_hugepage/defrag
echo "System optimization complete. Reboot recommended."
use libc::{cpu_set_t, sched_setaffinity, setpriority, PRIO_PROCESS};
pub struct ProcessOptimizer {
cpu_count: usize,
process_id: u32,
}
impl ProcessOptimizer {
pub fn new() -> Self {
Self {
cpu_count: num_cpus::get(),
process_id: std::process::id(),
}
}
pub fn optimize_for_neural_processing(&self) -> Result<(), Box<dyn std::error::Error>> {
// Set high priority (requires sudo)
unsafe {
setpriority(PRIO_PROCESS, self.process_id, -10);
}
// Pin to performance cores (assuming last half are performance cores)
let performance_cores = self.cpu_count / 2..self.cpu_count;
self.set_cpu_affinity(performance_cores)?;
// Set thread priorities for worker threads
self.optimize_thread_priorities()?;
Ok(())
}
fn set_cpu_affinity(&self, cores: std::ops::Range<usize>) -> Result<(), Box<dyn std::error::Error>> {
unsafe {
let mut cpu_set: cpu_set_t = std::mem::zeroed();
for core in cores {
libc::CPU_SET(core, &mut cpu_set);
}
sched_setaffinity(0, std::mem::size_of::<cpu_set_t>(), &cpu_set);
}
println!("CPU affinity set to performance cores");
Ok(())
}
fn optimize_thread_priorities(&self) -> Result<(), Box<dyn std::error::Error>> {
// Set thread priorities based on role
thread_priority::set_current_thread_priority(thread_priority::ThreadPriority::Max)?;
// For worker threads, use high priority
tokio::task::spawn(async {
thread_priority::set_current_thread_priority(thread_priority::ThreadPriority::Crossplatform(
thread_priority::ThreadPriorityValue::try_from(80u8).unwrap()
)).ok();
});
Ok(())
}
}
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
// Performance-tracking allocator
pub struct TrackedAllocator {
allocations: AtomicUsize,
deallocations: AtomicUsize,
bytes_allocated: AtomicUsize,
peak_memory: AtomicUsize,
}
unsafe impl GlobalAlloc for TrackedAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = System.alloc(layout);
if !ptr.is_null() {
self.allocations.fetch_add(1, Ordering::Relaxed);
self.bytes_allocated.fetch_add(layout.size(), Ordering::Relaxed);
// Track peak memory usage
let current = self.bytes_allocated.load(Ordering::Relaxed);
self.peak_memory.fetch_max(current, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
System.dealloc(ptr, layout);
self.deallocations.fetch_add(1, Ordering::Relaxed);
self.bytes_allocated.fetch_sub(layout.size(), Ordering::Relaxed);
}
}
impl TrackedAllocator {
pub const fn new() -> Self {
Self {
allocations: AtomicUsize::new(0),
deallocations: AtomicUsize::new(0),
bytes_allocated: AtomicUsize::new(0),
peak_memory: AtomicUsize::new(0),
}
}
pub fn memory_stats(&self) -> MemoryStats {
MemoryStats {
allocations: self.allocations.load(Ordering::Relaxed),
deallocations: self.deallocations.load(Ordering::Relaxed),
current_bytes: self.bytes_allocated.load(Ordering::Relaxed),
peak_bytes: self.peak_memory.load(Ordering::Relaxed),
}
}
}
// Use custom allocator
#[global_allocator]
static GLOBAL: TrackedAllocator = TrackedAllocator::new();
// Memory optimization utilities
pub fn optimize_memory_usage() {
// Force garbage collection of unused allocations
if let Some(peak) = GLOBAL.memory_stats().peak_bytes.checked_div(1024 * 1024) {
println!("Peak memory usage: {} MB", peak);
if peak > 1000 { // > 1GB
// Trigger memory defragmentation
unsafe {
libc::malloc_trim(0);
}
}
}
}
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub neural_processing_time: Duration,
pub swarm_coordination_time: Duration,
pub memory_usage: usize,
pub cpu_usage: f64,
pub network_throughput: f64,
pub error_rate: f64,
pub tasks_per_second: f64,
}
pub struct PerformanceMonitor {
metrics: Arc<RwLock<PerformanceMetrics>>,
measurement_interval: Duration,
optimization_threshold: f64,
}
impl PerformanceMonitor {
pub async fn start_monitoring(&self, orchestrator: Arc<SwarmOrchestrator>) {
let metrics = Arc::clone(&self.metrics);
let interval = self.measurement_interval;
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
let new_metrics = Self::collect_metrics(&orchestrator).await;
// Update metrics
{
let mut metrics_lock = metrics.write().await;
*metrics_lock = new_metrics.clone();
}
// Check for optimization opportunities
if new_metrics.cpu_usage > 90.0 {
println!("High CPU usage detected: {:.1}%", new_metrics.cpu_usage);
Self::suggest_cpu_optimization(&new_metrics);
}
if new_metrics.memory_usage > 1024 * 1024 * 1024 { // 1GB
println!("High memory usage detected: {} MB", new_metrics.memory_usage / 1024 / 1024);
Self::suggest_memory_optimization(&new_metrics);
}
if new_metrics.error_rate > 0.05 { // 5%
println!("High error rate detected: {:.1}%", new_metrics.error_rate * 100.0);
Self::suggest_error_mitigation(&new_metrics);
}
}
});
}
async fn collect_metrics(orchestrator: &SwarmOrchestrator) -> PerformanceMetrics {
// Collect system metrics
let cpu_usage = Self::get_cpu_usage();
let memory_usage = GLOBAL.memory_stats().current_bytes;
// Collect swarm metrics
let swarm_metrics = orchestrator.get_metrics().await;
PerformanceMetrics {
neural_processing_time: swarm_metrics.average_processing_time,
swarm_coordination_time: swarm_metrics.average_coordination_latency,
memory_usage,
cpu_usage,
network_throughput: swarm_metrics.network_throughput,
error_rate: swarm_metrics.error_rate,
tasks_per_second: swarm_metrics.tasks_per_second,
}
}
fn suggest_cpu_optimization(metrics: &PerformanceMetrics) {
println!("CPU Optimization Suggestions:");
println!(" - Consider increasing batch size to reduce per-task overhead");
println!(" - Enable SIMD optimizations if not already active");
println!(" - Reduce coordination frequency in swarm topology");
println!(" - Pin processes to specific CPU cores");
}
fn suggest_memory_optimization(metrics: &PerformanceMetrics) {
println!("Memory Optimization Suggestions:");
println!(" - Reduce batch size to lower memory footprint");
println!(" - Enable gradient checkpointing for large models");
println!(" - Use memory pooling for frequent allocations");
println!(" - Consider model quantization to reduce memory usage");
}
fn suggest_error_mitigation(metrics: &PerformanceMetrics) {
println!("Error Mitigation Suggestions:");
println!(" - Implement circuit breaker pattern for failing agents");
println!(" - Add retry logic with exponential backoff");
println!(" - Increase timeout values for coordination");
println!(" - Enable redundant processing for critical tasks");
}
fn get_cpu_usage() -> f64 {
// Platform-specific CPU usage collection
// This is a simplified version
let load_avg = std::fs::read_to_string("/proc/loadavg")
.unwrap_or_default()
.split_whitespace()
.next()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
(load_avg * 100.0 / num_cpus::get() as f64).min(100.0)
}
}
// Automated optimization based on metrics
pub struct AutoOptimizer {
monitor: PerformanceMonitor,
optimization_rules: Vec<OptimizationRule>,
}
pub struct OptimizationRule {
condition: Box<dyn Fn(&PerformanceMetrics) -> bool + Send + Sync>,
action: Box<dyn Fn(&PerformanceMetrics) -> OptimizationAction + Send + Sync>,
description: String,
}
pub enum OptimizationAction {
AdjustBatchSize(usize),
ReconfigureTopology(TopologyType),
ScaleAgents(i32), // Positive to add, negative to remove
AdjustLearningRate(f32),
EnableSIMD,
OptimizeMemory,
}
impl AutoOptimizer {
pub async fn enable_auto_optimization(&self, orchestrator: Arc<SwarmOrchestrator>) {
let rules = &self.optimization_rules;
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let metrics = orchestrator.get_performance_metrics().await;
for rule in rules {
if (rule.condition)(&metrics) {
let action = (rule.action)(&metrics);
println!("Auto-optimization triggered: {}", rule.description);
match action {
OptimizationAction::AdjustBatchSize(new_size) => {
orchestrator.set_global_batch_size(new_size).await;
}
OptimizationAction::ReconfigureTopology(topology) => {
orchestrator.reconfigure_topology(topology).await;
}
OptimizationAction::ScaleAgents(delta) => {
if delta > 0 {
for _ in 0..delta {
orchestrator.spawn_agent(AgentType::Worker, "auto-scaling").await;
}
} else {
orchestrator.scale_down_agents((-delta) as usize).await;
}
}
_ => {} // Handle other actions
}
}
}
}
});
}
}
This comprehensive optimization guide covers all major performance aspects of ruv-FANN, from low-level SIMD optimizations to high-level swarm coordination strategies.