Inter Agent Communication - reza899/AutoSDLC GitHub Wiki

Inter-Agent Communication Protocol

#AutoSDLC #Agent #Communication #Protocol

← Back to Index | ← Workflow Engine

Overview

The Inter-Agent Communication Protocol defines how AutoSDLC agents communicate with each other to collaborate on software development tasks. This protocol leverages MCP (Model Context Protocol) for structured messaging, Agent_Output.md files for status sharing, and event-driven architectures for real-time coordination.

Communication Architecture

Multi-Channel Communication

graph TB
    subgraph "Communication Channels"
        MCP[MCP Messages]
        STATUS[Agent_Output.md]
        EVENTS[Event Bus]
        SHARED[Shared Storage]
    end
    
    subgraph "Agents"
        CA[Customer Agent]
        PM[PM Agent]
        CD[Coder Agent]
        CR[Reviewer Agent]
        TA[Tester Agent]
    end
    
    subgraph "Infrastructure"
        MQ[Message Queue]
        FS[File System]
        ES[Event Store]
        SYNC[Sync Service]
    end
    
    CA & PM & CD & CR & TA --> MCP
    CA & PM & CD & CR & TA --> STATUS
    MCP --> MQ
    STATUS --> FS
    EVENTS --> ES
    FS --> SYNC --> SHARED
Loading

Communication Patterns

enum CommunicationPattern {
  REQUEST_RESPONSE = 'request_response',
  PUBLISH_SUBSCRIBE = 'publish_subscribe',
  BROADCAST = 'broadcast',
  WORKFLOW_COORDINATION = 'workflow_coordination',
  STATUS_POLLING = 'status_polling'
}

interface CommunicationChannel {
  mcp: MCPChannel;           // Direct agent-to-agent
  status: StatusChannel;     // Via Agent_Output.md
  events: EventChannel;      // Event-driven updates
  shared: SharedChannel;     // Shared file system
}

Message Protocol

Message Structure

interface AgentMessage {
  // Message metadata
  id: string;
  version: '1.0';
  timestamp: Date;
  
  // Routing information
  source: AgentIdentifier;
  destination: AgentIdentifier | 'broadcast';
  replyTo?: string;
  correlationId?: string;
  
  // Message content
  type: MessageType;
  priority: MessagePriority;
  payload: any;
  
  // Delivery guarantees
  requiresAck: boolean;
  ttl?: number;
  retryPolicy?: RetryPolicy;
  
  // Security
  signature?: string;
  encrypted?: boolean;
}

interface AgentIdentifier {
  agentId: string;
  agentType: AgentType;
  instanceId: string;
  capabilities: string[];
}

enum MessageType {
  // Task management
  TASK_ASSIGNMENT = 'task_assignment',
  TASK_ACCEPTANCE = 'task_acceptance',
  TASK_REJECTION = 'task_rejection',
  TASK_UPDATE = 'task_update',
  TASK_COMPLETION = 'task_completion',
  
  // Coordination
  COORDINATION_REQUEST = 'coordination_request',
  COORDINATION_RESPONSE = 'coordination_response',
  
  // Information sharing
  STATUS_UPDATE = 'status_update',
  INFORMATION_REQUEST = 'information_request',
  INFORMATION_RESPONSE = 'information_response',
  
  // Workflow
  WORKFLOW_EVENT = 'workflow_event',
  PHASE_TRANSITION = 'phase_transition',
  
  // TDD specific
  TEST_SPEC_READY = 'test_spec_ready',
  TESTS_RED_CONFIRMED = 'tests_red_confirmed',
  TESTS_GREEN_ACHIEVED = 'tests_green_achieved',
  REFACTORING_COMPLETE = 'refactoring_complete',
  
  // Collaboration
  REVIEW_REQUEST = 'review_request',
  REVIEW_FEEDBACK = 'review_feedback',
  CLARIFICATION_REQUEST = 'clarification_request',
  CLARIFICATION_RESPONSE = 'clarification_response',
  
  // System
  HEARTBEAT = 'heartbeat',
  ERROR_REPORT = 'error_report',
  SYSTEM_ANNOUNCEMENT = 'system_announcement'
}

