Custom Collectors - antimetal/system-agent GitHub Wiki

Custom Collectors

This guide explains how to build custom performance collectors for the Antimetal System Agent. Custom collectors allow you to extend the agent's monitoring capabilities for your specific needs.

Overview

The collector system provides two interfaces:

  • PointCollector: One-shot data collection
  • ContinuousCollector: Streaming data collection with lifecycle management

Most collectors implement PointCollector and are wrapped for continuous collection.

Basic Collector Structure

Step 1: Define Your Data Type

// pkg/performance/types.go
package performance

// Define your metric type
const MetricTypeCustom MetricType = "custom"

// Define your data structure
type CustomStats struct {
    Timestamp   time.Time          `json:"timestamp"`
    Values      map[string]float64 `json:"values"`
    Metadata    map[string]string  `json:"metadata"`
    SampleCount int                `json:"sample_count"`
}

Step 2: Implement the Collector

// pkg/performance/collectors/custom_collector.go
package collectors

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "strings"
    
    "github.com/go-logr/logr"
    "github.com/antimetal/system-agent/pkg/performance"
)

// Compile-time interface check
var _ performance.PointCollector = (*CustomCollector)(nil)

type CustomCollector struct {
    performance.BaseCollector
    dataPath   string
    configPath string
}

// Constructor with standard pattern
func NewCustomCollector(logger logr.Logger, config performance.CollectionConfig) (*CustomCollector, error) {
    // Validate paths are absolute
    if !filepath.IsAbs(config.HostProcPath) {
        return nil, fmt.Errorf("HostProcPath must be absolute: %q", config.HostProcPath)
    }
    
    // Define collector capabilities
    capabilities := performance.CollectorCapabilities{
        SupportsOneShot:    true,
        SupportsContinuous: false,
        RequiresRoot:       false,
        RequiresEBPF:       false,
        MinKernelVersion:   "3.10.0",
    }
    
    // Create collector instance
    return &CustomCollector{
        BaseCollector: performance.NewBaseCollector(
            performance.MetricTypeCustom,
            "custom",
            logger,
            config,
            capabilities,
        ),
        dataPath:   filepath.Join(config.HostProcPath, "custom_data"),
        configPath: filepath.Join(config.HostSysPath, "custom/config"),
    }, nil
}

// Implement the Collect method
func (c *CustomCollector) Collect(ctx context.Context) (any, error) {
    c.Logger().V(1).Info("Collecting custom metrics")
    
    // Check context cancellation
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    
    // Read data from filesystem
    data, err := c.readData()
    if err != nil {
        return nil, fmt.Errorf("failed to read data: %w", err)
    }
    
    // Parse and process data
    stats, err := c.parseData(data)
    if err != nil {
        return nil, fmt.Errorf("failed to parse data: %w", err)
    }
    
    // Add timestamp
    stats.Timestamp = time.Now()
    
    c.Logger().V(2).Info("Custom metrics collected", 
        "sample_count", stats.SampleCount)
    
    return stats, nil
}

// Private helper methods
func (c *CustomCollector) readData() ([]byte, error) {
    // Read critical file - return error if missing
    data, err := os.ReadFile(c.dataPath)
    if err != nil {
        return nil, fmt.Errorf("failed to read %s: %w", c.dataPath, err)
    }
    
    return data, nil
}

func (c *CustomCollector) parseData(data []byte) (*performance.CustomStats, error) {
    stats := &performance.CustomStats{
        Values:   make(map[string]float64),
        Metadata: make(map[string]string),
    }
    
    lines := strings.Split(string(data), "\n")
    for _, line := range lines {
        line = strings.TrimSpace(line)
        if line == "" || strings.HasPrefix(line, "#") {
            continue
        }
        
        // Parse your data format
        parts := strings.Fields(line)
        if len(parts) >= 2 {
            key := parts[0]
            value, err := strconv.ParseFloat(parts[1], 64)
            if err != nil {
                c.Logger().V(2).Info("Failed to parse value", 
                    "key", key, "error", err)
                continue
            }
            stats.Values[key] = value
            stats.SampleCount++
        }
    }
    
    // Read optional metadata
    if metadata, err := c.readMetadata(); err == nil {
        stats.Metadata = metadata
    } else {
        c.Logger().V(2).Info("Metadata unavailable", "error", err)
    }
    
    return stats, nil
}

