Data Flow - antimetal/system-agent GitHub Wiki
This document describes how data moves through the Antimetal System Agent, from collection sources to the Antimetal platform. Understanding these flows is essential for troubleshooting, performance optimization, and extending the system.
The System Agent processes two primary types of data:
- Kubernetes Resources - Objects from the K8s API (pods, nodes, services, etc.)
- Performance Metrics - System metrics from Linux kernel interfaces
Both data types flow through the Resource Store event system before being streamed to the Antimetal platform via gRPC.
flowchart LR
K8S["Kubernetes<br/>API Server<br/><br/>REST API<br/>HTTP/JSON<br/>Resources"] --> INF["Informers<br/>(Watchers)<br/><br/>Add/Update/<br/>Delete<br/>Events"]
INF --> CTRL["Controller<br/>Reconcile<br/><br/>Normalize<br/>Transform<br/>Enrich"]
CTRL --> STORE["Resource<br/>Store<br/><br/>Store<br/>BadgerDB<br/>Transaction"]
Detailed Steps:
- Kubernetes API - System agent watches K8s resources using informers
- Informers - Cache and detect changes, emit Add/Update/Delete events
- Controller Reconcile - Process events, normalize data, add metadata
- Resource Store - Persist to BadgerDB, emit internal events for subscribers
flowchart LR
KERNEL["Linux<br/>Kernel<br/><br/>/proc/stat<br/>/proc/meminfo<br/>/sys/block<br/>eBPF progs"] --> COLL["Collectors<br/>(14 types)<br/><br/>Parse &<br/>Structure<br/>Data"]
COLL --> PERF["Performance<br/>Manager<br/><br/>Batch &<br/>Aggregate<br/>Metrics<br/>Transforms"]
PERF --> STORE["Resource<br/>Store<br/><br/>Store as<br/>Events<br/>(DB)"]
Detailed Steps:
- Linux Kernel - Data sources: /proc, /sys filesystems, eBPF programs
- Collectors - 14 different collectors parse and structure the raw data
- Performance Manager - Coordinates collection, batches metrics
- Resource Store - Stores metrics as events for upstream consumption
flowchart LR
STORE["Resource<br/>Store<br/><br/>Add/Update/<br/>Delete<br/>Operations"] --> EVENT["Event<br/>Generation<br/><br/>Event with<br/>Resource<br/>Metadata"]
EVENT --> FILTER["Type<br/>Filtering<br/><br/>Route to<br/>Interested<br/>Subscribers"]
FILTER --> SUB["Subscriber<br/>Components<br/><br/>Intake<br/>Worker<br/>(Primary)"]
Detailed Steps:
- Resource Store Transaction - Any change to stored data triggers event
- Event Generation - Creates event with operation type and resource data
- Type Filtering - Routes events to subscribers based on resource type
- Subscribers - Intake Worker receives events for upstream transmission
flowchart LR
EVENTS["Event<br/>Subscribers<br/><br/>K8s + Perf<br/>Events<br/>(Mixed)"] --> INTAKE["Intake<br/>Worker<br/><br/>Convert to<br/>Protobuf<br/>Messages"]
INTAKE --> BATCH["Batch<br/>Queue<br/><br/>Size/Time<br/>Based<br/>Batching"]
BATCH --> PLATFORM["Antimetal<br/>Platform<br/><br/>gRPC Stream<br/>with TLS<br/>& Auth"]
Detailed Steps:
- Event Subscribers - Receive both K8s resource and performance events
- Intake Worker - Converts events to protobuf messages for transmission
- Batch Queue - Accumulates messages based on size/time thresholds
- Antimetal Platform - Receives data via secure gRPC stream
Input Format (from K8s API):
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "example-pod",
"namespace": "default",
"labels": {...},
"annotations": {...}
},
"spec": {...},
"status": {...}
}
Normalized Format (in Resource Store):
type Resource struct {
ID string
Type string
Name string
Namespace string
Cluster string
Region string
Provider string
Metadata map[string]string
Spec interface{}
Status interface{}
Timestamp time.Time
}
Input Format (from /proc/stat):
cpu 1234 56 789 10000 200 30 40 50 60 70
cpu0 600 30 400 5000 100 15 20 25 30 35
Structured Format (from CPU Collector):
type CPUStats struct {
CPUIndex int32
User uint64
Nice uint64
System uint64
Idle uint64
IOWait uint64
IRQ uint64
SoftIRQ uint64
Steal uint64
Guest uint64
GuestNice uint64
}
- Informer Disconnection: Automatic reconnection with exponential backoff
- Missing Proc Files: Graceful degradation, log warnings, continue collection
- Collector Errors: Individual collector failures don't affect others
- Network Issues: Exponential backoff with jitter
- gRPC Stream Errors: Automatic stream recreation
- Batch Failures: Retry individual messages, dead letter queue for persistent failures
- Size Threshold: 1MB default batch size
- Time Threshold: 30 second maximum batch age
- Adaptive: Adjusts based on event rate and network conditions
- Event Queues: Bounded queues with back-pressure
- Batch Buffers: Pre-allocated pools to reduce GC pressure
- Resource Store: LRU eviction for old data
- Concurrent Processing: Parallel collector execution
- Pipeline Parallelism: Overlapping collection, processing, and transmission
- Compression: gRPC message compression reduces bandwidth
-
events_generated_total{type, operation}
- Events created by type -
batch_queue_size
- Current items waiting for transmission -
upload_duration_seconds
- Time to transmit batches -
collector_errors_total{collector}
- Collection failure rates
- Slow Collection: Check collector-specific metrics and /proc filesystem access
- Event Backlog: Monitor batch queue size and upstream connectivity
- Missing Data: Verify informer health and RBAC permissions
- Component Diagram - Visual system architecture
- Architecture Overview - Detailed component descriptions
- Performance Monitoring - Collector system details
- Troubleshooting - Common data flow issues
This document describes the data flow patterns as implemented. For configuration options, see Configuration Guide.