enum MessagePriority {
  LOW = 0,
  NORMAL = 1,
  HIGH = 2,
  URGENT = 3,
  CRITICAL = 4
}

Message Examples

Task Assignment Message

{
  "id": "msg-123e4567-e89b-12d3-a456-426614174000",
  "version": "1.0",
  "timestamp": "2024-12-28T10:30:00Z",
  "source": {
    "agentId": "pm-001",
    "agentType": "pm",
    "instanceId": "pm-001-instance-1",
    "capabilities": ["task_management", "github_integration"]
  },
  "destination": {
    "agentId": "coder-001",
    "agentType": "coder",
    "instanceId": "coder-001-instance-1",
    "capabilities": ["tdd_implementation", "typescript"]
  },
  "type": "task_assignment",
  "priority": 2,
  "payload": {
    "taskId": "task-789",
    "taskType": "implement_feature",
    "githubIssue": 123,
    "testSpecification": "path/to/test-spec.ts",
    "deadline": "2024-12-28T18:00:00Z",
    "requirements": {
      "feature": "User Authentication",
      "acceptanceCriteria": [
        "Users can login with email/password",
        "JWT tokens are generated",
        "Sessions expire after 24 hours"
      ],
      "testCoverage": 100
    }
  },
  "requiresAck": true,
  "ttl": 3600
}

TDD Phase Transition Message

{
  "id": "msg-234e5678-f89c-23e4-b567-537625285111",
  "version": "1.0",
  "timestamp": "2024-12-28T11:45:00Z",
  "source": {
    "agentId": "coder-001",
    "agentType": "coder",
    "instanceId": "coder-001-instance-1",
    "capabilities": ["tdd_implementation"]
  },
  "destination": "broadcast",
  "type": "phase_transition",
  "priority": 1,
  "payload": {
    "taskId": "task-789",
    "fromPhase": "red",
    "toPhase": "green",
    "details": {
      "totalTests": 45,
      "passingTests": 45,
      "failingTests": 0,
      "coverage": 98.5,
      "duration": 1847
    }
  },
  "requiresAck": false
}

Communication Patterns

1. Request-Response Pattern

class RequestResponseHandler {
  private pendingRequests: Map<string, PendingRequest>;
  private timeout: number = 30000; // 30 seconds
  
  async sendRequest<T>(
    destination: AgentIdentifier,
    payload: any,
    options: RequestOptions = {}
  ): Promise<T> {
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.agentIdentifier,
      destination: destination,
      type: options.type || MessageType.INFORMATION_REQUEST,
      priority: options.priority || MessagePriority.NORMAL,
      payload: payload,
      requiresAck: true,
      ttl: options.timeout || this.timeout
    };
    
    // Create pending request
    const pending = new PendingRequest(message.id, options.timeout || this.timeout);
    this.pendingRequests.set(message.id, pending);
    
    // Send message
    await this.mcpClient.send(message);
    
    try {
      // Wait for response
      const response = await pending.promise;
      return response.payload as T;
    } finally {
      this.pendingRequests.delete(message.id);
    }
  }
  
  async handleResponse(message: AgentMessage): Promise<void> {
    if (message.correlationId) {
      const pending = this.pendingRequests.get(message.correlationId);
      if (pending) {
        pending.resolve(message);
      }
    }
  }
}

2. Publish-Subscribe Pattern

class PubSubHandler {
  private subscriptions: Map<string, Set<SubscriptionHandler>>;
  
  async subscribe(
    topic: string,
    handler: SubscriptionHandler
  ): Promise<Unsubscribe> {
    if (!this.subscriptions.has(topic)) {
      this.subscriptions.set(topic, new Set());
      
      // Subscribe to MCP topic
      await this.mcpClient.subscribe(topic);
    }
    
    this.subscriptions.get(topic)!.add(handler);
    
    // Return unsubscribe function
    return async () => {
      const handlers = this.subscriptions.get(topic);
      if (handlers) {
        handlers.delete(handler);
        if (handlers.size === 0) {
          this.subscriptions.delete(topic);
          await this.mcpClient.unsubscribe(topic);
        }
      }
    };
  }
  
