WebSocket Events - reza899/AutoSDLC GitHub Wiki
#AutoSDLC #WebSocket #RealTime #Events
← Back to Index | ← API Specification
The WebSocket Event System provides real-time bidirectional communication between the AutoSDLC web interface and backend services. This enables live updates of agent status, task progress, system notifications, and interactive debugging capabilities.
graph TB
subgraph "Client Layer"
WEB[Web Dashboard]
CLI[CLI Tool]
MOB[Mobile App]
end
subgraph "Gateway Layer"
LB[Load Balancer]
WS1[WS Server 1]
WS2[WS Server 2]
WS3[WS Server 3]
end
subgraph "Backend Services"
SUB[Subscription Manager]
EVT[Event Processor]
AUTH[Auth Service]
CACHE[Redis PubSub]
end
subgraph "Event Sources"
AG[Agents]
WF[Workflow Engine]
SYS[System Monitor]
GH[GitHub Webhooks]
end
WEB & CLI & MOB --> LB
LB --> WS1 & WS2 & WS3
WS1 & WS2 & WS3 --> SUB
SUB --> EVT
EVT --> CACHE
AG & WF & SYS & GH --> CACHE
WS1 & WS2 & WS3 --> AUTH
interface WebSocketConfig {
url: string;
reconnect: {
enabled: boolean;
delay: number;
maxDelay: number;
attempts: number;
};
heartbeat: {
interval: number;
timeout: number;
};
authentication: {
type: 'jwt' | 'api-key';
token: string;
};
}
class WebSocketConnection {
private ws: WebSocket;
private config: WebSocketConfig;
private reconnectAttempts: number = 0;
private subscriptions: Map<string, Subscription>;
private messageQueue: Message[];
async connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.config.url);
this.ws.onopen = async () => {
console.log('WebSocket connected');
// Authenticate
await this.authenticate();
// Restore subscriptions
await this.restoreSubscriptions();
// Flush message queue
await this.flushMessageQueue();
// Start heartbeat
this.startHeartbeat();
resolve();
};
this.ws.onclose = (event) => {
console.log('WebSocket disconnected:', event.code, event.reason);
if (this.config.reconnect.enabled) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
});
}
}
interface WebSocketMessage {
id: string;
type: MessageType;
timestamp: number;
data: any;
metadata?: MessageMetadata;
}
interface MessageMetadata {
version: string;
source?: string;
correlationId?: string;
compressed?: boolean;
encrypted?: boolean;
}
enum MessageType {
// Connection management
CONNECT = 'connect',
DISCONNECT = 'disconnect',
PING = 'ping',
PONG = 'pong',
ERROR = 'error',
// Authentication
AUTH_REQUEST = 'auth_request',
AUTH_SUCCESS = 'auth_success',
AUTH_FAILURE = 'auth_failure',
// Subscriptions
SUBSCRIBE = 'subscribe',
UNSUBSCRIBE = 'unsubscribe',
SUBSCRIPTION_UPDATE = 'subscription_update',
// Agent events
AGENT_STATUS = 'agent_status',
AGENT_TASK_UPDATE = 'agent_task_update',
AGENT_LOG = 'agent_log',
AGENT_METRIC = 'agent_metric',
AGENT_ERROR = 'agent_error',
// TDD events
TDD_PHASE_CHANGE = 'tdd_phase_change',
TEST_RESULT = 'test_result',
COVERAGE_UPDATE = 'coverage_update',
// Workflow events
WORKFLOW_STARTED = 'workflow_started',
WORKFLOW_STEP_COMPLETE = 'workflow_step_complete',
WORKFLOW_COMPLETE = 'workflow_complete',
WORKFLOW_FAILED = 'workflow_failed',
// System events
SYSTEM_NOTIFICATION = 'system_notification',
SYSTEM_ALERT = 'system_alert',
DEPLOYMENT_STATUS = 'deployment_status',
// GitHub events
GITHUB_ISSUE_UPDATE = 'github_issue_update',
GITHUB_PR_UPDATE = 'github_pr_update',
GITHUB_WORKFLOW_UPDATE = 'github_workflow_update'
}
{
"id": "evt-123e4567-e89b-12d3-a456-426614174000",
"type": "agent_status",
"timestamp": 1703775135000,
"data": {
"agentId": "coder-001",
"agentType": "coder",
"status": "busy",
"currentTask": {
"id": "task-789",
"type": "implement_feature",
"progress": 65,
"startTime": 1703774300000
},
"metrics": {
"cpuUsage": 45.2,
"memoryUsage": 1024000000,
"tasksCompleted": 12,
"uptime": 86400
}
},
"metadata": {
"version": "1.0",
"source": "agent-monitor"
}
}
{
"id": "evt-234e5678-f89c-23e4-b567-537625285111",
"type": "tdd_phase_change",
"timestamp": 1703775200000,
"data": {
"agentId": "coder-001",
"taskId": "task-789",
"previousPhase": "red",
"currentPhase": "green",
"testResults": {
"total": 45,
"passing": 45,
"failing": 0,
"skipped": 0
},
"coverage": {
"lines": 98.5,
"branches": 95.2,
"functions": 100,
"statements": 97.8
},
"duration": 1847
}
}
// client/WebSocketClient.ts
export class AutoSDLCWebSocketClient {
private connection: WebSocketConnection;
private eventHandlers: Map<string, Set<EventHandler>>;
private subscriptions: Map<string, SubscriptionConfig>;
constructor(config: ClientConfig) {
this.connection = new WebSocketConnection(config);
this.eventHandlers = new Map();
this.subscriptions = new Map();
}
async connect(): Promise<void> {
await this.connection.connect();
// Setup default handlers
this.setupDefaultHandlers();
}
// Event subscription
on(eventType: string, handler: EventHandler): Unsubscribe {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, new Set());
}
this.eventHandlers.get(eventType)!.add(handler);
return () => {
this.eventHandlers.get(eventType)?.delete(handler);
};
}
// Subscribe to specific resources
async subscribe(subscription: SubscriptionConfig): Promise<string> {
const subscriptionId = this.generateSubscriptionId();
this.subscriptions.set(subscriptionId, subscription);
await this.send({
type: MessageType.SUBSCRIBE,
data: {
subscriptionId,
...subscription
}
});
return subscriptionId;
}
// Send message
async send(message: Partial<WebSocketMessage>): Promise<void> {
const fullMessage: WebSocketMessage = {
id: message.id || this.generateMessageId(),
type: message.type!,
timestamp: Date.now(),
data: message.data,
metadata: {
version: '1.0',
...message.metadata
}
};
if (this.connection.isConnected()) {
this.connection.send(JSON.stringify(fullMessage));
} else {
// Queue message for when connection is restored
this.connection.queueMessage(fullMessage);
}
}
}
// client/hooks/useWebSocket.ts
import { useEffect, useState, useCallback, useRef } from 'react';
export function useWebSocket(url: string, options?: WebSocketOptions) {
const [isConnected, setIsConnected] = useState(false);
const [lastMessage, setLastMessage] = useState<WebSocketMessage | null>(null);
const clientRef = useRef<AutoSDLCWebSocketClient>();
useEffect(() => {
const client = new AutoSDLCWebSocketClient({
url,
...options
});
clientRef.current = client;
client.connect()
.then(() => setIsConnected(true))
.catch(console.error);
// Setup message handler
client.on('*', (message) => {
setLastMessage(message);
});
return () => {
client.disconnect();
};
}, [url]);
const sendMessage = useCallback((message: any) => {
clientRef.current?.send(message);
}, []);
const subscribe = useCallback((subscription: SubscriptionConfig) => {
return clientRef.current?.subscribe(subscription);
}, []);
return {
isConnected,
lastMessage,
sendMessage,
subscribe
};
}
// Usage example
function AgentStatusComponent({ agentId }: { agentId: string }) {
const { isConnected, subscribe } = useWebSocket('/ws');
const [agentStatus, setAgentStatus] = useState<AgentStatus>();
useEffect(() => {
if (isConnected) {
const subscriptionId = subscribe({
resource: 'agent',
resourceId: agentId,
events: ['status', 'task_update', 'metric']
});
return () => {
// Unsubscribe on cleanup
};
}
}, [isConnected, agentId]);
return (
<div>
{/* Agent status UI */}
</div>
);
}
// server/WebSocketServer.ts
import { Server } from 'ws';
import { Redis } from 'ioredis';
export class AutoSDLCWebSocketServer {
private wss: Server;
private clients: Map<string, ClientConnection>;
private pubsub: Redis;
private subscriptionManager: SubscriptionManager;
constructor(config: ServerConfig) {
this.wss = new Server({
port: config.port,
verifyClient: this.verifyClient.bind(this)
});
this.clients = new Map();
this.pubsub = new Redis(config.redis);
this.subscriptionManager = new SubscriptionManager();
this.setupHandlers();
this.setupPubSub();
}
private setupHandlers(): void {
this.wss.on('connection', (ws, request) => {
const clientId = this.generateClientId();
const client = new ClientConnection(clientId, ws, request);
this.clients.set(clientId, client);
ws.on('message', (data) => {
this.handleMessage(client, data);
});
ws.on('close', () => {
this.handleDisconnect(client);
});
ws.on('error', (error) => {
console.error(`Client ${clientId} error:`, error);
});
// Send connect confirmation
this.sendToClient(client, {
type: MessageType.CONNECT,
data: { clientId }
});
});
}
private async handleMessage(
client: ClientConnection,
data: WebSocket.Data
): Promise<void> {
try {
const message = JSON.parse(data.toString()) as WebSocketMessage;
switch (message.type) {
case MessageType.AUTH_REQUEST:
await this.handleAuth(client, message);
break;
case MessageType.SUBSCRIBE:
await this.handleSubscribe(client, message);
break;
case MessageType.UNSUBSCRIBE:
await this.handleUnsubscribe(client, message);
break;
case MessageType.PING:
this.sendToClient(client, {
type: MessageType.PONG,
data: { timestamp: Date.now() }
});
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
} catch (error) {
console.error('Message handling error:', error);
this.sendToClient(client, {
type: MessageType.ERROR,
data: {
error: 'Invalid message format',
details: error.message
}
});
}
}
}
// server/EventBroadcaster.ts
export class EventBroadcaster {
private redis: Redis;
private subscriptions: Map<string, Set<ClientConnection>>;
constructor(redis: Redis) {
this.redis = redis;
this.subscriptions = new Map();
}
async broadcast(event: SystemEvent): Promise<void> {
// Publish to Redis for distribution across servers
await this.redis.publish('system:events', JSON.stringify(event));
// Local broadcast to subscribed clients
const subscribers = this.getSubscribers(event);
const message: WebSocketMessage = {
id: this.generateMessageId(),
type: this.mapEventType(event.type),
timestamp: Date.now(),
data: event.data,
metadata: {
version: '1.0',
source: event.source
}
};
// Send to all subscribed clients
await Promise.all(
Array.from(subscribers).map(client =>
this.sendToClient(client, message)
)
);
}
private getSubscribers(event: SystemEvent): Set<ClientConnection> {
const subscribers = new Set<ClientConnection>();
// Check each subscription
for (const [pattern, clients] of this.subscriptions) {
if (this.matchesPattern(event, pattern)) {
clients.forEach(client => subscribers.add(client));
}
}
return subscribers;
}
private matchesPattern(event: SystemEvent, pattern: string): boolean {
// Pattern matching logic
// e.g., "agent:*:status" matches "agent:coder-001:status"
const regex = new RegExp(
'^' + pattern.replace(/\*/g, '.*').replace(/:/g, '\\:') + '$'
);
return regex.test(event.key);
}
}
// server/SubscriptionManager.ts
export class SubscriptionManager {
private subscriptions: Map<string, Subscription>;
private clientSubscriptions: Map<string, Set<string>>;
private resourceSubscriptions: Map<string, Set<string>>;
async subscribe(
client: ClientConnection,
config: SubscriptionConfig
): Promise<string> {
const subscription: Subscription = {
id: config.subscriptionId || this.generateId(),
clientId: client.id,
config: config,
createdAt: new Date(),
active: true
};
// Store subscription
this.subscriptions.set(subscription.id, subscription);
// Index by client
if (!this.clientSubscriptions.has(client.id)) {
this.clientSubscriptions.set(client.id, new Set());
}
this.clientSubscriptions.get(client.id)!.add(subscription.id);
// Index by resource
const resourceKey = `${config.resource}:${config.resourceId}`;
if (!this.resourceSubscriptions.has(resourceKey)) {
this.resourceSubscriptions.set(resourceKey, new Set());
}
this.resourceSubscriptions.get(resourceKey)!.add(subscription.id);
// Setup event filters
await this.setupFilters(subscription);
return subscription.id;
}
async unsubscribe(subscriptionId: string): Promise<void> {
const subscription = this.subscriptions.get(subscriptionId);
if (!subscription) return;
// Remove from indices
this.clientSubscriptions.get(subscription.clientId)?.delete(subscriptionId);
const resourceKey = `${subscription.config.resource}:${subscription.config.resourceId}`;
this.resourceSubscriptions.get(resourceKey)?.delete(subscriptionId);
// Remove subscription
this.subscriptions.delete(subscriptionId);
// Cleanup filters
await this.cleanupFilters(subscription);
}
getClientSubscriptions(clientId: string): Subscription[] {
const subscriptionIds = this.clientSubscriptions.get(clientId) || new Set();
return Array.from(subscriptionIds)
.map(id => this.subscriptions.get(id))
.filter(sub => sub !== undefined) as Subscription[];
}
}
// Agent status change event
interface AgentStatusEvent {
type: 'agent_status';
data: {
agentId: string;
agentType: AgentType;
previousStatus: AgentStatus;
currentStatus: AgentStatus;
reason?: string;
timestamp: Date;
};
}
// Agent task update event
interface AgentTaskUpdateEvent {
type: 'agent_task_update';
data: {
agentId: string;
taskId: string;
taskType: string;
status: TaskStatus;
progress: number;
details?: any;
};
}
// Agent log event
interface AgentLogEvent {
type: 'agent_log';
data: {
agentId: string;
level: LogLevel;
message: string;
context?: any;
timestamp: Date;
};
}
// Agent metric event
interface AgentMetricEvent {
type: 'agent_metric';
data: {
agentId: string;
metrics: {
cpuUsage: number;
memoryUsage: number;
taskQueueDepth: number;
responseTime: number;
};
timestamp: Date;
};
}
// TDD phase change event
interface TDDPhaseChangeEvent {
type: 'tdd_phase_change';
data: {
agentId: string;
taskId: string;
previousPhase: TDDPhase;
currentPhase: TDDPhase;
testResults: TestResults;
coverage: CoverageReport;
timestamp: Date;
};
}
// Test result event
interface TestResultEvent {
type: 'test_result';
data: {
agentId: string;
taskId: string;
testFile: string;
results: {
total: number;
passing: number;
failing: number;
skipped: number;
duration: number;
};
failures?: TestFailure[];
};
}
// Coverage update event
interface CoverageUpdateEvent {
type: 'coverage_update';
data: {
agentId: string;
taskId: string;
coverage: {
lines: number;
branches: number;
functions: number;
statements: number;
};
uncoveredLines?: number[];
};
}
// Workflow started event
interface WorkflowStartedEvent {
type: 'workflow_started';
data: {
workflowId: string;
workflowType: string;
initiator: string;
parameters: any;
steps: WorkflowStep[];
estimatedDuration: number;
};
}
// Workflow step complete event
interface WorkflowStepCompleteEvent {
type: 'workflow_step_complete';
data: {
workflowId: string;
stepId: string;
stepType: string;
result: StepResult;
duration: number;
nextStep?: string;
};
}
// Workflow complete event
interface WorkflowCompleteEvent {
type: 'workflow_complete';
data: {
workflowId: string;
result: WorkflowResult;
duration: number;
outputs: any;
metrics: WorkflowMetrics;
};
}
// System notification event
interface SystemNotificationEvent {
type: 'system_notification';
data: {
level: 'info' | 'warning' | 'error' | 'success';
title: string;
message: string;
actions?: NotificationAction[];
persistent?: boolean;
};
}
// System alert event
interface SystemAlertEvent {
type: 'system_alert';
data: {
alertId: string;
severity: 'low' | 'medium' | 'high' | 'critical';
title: string;
description: string;
affectedComponents: string[];
recommendedActions: string[];
};
}
// Deployment status event
interface DeploymentStatusEvent {
type: 'deployment_status';
data: {
deploymentId: string;
environment: string;
status: DeploymentStatus;
progress: number;
components: ComponentStatus[];
estimatedCompletion?: Date;
};
}
// server/ClusteredWebSocketServer.ts
import cluster from 'cluster';
import { createAdapter } from '@socket.io/redis-adapter';
export class ClusteredWebSocketServer {
private workers: Worker[] = [];
private stickySession: StickySessionManager;
async start(config: ClusterConfig): Promise<void> {
if (cluster.isMaster) {
// Master process
console.log(`Master ${process.pid} starting`);
// Fork workers
for (let i = 0; i < config.workers; i++) {
const worker = cluster.fork();
this.workers.push(worker);
}
// Setup sticky sessions
this.stickySession = new StickySessionManager(config.redis);
// Handle worker events
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
// Restart worker
const newWorker = cluster.fork();
this.workers = this.workers.filter(w => w.id !== worker.id);
this.workers.push(newWorker);
});
} else {
// Worker process
const server = new AutoSDLCWebSocketServer(config);
// Setup Redis adapter for cross-worker communication
server.setAdapter(createAdapter(config.redis));
await server.start();
console.log(`Worker ${process.pid} started`);
}
}
}
// server/MessageCompression.ts
import zlib from 'zlib';
export class MessageCompressor {
private compressionThreshold: number = 1024; // 1KB
async compress(message: WebSocketMessage): Promise<Buffer> {
const json = JSON.stringify(message);
if (json.length < this.compressionThreshold) {
return Buffer.from(json);
}
return new Promise((resolve, reject) => {
zlib.gzip(json, (err, compressed) => {
if (err) reject(err);
else resolve(compressed);
});
});
}
async decompress(data: Buffer): Promise<WebSocketMessage> {
// Check if compressed
if (data[0] === 0x1f && data[1] === 0x8b) {
// GZIP magic numbers
return new Promise((resolve, reject) => {
zlib.gunzip(data, (err, decompressed) => {
if (err) reject(err);
else resolve(JSON.parse(decompressed.toString()));
});
});
}
return JSON.parse(data.toString());
}
}
// server/RateLimiter.ts
export class WebSocketRateLimiter {
private limits: Map<string, RateLimit>;
constructor(private config: RateLimitConfig) {
this.limits = new Map();
}
async checkLimit(clientId: string, messageType: string): Promise<boolean> {
const key = `${clientId}:${messageType}`;
const limit = this.getLimit(messageType);
if (!this.limits.has(key)) {
this.limits.set(key, {
count: 0,
resetAt: Date.now() + limit.window
});
}
const current = this.limits.get(key)!;
// Reset if window expired
if (Date.now() > current.resetAt) {
current.count = 0;
current.resetAt = Date.now() + limit.window;
}
// Check limit
if (current.count >= limit.max) {
return false;
}
current.count++;
return true;
}
private getLimit(messageType: string): { max: number; window: number } {
const defaults = { max: 100, window: 60000 }; // 100 per minute
const specific = {
[MessageType.SUBSCRIBE]: { max: 50, window: 60000 },
[MessageType.AGENT_LOG]: { max: 1000, window: 60000 },
[MessageType.PING]: { max: 60, window: 60000 }
};
return specific[messageType] || defaults;
}
}
// server/WebSocketAuth.ts
export class WebSocketAuthenticator {
async verifyClient(
info: { origin: string; secure: boolean; req: IncomingMessage }
): Promise<boolean> {
try {
// Extract token from query or headers
const token = this.extractToken(info.req);
if (!token) {
return false;
}
// Verify token
const payload = await this.verifyToken(token);
// Check permissions
if (!this.hasWebSocketAccess(payload)) {
return false;
}
// Attach user info to request
(info.req as any).user = payload;
return true;
} catch (error) {
console.error('WebSocket auth error:', error);
return false;
}
}
private extractToken(req: IncomingMessage): string | null {
// Check query parameter
const url = new URL(req.url!, `http://${req.headers.host}`);
const queryToken = url.searchParams.get('token');
if (queryToken) return queryToken;
// Check authorization header
const authHeader = req.headers.authorization;
if (authHeader?.startsWith('Bearer ')) {
return authHeader.substring(7);
}
// Check cookie
const cookies = this.parseCookies(req.headers.cookie || '');
return cookies['auth-token'] || null;
}
}
// server/MessageValidator.ts
import Ajv from 'ajv';
export class MessageValidator {
private ajv: Ajv;
private schemas: Map<string, any>;
constructor() {
this.ajv = new Ajv({ allErrors: true });
this.schemas = new Map();
this.loadSchemas();
}
validate(message: WebSocketMessage): ValidationResult {
const schema = this.schemas.get(message.type);
if (!schema) {
return {
valid: false,
errors: [`Unknown message type: ${message.type}`]
};
}
const valid = this.ajv.validate(schema, message);
if (!valid) {
return {
valid: false,
errors: this.ajv.errors?.map(e => e.message || '') || []
};
}
return { valid: true };
}
private loadSchemas(): void {
// Example schema for agent status event
this.schemas.set(MessageType.AGENT_STATUS, {
type: 'object',
required: ['id', 'type', 'timestamp', 'data'],
properties: {
id: { type: 'string', format: 'uuid' },
type: { const: MessageType.AGENT_STATUS },
timestamp: { type: 'number' },
data: {
type: 'object',
required: ['agentId', 'agentType', 'status'],
properties: {
agentId: { type: 'string' },
agentType: { type: 'string' },
status: { type: 'string' },
currentTask: { type: 'object' },
metrics: { type: 'object' }
}
}
}
});
// Load other schemas...
}
}
// sdk/javascript/AutoSDLCWebSocket.ts
export class AutoSDLCWebSocket {
private client: WebSocketClient;
private eventEmitter: EventEmitter;
constructor(options: SDKOptions) {
this.client = new WebSocketClient(options);
this.eventEmitter = new EventEmitter();
this.setupEventHandlers();
}
// Connect to WebSocket server
async connect(): Promise<void> {
await this.client.connect();
}
// Subscribe to agent events
async subscribeToAgent(
agentId: string,
events?: string[]
): Promise<Subscription> {
const subscription = await this.client.subscribe({
resource: 'agent',
resourceId: agentId,
events: events || ['status', 'task_update', 'log', 'metric']
});
return {
id: subscription.id,
unsubscribe: () => this.client.unsubscribe(subscription.id)
};
}
// Subscribe to workflow events
async subscribeToWorkflow(
workflowId: string
): Promise<Subscription> {
const subscription = await this.client.subscribe({
resource: 'workflow',
resourceId: workflowId,
events: ['started', 'step_complete', 'complete', 'failed']
});
return {
id: subscription.id,
unsubscribe: () => this.client.unsubscribe(subscription.id)
};
}
// Event listeners
on(event: string, handler: EventHandler): void {
this.eventEmitter.on(event, handler);
}
off(event: string, handler: EventHandler): void {
this.eventEmitter.off(event, handler);
}
}
// Usage example
const ws = new AutoSDLCWebSocket({
url: 'wss://api.autosdlc.com/ws',
token: 'your-auth-token'
});
await ws.connect();
// Subscribe to agent
const agentSub = await ws.subscribeToAgent('coder-001');
// Listen for events
ws.on('agent:status', (event) => {
console.log('Agent status changed:', event.data);
});
ws.on('tdd:phase_change', (event) => {
console.log('TDD phase changed:', event.data);
});
# sdk/python/autosdlc_websocket.py
import asyncio
import json
import websockets
from typing import Callable, Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class Subscription:
id: str
unsubscribe: Callable
class AutoSDLCWebSocket:
def __init__(self, url: str, token: str):
self.url = url
self.token = token
self.ws = None
self.handlers = {}
self.subscriptions = {}
async def connect(self):
"""Connect to WebSocket server"""
headers = {"Authorization": f"Bearer {self.token}"}
self.ws = await websockets.connect(self.url, extra_headers=headers)
# Start message handler
asyncio.create_task(self._handle_messages())
async def subscribe_to_agent(
self,
agent_id: str,
events: Optional[List[str]] = None
) -> Subscription:
"""Subscribe to agent events"""
subscription_id = self._generate_id()
await self._send({
"type": "subscribe",
"data": {
"subscriptionId": subscription_id,
"resource": "agent",
"resourceId": agent_id,
"events": events or ["status", "task_update", "log", "metric"]
}
})
return Subscription(
id=subscription_id,
unsubscribe=lambda: self.unsubscribe(subscription_id)
)
def on(self, event: str, handler: Callable):
"""Register event handler"""
if event not in self.handlers:
self.handlers[event] = []
self.handlers[event].append(handler)
async def _handle_messages(self):
"""Handle incoming messages"""
async for message in self.ws:
try:
data = json.loads(message)
event_type = data.get("type")
# Call registered handlers
if event_type in self.handlers:
for handler in self.handlers[event_type]:
await handler(data)
except Exception as e:
print(f"Error handling message: {e}")
# Usage example
async def main():
ws = AutoSDLCWebSocket(
url="wss://api.autosdlc.com/ws",
token="your-auth-token"
)
await ws.connect()
# Subscribe to agent
agent_sub = await ws.subscribe_to_agent("coder-001")
# Register handlers
async def on_status_change(event):
print(f"Agent status changed: {event['data']}")
ws.on("agent_status", on_status_change)
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
// monitoring/WebSocketMetrics.ts
export class WebSocketMetrics {
private metrics = {
connectionsTotal: new Counter({
name: 'websocket_connections_total',
help: 'Total WebSocket connections'
}),
activeConnections: new Gauge({
name: 'websocket_active_connections',
help: 'Currently active WebSocket connections'
}),
messagesReceived: new Counter({
name: 'websocket_messages_received_total',
help: 'Total messages received',
labelNames: ['type']
}),
messagesSent: new Counter({
name: 'websocket_messages_sent_total',
help: 'Total messages sent',
labelNames: ['type']
}),
messageSize: new Histogram({
name: 'websocket_message_size_bytes',
help: 'Message size in bytes',
labelNames: ['type', 'direction'],
buckets: [100, 500, 1000, 5000, 10000, 50000]
}),
subscriptions: new Gauge({
name: 'websocket_active_subscriptions',
help: 'Currently active subscriptions',
labelNames: ['resource']
})
};
recordConnection(): void {
this.metrics.connectionsTotal.inc();
this.metrics.activeConnections.inc();
}
recordDisconnection(): void {
this.metrics.activeConnections.dec();
}
recordMessage(
type: string,
direction: 'in' | 'out',
size: number
): void {
if (direction === 'in') {
this.metrics.messagesReceived.inc({ type });
} else {
this.metrics.messagesSent.inc({ type });
}
this.metrics.messageSize.observe({ type, direction }, size);
}
}
// debug/WebSocketDebugger.ts
export class WebSocketDebugger {
private enabled: boolean;
private filter: DebugFilter;
constructor(config: DebugConfig) {
this.enabled = config.enabled;
this.filter = config.filter || {};
}
logMessage(
direction: 'in' | 'out',
client: ClientConnection,
message: WebSocketMessage
): void {
if (!this.enabled) return;
if (!this.shouldLog(message)) return;
const log = {
timestamp: new Date().toISOString(),
direction,
clientId: client.id,
messageId: message.id,
type: message.type,
size: JSON.stringify(message).length,
data: this.filter.includeData ? message.data : '[FILTERED]'
};
console.log(JSON.stringify(log, null, 2));
}
private shouldLog(message: WebSocketMessage): boolean {
// Apply filters
if (this.filter.types && !this.filter.types.includes(message.type)) {
return false;
}
if (this.filter.excludeTypes?.includes(message.type)) {
return false;
}
return true;
}
}
- Implement automatic reconnection with exponential backoff
- Use heartbeat/ping-pong to detect stale connections
- Handle connection state in UI appropriately
- Queue messages during disconnection
- Keep event payloads small and focused
- Use consistent event naming conventions
- Include timestamps and version information
- Implement event schema validation
- Implement message compression for large payloads
- Use subscription filters to reduce traffic
- Batch related events when possible
- Monitor connection and message metrics
- Always authenticate WebSocket connections
- Validate all incoming messages
- Implement rate limiting per client
- Use TLS for all connections
- Gracefully handle connection errors
- Provide meaningful error messages
- Implement retry logic for failed operations
- Log errors for debugging
Tags: #AutoSDLC #WebSocket #RealTime #Events #Communication Last Updated: 2025-06-09 Next: Database Schema Design →