Performance Monitoring - antimetal/system-agent GitHub Wiki

Performance Monitoring

The System Agent includes a comprehensive performance monitoring system that collects system and hardware metrics from Linux hosts. This page covers the architecture, available collectors, and how to build custom collectors.

Architecture Overview

The performance monitoring system follows a pluggable collector pattern with two main interfaces:

// One-shot data collection
type PointCollector interface {
    Collect(ctx context.Context) (any, error)
}

// Continuous streaming collection
type ContinuousCollector interface {
    Start(ctx context.Context) (<-chan any, error)
    Stop() error
}

Available Collectors

System Metrics

Collector Type Source Description
CPU Continuous /proc/stat CPU usage, time distribution
Memory Continuous /proc/meminfo Memory usage, buffers, cache
Load Continuous /proc/loadavg System load averages
Network Continuous /proc/net/dev Interface statistics
Disk Continuous /proc/diskstats Disk I/O statistics
TCP Continuous /proc/net/tcp* TCP connection states
Process Continuous /proc/[pid]/ Per-process metrics
Kernel Continuous /dev/kmsg Kernel messages

Hardware Information

Collector Type Source Description
CPU Info Once /proc/cpuinfo CPU details, cores, features
Memory Info Once /proc/meminfo Total memory, types
Disk Info Once /sys/block/*/ Disk hardware details
Network Info Once /sys/class/net/*/ Network interface details
NUMA Once /sys/devices/system/node/ NUMA topology

Advanced Collectors

Collector Type Source Description
Execsnoop Continuous eBPF Process execution tracking

Collector Implementation

Basic Collector Structure

package collectors

import (
    "context"
    "fmt"
    "path/filepath"
    
    "github.com/go-logr/logr"
    "github.com/antimetal/system-agent/pkg/performance"
)

// Compile-time interface check
var _ performance.Collector = (*CustomCollector)(nil)

type CustomCollector struct {
    performance.BaseCollector
    // Collector-specific fields
    dataPath string
}

// Constructor follows standard pattern
func NewCustomCollector(logger logr.Logger, config performance.CollectionConfig) (*CustomCollector, error) {
    // Validate paths are absolute
    if !filepath.IsAbs(config.HostProcPath) {
        return nil, fmt.Errorf("HostProcPath must be absolute: %q", config.HostProcPath)
    }
    
    // Define capabilities
    capabilities := performance.CollectorCapabilities{
        SupportsOneShot:    true,
        SupportsContinuous: false,
        RequiresRoot:       false,
        RequiresEBPF:       false,
        MinKernelVersion:   "2.6.0",
    }
    
    return &CustomCollector{
        BaseCollector: performance.NewBaseCollector(
            performance.MetricTypeCustom,
            "custom",
            logger,
            config,
            capabilities,
        ),
        dataPath: filepath.Join(config.HostProcPath, "custom_data"),
    }, nil
}

// Implement collection logic
func (c *CustomCollector) Collect(ctx context.Context) (any, error) {
    // Read data from filesystem
    data, err := os.ReadFile(c.dataPath)
    if err != nil {
        return nil, fmt.Errorf("failed to read %s: %w", c.dataPath, err)
    }
    
    // Parse and return structured data
    return c.parseData(data)
}

Continuous Collector Pattern

type ContinuousCustomCollector struct {
    performance.BaseContinuousCollector
    interval time.Duration
    ch       chan any
    stopped  chan struct{}
}

func (c *ContinuousCustomCollector) Start(ctx context.Context) (<-chan any, error) {
    if c.Status() != performance.CollectorStatusDisabled {
        return nil, fmt.Errorf("collector already running")
    }
    
    c.SetStatus(performance.CollectorStatusActive)
    c.ch = make(chan any)
    c.stopped = make(chan struct{})
    
    go c.runCollection(ctx)
    return c.ch, nil
}

func (c *ContinuousCustomCollector) Stop() error {
    if c.stopped != nil {
        close(c.stopped)
        c.stopped = nil
    }
    
    time.Sleep(10 * time.Millisecond) // Allow goroutine to exit
    
    if c.ch != nil {
        close(c.ch)
        c.ch = nil
    }
    
    c.SetStatus(performance.CollectorStatusDisabled)
    return nil
}

func (c *ContinuousCustomCollector) runCollection(ctx context.Context) {
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-c.stopped:
            return
        case <-ticker.C:
            data, err := c.collect(ctx)
            if err != nil {
                c.Logger().Error(err, "Collection failed")
                continue
            }
            
            select {
            case c.ch <- data:
            case <-ctx.Done():
                return
            case <-c.stopped:
                return
            }
        }
    }
}

Registering Collectors

func init() {
    // For PointCollectors, wrap with continuous adapter
    performance.Register(
        performance.MetricTypeCustom,
        performance.PartialNewContinuousPointCollector(
            func(logger logr.Logger, config performance.CollectionConfig) (performance.PointCollector, error) {
                return NewCustomCollector(logger, config)
            },
        ),
    )
}

Data Types

CPU Metrics

type CPUStats struct {
    User        uint64            // Time in user mode
    Nice        uint64            // Time in user mode (nice)
    System      uint64            // Time in system mode
    Idle        uint64            // Time idle
    IOWait      uint64            // Time waiting for I/O
    IRQ         uint64            // Time servicing interrupts
    SoftIRQ     uint64            // Time servicing soft interrupts
    Steal       uint64            // Time stolen by hypervisor
    Guest       uint64            // Time running guest OS
    GuestNice   uint64            // Time running nice guest OS
    PerCPU      []CPUCoreStat    // Per-core statistics
}

Memory Metrics

