Distributed Computing - ruvnet/ruv-FANN GitHub Wiki
A comprehensive guide to distributed systems architecture, protocols, and implementation strategies for building scalable, fault-tolerant distributed applications.
- Overview
- Distributed Architecture Patterns
- Consensus Protocols
- Data Synchronization
- Fault Tolerance
- Scalability Strategies
- Network Communication
- Implementation Considerations
- Best Practices
Distributed computing involves multiple computers working together to solve computational problems or provide services. These systems are characterized by:
- Geographic Distribution: Components are spread across multiple locations
- Concurrent Execution: Multiple processes run simultaneously
- Independent Failures: Components can fail independently
- Lack of Global Clock: No shared time reference
- Asynchronous Communication: Message passing with varying delays
- Network Partitions: Connectivity issues between nodes
- Consistency: Maintaining data coherence across nodes
- Availability: Ensuring system remains operational
- Partition Tolerance: Continuing operation despite network failures
- Latency: Managing communication delays
- Load Distribution: Balancing work across nodes
Pattern: Centralized server handles requests from multiple clients.
Client <---> Server <---> Database
Client <---> Server <---> Database
Client <---> Server <---> Database
Characteristics:
- Simple to implement and reason about
- Central point of control and failure
- Limited scalability due to server bottleneck
Use Cases: Web applications, database systems, file servers
Pattern: All nodes are equal and can act as both clients and servers.
Node A <---> Node B
| |
v v
Node D <---> Node C
Characteristics:
- No single point of failure
- Self-organizing and self-healing
- Complex coordination and consistency
Use Cases: File sharing (BitTorrent), blockchain networks, distributed hash tables
Pattern: Master node coordinates work, slave nodes execute tasks.
Master Node
|
|-------- Slave 1
|-------- Slave 2
|-------- Slave 3
Characteristics:
- Clear hierarchy and control flow
- Master becomes bottleneck and single point of failure
- Easy to implement load balancing
Use Cases: Database replication, MapReduce, distributed computing frameworks
Pattern: Application split into small, independent services.
API Gateway
|
|-------- User Service
|-------- Order Service
|-------- Payment Service
|-------- Inventory Service
Characteristics:
- High modularity and independence
- Complex inter-service communication
- Technology diversity support
Use Cases: Large-scale web applications, cloud-native systems
Pattern: Components communicate through events and message queues.
Producer --> Message Queue --> Consumer
Producer --> Event Bus -----> Multiple Consumers
Characteristics:
- Loose coupling between components
- Asynchronous communication
- Complex event ordering and delivery guarantees
Use Cases: Real-time systems, IoT applications, financial trading systems
Consensus protocols ensure all nodes in a distributed system agree on a common value or state, even in the presence of failures.
Purpose: Leader-based consensus for replicated state machines.
Leader Election:
1. Nodes start as followers
2. Election timeout triggers candidate state
3. Candidate requests votes from other nodes
4. Node with majority votes becomes leader
5. Leader sends heartbeats to maintain leadership
Log Replication:
1. Client sends command to leader
2. Leader appends entry to its log
3. Leader replicates entry to followers
4. Once majority confirms, leader commits entry
5. Leader notifies followers to commit
Safety Properties:
- Election Safety: At most one leader per term
- Leader Append-Only: Leader never overwrites log entries
- Log Matching: Identical entries at same index across logs
- Leader Completeness: Committed entries appear in future leaders
- State Machine Safety: Servers apply same commands in same order
struct RaftNode {
current_term: u64,
voted_for: Option<NodeId>,
log: Vec<LogEntry>,
commit_index: usize,
last_applied: usize,
// Leader state
next_index: HashMap<NodeId, usize>,
match_index: HashMap<NodeId, usize>,
}
impl RaftNode {
fn start_election(&mut self) {
self.current_term += 1;
self.voted_for = Some(self.id);
// Send RequestVote RPCs to all nodes
}
fn append_entries(&mut self, entries: Vec<LogEntry>) {
// Replicate entries to followers
// Wait for majority confirmation
// Commit entries and apply to state machine
}
}Purpose: Consensus in presence of arbitrary (malicious) failures.
Three-Phase Protocol:
- Pre-prepare: Primary broadcasts request sequence
- Prepare: Replicas agree on request ordering
- Commit: Replicas commit to executing request
Safety Requirements:
- Requires 3f + 1 replicas to tolerate f Byzantine failures
- All non-faulty replicas agree on same sequence of requests
- Clients eventually receive replies to their requests
Client --> Primary --> All Replicas (Pre-prepare)
All Replicas <--> All Replicas (Prepare)
All Replicas <--> All Replicas (Commit)
All Replicas --> Client (Reply)
HotStuff:
- Linear communication complexity
- Pipelined consensus phases
- View synchronization for liveness
Tendermint:
- Immediate finality
- Fork accountability
- Application-agnostic design
Purpose: Epidemic-style information dissemination.
def gossip_round():
# Select random subset of peers
peers = random.sample(known_peers, fanout)
# Send updates to selected peers
for peer in peers:
send_updates(peer, local_state)
# Receive updates from peers
for update in received_updates:
merge_update(update)Properties:
- Probabilistic delivery guarantees
- Scalable to large networks
- Resilient to node failures
- Eventually consistent
Variants:
- Push Gossip: Nodes push updates to others
- Pull Gossip: Nodes request updates from others
- Push-Pull Gossip: Combination of both approaches
- Operations appear to execute atomically at some point between start and completion
- Real-time ordering is preserved
- Strongest consistency model
- Operations appear to execute in some sequential order
- All processes see same ordering
- Per-process program order is preserved
- System will become consistent over time
- No guarantees on when consistency will be achieved
- Allows for temporary inconsistencies
- Causally related operations are seen in same order by all processes
- Concurrent operations may be seen in different orders
- Weaker than sequential, stronger than eventual
def resolve_conflict(local_value, remote_value):
if remote_value.timestamp > local_value.timestamp:
return remote_value
return local_valueclass VectorClock:
def __init__(self, node_id, num_nodes):
self.clock = [0] * num_nodes
self.node_id = node_id
def tick(self):
self.clock[self.node_id] += 1
def update(self, other_clock):
for i in range(len(self.clock)):
self.clock[i] = max(self.clock[i], other_clock[i])
self.tick()G-Counter (Grow-only Counter):
class GCounter:
def __init__(self, node_id, num_nodes):
self.counts = [0] * num_nodes
self.node_id = node_id
def increment(self):
self.counts[self.node_id] += 1
def merge(self, other):
for i in range(len(self.counts)):
self.counts[i] = max(self.counts[i], other.counts[i])
def value(self):
return sum(self.counts)OR-Set (Observed-Remove Set):
class ORSet:
def __init__(self):
self.added = {} # element -> set of unique tags
self.removed = set() # set of removed tags
def add(self, element):
tag = generate_unique_tag()
if element not in self.added:
self.added[element] = set()
self.added[element].add(tag)
def remove(self, element):
if element in self.added:
self.removed.update(self.added[element])
def contains(self, element):
if element not in self.added:
return False
return bool(self.added[element] - self.removed)- Process stops executing and doesn't resume
- Fail-stop: failure is detectable
- Fail-silent: failure may not be immediately detectable
- Process fails to send or receive messages
- Send omission: fails to send messages
- Receive omission: fails to receive messages
- Process responds too late or too early
- Common in real-time systems
- Can lead to system degradation
- Process exhibits arbitrary behavior
- May send conflicting information
- Most general and difficult to handle
class HeartbeatMonitor:
def __init__(self, timeout=5.0):
self.timeout = timeout
self.last_heartbeat = {}
def record_heartbeat(self, node_id):
self.last_heartbeat[node_id] = time.time()
def is_alive(self, node_id):
if node_id not in self.last_heartbeat:
return False
return time.time() - self.last_heartbeat[node_id] < self.timeout- Perfect Failure Detector: Never makes mistakes
- Eventually Perfect: Eventually stops making mistakes
- Strong: Detects all crashes, may have false positives
- Weak: Detects some crashes eventually
class CheckpointManager:
def __init__(self, checkpoint_interval=1000):
self.interval = checkpoint_interval
self.operation_count = 0
self.checkpoints = []
def execute_operation(self, operation):
result = apply_operation(operation)
self.operation_count += 1
if self.operation_count % self.interval == 0:
self.create_checkpoint()
return result
def create_checkpoint(self):
state = serialize_current_state()
self.checkpoints.append({
'timestamp': time.time(),
'operation_count': self.operation_count,
'state': state
})- Pessimistic Logging: Log messages before processing
- Optimistic Logging: Log messages asynchronously
- Causal Logging: Log only causally related messages
- Active Replication: All replicas process all requests
- Passive Replication: Primary processes, backups receive state updates
- Semi-Active Replication: Hybrid approach with partial processing
class LoadBalancer:
def __init__(self, servers):
self.servers = servers
self.current = 0
def round_robin(self, request):
server = self.servers[self.current]
self.current = (self.current + 1) % len(self.servers)
return server
def weighted_round_robin(self, request):
# Select server based on weights and current load
pass
def least_connections(self, request):
return min(self.servers, key=lambda s: s.active_connections)class ConsistentHashing:
def __init__(self, nodes, replicas=3):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.replicas):
key = self.hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys = sorted(self.ring.keys())
def get_node(self, key):
if not self.ring:
return None
hash_key = self.hash(key)
idx = bisect.bisect_right(self.sorted_keys, hash_key)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]- CPU optimization through parallel processing
- Memory optimization through caching strategies
- I/O optimization through batching and asynchronous operations
class DistributedCache:
def __init__(self, cache_nodes):
self.nodes = cache_nodes
self.hash_ring = ConsistentHashing(cache_nodes)
def get(self, key):
node = self.hash_ring.get_node(key)
return node.get(key)
def set(self, key, value, ttl=None):
node = self.hash_ring.get_node(key)
return node.set(key, value, ttl)
def invalidate(self, key):
node = self.hash_ring.get_node(key)
return node.delete(key)class AutoScaler:
def __init__(self, min_nodes=1, max_nodes=10):
self.min_nodes = min_nodes
self.max_nodes = max_nodes
self.current_nodes = min_nodes
self.metrics_history = []
def scale_decision(self, current_metrics):
self.metrics_history.append(current_metrics)
if len(self.metrics_history) < 5:
return 0 # Need more data
avg_cpu = sum(m['cpu'] for m in self.metrics_history[-5:]) / 5
avg_memory = sum(m['memory'] for m in self.metrics_history[-5:]) / 5
if avg_cpu > 80 or avg_memory > 85:
return min(2, self.max_nodes - self.current_nodes)
elif avg_cpu < 30 and avg_memory < 40:
return -min(1, self.current_nodes - self.min_nodes)
return 0- Machine learning models to predict load
- Time-based scaling for known patterns
- Event-driven scaling for anticipated spikes
class SynchronousClient:
def __init__(self, timeout=30):
self.timeout = timeout
def request(self, server, message):
try:
response = server.send_request(message, timeout=self.timeout)
return response
except TimeoutError:
raise CommunicationError("Request timed out")import asyncio
class AsynchronousClient:
async def request(self, server, message):
try:
response = await server.send_request(message)
return response
except asyncio.TimeoutError:
raise CommunicationError("Request timed out")
async def batch_requests(self, requests):
tasks = [self.request(server, msg) for server, msg in requests]
return await asyncio.gather(*tasks, return_exceptions=True)class MessageQueue:
def __init__(self, max_size=1000):
self.queue = asyncio.Queue(maxsize=max_size)
async def publish(self, message):
await self.queue.put(message)
async def consume(self):
return await self.queue.get()
async def batch_consume(self, batch_size=10):
batch = []
for _ in range(batch_size):
try:
message = self.queue.get_nowait()
batch.append(message)
except asyncio.QueueEmpty:
break
return batch- Reliable, ordered delivery
- Connection-oriented
- Flow control and congestion control
- Higher latency due to connection setup
- Unreliable, unordered delivery
- Connectionless
- Lower latency
- Suitable for real-time applications
class RESTClient:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def get(self, endpoint, params=None):
url = f"{self.base_url}/{endpoint}"
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def post(self, endpoint, data=None):
url = f"{self.base_url}/{endpoint}"
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()service DistributedService {
rpc ProcessRequest(RequestMessage) returns (ResponseMessage);
rpc StreamData(stream DataMessage) returns (stream DataMessage);
}
message RequestMessage {
string id = 1;
bytes payload = 2;
int64 timestamp = 3;
}class DNSServiceDiscovery:
def __init__(self, domain):
self.domain = domain
def discover_services(self, service_name):
try:
answers = dns.resolver.resolve(f"{service_name}.{self.domain}", 'SRV')
services = []
for answer in answers:
services.append({
'host': str(answer.target),
'port': answer.port,
'priority': answer.priority,
'weight': answer.weight
})
return services
except dns.resolver.NXDOMAIN:
return []class ServiceRegistry:
def __init__(self):
self.services = {}
self.health_checks = {}
def register(self, service_name, host, port, health_check_url=None):
if service_name not in self.services:
self.services[service_name] = []
service_info = {
'host': host,
'port': port,
'registered_at': time.time(),
'last_heartbeat': time.time()
}
self.services[service_name].append(service_info)
if health_check_url:
self.health_checks[f"{host}:{port}"] = health_check_url
def discover(self, service_name):
if service_name not in self.services:
return []
# Filter out unhealthy services
healthy_services = []
for service in self.services[service_name]:
if self.is_healthy(service):
healthy_services.append(service)
return healthy_services
def is_healthy(self, service):
key = f"{service['host']}:{service['port']}"
if key not in self.health_checks:
return True # Assume healthy if no health check
try:
response = requests.get(self.health_checks[key], timeout=5)
return response.status_code == 200
except:
return FalseThe CAP theorem states that in a distributed system, you can only guarantee two of:
- Consistency: All nodes see the same data at the same time
- Availability: System remains operational
- Partition Tolerance: System continues despite network failures
CP Systems (Consistency + Partition Tolerance):
- MongoDB, Redis Cluster, HBase
- Sacrifice availability during partitions
- Strong consistency guarantees
AP Systems (Availability + Partition Tolerance):
- DynamoDB, Cassandra, CouchDB
- Eventually consistent
- Always available for reads/writes
CA Systems (Consistency + Availability):
- Traditional RDBMS in single-node deployments
- Not truly distributed
- Cannot handle network partitions
class BatchProcessor:
def __init__(self, batch_size=100, flush_interval=1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch = []
self.last_flush = time.time()
def add_operation(self, operation):
self.batch.append(operation)
if (len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.flush_interval):
self.flush()
def flush(self):
if self.batch:
self.process_batch(self.batch)
self.batch = []
self.last_flush = time.time()class ConnectionPool:
def __init__(self, host, port, max_connections=10):
self.host = host
self.port = port
self.max_connections = max_connections
self.pool = asyncio.Queue(maxsize=max_connections)
# Initialize pool with connections
for _ in range(max_connections):
conn = self.create_connection()
self.pool.put_nowait(conn)
async def get_connection(self):
return await self.pool.get()
async def return_connection(self, connection):
if connection.is_healthy():
await self.pool.put(connection)
else:
# Create new connection to replace unhealthy one
new_conn = self.create_connection()
await self.pool.put(new_conn)class DistributedTracer:
def __init__(self, service_name):
self.service_name = service_name
self.active_spans = {}
def start_span(self, operation_name, parent_span_id=None):
span_id = self.generate_span_id()
trace_id = parent_span_id or self.generate_trace_id()
span = {
'span_id': span_id,
'trace_id': trace_id,
'operation_name': operation_name,
'service_name': self.service_name,
'start_time': time.time(),
'parent_span_id': parent_span_id,
'tags': {},
'logs': []
}
self.active_spans[span_id] = span
return span
def finish_span(self, span_id):
if span_id in self.active_spans:
span = self.active_spans[span_id]
span['end_time'] = time.time()
span['duration'] = span['end_time'] - span['start_time']
# Send span to tracing backend
self.send_span(span)
del self.active_spans[span_id]class MetricsCollector:
def __init__(self):
self.counters = {}
self.gauges = {}
self.histograms = {}
def increment_counter(self, name, value=1, tags=None):
key = self.build_key(name, tags)
self.counters[key] = self.counters.get(key, 0) + value
def set_gauge(self, name, value, tags=None):
key = self.build_key(name, tags)
self.gauges[key] = value
def record_histogram(self, name, value, tags=None):
key = self.build_key(name, tags)
if key not in self.histograms:
self.histograms[key] = []
self.histograms[key].append(value)- Each service should have one clear responsibility
- Avoid creating monolithic distributed services
- Clear service boundaries and interfaces
- Minimize dependencies between services
- Use message queues for asynchronous communication
- Implement circuit breakers for external dependencies
- Related functionality should be grouped together
- Minimize data transfer between services
- Design for local processing where possible
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise CircuitBreakerOpenError()
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'class RetryHandler:
def __init__(self, max_retries=3, base_delay=1.0, max_delay=60.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
async def retry(self, func, *args, **kwargs):
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries:
raise e
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)class ResourcePool:
def __init__(self, pool_size=10):
self.pool_size = pool_size
self.semaphore = asyncio.Semaphore(pool_size)
self.active_requests = 0
async def execute(self, func, *args, **kwargs):
async with self.semaphore:
self.active_requests += 1
try:
return await func(*args, **kwargs)
finally:
self.active_requests -= 1class JWTAuthenticator:
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def generate_token(self, user_id, expiration_hours=24):
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=expiration_hours),
'iat': datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token):
try:
payload = jwt.decode(token, self.secret_key,
algorithms=[self.algorithm])
return payload['user_id']
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")- Use TLS/SSL for all network communication
- Implement mutual TLS (mTLS) for service-to-service communication
- Regular certificate rotation and management
- Encrypt sensitive data at rest and in transit
- Implement proper key management
- Regular security audits and penetration testing
class MockNetworkService:
def __init__(self, responses=None, failures=None):
self.responses = responses or {}
self.failures = failures or set()
self.call_count = 0
async def send_request(self, endpoint, data):
self.call_count += 1
if endpoint in self.failures:
raise NetworkError(f"Simulated failure for {endpoint}")
if endpoint in self.responses:
return self.responses[endpoint]
return {"status": "success", "data": "mock_response"}class DistributedSystemTestHarness:
def __init__(self):
self.services = {}
self.message_queues = {}
def start_service(self, service_name, config):
service = ServiceFactory.create(service_name, config)
self.services[service_name] = service
service.start()
def stop_service(self, service_name):
if service_name in self.services:
self.services[service_name].stop()
del self.services[service_name]
def simulate_network_partition(self, service_group_a, service_group_b):
# Block communication between two groups of services
pass
def verify_consistency(self):
# Check that all services have consistent state
passclass ChaosMonkey:
def __init__(self, services):
self.services = services
self.failures = []
def kill_random_service(self):
service = random.choice(list(self.services.keys()))
self.services[service].stop()
self.failures.append({
'type': 'service_kill',
'service': service,
'timestamp': time.time()
})
def introduce_network_latency(self, min_delay=100, max_delay=1000):
delay = random.randint(min_delay, max_delay)
# Add network delay to random service
pass
def corrupt_data(self, service_name, corruption_rate=0.01):
# Introduce data corruption in storage
passDistributed computing presents unique challenges and opportunities. Success requires careful consideration of:
- Architecture Patterns: Choose the right pattern for your use case
- Consistency Models: Balance consistency with availability and performance
- Fault Tolerance: Design for failure and implement robust recovery mechanisms
- Scalability: Plan for growth and implement appropriate scaling strategies
- Monitoring: Implement comprehensive observability for system health
- Security: Protect data and communications in a distributed environment
The key to successful distributed systems is understanding trade-offs and making informed decisions based on your specific requirements and constraints.