Managers And Remote Executors - nshaibu/volnux GitHub Wiki

Volnux Framework: Remote Task Execution Technical Documentation

  • Document Version: 1.0
  • Date: November 08, 2025
  • Author: nshaibu
  • Purpose: This document provides a comprehensive technical overview of the framework's remote execution manager for contributors.

Overview

The Remote Task Execution subsystem is the core engine of the Volnux orchestration framework responsible for securely receiving, validating, resolving, executing, and returning results for tasks submitted by remote clients (Executors). It is designed to operate in distributed, multi-tenant environments with strong emphasis on security, isolation, observability, and scalability.

This document details the end-to-end flow of a remote task from submission to completion, with a focus on the Remote Managerβ€”the server-side component that executes tasks. It is intended for software engineers contributing to or integrating with the Volnux framework.


Key Design Goals

Goal Description
Security End-to-end integrity, authentication, sandboxing, and least-privilege execution.
Isolation No task can affect another or the host system.
Reliability Fail-fast validation, structured error handling, and guaranteed response delivery.
Observability Full audit trail, metrics, and tracing for every task lifecycle stage.
Scalability Vertical scaling via worker pools; horizontal via protocol-specialized Manager instances.

Core Components in Remote Execution

Component Responsibility
Protocol Manager Ingress point (gRPC, XML-RPC, TCP). Handles TLS, deserialization, and response routing.
BaseManager Shared execution engine. Manages validation, registry, task queue, workers, and response routing.
Payload Validator Verifies HMAC integrity and schema of incoming JSON payloads.
Event Registry Maps event names to executable classes; supports local and remote sourcing.
Task Instantiator Creates TaskNode from resolved event class and arguments.
Execution Pool Worker processes that spawn isolated subprocesses for task execution.
ClientTaskRegistry Tracks task correlation IDs and client-specific response queues.
Cleanup Scheduler Manages TTL-based eviction of ephemeral event code.

End-to-End Execution Flow

The following is the canonical flow of a remote task from client submission to result delivery.

1. Task Submission (Client β†’ Manager)

  • Client: Remote Executor constructs a JSON payload:
    {
      "event_name": "DataProcessEvent",
      "args": {
        "input_file": "s3://bucket/data.csv",
        "threshold": 0.75
      }
    }
    
  • Checksum: Client computes HMAC-SHA256(payload, secret_key) and attaches it.
  • Transport: Submits over TLS-encrypted channel using chosen protocol:
    • gRPC: Unary RPC with metadata
    • XML-RPC: Method call with struct
    • TCP: Length-prefixed JSON frame
  • Authentication: mTLS client certificate or JWT in header.

2. Ingress & Protocol Handling

  • Protocol Manager receives the request.
  • Extracts raw payload and HMAC.
  • Deserialises into internal task representation.
  • Forwards to BaseManager.handle_task() with protocol-specific handle (for sync responses).

3. Payload Validation

BaseManager performs fail-fast validation:

  1. HMAC Verification:

    • Recompute HMAC-SHA256(payload, secret_key)
    • Compare with received value
    • Reject if mismatch β†’ return INVALID_CHECKSUM
  2. Schema Validation:

    • Ensure event_name is string
    • args is JSON-serializable object
    • Optional: jsonschema validation per event (Or pydantic, pydantic-mini)
  3. Whitelist Check:

    • event_name must be in allowed_events config
    • Reject if not β†’ EVENT_NOT_WHITELISTED

4. Task Correlation & Registration

  • Generate correlation_id = UUID4()
  • Register in ClientTaskRegistry:
    {
      "correlation_id": "abc123...",
      "client_id": "executor-7",
      "client_socket": object
      "status": "pending",
      "created_at": "2025-11-14T10:00:00Z",
      "expires_at": "2025-11-14T10:10:00Z"
    }
    
  • Attach correlation_id to task for response routing.

5. Event Resolution

  • Query EventRegistry for class mapped to event_name as it is done in the normal orchestrator

  • If not found:

    • Return structured error: EVENT_NOT_REGISTERED
    • Include hint: available events (if debug enabled)
  • Optimization:

    • Before the Executor submit the task, It should be able to query for available events
    • The manager will response with status (whether the event exist or not)
    • This will reduce the network latency involved as the payload for this operation is smaller.
  • If expired (ephemeral):

    • Acquire local lock
    • Pull from remote source (GitHub/PyPI/Event-Hub) if not available
    • Re-register class
    • Update last-accessed timestamp
  • Success: Return resolved EventClass


6. Task Instantiation

  • Instantiate event:
    event_instance = EventClass(**task_payload["args"])
    
  • Wrap in TaskNode:
    • id, event, correlation_id, timeout, created_at

7. Dispatch to Execution Pool

  • Enqueue TaskNode to shared task_queue (bounded, e.g., max 100)
  • If queue full β†’ return QUEUE_FULL (503-like)
  • (simple case) ProcessPoolExecutor can handle the queue management and task execution for us.