func (c *CustomCollector) readMetadata() (map[string]string, error) {
    metadata := make(map[string]string)
    
    // Read optional configuration
    if data, err := os.ReadFile(c.configPath); err == nil {
        // Parse configuration
        lines := strings.Split(string(data), "\n")
        for _, line := range lines {
            if parts := strings.SplitN(line, "=", 2); len(parts) == 2 {
                metadata[parts[0]] = parts[1]
            }
        }
    }
    
    return metadata, nil
}

// Register the collector
func init() {
    performance.Register(
        performance.MetricTypeCustom,
        performance.PartialNewContinuousPointCollector(
            func(logger logr.Logger, config performance.CollectionConfig) (performance.PointCollector, error) {
                return NewCustomCollector(logger, config)
            },
        ),
    )
}

Step 3: Write Tests

// pkg/performance/collectors/custom_collector_test.go
package collectors_test

import (
    "context"
    "os"
    "path/filepath"
    "testing"
    
    "github.com/go-logr/logr"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
    
    "github.com/antimetal/system-agent/pkg/performance"
    "github.com/antimetal/system-agent/pkg/performance/collectors"
)

// Test constructor validation
func TestCustomCollector_Constructor(t *testing.T) {
    tests := []struct {
        name      string
        config    performance.CollectionConfig
        wantError bool
        errorMsg  string
    }{
        {
            name: "valid absolute paths",
            config: performance.CollectionConfig{
                HostProcPath: "/proc",
                HostSysPath:  "/sys",
            },
            wantError: false,
        },
        {
            name: "relative proc path",
            config: performance.CollectionConfig{
                HostProcPath: "proc",
                HostSysPath:  "/sys",
            },
            wantError: true,
            errorMsg:  "HostProcPath must be absolute",
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            _, err := collectors.NewCustomCollector(logr.Discard(), tt.config)
            
            if tt.wantError {
                assert.Error(t, err)
                assert.Contains(t, err.Error(), tt.errorMsg)
            } else {
                assert.NoError(t, err)
            }
        })
    }
}

// Test data collection
func TestCustomCollector_Collect(t *testing.T) {
    tests := []struct {
        name        string
        procContent string
        sysContent  string
        wantError   bool
        validate    func(t *testing.T, result any)
    }{
        {
            name: "valid data",
            procContent: `metric1 100.5
metric2 200.0
metric3 300.75`,
            sysContent: "version=1.0.0\nmode=production",
            validate: func(t *testing.T, result any) {
                stats, ok := result.(*performance.CustomStats)
                require.True(t, ok, "result should be CustomStats")
                
                assert.Equal(t, 3, stats.SampleCount)
                assert.Equal(t, 100.5, stats.Values["metric1"])
                assert.Equal(t, 200.0, stats.Values["metric2"])
                assert.Equal(t, 300.75, stats.Values["metric3"])
                assert.Equal(t, "1.0.0", stats.Metadata["version"])
                assert.Equal(t, "production", stats.Metadata["mode"])
            },
        },
        {
            name: "empty data",
            procContent: "",
            validate: func(t *testing.T, result any) {
                stats, ok := result.(*performance.CustomStats)
                require.True(t, ok)
                assert.Equal(t, 0, stats.SampleCount)
                assert.Empty(t, stats.Values)
            },
        },
        {
            name:      "missing file",
            wantError: true,
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            collector, procPath, sysPath := createTestCustomCollector(t, tt.procContent, tt.sysContent)
            
            result, err := collector.Collect(context.Background())
            
            if tt.wantError {
                assert.Error(t, err)
                return
            }
            
            require.NoError(t, err)
            if tt.validate != nil {
                tt.validate(t, result)
            }
        })
    }
}

// Helper function to create test collector
func createTestCustomCollector(t *testing.T, procContent, sysContent string) (*collectors.CustomCollector, string, string) {
    tmpDir := t.TempDir()
    procPath := filepath.Join(tmpDir, "proc")
    sysPath := filepath.Join(tmpDir, "sys")
    
    // Create directory structure
    require.NoError(t, os.MkdirAll(procPath, 0755))
    require.NoError(t, os.MkdirAll(filepath.Join(sysPath, "custom"), 0755))
    
    // Write test data if provided
    if procContent != "" {
        dataPath := filepath.Join(procPath, "custom_data")
        require.NoError(t, os.WriteFile(dataPath, []byte(procContent), 0644))
    }
    
    if sysContent != "" {
        configPath := filepath.Join(sysPath, "custom/config")
        require.NoError(t, os.WriteFile(configPath, []byte(sysContent), 0644))
    }
    
    config := performance.CollectionConfig{
        HostProcPath: procPath,
        HostSysPath:  sysPath,
    }
    
    collector, err := collectors.NewCustomCollector(logr.Discard(), config)
    require.NoError(t, err)
    
    return collector, procPath, sysPath
}