  async publish(topic: string, data: any): Promise<void> {
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.agentIdentifier,
      destination: 'broadcast',
      type: MessageType.WORKFLOW_EVENT,
      priority: MessagePriority.NORMAL,
      payload: {
        topic: topic,
        data: data
      },
      requiresAck: false
    };
    
    await this.mcpClient.broadcast(message);
  }
}

3. Status Polling Pattern

class StatusPollingHandler {
  private pollingIntervals: Map<string, NodeJS.Timer>;
  private statusCache: Map<string, AgentStatus>;
  
  async startPolling(
    agentType: string,
    interval: number = 5000
  ): Promise<void> {
    // Stop existing polling if any
    this.stopPolling(agentType);
    
    // Start new polling interval
    const timer = setInterval(async () => {
      try {
        const status = await this.readAgentStatus(agentType);
        this.updateStatusCache(agentType, status);
        
        // Emit status change event if changed
        if (this.hasStatusChanged(agentType, status)) {
          this.emit('status:changed', { agentType, status });
        }
      } catch (error) {
        console.error(`Failed to poll ${agentType} status:`, error);
      }
    }, interval);
    
    this.pollingIntervals.set(agentType, timer);
  }
  
  private async readAgentStatus(agentType: string): Promise<AgentStatus> {
    const statusPath = path.join(
      this.sharedStatusDir,
      `${agentType}_status.md`
    );
    
    const content = await fs.readFile(statusPath, 'utf-8');
    return this.parseStatusFile(content);
  }
  
  private parseStatusFile(content: string): AgentStatus {
    // Parse markdown status file
    const status: AgentStatus = {
      lastUpdated: this.extractField(content, 'Last Updated'),
      state: this.extractField(content, 'Status'),
      currentActivity: this.extractField(content, 'Current Activity'),
      metrics: this.extractMetrics(content),
      recentActions: this.extractRecentActions(content)
    };
    
    return status;
  }
}

4. Workflow Coordination Pattern

class WorkflowCoordinator {
  private activeWorkflows: Map<string, WorkflowState>;
  
  async coordinateWorkflow(
    workflow: WorkflowDefinition
  ): Promise<WorkflowResult> {
    const workflowId = this.generateWorkflowId();
    const state = new WorkflowState(workflow);
    
    this.activeWorkflows.set(workflowId, state);
    
    try {
      // Execute workflow steps
      for (const step of workflow.steps) {
        await this.executeStep(workflowId, step);
      }
      
      // Wait for completion
      const result = await state.waitForCompletion();
      
      return result;
    } finally {
      this.activeWorkflows.delete(workflowId);
    }
  }
  
  private async executeStep(
    workflowId: string,
    step: WorkflowStep
  ): Promise<void> {
    const state = this.activeWorkflows.get(workflowId)!;
    
    // Find capable agent
    const agent = await this.findCapableAgent(step.requiredCapabilities);
    
    // Send coordination request
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.agentIdentifier,
      destination: agent,
      type: MessageType.COORDINATION_REQUEST,
      priority: MessagePriority.HIGH,
      payload: {
        workflowId: workflowId,
        step: step,
        context: state.getContext()
      },
      requiresAck: true,
      correlationId: workflowId
    };
    
    await this.sendAndWaitForCompletion(message);
  }
}

Agent-Specific Communication

Customer Agent Communication

// Customer Agent primarily communicates with PM Agent
class CustomerAgentCommunication {
  async validateImplementation(
    implementation: Implementation
  ): Promise<ValidationResult> {
    // Read PM Agent status to understand context
    const pmStatus = await this.statusReader.readAgentStatus('pm');
    
    // Send validation result
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.identifier,
      destination: this.pmAgentIdentifier,
      type: MessageType.INFORMATION_RESPONSE,
      priority: MessagePriority.HIGH,
      payload: {
        taskId: implementation.taskId,
        validationType: 'implementation_validation',
        result: {
          approved: true,
          feedback: [
            'All acceptance criteria met',
            'User experience is intuitive',
            'Edge cases handled properly'
          ],
          suggestions: [
            'Consider adding loading states',
            'Improve error messages'
          ]
        }
      },
      requiresAck: true
    };
    
