Code Examples - ruvnet/ruv-FANN GitHub Wiki
This page provides comprehensive code examples for the ruv-FANN Neural Network Framework, covering everything from basic usage to advanced optimizations.
- Basic Neural Network Examples
- Advanced SIMD Optimizations
- Swarm Coordination Examples
- WebAssembly Integration
- GPU Acceleration Examples
- Distributed Training Examples
use ruv_fann::{Network, ActivationFunction, TrainingData};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a 3-layer network: 2 inputs, 3 hidden, 1 output
let mut net = Network::new(&[2, 3, 1])?;
// Configure activation functions
net.set_activation_function_hidden(ActivationFunction::Sigmoid);
net.set_activation_function_output(ActivationFunction::Linear);
// Set learning parameters
net.set_learning_rate(0.7);
net.set_training_algorithm(TrainingAlgorithm::Rprop);
// Create training data for XOR problem
let training_data = TrainingData::new(
vec![
vec![0.0, 0.0], vec![0.0, 1.0],
vec![1.0, 0.0], vec![1.0, 1.0]
],
vec![
vec![0.0], vec![1.0],
vec![1.0], vec![0.0]
]
)?;
// Train the network
net.train_on_data(&training_data, 500, 10, 0.001)?;
// Test the network
let output = net.run(&[1.0, 0.0])?;
println!("XOR(1,0) = {:.3}", output[0]);
Ok(())
}
use ruv_fann::{ConvNet, ConvLayer, PoolLayer, DenseLayer, ActivationFunction};
fn create_cnn_mnist() -> Result<ConvNet, Box<dyn std::error::Error>> {
let mut cnn = ConvNet::new();
// Input: 28x28x1 grayscale images
cnn.add_layer(ConvLayer::new(
32, // 32 filters
(3, 3), // 3x3 kernel
(1, 1), // stride
"same", // padding
ActivationFunction::ReLU
)?);
cnn.add_layer(PoolLayer::new(
(2, 2), // 2x2 pool size
(2, 2), // stride
"max" // max pooling
)?);
cnn.add_layer(ConvLayer::new(
64, // 64 filters
(3, 3), // 3x3 kernel
(1, 1), // stride
"same", // padding
ActivationFunction::ReLU
)?);
cnn.add_layer(PoolLayer::new(
(2, 2), // 2x2 pool size
(2, 2), // stride
"max" // max pooling
)?);
// Flatten and add dense layers
cnn.add_layer(DenseLayer::new(128, ActivationFunction::ReLU)?);
cnn.add_layer(DenseLayer::new(10, ActivationFunction::Softmax)?); // 10 classes
// Configure optimizer
cnn.set_optimizer(Optimizer::Adam {
learning_rate: 0.001,
beta1: 0.9,
beta2: 0.999,
epsilon: 1e-8
});
Ok(cnn)
}
use ruv_fann::{LSTM, RNNCell, SequenceData};
fn create_lstm_time_series() -> Result<LSTM, Box<dyn std::error::Error>> {
let mut lstm = LSTM::new();
// LSTM layer with 50 hidden units
lstm.add_layer(RNNCell::LSTM {
hidden_size: 50,
return_sequences: true,
dropout: 0.2
})?;
// Another LSTM layer
lstm.add_layer(RNNCell::LSTM {
hidden_size: 50,
return_sequences: false,
dropout: 0.2
})?);
// Dense output layer
lstm.add_layer(DenseLayer::new(1, ActivationFunction::Linear)?);
// Configure for time series prediction
lstm.set_sequence_length(10); // Look back 10 time steps
lstm.set_optimizer(Optimizer::RMSprop {
learning_rate: 0.001,
decay: 0.9
});
Ok(lstm)
}
use ruv_fann::simd::{SimdMatrix, simd_gemm};
use std::arch::x86_64::*;
#[cfg(target_arch = "x86_64")]
unsafe fn optimized_matrix_multiply(
a: &SimdMatrix<f32>,
b: &SimdMatrix<f32>,
c: &mut SimdMatrix<f32>
) -> Result<(), Box<dyn std::error::Error>> {
// Ensure matrices are properly aligned for SIMD
assert!(a.is_aligned(32));
assert!(b.is_aligned(32));
assert!(c.is_aligned(32));
let m = a.rows();
let n = b.cols();
let k = a.cols();
// Use AVX2 for 8 float32s at once
for i in (0..m).step_by(8) {
for j in (0..n).step_by(8) {
let mut acc = [_mm256_setzero_ps(); 8];
for l in (0..k).step_by(8) {
// Load 8x8 block from matrix A
let a_block = [
_mm256_load_ps(a.ptr().add(i * k + l)),
_mm256_load_ps(a.ptr().add((i + 1) * k + l)),
_mm256_load_ps(a.ptr().add((i + 2) * k + l)),
_mm256_load_ps(a.ptr().add((i + 3) * k + l)),
_mm256_load_ps(a.ptr().add((i + 4) * k + l)),
_mm256_load_ps(a.ptr().add((i + 5) * k + l)),
_mm256_load_ps(a.ptr().add((i + 6) * k + l)),
_mm256_load_ps(a.ptr().add((i + 7) * k + l)),
];
// Load 8x8 block from matrix B
let b_block = [
_mm256_load_ps(b.ptr().add(l * n + j)),
_mm256_load_ps(b.ptr().add((l + 1) * n + j)),
_mm256_load_ps(b.ptr().add((l + 2) * n + j)),
_mm256_load_ps(b.ptr().add((l + 3) * n + j)),
_mm256_load_ps(b.ptr().add((l + 4) * n + j)),
_mm256_load_ps(b.ptr().add((l + 5) * n + j)),
_mm256_load_ps(b.ptr().add((l + 6) * n + j)),
_mm256_load_ps(b.ptr().add((l + 7) * n + j)),
];
// Perform fused multiply-add
for row in 0..8 {
for col in 0..8 {
acc[row] = _mm256_fmadd_ps(
_mm256_broadcast_ss(&a_block[row] as *const __m256 as *const f32),
b_block[col],
acc[row]
);
}
}
}
// Store results
for row in 0..8 {
_mm256_store_ps(c.mut_ptr().add((i + row) * n + j), acc[row]);
}
}
}
Ok(())
}
use ruv_fann::simd::SimdVector;
use std::arch::x86_64::*;
#[cfg(target_arch = "x86_64")]
pub struct SimdActivations;
impl SimdActivations {
#[target_feature(enable = "avx2,fma")]
pub unsafe fn relu_avx2(input: &mut SimdVector<f32>) {
let zero = _mm256_setzero_ps();
let len = input.len();
let ptr = input.as_mut_ptr();
for i in (0..len).step_by(8) {
let vals = _mm256_loadu_ps(ptr.add(i));
let result = _mm256_max_ps(vals, zero);
_mm256_storeu_ps(ptr.add(i), result);
}
}
#[target_feature(enable = "avx2,fma")]
pub unsafe fn tanh_avx2(input: &mut SimdVector<f32>) {
let len = input.len();
let ptr = input.as_mut_ptr();
// Constants for tanh approximation
let one = _mm256_set1_ps(1.0);
let two = _mm256_set1_ps(2.0);
let neg_two = _mm256_set1_ps(-2.0);
for i in (0..len).step_by(8) {
let x = _mm256_loadu_ps(ptr.add(i));
// tanh(x) ≈ (exp(2x) - 1) / (exp(2x) + 1)
let exp_2x = simd_exp(_mm256_mul_ps(two, x));
let numerator = _mm256_sub_ps(exp_2x, one);
let denominator = _mm256_add_ps(exp_2x, one);
let result = _mm256_div_ps(numerator, denominator);
_mm256_storeu_ps(ptr.add(i), result);
}
}
#[target_feature(enable = "avx2,fma")]
pub unsafe fn softmax_avx2(input: &mut SimdVector<f32>) {
let len = input.len();
let ptr = input.as_mut_ptr();
// Find maximum for numerical stability
let mut max_val = f32::NEG_INFINITY;
for i in 0..len {
max_val = max_val.max(*ptr.add(i));
}
let max_vec = _mm256_set1_ps(max_val);
// Compute exp(x - max) and sum
let mut sum = _mm256_setzero_ps();
for i in (0..len).step_by(8) {
let x = _mm256_loadu_ps(ptr.add(i));
let shifted = _mm256_sub_ps(x, max_vec);
let exp_x = simd_exp(shifted);
_mm256_storeu_ps(ptr.add(i), exp_x);
sum = _mm256_add_ps(sum, exp_x);
}
// Sum all elements in the vector
let sum_scalar = horizontal_sum_avx2(sum);
let sum_vec = _mm256_set1_ps(sum_scalar);
// Divide by sum
for i in (0..len).step_by(8) {
let vals = _mm256_loadu_ps(ptr.add(i));
let result = _mm256_div_ps(vals, sum_vec);
_mm256_storeu_ps(ptr.add(i), result);
}
}
}
use ruv_fann::swarm::{SwarmCoordinator, SwarmNode, ConsensusProtocol};
use tokio::net::TcpListener;
#[tokio::main]
async fn distributed_training_example() -> Result<(), Box<dyn std::error::Error>> {
// Initialize swarm coordinator
let mut coordinator = SwarmCoordinator::new("coordinator-1".to_string());
// Set up Byzantine fault tolerance
coordinator.set_consensus_protocol(ConsensusProtocol::Byzantine {
fault_tolerance: 1, // Can handle 1 faulty node
timeout_ms: 5000,
});
// Define training configuration
let training_config = TrainingConfig {
batch_size: 32,
learning_rate: 0.001,
epochs: 100,
gradient_sync_frequency: 10,
};
// Start coordinator server
let listener = TcpListener::bind("0.0.0.0:8080").await?;
println!("Swarm coordinator listening on port 8080");
// Spawn coordination task
tokio::spawn(async move {
coordinator.run_coordination_loop(listener, training_config).await
});
// Wait for nodes to connect
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Start training across the swarm
let training_handle = tokio::spawn(async move {
coordinate_distributed_training().await
});
training_handle.await??;
Ok(())
}
async fn coordinate_distributed_training() -> Result<(), Box<dyn std::error::Error>> {
let coordinator = SwarmCoordinator::connect("127.0.0.1:8080").await?;
// Initialize model parameters across all nodes
coordinator.broadcast_model_parameters().await?;
// Training loop
for epoch in 0..100 {
println!("Starting epoch {}", epoch);
// Each node trains on its local data
let local_gradients = coordinator.local_training_step().await?;
// Aggregate gradients using Byzantine consensus
let aggregated_gradients = coordinator.aggregate_gradients(
local_gradients,
ConsensusAlgorithm::ByzantineMedian
).await?;
// Update model parameters
coordinator.update_model_parameters(aggregated_gradients).await?;
// Evaluate on validation set every 10 epochs
if epoch % 10 == 0 {
let validation_metrics = coordinator.evaluate_model().await?;
println!("Epoch {}: Loss = {:.4}, Accuracy = {:.2}%",
epoch, validation_metrics.loss, validation_metrics.accuracy * 100.0);
}
}
Ok(())
}
use ruv_fann::swarm::{HierarchicalSwarm, SwarmLevel, NodeRole};
async fn hierarchical_swarm_example() -> Result<(), Box<dyn std::error::Error>> {
// Create hierarchical swarm with 3 levels
let mut swarm = HierarchicalSwarm::new(3);
// Level 0: Master coordinator
swarm.add_node(SwarmLevel::Master, SwarmNode::new(
"master-1",
NodeRole::Coordinator,
"192.168.1.100:8080"
)).await?;
// Level 1: Regional coordinators
for i in 0..3 {
swarm.add_node(SwarmLevel::Regional(i), SwarmNode::new(
&format!("regional-{}", i),
NodeRole::RegionalCoordinator,
&format!("192.168.{}.100:8080", i + 2)
)).await?;
}
// Level 2: Worker nodes
for region in 0..3 {
for worker in 0..4 {
swarm.add_node(SwarmLevel::Worker(region), SwarmNode::new(
&format!("worker-{}-{}", region, worker),
NodeRole::Worker,
&format!("192.168.{}.{}:8080", region + 2, worker + 10)
)).await?;
}
}
// Configure communication patterns
swarm.set_communication_topology(CommunicationTopology::Tree {
branching_factor: 4,
max_depth: 3,
});
// Start the swarm
swarm.initialize().await?;
// Begin hierarchical training
swarm.start_hierarchical_training(HierarchicalConfig {
aggregation_strategy: AggregationStrategy::WeightedAverage,
synchronization_frequency: Duration::from_secs(30),
fault_tolerance: FaultTolerance::Byzantine { max_faults: 1 },
}).await?;
Ok(())
}
use ruv_fann::swarm::{GossipProtocol, GossipMessage, PeerInfo};
use rand::seq::SliceRandom;
pub struct ModelGossip {
node_id: String,
peers: Vec<PeerInfo>,
model_updates: HashMap<String, ModelUpdate>,
gossip_frequency: Duration,
}
impl ModelGossip {
pub fn new(node_id: String, gossip_frequency: Duration) -> Self {
Self {
node_id,
peers: Vec::new(),
model_updates: HashMap::new(),
gossip_frequency,
}
}
pub async fn start_gossip_loop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut interval = tokio::time::interval(self.gossip_frequency);
loop {
interval.tick().await;
// Select random subset of peers for gossip
let mut rng = rand::thread_rng();
let gossip_peers: Vec<_> = self.peers
.choose_multiple(&mut rng, 3)
.cloned()
.collect();
// Send model updates to selected peers
for peer in gossip_peers {
let message = GossipMessage::ModelUpdate {
sender: self.node_id.clone(),
timestamp: SystemTime::now(),
model_delta: self.get_latest_model_delta(),
version: self.get_model_version(),
};
if let Err(e) = self.send_gossip_message(&peer, message).await {
eprintln!("Failed to gossip with peer {}: {}", peer.id, e);
// Mark peer as potentially faulty
self.mark_peer_suspicious(&peer.id);
}
}
// Clean up old model updates
self.cleanup_old_updates();
}
}
pub async fn handle_gossip_message(&mut self, message: GossipMessage) -> Result<(), Box<dyn std::error::Error>> {
match message {
GossipMessage::ModelUpdate { sender, timestamp, model_delta, version } => {
// Validate message authenticity and freshness
if self.validate_model_update(&sender, &model_delta, version, timestamp) {
// Apply model update with conflict resolution
self.merge_model_update(sender, model_delta, version).await?;
// Propagate to other peers with some probability
if rand::random::<f32>() < 0.7 {
self.propagate_update(message).await?;
}
}
},
GossipMessage::PeerDiscovery { sender, peers } => {
// Update peer list
for peer in peers {
if peer.id != self.node_id && !self.peers.contains(&peer) {
self.peers.push(peer);
}
}
},
GossipMessage::Heartbeat { sender, timestamp } => {
// Update peer liveness
self.update_peer_liveness(&sender, timestamp);
}
}
Ok(())
}
async fn merge_model_update(
&mut self,
sender: String,
model_delta: ModelDelta,
version: u64
) -> Result<(), Box<dyn std::error::Error>> {
// Implement CRDT-based conflict resolution
let current_update = self.model_updates.entry(sender.clone()).or_insert_with(|| {
ModelUpdate::new(version, model_delta.clone())
});
if version > current_update.version {
// Newer update, apply directly
current_update.apply_delta(model_delta)?;
current_update.version = version;
} else if version == current_update.version {
// Concurrent update, merge using CRDT rules
current_update.merge_concurrent_delta(model_delta)?;
}
// Ignore older updates
Ok(())
}
}
use wasm_bindgen::prelude::*;
use ruv_fann::{Network, ActivationFunction};
#[wasm_bindgen]
pub struct WasmNeuralNetwork {
network: Network,
}
#[wasm_bindgen]
impl WasmNeuralNetwork {
#[wasm_bindgen(constructor)]
pub fn new(layers: &[u32]) -> Result<WasmNeuralNetwork, JsValue> {
console_error_panic_hook::set_once();
let network = Network::new(layers)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
Ok(WasmNeuralNetwork { network })
}
#[wasm_bindgen]
pub fn set_activation_function(&mut self, function: &str) -> Result<(), JsValue> {
let activation = match function {
"sigmoid" => ActivationFunction::Sigmoid,
"tanh" => ActivationFunction::Tanh,
"relu" => ActivationFunction::ReLU,
"leaky_relu" => ActivationFunction::LeakyReLU(0.01),
"softmax" => ActivationFunction::Softmax,
_ => return Err(JsValue::from_str("Unknown activation function"))
};
self.network.set_activation_function_hidden(activation);
Ok(())
}
#[wasm_bindgen]
pub fn train_batch(&mut self, inputs: &[f32], targets: &[f32], batch_size: usize) -> Result<f32, JsValue> {
let input_size = self.network.get_num_input();
let output_size = self.network.get_num_output();
if inputs.len() % (batch_size * input_size) != 0 {
return Err(JsValue::from_str("Invalid input batch size"));
}
if targets.len() % (batch_size * output_size) != 0 {
return Err(JsValue::from_str("Invalid target batch size"));
}
let mut total_error = 0.0;
for batch in 0..batch_size {
let input_start = batch * input_size;
let input_end = input_start + input_size;
let target_start = batch * output_size;
let target_end = target_start + output_size;
let batch_input = &inputs[input_start..input_end];
let batch_target = &targets[target_start..target_end];
let error = self.network.train(batch_input, batch_target)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
total_error += error;
}
Ok(total_error / batch_size as f32)
}
#[wasm_bindgen]
pub fn predict(&mut self, inputs: &[f32]) -> Result<Vec<f32>, JsValue> {
let outputs = self.network.run(inputs)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
Ok(outputs)
}
#[wasm_bindgen]
pub fn get_weights(&self) -> Vec<f32> {
self.network.get_weights()
}
#[wasm_bindgen]
pub fn set_weights(&mut self, weights: &[f32]) -> Result<(), JsValue> {
self.network.set_weights(weights)
.map_err(|e| JsValue::from_str(&e.to_string()))
}
#[wasm_bindgen]
pub fn save_to_bytes(&self) -> Result<Vec<u8>, JsValue> {
self.network.serialize()
.map_err(|e| JsValue::from_str(&e.to_string()))
}
#[wasm_bindgen]
pub fn load_from_bytes(data: &[u8]) -> Result<WasmNeuralNetwork, JsValue> {
let network = Network::deserialize(data)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
Ok(WasmNeuralNetwork { network })
}
}
// neural-network-worker.js
import init, { WasmNeuralNetwork } from './ruv_fann_wasm.js';
class NeuralNetworkWorker {
constructor() {
this.network = null;
this.isInitialized = false;
}
async initialize() {
await init();
this.isInitialized = true;
}
createNetwork(layers) {
if (!this.isInitialized) {
throw new Error('WASM module not initialized');
}
this.network = new WasmNeuralNetwork(new Uint32Array(layers));
return { status: 'success', message: 'Network created' };
}
trainBatch(inputs, targets) {
if (!this.network) {
throw new Error('Network not created');
}
const inputArray = new Float32Array(inputs);
const targetArray = new Float32Array(targets);
const batchSize = inputs.length / this.network.get_num_input();
try {
const error = this.network.train_batch(inputArray, targetArray, batchSize);
return { status: 'success', error: error };
} catch (e) {
return { status: 'error', message: e.toString() };
}
}
predict(inputs) {
if (!this.network) {
throw new Error('Network not created');
}
try {
const outputs = this.network.predict(new Float32Array(inputs));
return { status: 'success', outputs: Array.from(outputs) };
} catch (e) {
return { status: 'error', message: e.toString() };
}
}
saveModel() {
if (!this.network) {
throw new Error('Network not created');
}
try {
const bytes = this.network.save_to_bytes();
return { status: 'success', data: Array.from(bytes) };
} catch (e) {
return { status: 'error', message: e.toString() };
}
}
loadModel(data) {
try {
this.network = WasmNeuralNetwork.load_from_bytes(new Uint8Array(data));
return { status: 'success', message: 'Model loaded' };
} catch (e) {
return { status: 'error', message: e.toString() };
}
}
}
// Worker message handler
self.onmessage = async function(e) {
const { id, method, params } = e.data;
if (!self.neuralWorker) {
self.neuralWorker = new NeuralNetworkWorker();
await self.neuralWorker.initialize();
}
try {
let result;
switch (method) {
case 'createNetwork':
result = self.neuralWorker.createNetwork(params.layers);
break;
case 'trainBatch':
result = self.neuralWorker.trainBatch(params.inputs, params.targets);
break;
case 'predict':
result = self.neuralWorker.predict(params.inputs);
break;
case 'saveModel':
result = self.neuralWorker.saveModel();
break;
case 'loadModel':
result = self.neuralWorker.loadModel(params.data);
break;
default:
throw new Error(`Unknown method: ${method}`);
}
self.postMessage({ id, result });
} catch (error) {
self.postMessage({
id,
error: { status: 'error', message: error.message }
});
}
};
<!DOCTYPE html>
<html>
<head>
<title>ruv-FANN Web Demo</title>
</head>
<body>
<div id="app">
<h1>Neural Network Training Demo</h1>
<div>
<label>Network Architecture:</label>
<input type="text" id="layers" value="2,4,1" placeholder="e.g., 2,4,1">
</div>
<div>
<label>Learning Rate:</label>
<input type="number" id="learningRate" value="0.1" step="0.01">
</div>
<button onclick="createNetwork()">Create Network</button>
<button onclick="trainNetwork()">Train XOR</button>
<button onclick="testNetwork()">Test</button>
<div id="output"></div>
<canvas id="visualization" width="400" height="300"></canvas>
</div>
<script type="module">
import init, { WasmNeuralNetwork } from './ruv_fann_wasm.js';
let network = null;
async function initializeWasm() {
await init();
console.log('WASM initialized');
}
window.createNetwork = function() {
const layersInput = document.getElementById('layers').value;
const layers = layersInput.split(',').map(x => parseInt(x.trim()));
try {
network = new WasmNeuralNetwork(new Uint32Array(layers));
network.set_activation_function('sigmoid');
document.getElementById('output').innerHTML = 'Network created successfully!';
} catch (e) {
document.getElementById('output').innerHTML = `Error: ${e}`;
}
};
window.trainNetwork = async function() {
if (!network) {
alert('Please create a network first');
return;
}
// XOR training data
const inputs = [0, 0, 0, 1, 1, 0, 1, 1];
const targets = [0, 1, 1, 0];
const output = document.getElementById('output');
output.innerHTML = 'Training started...<br>';
for (let epoch = 0; epoch < 1000; epoch++) {
const error = network.train_batch(
new Float32Array(inputs),
new Float32Array(targets),
4
);
if (epoch % 100 === 0) {
output.innerHTML += `Epoch ${epoch}: Error = ${error.toFixed(6)}<br>`;
}
// Allow UI to update
if (epoch % 50 === 0) {
await new Promise(resolve => setTimeout(resolve, 1));
}
}
output.innerHTML += 'Training completed!<br>';
};
window.testNetwork = function() {
if (!network) {
alert('Please create and train a network first');
return;
}
const testCases = [
[0, 0], [0, 1], [1, 0], [1, 1]
];
const expected = [0, 1, 1, 0];
let results = 'Test Results:<br>';
testCases.forEach((input, i) => {
const output = network.predict(new Float32Array(input));
results += `XOR(${input[0]}, ${input[1]}) = ${output[0].toFixed(3)} (expected: ${expected[i]})<br>`;
});
document.getElementById('output').innerHTML = results;
};
// Initialize when page loads
initializeWasm();
</script>
</body>
</html>
use ruv_fann::gpu::{CudaNetwork, CudaMemory, CudaKernel};
use cudarc::driver::{CudaDevice, DeviceRepr};
pub struct GpuNeuralNetwork {
device: CudaDevice,
weights: CudaMemory<f32>,
biases: CudaMemory<f32>,
activations: CudaMemory<f32>,
gradients: CudaMemory<f32>,
layer_sizes: Vec<usize>,
}
impl GpuNeuralNetwork {
pub fn new(layer_sizes: Vec<usize>) -> Result<Self, Box<dyn std::error::Error>> {
let device = CudaDevice::new(0)?; // Use GPU 0
// Calculate total weights and biases needed
let total_weights: usize = layer_sizes.windows(2)
.map(|pair| pair[0] * pair[1])
.sum();
let total_biases: usize = layer_sizes[1..].iter().sum();
let max_layer_size = *layer_sizes.iter().max().unwrap();
// Allocate GPU memory
let weights = device.alloc_zeros::<f32>(total_weights)?;
let biases = device.alloc_zeros::<f32>(total_biases)?;
let activations = device.alloc_zeros::<f32>(max_layer_size)?;
let gradients = device.alloc_zeros::<f32>(total_weights)?;
// Initialize weights with Xavier initialization
let mut host_weights = vec![0.0f32; total_weights];
let mut weight_idx = 0;
for pair in layer_sizes.windows(2) {
let fan_in = pair[0];
let fan_out = pair[1];
let bound = (6.0 / (fan_in + fan_out) as f32).sqrt();
for _ in 0..(fan_in * fan_out) {
host_weights[weight_idx] = (rand::random::<f32>() - 0.5) * 2.0 * bound;
weight_idx += 1;
}
}
device.htod_copy(host_weights, &weights)?;
Ok(Self {
device,
weights,
biases,
activations,
gradients,
layer_sizes,
})
}
pub fn forward_pass(&mut self, input: &[f32]) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
// Copy input to GPU
let mut current_activations = self.device.alloc_zeros::<f32>(input.len())?;
self.device.htod_copy(input.to_vec(), ¤t_activations)?;
let mut weight_offset = 0;
let mut bias_offset = 0;
for layer_idx in 1..self.layer_sizes.len() {
let input_size = self.layer_sizes[layer_idx - 1];
let output_size = self.layer_sizes[layer_idx];
// Allocate memory for next layer
let next_activations = self.device.alloc_zeros::<f32>(output_size)?;
// Launch matrix multiplication kernel
let grid_size = (output_size + 255) / 256;
let block_size = 256;
unsafe {
matrix_vector_multiply_kernel::launch(
&self.device,
(grid_size, 1, 1),
(block_size, 1, 1),
&[
&self.weights.slice(weight_offset..weight_offset + input_size * output_size),
¤t_activations,
&self.biases.slice(bias_offset..bias_offset + output_size),
&next_activations,
&input_size,
&output_size,
],
)?;
}
// Apply activation function
let activation_grid = (output_size + 255) / 256;
unsafe {
if layer_idx == self.layer_sizes.len() - 1 {
// Output layer - linear activation
// No additional activation needed
} else {
// Hidden layer - ReLU activation
relu_activation_kernel::launch(
&self.device,
(activation_grid, 1, 1),
(block_size, 1, 1),
&[&next_activations, &output_size],
)?;
}
}
current_activations = next_activations;
weight_offset += input_size * output_size;
bias_offset += output_size;
}
// Copy result back to host
let output_size = self.layer_sizes.last().unwrap();
let mut output = vec![0.0f32; *output_size];
self.device.dtoh_sync_copy(¤t_activations, &mut output)?;
Ok(output)
}
pub fn backward_pass(&mut self, target: &[f32]) -> Result<f32, Box<dyn std::error::Error>> {
// Compute output layer error
let output_size = *self.layer_sizes.last().unwrap();
let mut output_errors = self.device.alloc_zeros::<f32>(output_size)?;
// Launch error computation kernel
let grid_size = (output_size + 255) / 256;
unsafe {
compute_output_error_kernel::launch(
&self.device,
(grid_size, 1, 1),
(256, 1, 1),
&[
&self.activations,
&self.device.htod_copy(target.to_vec(), &output_errors)?,
&output_errors,
&output_size,
],
)?;
}
// Backpropagate errors through network
let mut current_errors = output_errors;
let mut weight_offset = 0;
// Calculate total weights up to current layer
for pair in self.layer_sizes.windows(2).rev().skip(1) {
weight_offset += pair[0] * pair[1];
}
for layer_idx in (1..self.layer_sizes.len()).rev() {
let input_size = self.layer_sizes[layer_idx - 1];
let output_size = self.layer_sizes[layer_idx];
if layer_idx > 1 {
// Propagate errors to previous layer
let prev_errors = self.device.alloc_zeros::<f32>(input_size)?;
let grid_size = (input_size + 255) / 256;
unsafe {
backprop_error_kernel::launch(
&self.device,
(grid_size, 1, 1),
(256, 1, 1),
&[
&self.weights.slice(weight_offset..weight_offset + input_size * output_size),
¤t_errors,
&prev_errors,
&input_size,
&output_size,
],
)?;
}
current_errors = prev_errors;
}
// Update weights and biases
let grid_size = (input_size * output_size + 255) / 256;
unsafe {
update_weights_kernel::launch(
&self.device,
(grid_size, 1, 1),
(256, 1, 1),
&[
&self.weights.slice(weight_offset..weight_offset + input_size * output_size),
&self.gradients.slice(weight_offset..weight_offset + input_size * output_size),
¤t_errors,
&self.activations, // Previous layer activations
&0.01f32, // Learning rate
&input_size,
&output_size,
],
)?;
}
weight_offset -= if layer_idx > 1 {
self.layer_sizes[layer_idx - 2] * self.layer_sizes[layer_idx - 1]
} else {
0
};
}
// Calculate and return loss
let mut loss = vec![0.0f32; 1];
unsafe {
compute_loss_kernel::launch(
&self.device,
(1, 1, 1),
(1, 1, 1),
&[
&self.activations,
&self.device.htod_copy(target.to_vec(), &output_errors)?,
&self.device.alloc_zeros::<f32>(1)?,
&output_size,
],
)?;
}
Ok(loss[0])
}
}
// CUDA kernels (would be compiled separately)
mod cuda_kernels {
use cudarc::nvrtc::compile_ptx;
pub const MATRIX_VECTOR_MULTIPLY_KERNEL: &str = r#"
extern "C" __global__ void matrix_vector_multiply_kernel(
const float* weights,
const float* input,
const float* biases,
float* output,
int input_size,
int output_size
) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < output_size) {
float sum = biases[idx];
for (int i = 0; i < input_size; i++) {
sum += weights[idx * input_size + i] * input[i];
}
output[idx] = sum;
}
}
"#;
pub const RELU_ACTIVATION_KERNEL: &str = r#"
extern "C" __global__ void relu_activation_kernel(
float* values,
int size
) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < size) {
values[idx] = fmaxf(0.0f, values[idx]);
}
}
"#;
}
use ruv_fann::gpu::{MultiGpuTrainer, DataParallelism, ModelParallelism};
pub struct MultiGpuNeuralNetwork {
devices: Vec<CudaDevice>,
models: Vec<GpuNeuralNetwork>,
parallelism_strategy: ParallelismStrategy,
}
impl MultiGpuNeuralNetwork {
pub fn new(
layer_sizes: Vec<usize>,
num_gpus: usize,
strategy: ParallelismStrategy
) -> Result<Self, Box<dyn std::error::Error>> {
let mut devices = Vec::new();
let mut models = Vec::new();
for gpu_id in 0..num_gpus {
let device = CudaDevice::new(gpu_id)?;
devices.push(device);
match strategy {
ParallelismStrategy::DataParallel => {
// Each GPU gets a complete copy of the model
models.push(GpuNeuralNetwork::new(layer_sizes.clone())?);
},
ParallelismStrategy::ModelParallel => {
// Split model across GPUs
let layers_per_gpu = layer_sizes.len() / num_gpus;
let start_layer = gpu_id * layers_per_gpu;
let end_layer = if gpu_id == num_gpus - 1 {
layer_sizes.len()
} else {
(gpu_id + 1) * layers_per_gpu
};
let gpu_layers = layer_sizes[start_layer..end_layer].to_vec();
models.push(GpuNeuralNetwork::new(gpu_layers)?);
}
}
}
Ok(Self {
devices,
models,
parallelism_strategy: strategy,
})
}
pub async fn train_batch_parallel(
&mut self,
inputs: Vec<Vec<f32>>,
targets: Vec<Vec<f32>>
) -> Result<f32, Box<dyn std::error::Error>> {
match self.parallelism_strategy {
ParallelismStrategy::DataParallel => {
self.data_parallel_training(inputs, targets).await
},
ParallelismStrategy::ModelParallel => {
self.model_parallel_training(inputs, targets).await
}
}
}
async fn data_parallel_training(
&mut self,
inputs: Vec<Vec<f32>>,
targets: Vec<Vec<f32>>
) -> Result<f32, Box<dyn std::error::Error>> {
let batch_size = inputs.len();
let samples_per_gpu = batch_size / self.models.len();
// Split batch across GPUs
let mut tasks = Vec::new();
for (gpu_id, model) in self.models.iter_mut().enumerate() {
let start_idx = gpu_id * samples_per_gpu;
let end_idx = if gpu_id == self.models.len() - 1 {
batch_size
} else {
(gpu_id + 1) * samples_per_gpu
};
let gpu_inputs = inputs[start_idx..end_idx].to_vec();
let gpu_targets = targets[start_idx..end_idx].to_vec();
// Train on this GPU's portion of the batch
tasks.push(tokio::spawn(async move {
let mut total_loss = 0.0;
for (input, target) in gpu_inputs.iter().zip(gpu_targets.iter()) {
model.forward_pass(input)?;
let loss = model.backward_pass(target)?;
total_loss += loss;
}
Ok::<f32, Box<dyn std::error::Error>>(total_loss / gpu_inputs.len() as f32)
}));
}
// Wait for all GPUs to complete
let mut total_loss = 0.0;
for task in tasks {
let gpu_loss = task.await??;
total_loss += gpu_loss;
}
// Average gradients across GPUs using NCCL
self.all_reduce_gradients().await?;
Ok(total_loss / self.models.len() as f32)
}
async fn all_reduce_gradients(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// Implement NCCL AllReduce for gradient synchronization
// This is a simplified version - real implementation would use NCCL
let num_gpus = self.models.len();
// Collect gradients from all GPUs
let mut all_gradients = Vec::new();
for model in &self.models {
let gradients = model.get_gradients()?;
all_gradients.push(gradients);
}
// Average gradients
let gradient_size = all_gradients[0].len();
let mut averaged_gradients = vec![0.0f32; gradient_size];
for i in 0..gradient_size {
let mut sum = 0.0;
for gpu_gradients in &all_gradients {
sum += gpu_gradients[i];
}
averaged_gradients[i] = sum / num_gpus as f32;
}
// Broadcast averaged gradients back to all GPUs
for model in &mut self.models {
model.set_gradients(&averaged_gradients)?;
}
Ok(())
}
}
#[derive(Clone)]
pub enum ParallelismStrategy {
DataParallel,
ModelParallel,
}
use ruv_fann::distributed::{ParameterServer, Worker, GradientUpdate};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub struct ParameterUpdate {
pub layer_id: usize,
pub gradients: Vec<f32>,
pub worker_id: String,
pub version: u64,
}
pub struct DistributedParameterServer {
parameters: HashMap<usize, Vec<f32>>,
parameter_versions: HashMap<usize, u64>,
workers: HashMap<String, WorkerInfo>,
learning_rate: f32,
momentum: f32,
momentum_buffers: HashMap<usize, Vec<f32>>,
}
impl DistributedParameterServer {
pub fn new(learning_rate: f32, momentum: f32) -> Self {
Self {
parameters: HashMap::new(),
parameter_versions: HashMap::new(),
workers: HashMap::new(),
learning_rate,
momentum,
momentum_buffers: HashMap::new(),
}
}
pub async fn start_server(&mut self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(addr).await?;
println!("Parameter server listening on {}", addr);
loop {
let (stream, addr) = listener.accept().await?;
println!("New connection from {}", addr);
let server_handle = Arc::new(Mutex::new(self));
tokio::spawn(async move {
if let Err(e) = handle_worker_connection(stream, server_handle).await {
eprintln!("Error handling worker connection: {}", e);
}
});
}
}
pub fn push_gradients(&mut self, update: ParameterUpdate) -> Result<(), Box<dyn std::error::Error>> {
let layer_id = update.layer_id;
let gradients = update.gradients;
// Get current parameters and version
let current_params = self.parameters.entry(layer_id).or_insert_with(|| {
vec![0.0; gradients.len()]
});
let current_version = self.parameter_versions.entry(layer_id).or_insert(0);
// Check for stale gradients
if update.version < *current_version {
return Err("Stale gradient update".into());
}
// Initialize momentum buffer if needed
let momentum_buffer = self.momentum_buffers.entry(layer_id).or_insert_with(|| {
vec![0.0; gradients.len()]
});
// Apply momentum and update parameters
for i in 0..current_params.len() {
// Update momentum buffer
momentum_buffer[i] = self.momentum * momentum_buffer[i] + gradients[i];
// Update parameters
current_params[i] -= self.learning_rate * momentum_buffer[i];
}
// Increment version
*current_version += 1;
// Update worker statistics
if let Some(worker_info) = self.workers.get_mut(&update.worker_id) {
worker_info.last_update = SystemTime::now();
worker_info.updates_sent += 1;
}
Ok(())
}
pub fn pull_parameters(&self, layer_id: usize) -> Option<(Vec<f32>, u64)> {
if let (Some(params), Some(version)) = (
self.parameters.get(&layer_id),
self.parameter_versions.get(&layer_id)
) {
Some((params.clone(), *version))
} else {
None
}
}
pub fn register_worker(&mut self, worker_id: String, capabilities: WorkerCapabilities) {
let worker_info = WorkerInfo {
id: worker_id.clone(),
capabilities,
last_update: SystemTime::now(),
updates_sent: 0,
is_active: true,
};
self.workers.insert(worker_id, worker_info);
}
}
#[derive(Clone)]
struct WorkerInfo {
id: String,
capabilities: WorkerCapabilities,
last_update: SystemTime,
updates_sent: u64,
is_active: bool,
}
#[derive(Clone, Serialize, Deserialize)]
struct WorkerCapabilities {
gpu_count: usize,
memory_gb: f32,
compute_capability: String,
}
pub struct DistributedWorker {
worker_id: String,
server_addr: String,
local_model: GpuNeuralNetwork,
parameter_cache: HashMap<usize, (Vec<f32>, u64)>,
sync_frequency: usize,
current_iteration: usize,
}
impl DistributedWorker {
pub fn new(
worker_id: String,
server_addr: String,
model_config: Vec<usize>
) -> Result<Self, Box<dyn std::error::Error>> {
let local_model = GpuNeuralNetwork::new(model_config)?;
Ok(Self {
worker_id,
server_addr,
local_model,
parameter_cache: HashMap::new(),
sync_frequency: 10, // Sync every 10 iterations
current_iteration: 0,
})
}
pub async fn train_distributed(
&mut self,
training_data: Vec<(Vec<f32>, Vec<f32>)>
) -> Result<(), Box<dyn std::error::Error>> {
// Connect to parameter server
let mut stream = TcpStream::connect(&self.server_addr).await?;
// Register with server
self.register_with_server(&mut stream).await?;
// Pull initial parameters
self.pull_all_parameters(&mut stream).await?;
// Training loop
for (input, target) in training_data {
// Forward pass
let _output = self.local_model.forward_pass(&input)?;
// Backward pass
let _loss = self.local_model.backward_pass(&target)?;
self.current_iteration += 1;
// Sync with parameter server periodically
if self.current_iteration % self.sync_frequency == 0 {
self.sync_with_server(&mut stream).await?;
}
}
Ok(())
}
async fn sync_with_server(&mut self, stream: &mut TcpStream) -> Result<(), Box<dyn std::error::Error>> {
// Push gradients to server
let gradients = self.local_model.get_gradients()?;
// Split gradients by layer and send updates
let mut gradient_offset = 0;
for (layer_id, layer_size) in self.local_model.get_layer_sizes().windows(2).enumerate() {
let layer_gradient_size = layer_size[0] * layer_size[1];
let layer_gradients = gradients[gradient_offset..gradient_offset + layer_gradient_size].to_vec();
let update = ParameterUpdate {
layer_id,
gradients: layer_gradients,
worker_id: self.worker_id.clone(),
version: self.get_parameter_version(layer_id),
};
self.send_gradient_update(stream, update).await?;
gradient_offset += layer_gradient_size;
}
// Pull updated parameters
self.pull_all_parameters(stream).await?;
Ok(())
}
async fn pull_all_parameters(&mut self, stream: &mut TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let layer_count = self.local_model.get_layer_count();
for layer_id in 0..layer_count {
if let Some((params, version)) = self.pull_layer_parameters(stream, layer_id).await? {
// Update local model with new parameters
self.local_model.set_layer_parameters(layer_id, ¶ms)?;
// Update cache
self.parameter_cache.insert(layer_id, (params, version));
}
}
Ok(())
}
fn get_parameter_version(&self, layer_id: usize) -> u64 {
self.parameter_cache.get(&layer_id)
.map(|(_, version)| *version)
.unwrap_or(0)
}
}
use ruv_fann::federated::{FederatedServer, FederatedClient, AggregationStrategy};
use differential_privacy::{DifferentialPrivacy, NoiseDistribution};
pub struct FederatedLearningServer {
global_model: Network,
client_models: HashMap<String, ClientModel>,
aggregation_strategy: AggregationStrategy,
privacy_budget: f64,
noise_mechanism: NoiseDistribution,
round_number: u32,
min_clients_per_round: usize,
}
impl FederatedLearningServer {
pub fn new(
model_config: Vec<usize>,
aggregation_strategy: AggregationStrategy,
privacy_budget: f64
) -> Result<Self, Box<dyn std::error::Error>> {
let global_model = Network::new(&model_config)?;
Ok(Self {
global_model,
client_models: HashMap::new(),
aggregation_strategy,
privacy_budget,
noise_mechanism: NoiseDistribution::Gaussian { sigma: 0.1 },
round_number: 0,
min_clients_per_round: 5,
})
}
pub async fn run_federated_round(&mut self) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting federated round {}", self.round_number);
// Select clients for this round
let selected_clients = self.select_clients_for_round().await?;
println!("Selected {} clients for training", selected_clients.len());
if selected_clients.len() < self.min_clients_per_round {
return Err("Not enough clients available for training".into());
}
// Send global model to selected clients
let global_weights = self.global_model.get_weights();
for client_id in &selected_clients {
self.send_model_to_client(client_id, &global_weights).await?;
}
// Wait for client updates
let client_updates = self.collect_client_updates(selected_clients).await?;
// Aggregate client models
let aggregated_weights = self.aggregate_client_models(client_updates)?;
// Apply differential privacy
let private_weights = self.apply_differential_privacy(aggregated_weights)?;
// Update global model
self.global_model.set_weights(&private_weights)?;
self.round_number += 1;
println!("Completed federated round {}", self.round_number - 1);
Ok(())
}
async fn select_clients_for_round(&self) -> Result<Vec<String>, Box<dyn std::error::Error>> {
// Select a random subset of available clients
let available_clients: Vec<String> = self.client_models.keys()
.filter(|client_id| self.is_client_available(client_id))
.cloned()
.collect();
let num_clients_to_select = std::cmp::min(
available_clients.len(),
(available_clients.len() as f32 * 0.3).ceil() as usize // Select 30% of clients
);
let mut rng = rand::thread_rng();
let selected_clients = available_clients
.choose_multiple(&mut rng, num_clients_to_select)
.cloned()
.collect();
Ok(selected_clients)
}
async fn collect_client_updates(
&mut self,
selected_clients: Vec<String>
) -> Result<Vec<ClientUpdate>, Box<dyn std::error::Error>> {
let mut client_updates = Vec::new();
let timeout_duration = Duration::from_secs(300); // 5 minute timeout
for client_id in selected_clients {
// Wait for client to complete training and send update
let update = tokio::time::timeout(
timeout_duration,
self.receive_client_update(&client_id)
).await;
match update {
Ok(Ok(client_update)) => {
client_updates.push(client_update);
},
Ok(Err(e)) => {
eprintln!("Error receiving update from client {}: {}", client_id, e);
},
Err(_) => {
eprintln!("Timeout waiting for update from client {}", client_id);
// Mark client as potentially slow or unavailable
self.mark_client_slow(&client_id);
}
}
}
Ok(client_updates)
}
fn aggregate_client_models(&self, updates: Vec<ClientUpdate>) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
match self.aggregation_strategy {
AggregationStrategy::FederatedAveraging => {
self.federated_averaging(updates)
},
AggregationStrategy::WeightedAveraging => {
self.weighted_averaging(updates)
},
AggregationStrategy::MedianAggregation => {
self.median_aggregation(updates)
}
}
}
fn federated_averaging(&self, updates: Vec<ClientUpdate>) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
if updates.is_empty() {
return Err("No client updates to aggregate".into());
}
let weight_size = updates[0].model_weights.len();
let mut aggregated_weights = vec![0.0; weight_size];
// Calculate total samples across all clients
let total_samples: usize = updates.iter().map(|u| u.num_samples).sum();
// Weighted average based on number of samples
for update in updates {
let weight = update.num_samples as f32 / total_samples as f32;
for i in 0..weight_size {
aggregated_weights[i] += weight * update.model_weights[i];
}
}
Ok(aggregated_weights)
}
fn weighted_averaging(&self, updates: Vec<ClientUpdate>) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
// Similar to federated averaging but uses client-reported performance metrics
let weight_size = updates[0].model_weights.len();
let mut aggregated_weights = vec![0.0; weight_size];
// Calculate weights based on inverse of client loss (better performance = higher weight)
let total_weight: f32 = updates.iter()
.map(|u| 1.0 / (u.training_loss + 1e-8))
.sum();
for update in updates {
let weight = (1.0 / (update.training_loss + 1e-8)) / total_weight;
for i in 0..weight_size {
aggregated_weights[i] += weight * update.model_weights[i];
}
}
Ok(aggregated_weights)
}
fn apply_differential_privacy(&self, weights: Vec<f32>) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
let mut private_weights = weights;
match self.noise_mechanism {
NoiseDistribution::Gaussian { sigma } => {
let mut rng = rand::thread_rng();
let normal = Normal::new(0.0, sigma).unwrap();
for weight in &mut private_weights {
let noise: f32 = normal.sample(&mut rng) as f32;
*weight += noise;
}
},
NoiseDistribution::Laplace { scale } => {
let mut rng = rand::thread_rng();
for weight in &mut private_weights {
let u1: f32 = rng.gen_range(0.0..1.0);
let u2: f32 = rng.gen_range(0.0..1.0);
let laplace_noise = if u1 < 0.5 {
scale * (2.0 * u1).ln()
} else {
-scale * (2.0 * (1.0 - u1)).ln()
};
*weight += laplace_noise;
}
}
}
Ok(private_weights)
}
}
#[derive(Clone)]
struct ClientUpdate {
client_id: String,
model_weights: Vec<f32>,
num_samples: usize,
training_loss: f32,
training_accuracy: f32,
computation_time: Duration,
}
#[derive(Clone)]
enum AggregationStrategy {
FederatedAveraging,
WeightedAveraging,
MedianAggregation,
}
pub struct FederatedClient {
client_id: String,
local_model: Network,
local_data: Vec<(Vec<f32>, Vec<f32>)>,
server_addr: String,
privacy_level: PrivacyLevel,
}
impl FederatedClient {
pub async fn participate_in_round(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// Receive global model from server
let global_weights = self.receive_global_model().await?;
self.local_model.set_weights(&global_weights)?;
// Train locally
let training_start = Instant::now();
let (loss, accuracy) = self.train_local_model().await?;
let training_time = training_start.elapsed();
// Prepare model update
let mut updated_weights = self.local_model.get_weights();
// Apply local differential privacy if requested
if let PrivacyLevel::Local { epsilon } = self.privacy_level {
updated_weights = self.apply_local_privacy(updated_weights, epsilon)?;
}
// Send update to server
let update = ClientUpdate {
client_id: self.client_id.clone(),
model_weights: updated_weights,
num_samples: self.local_data.len(),
training_loss: loss,
training_accuracy: accuracy,
computation_time: training_time,
};
self.send_update_to_server(update).await?;
Ok(())
}
}
This comprehensive Code Examples document provides practical, production-ready examples covering all major aspects of the ruv-FANN framework, from basic neural networks to advanced distributed training scenarios. Each example includes detailed implementations with proper error handling and modern Rust patterns.