// Test context cancellation
func TestCustomCollector_ContextCancellation(t *testing.T) {
    collector, _, _ := createTestCustomCollector(t, "test data", "")
    
    ctx, cancel := context.WithCancel(context.Background())
    cancel() // Cancel immediately
    
    _, err := collector.Collect(ctx)
    assert.Error(t, err)
    assert.Equal(t, context.Canceled, err)
}

Advanced Collector Patterns

Continuous Collector

For collectors that need lifecycle management:

type AdvancedCollector struct {
    performance.BaseContinuousCollector
    
    // Collector state
    client     *CustomClient
    interval   time.Duration
    ch         chan any
    stopped    chan struct{}
    
    // Configuration
    endpoint   string
    bufferSize int
}

func NewAdvancedCollector(logger logr.Logger, config performance.CollectionConfig) (*AdvancedCollector, error) {
    return &AdvancedCollector{
        BaseContinuousCollector: performance.NewBaseContinuousCollector(
            performance.MetricTypeAdvanced,
            "advanced",
            logger,
            config,
            performance.CollectorCapabilities{
                SupportsOneShot:    false,
                SupportsContinuous: true,
                RequiresRoot:       true,
                RequiresEBPF:       false,
            },
        ),
        interval:   30 * time.Second,
        bufferSize: 100,
        endpoint:   config.CustomEndpoint,
    }, nil
}

func (c *AdvancedCollector) Start(ctx context.Context) (<-chan any, error) {
    if c.Status() != performance.CollectorStatusDisabled {
        return nil, fmt.Errorf("collector already running")
    }
    
    // Initialize client
    client, err := NewCustomClient(c.endpoint)
    if err != nil {
        return nil, fmt.Errorf("failed to create client: %w", err)
    }
    c.client = client
    
    // Create channels
    c.ch = make(chan any, c.bufferSize)
    c.stopped = make(chan struct{})
    
    // Start collection goroutine
    c.SetStatus(performance.CollectorStatusActive)
    go c.runCollection(ctx)
    
    return c.ch, nil
}

func (c *AdvancedCollector) Stop() error {
    if c.Status() == performance.CollectorStatusDisabled {
        return nil
    }
    
    // Signal stop
    if c.stopped != nil {
        close(c.stopped)
        c.stopped = nil
    }
    
    // Cleanup client
    if c.client != nil {
        c.client.Close()
        c.client = nil
    }
    
    // Give goroutine time to exit
    time.Sleep(10 * time.Millisecond)
    
    // Close channel
    if c.ch != nil {
        close(c.ch)
        c.ch = nil
    }
    
    c.SetStatus(performance.CollectorStatusDisabled)
    return nil
}

func (c *AdvancedCollector) runCollection(ctx context.Context) {
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-c.stopped:
            return
        case <-ticker.C:
            data, err := c.client.Fetch()
            if err != nil {
                c.Logger().Error(err, "Failed to fetch data")
                c.SetError(err)
                continue
            }
            
            select {
            case c.ch <- data:
                c.ClearError()
            case <-ctx.Done():
                return
            case <-c.stopped:
                return
            default:
                c.Logger().V(1).Info("Channel full, dropping data")
            }
        }
    }
}

eBPF-Based Collector

For kernel-level monitoring:

//go:build linux

package collectors

import (
    _ "embed"
    
    "github.com/cilium/ebpf"
    "github.com/cilium/ebpf/link"
    "github.com/cilium/ebpf/perf"
)

//go:embed syscall_monitor.bpf.o
var syscallMonitorProgram []byte

type SyscallCollector struct {
    performance.BaseContinuousCollector
    
    // eBPF objects
    collection *ebpf.Collection
    perfReader *perf.Reader
    links      []link.Link
}