    return await this.sendRequest(message);
  }
}

PM Agent Communication Hub

class PMAgentCommunication {
  // PM Agent acts as central coordinator
  async distributeTask(task: Task): Promise<void> {
    // Determine task type and target agent
    const targetAgent = this.determineTargetAgent(task);
    
    // For TDD tasks, ensure test specification is included
    if (task.type === 'implement_feature') {
      task.testSpecification = await this.generateTestSpecification(task);
    }
    
    // Send task assignment
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.identifier,
      destination: targetAgent,
      type: MessageType.TASK_ASSIGNMENT,
      priority: this.calculatePriority(task),
      payload: task,
      requiresAck: true,
      retryPolicy: {
        maxRetries: 3,
        backoff: 'exponential'
      }
    };
    
    await this.sendRequest(message);
    
    // Update Agent_Output.md
    await this.updateStatus({
      action: 'task_assigned',
      taskId: task.id,
      assignedTo: targetAgent.agentType,
      timestamp: new Date()
    });
  }
  
  async handleTaskUpdate(update: TaskUpdate): Promise<void> {
    // Process task update
    await this.updateTaskStatus(update);
    
    // Notify relevant agents
    const notifications = this.determineNotifications(update);
    
    for (const notification of notifications) {
      await this.sendNotification(notification);
    }
    
    // Check if workflow needs adjustment
    if (update.status === 'blocked') {
      await this.handleBlockedTask(update);
    }
  }
}

Coder Agent TDD Communication

class CoderAgentTDDCommunication {
  async reportTDDPhase(phase: TDDPhase): Promise<void> {
    // Broadcast TDD phase transition
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.identifier,
      destination: 'broadcast',
      type: MessageType.PHASE_TRANSITION,
      priority: MessagePriority.NORMAL,
      payload: {
        taskId: this.currentTask.id,
        phase: phase,
        metrics: await this.collectTDDMetrics()
      },
      requiresAck: false
    };
    
    await this.broadcast(message);
    
    // Update Agent_Output.md with detailed status
    await this.updateAgentOutput({
      tddPhase: phase,
      testStatus: await this.getTestStatus(),
      implementation: phase === 'green' ? 'complete' : 'in-progress',
      coverage: await this.getTestCoverage()
    });
  }
  
  async requestClarification(
    question: string,
    context: any
  ): Promise<Clarification> {
    const message: AgentMessage = {
      id: this.generateMessageId(),
      version: '1.0',
      timestamp: new Date(),
      source: this.identifier,
      destination: this.pmAgentIdentifier,
      type: MessageType.CLARIFICATION_REQUEST,
      priority: MessagePriority.HIGH,
      payload: {
        question: question,
        context: context,
        blockingTask: this.currentTask.id
      },
      requiresAck: true
    };
    
    const response = await this.sendRequest<ClarificationResponse>(message);
    return response.clarification;
  }
}

Message Queue Implementation

RabbitMQ Integration

// infrastructure/MessageQueue.ts
import amqp from 'amqplib';

export class AgentMessageQueue {
  private connection: amqp.Connection;
  private channel: amqp.Channel;
  
  async initialize(): Promise<void> {
    this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
    this.channel = await this.connection.createChannel();
    
    // Create exchanges
    await this.channel.assertExchange('agent.direct', 'direct', { durable: true });
    await this.channel.assertExchange('agent.topic', 'topic', { durable: true });
    await this.channel.assertExchange('agent.broadcast', 'fanout', { durable: true });
    
    // Create agent-specific queues
    for (const agentType of ['customer', 'pm', 'coder', 'reviewer', 'tester']) {
      await this.createAgentQueue(agentType);
    }
  }
  