type MemoryStats struct {
    Total          uint64  // Total memory
    Free           uint64  // Free memory
    Available      uint64  // Available memory
    Buffers        uint64  // Buffer cache
    Cached         uint64  // Page cache
    SwapTotal      uint64  // Total swap
    SwapFree       uint64  // Free swap
    Dirty          uint64  // Dirty pages
    Writeback      uint64  // Pages being written
    AnonPages      uint64  // Anonymous pages
    Mapped         uint64  // Mapped pages
    Shmem          uint64  // Shared memory
    KernelStack    uint64  // Kernel stack
    PageTables     uint64  // Page table pages
    HugePages      HugePagesStats
}

Network Metrics

type NetworkStats struct {
    Interfaces []InterfaceStats
}

type InterfaceStats struct {
    Name         string
    RxBytes      uint64  // Received bytes
    RxPackets    uint64  // Received packets
    RxErrors     uint64  // Receive errors
    RxDropped    uint64  // Dropped packets
    TxBytes      uint64  // Transmitted bytes
    TxPackets    uint64  // Transmitted packets
    TxErrors     uint64  // Transmit errors
    TxDropped    uint64  // Dropped packets
    Collisions   uint64  // Collisions
    CarrierLoss  uint64  // Carrier losses
}

Error Handling

Collectors must distinguish between critical and optional data:

func (c *NetworkCollector) collectNetworkStats() (*NetworkStats, error) {
    // Critical file - return error if missing
    procNetDev, err := os.Open(c.procNetDevPath)
    if err != nil {
        return nil, fmt.Errorf("failed to open %s: %w", c.procNetDevPath, err)
    }
    defer procNetDev.Close()
    
    stats := &NetworkStats{}
    scanner := bufio.NewScanner(procNetDev)
    
    for scanner.Scan() {
        iface, err := c.parseInterface(scanner.Text())
        if err != nil {
            // Log but continue - don't fail on single interface
            c.Logger().V(2).Info("Failed to parse interface", "error", err)
            continue
        }
        
        // Optional metadata - graceful degradation
        if metadata, err := c.readInterfaceMetadata(iface.Name); err == nil {
            iface.Metadata = metadata
        } else {
            c.Logger().V(2).Info("Metadata unavailable", "interface", iface.Name)
        }
        
        stats.Interfaces = append(stats.Interfaces, iface)
    }
    
    return stats, scanner.Err()
}

Testing Collectors

Test Structure

func TestCustomCollector_Collect(t *testing.T) {
    tests := []struct {
        name        string
        procContent string
        wantError   bool
        validate    func(t *testing.T, result any)
    }{
        {
            name: "valid data",
            procContent: "valid test data",
            validate: func(t *testing.T, result any) {
                stats, ok := result.(*CustomStats)
                require.True(t, ok)
                assert.Equal(t, expectedValue, stats.Value)
            },
        },
        {
            name:      "malformed data",
            procContent: "invalid",
            wantError: true,
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            collector, procPath, _ := createTestCollector(t, tt.procContent)
            result, err := collector.Collect(context.Background())
            
            if tt.wantError {
                assert.Error(t, err)
                return
            }
            
            require.NoError(t, err)
            if tt.validate != nil {
                tt.validate(t, result)
            }
        })
    }
}

Performance Considerations

Efficient File Reading

// Good: Read once, parse in memory
data, err := os.ReadFile(path)
if err != nil {
    return nil, err
}
return parseData(data), nil

// Bad: Multiple reads of same file
value1 := readLineFromFile(path, 1)
value2 := readLineFromFile(path, 2)

Memory Management

// Pre-allocate slices when size is known
interfaces := make([]InterfaceStats, 0, estimatedCount)

// Reuse buffers for parsing
var buffer bytes.Buffer
for _, line := range lines {
    buffer.Reset()
    // Use buffer...
}

Concurrent Collection

// Collect from multiple sources concurrently
var wg sync.WaitGroup
results := make(chan CollectorResult, len(collectors))

for _, collector := range collectors {
    wg.Add(1)
    go func(c Collector) {
        defer wg.Done()
        result, err := c.Collect(ctx)
        results <- CollectorResult{Data: result, Error: err}
    }(collector)
}

wg.Wait()
close(results)

Container Considerations

When running in containers, filesystem paths differ:

// Environment variables for container paths
HOST_PROC := os.Getenv("HOST_PROC")  // Default: /host/proc
HOST_SYS := os.Getenv("HOST_SYS")    // Default: /host/sys

// Container volume mounts
volumes:
  - /proc:/host/proc:ro
  - /sys:/host/sys:ro

eBPF Collectors

For kernel-level monitoring:

// eBPF program in C
//go:embed execsnoop.bpf.c
var execsnoopProgram string

// Load and attach eBPF program
spec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(execsnoopProgram))
if err != nil {
    return nil, fmt.Errorf("failed to load eBPF program: %w", err)
}

coll, err := ebpf.NewCollection(spec)
if err != nil {
    return nil, fmt.Errorf("failed to create eBPF collection: %w", err)
}

Integration with Agent

Collectors are automatically managed by the Performance Manager:

# Configuration
performance:
  enabled: true
  interval: 60s
  collectors:
    - cpu
    - memory
    - network
    - disk
    - custom  # Your custom collector

Best Practices

  1. Path Handling

    • Always validate paths are absolute
    • Use filepath.Join for path construction
    • Handle container vs host paths
  2. Error Handling

    • Fail fast on critical errors
    • Graceful degradation for optional data
    • Clear error messages with context
  3. Performance

    • Minimize file system operations
    • Cache static information
    • Use efficient parsing techniques
  4. Testing

    • Mock filesystem for unit tests
    • Test error conditions
    • Validate container compatibility

Next Steps


For collector examples, see pkg/performance/collectors/