func (c *SyscallCollector) Start(ctx context.Context) (<-chan any, error) {
    // Load eBPF program
    spec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(syscallMonitorProgram))
    if err != nil {
        return nil, fmt.Errorf("failed to load eBPF spec: %w", err)
    }
    
    coll, err := ebpf.NewCollection(spec)
    if err != nil {
        return nil, fmt.Errorf("failed to create eBPF collection: %w", err)
    }
    c.collection = coll
    
    // Attach to tracepoints
    tp, err := link.Tracepoint("syscalls", "sys_enter_open", coll.Programs["trace_open"])
    if err != nil {
        return nil, fmt.Errorf("failed to attach tracepoint: %w", err)
    }
    c.links = append(c.links, tp)
    
    // Create perf event reader
    reader, err := perf.NewReader(coll.Maps["events"], 4096)
    if err != nil {
        return nil, fmt.Errorf("failed to create perf reader: %w", err)
    }
    c.perfReader = reader
    
    // Start reading events
    ch := make(chan any)
    go c.readEvents(ctx, ch)
    
    return ch, nil
}

func (c *SyscallCollector) readEvents(ctx context.Context, ch chan<- any) {
    defer close(ch)
    
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }
        
        record, err := c.perfReader.Read()
        if err != nil {
            if errors.Is(err, perf.ErrClosed) {
                return
            }
            c.Logger().Error(err, "Failed to read perf event")
            continue
        }
        
        // Parse event data
        event := parseSyscallEvent(record.RawSample)
        
        select {
        case ch <- event:
        case <-ctx.Done():
            return
        }
    }
}

Aggregating Collector

For collectors that aggregate data:

type AggregatingCollector struct {
    performance.BaseCollector
    
    // Aggregation state
    mu           sync.Mutex
    accumulator  map[string]*Accumulator
    lastFlush    time.Time
    flushPeriod  time.Duration
}

type Accumulator struct {
    Count   int64
    Sum     float64
    Min     float64
    Max     float64
    Samples []float64
}

func (c *AggregatingCollector) AddSample(key string, value float64) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    acc, exists := c.accumulator[key]
    if !exists {
        acc = &Accumulator{
            Min: value,
            Max: value,
        }
        c.accumulator[key] = acc
    }
    
    acc.Count++
    acc.Sum += value
    if value < acc.Min {
        acc.Min = value
    }
    if value > acc.Max {
        acc.Max = value
    }
    
    // Keep last N samples for percentiles
    if len(acc.Samples) < 100 {
        acc.Samples = append(acc.Samples, value)
    }
}

func (c *AggregatingCollector) Collect(ctx context.Context) (any, error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    // Check if we should flush
    if time.Since(c.lastFlush) < c.flushPeriod {
        return nil, performance.ErrNotReady
    }
    
    // Build aggregated stats
    stats := &AggregatedStats{
        Timestamp: time.Now(),
        Period:    c.flushPeriod,
        Metrics:   make(map[string]*MetricSummary),
    }
    
    for key, acc := range c.accumulator {
        summary := &MetricSummary{
            Count:   acc.Count,
            Sum:     acc.Sum,
            Average: acc.Sum / float64(acc.Count),
            Min:     acc.Min,
            Max:     acc.Max,
        }
        
        // Calculate percentiles
        if len(acc.Samples) > 0 {
            sort.Float64s(acc.Samples)
            summary.P50 = percentile(acc.Samples, 0.5)
            summary.P95 = percentile(acc.Samples, 0.95)
            summary.P99 = percentile(acc.Samples, 0.99)
        }
        
        stats.Metrics[key] = summary
    }
    
    // Reset accumulator
    c.accumulator = make(map[string]*Accumulator)
    c.lastFlush = time.Now()
    
    return stats, nil
}

Best Practices

1. Error Handling

func (c *CustomCollector) Collect(ctx context.Context) (any, error) {
    // Distinguish between critical and optional data
    
    // Critical data - fail if unavailable
    criticalData, err := c.readCriticalData()
    if err != nil {
        return nil, fmt.Errorf("critical data unavailable: %w", err)
    }
    
    stats := processData(criticalData)
    
    // Optional data - log but continue
    if optionalData, err := c.readOptionalData(); err == nil {
        enrichStats(stats, optionalData)
    } else {
        c.Logger().V(2).Info("Optional data unavailable", "error", err)
    }
    
    return stats, nil
}

2. Resource Management