  private async createAgentQueue(agentType: string): Promise<void> {
    const queueName = `agent.${agentType}`;
    
    // Create queue
    await this.channel.assertQueue(queueName, {
      durable: true,
      arguments: {
        'x-message-ttl': 3600000, // 1 hour TTL
        'x-max-length': 10000,
        'x-dead-letter-exchange': 'agent.dlx'
      }
    });
    
    // Bind to direct exchange
    await this.channel.bindQueue(queueName, 'agent.direct', agentType);
    
    // Bind to topic exchange for pattern matching
    await this.channel.bindQueue(queueName, 'agent.topic', `*.${agentType}.*`);
    
    // Bind to broadcast exchange
    await this.channel.bindQueue(queueName, 'agent.broadcast', '');
  }
  
  async sendMessage(message: AgentMessage): Promise<void> {
    const serialized = JSON.stringify(message);
    
    if (message.destination === 'broadcast') {
      // Broadcast to all agents
      await this.channel.publish(
        'agent.broadcast',
        '',
        Buffer.from(serialized),
        {
          persistent: true,
          priority: message.priority,
          messageId: message.id,
          timestamp: Date.now()
        }
      );
    } else {
      // Direct message to specific agent
      await this.channel.publish(
        'agent.direct',
        message.destination.agentType,
        Buffer.from(serialized),
        {
          persistent: true,
          priority: message.priority,
          messageId: message.id,
          timestamp: Date.now(),
          expiration: message.ttl?.toString()
        }
      );
    }
  }
  
  async consumeMessages(
    agentType: string,
    handler: MessageHandler
  ): Promise<void> {
    const queueName = `agent.${agentType}`;
    
    await this.channel.consume(queueName, async (msg) => {
      if (!msg) return;
      
      try {
        const message = JSON.parse(msg.content.toString()) as AgentMessage;
        
        // Process message
        await handler(message);
        
        // Acknowledge message
        this.channel.ack(msg);
      } catch (error) {
        console.error('Failed to process message:', error);
        
        // Reject and requeue if retries available
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
        
        if (retryCount < 3) {
          // Requeue with delay
          setTimeout(() => {
            this.channel.sendToQueue(queueName, msg.content, {
              ...msg.properties,
              headers: {
                ...msg.properties.headers,
                'x-retry-count': retryCount
              }
            });
          }, Math.pow(2, retryCount) * 1000);
        }
        
        // Acknowledge to remove from queue
        this.channel.ack(msg);
      }
    });
  }
}

Event-Driven Communication

Event Bus Implementation

// infrastructure/EventBus.ts
export class AgentEventBus {
  private eventStore: EventStore;
  private subscribers: Map<string, Set<EventHandler>>;
  
  async publish(event: AgentEvent): Promise<void> {
    // Store event
    await this.eventStore.append(event);
    
    // Notify subscribers
    const handlers = this.getHandlers(event.type);
    
    await Promise.all(
      Array.from(handlers).map(handler => 
        this.invokeHandler(handler, event)
      )
    );
  }
  
  subscribe(
    eventType: string | RegExp,
    handler: EventHandler
  ): Unsubscribe {
    const key = eventType.toString();
    
    if (!this.subscribers.has(key)) {
      this.subscribers.set(key, new Set());
    }
    
    this.subscribers.get(key)!.add(handler);
    
    return () => {
      this.subscribers.get(key)?.delete(handler);
    };
  }
  
  private getHandlers(eventType: string): Set<EventHandler> {
    const handlers = new Set<EventHandler>();
    
    for (const [pattern, subs] of this.subscribers) {
      if (pattern === eventType || new RegExp(pattern).test(eventType)) {
        subs.forEach(handler => handlers.add(handler));
      }
    }
    
    return handlers;
  }
}

// Event definitions
interface AgentEvent {
  id: string;
  type: string;
  source: AgentIdentifier;
  timestamp: Date;
  data: any;
  metadata?: EventMetadata;
}

interface TDDPhaseEvent extends AgentEvent {
  type: 'tdd.phase.transition';
  data: {
    taskId: string;
    fromPhase: TDDPhase;
    toPhase: TDDPhase;
    testResults: TestResults;
  };
}

