Monitoring Metrics - ruvnet/ruv-FANN GitHub Wiki
Comprehensive guide to observability, performance monitoring, and metrics collection in the ruv-FANN ecosystem.
- Overview
- System Metrics Collection
- Performance Monitoring
- Distributed Tracing
- Log Aggregation
- Alert Configuration
- Dashboard Design
- Metrics Implementation
- Best Practices
ruv-FANN provides comprehensive monitoring and metrics capabilities for tracking neural network performance, swarm coordination, and system health across distributed deployments.
- Real-time Performance Metrics: Sub-millisecond metric collection
- Distributed Tracing: End-to-end request tracking across swarms
- Smart Alerting: ML-powered anomaly detection
- Custom Dashboards: Flexible visualization options
- Low Overhead: < 1% performance impact
use ruv_fann::metrics::{MetricsCollector, MetricType};
use std::time::Instant;
// Initialize metrics collector
let mut collector = MetricsCollector::new()
.with_namespace("ruv_fann")
.with_flush_interval(Duration::from_secs(10))
.build()?;
// System metrics
collector.gauge("system.cpu_usage", cpu_usage_percent());
collector.gauge("system.memory_usage_mb", memory_usage_mb());
collector.gauge("system.disk_io_mbps", disk_io_mbps());
collector.gauge("system.network_bandwidth_mbps", network_bandwidth());
// Neural network metrics
collector.counter("nn.forward_passes", 1);
collector.histogram("nn.inference_time_ms", inference_time.as_millis() as f64);
collector.gauge("nn.model_parameters", model.parameter_count());
collector.gauge("nn.gradient_norm", gradient_norm);// GPU metrics
#[cfg(feature = "cuda")]
{
collector.gauge("gpu.utilization_percent", gpu_utilization());
collector.gauge("gpu.memory_used_mb", gpu_memory_used());
collector.gauge("gpu.temperature_celsius", gpu_temperature());
collector.gauge("gpu.power_watts", gpu_power_consumption());
}
// SIMD metrics
collector.gauge("simd.utilization_percent", simd_utilization());
collector.counter("simd.operations", simd_op_count);
collector.histogram("simd.speedup_factor", speedup_factor);// Process-level metrics
collector.gauge("process.threads", thread_count());
collector.gauge("process.file_descriptors", fd_count());
collector.gauge("process.heap_size_mb", heap_size_mb());
collector.histogram("process.gc_pause_ms", gc_pause_ms);use ruv_fann::metrics::performance::{PerformanceMonitor, PerfConfig};
// Configure performance monitoring
let perf_monitor = PerformanceMonitor::new(PerfConfig {
sample_rate: 0.1, // Sample 10% of operations
trace_level: TraceLevel::Detailed,
enable_profiling: true,
});
// Monitor training performance
let mut epoch_monitor = perf_monitor.start_epoch();
for batch in dataset.batches() {
let batch_timer = epoch_monitor.start_batch();
// Forward pass monitoring
let forward_timer = batch_timer.start_phase("forward");
let output = model.forward(&batch.inputs);
forward_timer.end();
// Loss calculation monitoring
let loss_timer = batch_timer.start_phase("loss");
let loss = criterion.compute_loss(&output, &batch.targets);
loss_timer.end();
// Backward pass monitoring
let backward_timer = batch_timer.start_phase("backward");
model.backward(&loss);
backward_timer.end();
// Optimizer step monitoring
let optimizer_timer = batch_timer.start_phase("optimizer");
optimizer.step();
optimizer_timer.end();
batch_timer.end();
}
// Get performance report
let report = epoch_monitor.generate_report();
println!("Epoch Performance: {:#?}", report);use ruv_swarm::metrics::{SwarmMonitor, AgentMetrics};
// Monitor swarm coordination
let swarm_monitor = SwarmMonitor::new()
.track_communication_latency()
.track_consensus_time()
.track_task_distribution()
.build();
// Agent-level metrics
impl Agent for CustomAgent {
async fn execute_task(&mut self, task: Task) -> Result<TaskResult> {
let metrics = AgentMetrics::start();
// Task execution
let result = self.process_task(&task).await?;
// Record metrics
metrics.record_execution_time();
metrics.record_memory_usage();
metrics.record_cpu_cycles();
Ok(result)
}
}// SIMD operation monitoring
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
struct SimdMonitor {
vector_ops: AtomicU64,
scalar_fallbacks: AtomicU64,
cache_misses: AtomicU64,
}
impl SimdMonitor {
fn monitor_matrix_multiply(&self, a: &Matrix, b: &Matrix) -> Matrix {
let start = Instant::now();
let start_cycles = unsafe { _rdtsc() };
let result = if is_simd_available() {
self.vector_ops.fetch_add(1, Ordering::Relaxed);
simd_matrix_multiply(a, b)
} else {
self.scalar_fallbacks.fetch_add(1, Ordering::Relaxed);
scalar_matrix_multiply(a, b)
};
let cycles = unsafe { _rdtsc() } - start_cycles;
let duration = start.elapsed();
// Record performance metrics
METRICS.histogram("simd.matrix_multiply_cycles", cycles as f64);
METRICS.histogram("simd.matrix_multiply_us", duration.as_micros() as f64);
result
}
}use opentelemetry::{global, trace::{Tracer, Span}};
use opentelemetry_jaeger::JaegerExporter;
// Initialize distributed tracing
fn init_tracing() -> Result<()> {
let exporter = JaegerExporter::builder()
.with_agent_endpoint("localhost:6831")
.with_service_name("ruv-fann")
.build()?;
let tracer = opentelemetry::sdk::trace::TracerProvider::builder()
.with_simple_exporter(exporter)
.build()
.get_tracer("ruv-fann", Some(env!("CARGO_PKG_VERSION")));
global::set_tracer_provider(tracer);
Ok(())
}
// Trace neural network operations
pub fn trace_forward_pass<T>(
model: &NeuralNetwork,
input: &Tensor<T>,
) -> Result<Tensor<T>> {
let tracer = global::tracer("ruv-fann");
let mut span = tracer.start("neural_network.forward");
span.set_attribute("model.name", model.name());
span.set_attribute("input.shape", format!("{:?}", input.shape()));
span.set_attribute("model.parameters", model.parameter_count() as i64);
let result = model.forward(input)?;
span.set_attribute("output.shape", format!("{:?}", result.shape()));
span.end();
Ok(result)
}// Distributed swarm tracing
impl SwarmOrchestrator {
async fn orchestrate_with_tracing(&mut self, task: Task) -> Result<TaskResult> {
let tracer = global::tracer("ruv-swarm");
let mut span = tracer.start("swarm.orchestrate");
span.set_attribute("task.id", task.id.clone());
span.set_attribute("task.priority", task.priority as i64);
span.set_attribute("swarm.agents", self.agent_count() as i64);
// Trace agent assignments
let assignments = self.assign_agents(&task);
for (agent_id, subtask) in &assignments {
let agent_span = tracer.start(format!("agent.{}.execute", agent_id));
agent_span.set_attribute("subtask.type", subtask.task_type());
// Execute with tracing context
let result = self.execute_subtask(agent_id, subtask).await?;
agent_span.end();
}
span.end();
Ok(result)
}
}use slog::{Logger, o, info, warn, error};
use slog_async::Async;
use slog_json::Json;
// Initialize structured logging
fn init_logging() -> Logger {
let drain = Json::new(std::io::stdout())
.add_default_keys()
.build()
.fuse();
let drain = Async::new(drain)
.overflow_strategy(slog_async::OverflowStrategy::DropNewest)
.build()
.fuse();
Logger::root(drain, o!(
"service" => "ruv-fann",
"version" => env!("CARGO_PKG_VERSION"),
"environment" => std::env::var("ENV").unwrap_or_else(|_| "development".to_string()),
))
}
// Log neural network events
impl NeuralNetwork {
fn log_training_event(&self, logger: &Logger, event: TrainingEvent) {
match event {
TrainingEvent::EpochStart { epoch } => {
info!(logger, "Training epoch started";
"epoch" => epoch,
"learning_rate" => self.learning_rate(),
"batch_size" => self.batch_size(),
);
}
TrainingEvent::LossUpdate { loss, accuracy } => {
info!(logger, "Training metrics updated";
"loss" => loss,
"accuracy" => accuracy,
"gradient_norm" => self.gradient_norm(),
);
}
TrainingEvent::ValidationComplete { metrics } => {
info!(logger, "Validation completed";
"val_loss" => metrics.loss,
"val_accuracy" => metrics.accuracy,
"val_f1_score" => metrics.f1_score,
);
}
}
}
}// Centralized log aggregation
struct LogAggregator {
backends: Vec<Box<dyn LogBackend>>,
buffer: RingBuffer<LogEntry>,
flush_interval: Duration,
}
impl LogAggregator {
async fn run(&mut self) {
let mut interval = interval(self.flush_interval);
loop {
interval.tick().await;
// Batch process logs
let logs: Vec<_> = self.buffer.drain().collect();
if logs.is_empty() {
continue;
}
// Send to all backends in parallel
let futures: Vec<_> = self.backends.iter()
.map(|backend| backend.send_batch(&logs))
.collect();
futures::future::join_all(futures).await;
}
}
}
// Elasticsearch backend
struct ElasticsearchBackend {
client: elasticsearch::Elasticsearch,
index_pattern: String,
}
impl LogBackend for ElasticsearchBackend {
async fn send_batch(&self, logs: &[LogEntry]) -> Result<()> {
let body: Vec<_> = logs.iter()
.flat_map(|log| {
vec![
json!({ "index": { "_index": self.get_index_name() } }),
serde_json::to_value(log).unwrap(),
]
})
.collect();
self.client
.bulk(elasticsearch::BulkParts::Index(&self.index_pattern))
.body(body)
.send()
.await?;
Ok(())
}
}# alerts.yaml
groups:
- name: neural_network_alerts
interval: 30s
rules:
- alert: HighInferenceLatency
expr: histogram_quantile(0.95, nn_inference_time_ms) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "High neural network inference latency"
description: "95th percentile inference time is {{ $value }}ms"
- alert: LowGPUUtilization
expr: gpu_utilization_percent < 50
for: 10m
labels:
severity: info
annotations:
summary: "Low GPU utilization"
description: "GPU utilization is {{ $value }}%"
- alert: MemoryLeak
expr: rate(process_heap_size_mb[5m]) > 10
for: 15m
labels:
severity: critical
annotations:
summary: "Potential memory leak detected"
description: "Heap size growing at {{ $value }}MB/min"
- name: swarm_alerts
interval: 10s
rules:
- alert: SwarmConsensusFailure
expr: swarm_consensus_failures_total > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Swarm consensus failures detected"
description: "{{ $value }} consensus failures in the last minute"
- alert: HighAgentFailureRate
expr: rate(agent_task_failures_total[5m]) / rate(agent_task_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High agent failure rate"
description: "{{ $value | humanizePercentage }} of tasks failing"use ruv_fann::alerts::{AlertManager, AlertRule, AlertAction};
// Configure alert manager
let alert_manager = AlertManager::builder()
.add_rule(AlertRule {
name: "high_loss",
condition: |metrics| metrics.training_loss > 1.0,
severity: Severity::Warning,
cooldown: Duration::from_secs(300),
})
.add_rule(AlertRule {
name: "training_stalled",
condition: |metrics| metrics.loss_improvement < 0.001,
severity: Severity::Critical,
cooldown: Duration::from_secs(600),
})
.add_action(AlertAction::Email {
to: "[email protected]",
template: "alert_email.html",
})
.add_action(AlertAction::Webhook {
url: "https://hooks.slack.com/services/...",
method: "POST",
})
.build();
// Monitor training
model.set_alert_manager(alert_manager);{
"dashboard": {
"title": "ruv-FANN Neural Network Monitoring",
"panels": [
{
"title": "Training Metrics",
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "nn_training_loss",
"legendFormat": "Loss"
},
{
"expr": "nn_training_accuracy",
"legendFormat": "Accuracy"
}
],
"type": "graph"
},
{
"title": "Inference Performance",
"gridPos": {"x": 12, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "histogram_quantile(0.95, nn_inference_time_ms)",
"legendFormat": "P95 Latency"
},
{
"expr": "rate(nn_forward_passes[1m])",
"legendFormat": "Throughput"
}
],
"type": "graph"
},
{
"title": "Resource Utilization",
"gridPos": {"x": 0, "y": 8, "w": 8, "h": 6},
"targets": [
{
"expr": "system_cpu_usage",
"legendFormat": "CPU %"
},
{
"expr": "gpu_utilization_percent",
"legendFormat": "GPU %"
}
],
"type": "graph"
},
{
"title": "Swarm Health",
"gridPos": {"x": 8, "y": 8, "w": 8, "h": 6},
"targets": [
{
"expr": "swarm_active_agents",
"legendFormat": "Active Agents"
},
{
"expr": "rate(swarm_messages_total[1m])",
"legendFormat": "Message Rate"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"gridPos": {"x": 16, "y": 8, "w": 8, "h": 6},
"targets": [
{
"expr": "rate(errors_total[5m])",
"legendFormat": "{{ error_type }}"
}
],
"type": "graph"
}
]
}
}use ruv_fann::dashboard::{Dashboard, Widget, Layout};
// Create custom dashboard
let dashboard = Dashboard::builder()
.title("Neural Network Training Dashboard")
.add_widget(Widget::LineChart {
title: "Loss Over Time",
metric: "training.loss",
aggregation: Aggregation::Average,
time_window: Duration::from_hours(1),
})
.add_widget(Widget::Heatmap {
title: "Layer Activation Patterns",
metric: "nn.layer_activations",
color_scheme: ColorScheme::Viridis,
})
.add_widget(Widget::Gauge {
title: "GPU Memory Usage",
metric: "gpu.memory_percent",
thresholds: vec![
(0.0, Color::Green),
(70.0, Color::Yellow),
(90.0, Color::Red),
],
})
.add_widget(Widget::Table {
title: "Top Resource Consumers",
query: "sort_desc(topk(10, process_memory_mb))",
columns: vec!["Process", "Memory (MB)", "CPU %"],
})
.layout(Layout::Grid {
columns: 3,
row_height: 300,
})
.build();
// Serve dashboard
dashboard.serve("0.0.0.0:3000").await?;use ruv_fann::metrics::{Metric, MetricRegistry};
// Define custom metrics
#[derive(Metric)]
struct CartanMatrixMetrics {
#[metric(type = "gauge", help = "Orthogonality measure of Cartan matrix")]
orthogonality: f64,
#[metric(type = "histogram", help = "Eigenvalue distribution")]
eigenvalues: Vec<f64>,
#[metric(type = "counter", help = "Number of matrix updates")]
update_count: u64,
}
// Implement metric collection
impl CartanMatrix {
fn collect_metrics(&self) -> CartanMatrixMetrics {
CartanMatrixMetrics {
orthogonality: self.compute_orthogonality(),
eigenvalues: self.compute_eigenvalues(),
update_count: self.updates,
}
}
}
// Register with global registry
METRIC_REGISTRY.register::<CartanMatrixMetrics>();use prometheus::{Encoder, TextEncoder, Counter, Gauge, Histogram};
use warp::Filter;
// Metrics endpoint
pub async fn serve_metrics(port: u16) {
let metrics_route = warp::path!("metrics")
.and(warp::get())
.map(|| {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
});
warp::serve(metrics_route)
.run(([0, 0, 0, 0], port))
.await;
}
// Export custom metrics
lazy_static! {
static ref NN_FORWARD_COUNTER: Counter = Counter::new(
"nn_forward_passes_total", "Total number of forward passes"
).unwrap();
static ref NN_LOSS_GAUGE: Gauge = Gauge::new(
"nn_training_loss", "Current training loss"
).unwrap();
static ref NN_INFERENCE_HISTOGRAM: Histogram = Histogram::with_opts(
HistogramOpts::new("nn_inference_duration_seconds", "Inference time distribution")
.buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
).unwrap();
}// Good metric names
"nn_training_loss" // Clear namespace and purpose
"swarm_agent_task_duration_seconds" // Units included
"gpu_memory_used_bytes" // Specific measurement
// Bad metric names
"loss" // Too generic
"agent_time" // Missing units
"memory" // Ambiguous// Use sampling for high-frequency metrics
impl MetricCollector {
fn should_sample(&self, metric_name: &str) -> bool {
match self.sampling_rates.get(metric_name) {
Some(rate) => rand::random::<f64>() < *rate,
None => true,
}
}
fn record(&mut self, name: &str, value: f64) {
if self.should_sample(name) {
self.buffer.push(Metric { name, value, timestamp: Instant::now() });
}
}
}# Dashboard hierarchy
root:
overview:
- system_health
- key_metrics
- alerts
neural_networks:
- training_progress
- model_performance
- resource_usage
swarm_coordination:
- agent_status
- task_distribution
- consensus_metrics
infrastructure:
- compute_resources
- network_metrics
- storage_metrics// Smart alert suppression
struct AlertSuppressor {
recent_alerts: HashMap<String, Instant>,
suppression_window: Duration,
}
impl AlertSuppressor {
fn should_alert(&mut self, alert_name: &str) -> bool {
match self.recent_alerts.get(alert_name) {
Some(last_alert) => {
if last_alert.elapsed() > self.suppression_window {
self.recent_alerts.insert(alert_name.to_string(), Instant::now());
true
} else {
false
}
}
None => {
self.recent_alerts.insert(alert_name.to_string(), Instant::now());
true
}
}
}
}// Asynchronous metric collection
impl MetricCollector {
async fn collect_expensive_metrics(&self) {
// Offload to background task
tokio::spawn(async move {
let metrics = expensive_computation().await;
METRIC_BUFFER.lock().unwrap().extend(metrics);
});
}
}
// Batch metric updates
impl MetricBuffer {
fn flush(&mut self) {
if self.buffer.len() >= self.batch_size || self.last_flush.elapsed() > self.flush_interval {
self.backend.send_batch(&self.buffer);
self.buffer.clear();
self.last_flush = Instant::now();
}
}
}Effective monitoring and metrics are essential for operating ruv-FANN systems in production. This guide provides comprehensive patterns for observability across all system components, from neural networks to distributed swarms.
Key takeaways:
- Implement structured logging and distributed tracing
- Use sampling for high-frequency metrics
- Design informative dashboards for different audiences
- Configure smart alerts to prevent fatigue
- Minimize performance impact through batching and async collection
For additional monitoring resources, see the Production Deployment and Performance Benchmarks guides.