type ResourceCollector struct {
    performance.BaseCollector
    
    // Resources that need cleanup
    file   *os.File
    client *http.Client
    cancel context.CancelFunc
}

func (c *ResourceCollector) Collect(ctx context.Context) (any, error) {
    // Ensure cleanup on all paths
    defer func() {
        if c.file != nil {
            c.file.Close()
        }
    }()
    
    // Open resources
    file, err := os.Open(c.path)
    if err != nil {
        return nil, err
    }
    c.file = file
    
    // Use defer for cleanup
    defer file.Close()
    
    // Process data...
}

3. Performance Optimization

// Pre-allocate structures
func (c *CustomCollector) Collect(ctx context.Context) (any, error) {
    // Estimate size to avoid reallocations
    stats := &CustomStats{
        Values: make(map[string]float64, 100),
    }
    
    // Use buffered I/O for large files
    file, err := os.Open(c.dataPath)
    if err != nil {
        return nil, err
    }
    defer file.Close()
    
    reader := bufio.NewReaderSize(file, 64*1024)
    
    // Process efficiently...
}

4. Testing Patterns

// Table-driven tests
func TestCollector_EdgeCases(t *testing.T) {
    tests := []struct {
        name      string
        setup     func() *CustomCollector
        wantError bool
        validate  func(t *testing.T, result any)
    }{
        {
            name: "handles malformed data",
            setup: func() *CustomCollector {
                return createCollectorWithData("invalid\ndata\n")
            },
            wantError: false,
            validate: func(t *testing.T, result any) {
                stats := result.(*CustomStats)
                assert.Empty(t, stats.Values)
            },
        },
        {
            name: "handles huge values",
            setup: func() *CustomCollector {
                return createCollectorWithData("metric 9223372036854775807")
            },
            validate: func(t *testing.T, result any) {
                stats := result.(*CustomStats)
                assert.Equal(t, float64(math.MaxInt64), stats.Values["metric"])
            },
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            collector := tt.setup()
            result, err := collector.Collect(context.Background())
            
            if tt.wantError {
                assert.Error(t, err)
            } else {
                assert.NoError(t, err)
                if tt.validate != nil {
                    tt.validate(t, result)
                }
            }
        })
    }
}

Integration with Agent

Configuration

Add your collector to the configuration:

performance:
  collectors:
    - cpu
    - memory
    - custom  # Your collector
  
  settings:
    custom:
      # Collector-specific settings
      endpoint: "http://custom-service:8080"
      timeout: "30s"

Metrics Export

Your collector's data will be available through the agent's metrics:

# HELP antimetal_custom_values Custom collector metrics
# TYPE antimetal_custom_values gauge
antimetal_custom_values{metric="metric1"} 100.5
antimetal_custom_values{metric="metric2"} 200.0

Debugging

Enable debug logging for your collector:

--log-verbosity=custom:3

Examples

System Service Monitor

Monitor systemd services:

type ServiceCollector struct {
    performance.BaseCollector
}

func (c *ServiceCollector) Collect(ctx context.Context) (any, error) {
    cmd := exec.CommandContext(ctx, "systemctl", "list-units", "--type=service", "--no-pager", "--plain")
    output, err := cmd.Output()
    if err != nil {
        return nil, fmt.Errorf("failed to list services: %w", err)
    }
    
    stats := &ServiceStats{
        Services: make(map[string]ServiceStatus),
    }
    
    scanner := bufio.NewScanner(bytes.NewReader(output))
    for scanner.Scan() {
        line := scanner.Text()
        if service := parseServiceLine(line); service != nil {
            stats.Services[service.Name] = *service
        }
    }
    
    return stats, nil
}

Database Connection Pool Monitor

type DBPoolCollector struct {
    performance.BaseCollector
    db *sql.DB
}

func (c *DBPoolCollector) Collect(ctx context.Context) (any, error) {
    stats := c.db.Stats()
    
    return &DBPoolStats{
        OpenConnections: stats.OpenConnections,
        InUse:          stats.InUse,
        Idle:           stats.Idle,
        WaitCount:      stats.WaitCount,
        WaitDuration:   stats.WaitDuration,
        MaxIdleClosed:  stats.MaxIdleClosed,
        MaxLifetimeClosed: stats.MaxLifetimeClosed,
    }, nil
}

Next Steps


For more examples, see pkg/performance/collectors/