interface TaskCompletionEvent extends AgentEvent {
  type: 'task.completed';
  data: {
    taskId: string;
    result: TaskResult;
    duration: number;
    artifacts: string[];
  };
}

Status File Protocol

Agent_Output.md Format

# Agent Status: Coder Agent
**Last Updated**: 2025-06-09 14:32:15 UTC
**Agent ID**: coder-001
**Status**: BUSY

## Current Activity
- **Task**: Implementing user authentication feature
- **Task ID**: task-789
- **Started**: 2024-12-28 14:15:00 UTC
- **Progress**: 65%
- **TDD Phase**: GREEN (implementing to pass tests)

## Communication Log
### Recent Messages Sent
1. [14:30] -> PM Agent: Status update (task progress)
2. [14:25] -> Reviewer Agent: Review request for auth module
3. [14:20] -> Broadcast: TDD phase transition (RED -> GREEN)

### Recent Messages Received
1. [14:15] <- PM Agent: Task assignment (task-789)
2. [14:10] <- Tester Agent: Test environment ready

## Collaboration Status
- **Waiting For**: None
- **Blocking**: reviewer-agent (pending PR review)
- **Available For**: clarification requests

## Inter-Agent Dependencies
- **PM Agent**: Task provider
- **Reviewer Agent**: Code review pending
- **Tester Agent**: Will need test execution after review

## Notes
- Following TDD strictly - all tests were red before implementation
- Test coverage currently at 98.5%
- No blocking issues encountered

Status Synchronization

// services/StatusSynchronization.ts
export class StatusSynchronizationService {
  private watchers: Map<string, fs.FSWatcher>;
  private statusCache: Map<string, AgentStatus>;
  
  async initialize(): Promise<void> {
    // Watch each agent's output file
    for (const agentType of this.agentTypes) {
      await this.watchAgentOutput(agentType);
    }
    
    // Start synchronization loop
    this.startSyncLoop();
  }
  
  private async watchAgentOutput(agentType: string): Promise<void> {
    const outputPath = this.getAgentOutputPath(agentType);
    
    const watcher = fs.watch(outputPath, async (eventType) => {
      if (eventType === 'change') {
        await this.syncAgentStatus(agentType);
      }
    });
    
    this.watchers.set(agentType, watcher);
  }
  
  private async syncAgentStatus(agentType: string): Promise<void> {
    try {
      // Read agent output
      const outputPath = this.getAgentOutputPath(agentType);
      const content = await fs.readFile(outputPath, 'utf-8');
      
      // Parse status
      const status = this.parseAgentOutput(content);
      
      // Update cache
      this.statusCache.set(agentType, status);
      
      // Sync to shared directory
      const sharedPath = this.getSharedStatusPath(agentType);
      await fs.writeFile(sharedPath, content);
      
      // Set read-only permissions
      await fs.chmod(sharedPath, 0o444);
      
      // Emit status change event
      this.emit('status:changed', { agentType, status });
    } catch (error) {
      console.error(`Failed to sync ${agentType} status:`, error);
    }
  }
}

Error Handling

Communication Errors

export class CommunicationErrorHandler {
  async handleError(
    error: CommunicationError,
    message: AgentMessage
  ): Promise<void> {
    switch (error.type) {
      case 'TIMEOUT':
        await this.handleTimeout(error, message);
        break;
        
      case 'DELIVERY_FAILED':
        await this.handleDeliveryFailure(error, message);
        break;
        
      case 'INVALID_MESSAGE':
        await this.handleInvalidMessage(error, message);
        break;
        
      case 'AGENT_UNAVAILABLE':
        await this.handleAgentUnavailable(error, message);
        break;
        
      default:
        await this.handleUnknownError(error, message);
    }
  }
  
  private async handleTimeout(
    error: CommunicationError,
    message: AgentMessage
  ): Promise<void> {
    // Log timeout
    console.error(`Message timeout: ${message.id} to ${message.destination}`);
    
    // Check retry policy
    if (message.retryPolicy && this.shouldRetry(message)) {
      await this.retryMessage(message);
    } else {
      // Send timeout notification
      await this.notifyTimeout(message);
    }
  }
  