8. Worker Pickup & Subprocess Execution

  • Worker Process (one of N pre-forked) dequeues task
  • Spawns isolated subprocess using multiprocessing.Pool(1)
  • Executes:
    result = event_instance.execute()
    
  • Sandboxing (if enabled):
    • RestrictedPython: whitelist safe builtins
    • Container: spawn Docker/K8s Job with resource limits
  • Timeout Enforcement:
    • Kill subprocess after task_timeout (default 300s)
    • Return TASK_TIMEOUT

9. Result Capture & HMAC

  • Capture result or exception
  • Both the result and the exception must be serializable
  • Construct response:
    {
      "correlation_id": "abc123...",
      "status": "success",
      "result": { ... },
      "completed_at": "2025-11-14T10:02:30Z"
    }
    
  • Compute HMAC-SHA256(response, secret_key)

10. Response Routing

  • Enqueue response to response_queue
  • Response Router Thread (in BaseManager):
    • Dequeues response
    • Looks up correlation_id in ClientTaskRegistry
    • Routes via protocol-specific mechanism:
Protocol Sync Async
gRPC Return via RPC response Stream or callback
XML-RPC Return fault/result N/A
TCP Push frame Polling (/poll/{id})
  • For polling:
    • Client calls POST /poll/{correlation_id}
    • Server returns result or 202 Pending

11. Cleanup & Eviction

  • ClientTaskRegistry: Evict entry after response delivery or TTL (10 min)
  • Ephemeral Events: Delete pulled code after idle TTL (default 120s)
  • Logs & Metrics: Emit completion event

Error Handling (Structured Responses)

All errors are serialized with HMAC and include:

{
  "correlation_id": "...",
  "status": "error",
  "code": "INVALID_CHECKSUM | EVENT_NOT_REGISTERED | TASK_TIMEOUT | ...",
  "message": "Human-readable explanation",
  "timestamp": "ISO8601"
}

Common error codes:

  • INVALID_CHECKSUM
  • EVENT_NOT_WHITELISTED
  • EVENT_NOT_REGISTERED
  • INVALID_ARGS
  • QUEUE_FULL
  • TASK_TIMEOUT
  • EXECUTION_FAILED

Security Model

Layer Mechanism
Transport TLS 1.3 + mTLS (mutual auth)
Integrity HMAC-SHA256 on all payloads and responses
Authentication Client certs or JWT with sub claim
Authorization allowed_events whitelist + RBAC via JWT scopes
Code Safety No dynamic eval; remote code scanned and ephemeral
Execution Isolation Subprocess + optional container sandbox
Resource Limits CPU/memory caps via resource module or container
Audit Log every resolution, execution, and response

Observability

Logging (Structured JSON)

{
  "level": "info",
  "event": "task_completed",
  "task_id": "def456",
  "correlation_id": "abc123",
  "event_name": "DataProcessEvent",
  "duration_ms": 2100,
  "status": "success",
  "protocol": "grpc",
  "client_id": "executor-7"
}

Metrics (Prometheus)

Metric Labels Type
volnux_tasks_total status, protocol, event_name Counter
volnux_task_duration_seconds status, protocol Histogram
volnux_queue_depth protocol Gauge
volnux_ephemeral_pulls_total source Counter
volnux_errors_total code, protocol Counter

Tracing

  • OpenTelemetry spans:
    • task.submit
    • task.validate
    • event.resolve
    • task.execute
    • task.respond
  • traceparent propagated in protocol headers

Health Endpoint

GET /healthz β†’ 200
{
  "status": "healthy",
  "workers": 8,
  "queue_depth": 12,
  "uptime_seconds": 3600
}

Configuration (Critical Settings)

# Core
secret_key: "..."  # HMAC key (from secret store)
allowed_events: ["DataProcessEvent", "TrainModelEvent"]

# Execution
worker_count: 8
task_timeout: 300
max_pending_tasks: 100
sandbox_mode: true

# Ephemeral Events
ephemeral_ttl: 120
remote_events_config:
  - name: "MLTrainEvent"
    url: "https://github.com...
    type: "github"

# Response
response_ttl_minutes: 10

# Protocol (example: gRPC)
bind_address: "0.0.0.0:50051"

Deployment & Protocol Specialization

  • One Manager type per server/VM
    • GrpcManager β†’ high-performance clusters
    • TcpManager β†’ lightweight edge nodes
    • XmlRpcManager β†’ legacy integration
  • Orchestrators route tasks based on endpoint registry (e.g., Consul, K8s Service)

Summary: Remote Task Execution Flow (One-Liner)

Client submits HMAC-protected JSON task β†’ Protocol Manager validates TLS/auth β†’ BaseManager verifies HMAC/whitelist β†’ Resolves event (pull if needed) β†’ Instantiates TaskNode β†’ Dispatches to worker β†’ Executes in isolated subprocess β†’ Captures result β†’ Routes via correlation ID β†’ Returns HMAC-protected response β†’ Cleans up


For Contributors

  • Add New Protocol: Implement XManager composing BaseManager
  • Add Event Source: Extend EventRegistry.pull_remote()
  • Improve Sandbox: Plug in seccomp, gvisor, or Firecracker