  private async handleAgentUnavailable(
    error: CommunicationError,
    message: AgentMessage
  ): Promise<void> {
    // Check if agent is expected to be available
    const agentStatus = await this.checkAgentStatus(message.destination);
    
    if (agentStatus.status === 'shutdown') {
      // Agent is intentionally down
      await this.queueForLater(message);
    } else {
      // Unexpected unavailability
      await this.escalateIssue(error, message);
    }
  }
}

Performance Optimization

Message Batching

export class MessageBatcher {
  private batches: Map<string, MessageBatch>;
  private batchInterval: number = 100; // ms
  
  async send(message: AgentMessage): Promise<void> {
    if (this.shouldBatch(message)) {
      await this.addToBatch(message);
    } else {
      await this.sendImmediate(message);
    }
  }
  
  private shouldBatch(message: AgentMessage): boolean {
    return (
      message.priority <= MessagePriority.NORMAL &&
      !message.requiresAck &&
      message.type === MessageType.STATUS_UPDATE
    );
  }
  
  private async addToBatch(message: AgentMessage): Promise<void> {
    const key = `${message.destination.agentType}:${message.type}`;
    
    if (!this.batches.has(key)) {
      const batch = new MessageBatch(key);
      this.batches.set(key, batch);
      
      // Schedule batch send
      setTimeout(() => this.sendBatch(key), this.batchInterval);
    }
    
    this.batches.get(key)!.add(message);
  }
}

Connection Pooling

export class ConnectionPool {
  private connections: Map<string, Connection>;
  private maxConnections: number = 10;
  
  async getConnection(agentId: string): Promise<Connection> {
    // Check existing connection
    if (this.connections.has(agentId)) {
      const conn = this.connections.get(agentId)!;
      if (conn.isAlive()) {
        return conn;
      }
    }
    
    // Create new connection
    const conn = await this.createConnection(agentId);
    this.connections.set(agentId, conn);
    
    // Manage pool size
    if (this.connections.size > this.maxConnections) {
      await this.evictOldestConnection();
    }
    
    return conn;
  }
}

Monitoring & Metrics

Communication Metrics

export class CommunicationMetrics {
  private metrics = {
    messagesSent: new Counter({
      name: 'agent_messages_sent_total',
      help: 'Total messages sent',
      labelNames: ['source', 'destination', 'type']
    }),
    
    messagesReceived: new Counter({
      name: 'agent_messages_received_total',
      help: 'Total messages received',
      labelNames: ['source', 'destination', 'type']
    }),
    
    messageLatency: new Histogram({
      name: 'agent_message_latency_seconds',
      help: 'Message delivery latency',
      labelNames: ['source', 'destination', 'type'],
      buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]
    }),
    
    communicationErrors: new Counter({
      name: 'agent_communication_errors_total',
      help: 'Total communication errors',
      labelNames: ['source', 'destination', 'error_type']
    })
  };
  
  recordMessageSent(message: AgentMessage): void {
    this.metrics.messagesSent.inc({
      source: message.source.agentType,
      destination: message.destination === 'broadcast' 
        ? 'broadcast' 
        : message.destination.agentType,
      type: message.type
    });
  }
}

Best Practices

1. Message Design

  • Keep messages small and focused
  • Use appropriate message types
  • Include correlation IDs for tracing
  • Set reasonable TTLs

2. Error Handling

  • Implement retry logic with backoff
  • Handle timeouts gracefully
  • Log all communication errors
  • Provide fallback mechanisms

3. Performance

  • Batch low-priority messages
  • Use connection pooling
  • Implement message compression
  • Monitor queue depths

4. Security

  • Encrypt sensitive message content
  • Validate message sources
  • Implement rate limiting
  • Audit all communications

5. Monitoring

  • Track message latencies
  • Monitor delivery rates
  • Alert on communication failures
  • Analyze communication patterns

Related Documents


Tags: #AutoSDLC #Agent #Communication #Protocol #MCP Last Updated: 2025-06-09 Next: API Specification →

⚠️ **GitHub.com Fallback** ⚠️