Kubernetes Controller - antimetal/system-agent GitHub Wiki
The Kubernetes Controller is the core component responsible for discovering, watching, and collecting Kubernetes resources. Built on the controller-runtime framework, it provides real-time synchronization of cluster state to the Resource Store.
graph TD
subgraph KC["Kubernetes Controller"]
M["Manager<br/>• Runtime<br/>• Leader<br/>• Metrics"]
R["Reconcilers<br/>• Pod<br/>• Node<br/>• Service<br/>• Deployment<br/>• + 7 more"]
I["Informers<br/>• Shared Cache<br/>• Event Handlers<br/>• Indexers"]
M -.-> R
M -.-> I
I -.-> R
end
KC --> RS["Resource Store"]
The controller monitors these Kubernetes resources:
Resource | API Group | Scope | Purpose |
---|---|---|---|
Node | v1 | Cluster | Infrastructure capacity and health |
Pod | v1 | Namespaced | Workload instances |
Service | v1 | Namespaced | Network endpoints |
Deployment | apps/v1 | Namespaced | Application definitions |
DaemonSet | apps/v1 | Namespaced | Node-level workloads |
StatefulSet | apps/v1 | Namespaced | Stateful applications |
ReplicaSet | apps/v1 | Namespaced | Pod replicas |
Job | batch/v1 | Namespaced | Batch workloads |
PersistentVolume | v1 | Cluster | Storage resources |
PersistentVolumeClaim | v1 | Namespaced | Storage requests |
func NewManager(config *rest.Config, options ctrl.Options) (ctrl.Manager, error) {
// Create scheme with Kubernetes types
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
// Configure manager
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: options.MetricsBindAddress,
Port: options.Port,
HealthProbeBindAddress: options.HealthProbeBindAddress,
LeaderElection: options.LeaderElection,
LeaderElectionID: "antimetal-agent-leader",
// Namespace empty = watch all namespaces
})
return mgr, err
}
Each resource type has a dedicated reconciler:
type PodReconciler struct {
client.Client
Scheme *runtime.Scheme
Store store.Store
Logger logr.Logger
}
// Reconcile handles Pod events
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Logger.WithValues("pod", req.NamespacedName)
// Fetch the Pod instance
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if apierrors.IsNotFound(err) {
// Pod deleted - remove from store
return r.handleDelete(ctx, req)
}
log.Error(err, "Unable to fetch Pod")
return ctrl.Result{}, err
}
// Convert to generic resource
resource := r.podToResource(&pod)
// Store in Resource Store
if err := r.Store.AddResource(resource); err != nil {
if errors.Is(err, store.ErrResourceExists) {
// Update existing resource
return ctrl.Result{}, r.Store.UpdateResource(resource)
}
return ctrl.Result{}, err
}
// Process relationships
return ctrl.Result{}, r.processRelationships(ctx, &pod, resource)
}
// SetupWithManager registers the reconciler
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 10,
}).
Complete(r)
}
func (r *PodReconciler) podToResource(pod *corev1.Pod) *store.Resource {
// Create resource reference
ref := &store.ResourceRef{
Group: "v1",
Version: "v1",
Kind: "Pod",
Namespace: pod.Namespace,
Name: pod.Name,
UID: string(pod.UID),
}
// Marshal pod data
data, _ := anypb.New(&k8s.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: pod.ObjectMeta,
Spec: pod.Spec,
Status: pod.Status,
})
return &store.Resource{
Ref: ref,
Data: data,
Labels: pod.Labels,
Annotations: pod.Annotations,
CreatedAt: pod.CreationTimestamp.Time,
UpdatedAt: time.Now(),
}
}
The controller tracks relationships between resources:
func (r *PodReconciler) processRelationships(ctx context.Context, pod *corev1.Pod, resource *store.Resource) error {
var relationships []*store.Relationship
// Process owner references
for _, owner := range pod.OwnerReferences {
rel := &store.Relationship{
Subject: &store.ResourceRef{
Group: owner.APIVersion,
Kind: owner.Kind,
Namespace: pod.Namespace,
Name: owner.Name,
UID: string(owner.UID),
},
Predicate: &k8s.OwnerReference{
APIVersion: owner.APIVersion,
Kind: owner.Kind,
Name: owner.Name,
UID: string(owner.UID),
},
Object: resource.Ref,
}
relationships = append(relationships, rel)
}
// Node binding
if pod.Spec.NodeName != "" {
rel := &store.Relationship{
Subject: resource.Ref,
Predicate: &k8s.PodNodeBinding{
NodeName: pod.Spec.NodeName,
},
Object: &store.ResourceRef{
Group: "v1",
Kind: "Node",
Name: pod.Spec.NodeName,
},
}
relationships = append(relationships, rel)
}
return r.Store.AddRelationships(relationships...)
}
// Track Pod-Service relationships via endpoints
func (r *ServiceReconciler) processEndpoints(ctx context.Context, svc *corev1.Service) error {
// Get endpoints for service
var endpoints corev1.Endpoints
if err := r.Get(ctx, types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
}, &endpoints); err != nil {
return err
}
var relationships []*store.Relationship
// Create service-pod relationships
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
rel := &store.Relationship{
Subject: &store.ResourceRef{
Group: "v1",
Kind: "Service",
Namespace: svc.Namespace,
Name: svc.Name,
},
Predicate: &k8s.ServiceEndpoint{
IP: address.IP,
Port: subset.Ports[0].Port,
},
Object: &store.ResourceRef{
Group: "v1",
Kind: "Pod",
Namespace: svc.Namespace,
Name: address.TargetRef.Name,
},
}
relationships = append(relationships, rel)
}
}
}
return r.Store.AddRelationships(relationships...)
}
The controller uses a shared informer cache for efficiency:
// Shared cache reduces API server load
cache := mgr.GetCache()
// Add index for faster lookups
cache.IndexField(ctx, &corev1.Pod{}, "spec.nodeName",
func(o client.Object) []string {
pod := o.(*corev1.Pod)
if pod.Spec.NodeName == "" {
return nil
}
return []string{pod.Spec.NodeName}
})
Filter unnecessary reconciliations:
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithEventFilter(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true // Process all creates
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldPod := e.ObjectOld.(*corev1.Pod)
newPod := e.ObjectNew.(*corev1.Pod)
// Skip if only status changed
if oldPod.ResourceVersion == newPod.ResourceVersion {
return false
}
// Process if spec or important metadata changed
return !reflect.DeepEqual(oldPod.Spec, newPod.Spec) ||
!reflect.DeepEqual(oldPod.Labels, newPod.Labels)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return !e.DeleteStateUnknown // Skip unclear deletes
},
}).
Complete(r)
}
// Configure concurrency per controller
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: 10, // Process 10 pods in parallel
RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(
5*time.Millisecond, // Base delay
1000*time.Second, // Max delay
),
}).
Complete(r)
}
// Batch multiple updates together
type batchProcessor struct {
store store.Store
resources []*store.Resource
mu sync.Mutex
ticker *time.Ticker
}
func (b *batchProcessor) add(resource *store.Resource) {
b.mu.Lock()
defer b.mu.Unlock()
b.resources = append(b.resources, resource)
}
func (b *batchProcessor) flush() {
b.mu.Lock()
resources := b.resources
b.resources = nil
b.mu.Unlock()
if len(resources) > 0 {
_ = b.store.AddResourcesBatch(resources)
}
}
Ensure only one controller is active per cluster:
// Manager configuration
mgr, err := ctrl.NewManager(config, ctrl.Options{
LeaderElection: true,
LeaderElectionID: "antimetal-agent-leader",
LeaderElectionNamespace: "antimetal-system",
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
LeaderElectionReleaseOnCancel: true,
})
// Leader election events
mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
<-mgr.Elected()
log.Info("Became leader, starting controllers")
return nil
}))
Required Kubernetes permissions:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: antimetal-agent
rules:
# Core resources
- apiGroups: [""]
resources:
- nodes
- pods
- services
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Apps resources
- apiGroups: ["apps"]
resources:
- deployments
- daemonsets
- statefulsets
- replicasets
verbs: ["get", "list", "watch"]
# Batch resources
- apiGroups: ["batch"]
resources:
- jobs
verbs: ["get", "list", "watch"]
# Leader election
- apiGroups: ["coordination.k8s.io"]
resources:
- leases
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var pod corev1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil // No retry needed
}
// Transient error - retry with backoff
if apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
return ctrl.Result{
RequeueAfter: 5 * time.Second,
}, nil
}
// Permanent error
return ctrl.Result{}, err
}
// Process pod...
return ctrl.Result{}, nil
}
type circuitBreaker struct {
failureThreshold int
resetTimeout time.Duration
failures int
lastFailureTime time.Time
mu sync.Mutex
}
func (cb *circuitBreaker) call(fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// Check if circuit is open
if cb.failures >= cb.failureThreshold {
if time.Since(cb.lastFailureTime) < cb.resetTimeout {
return fmt.Errorf("circuit breaker open")
}
// Reset after timeout
cb.failures = 0
}
// Execute function
if err := fn(); err != nil {
cb.failures++
cb.lastFailureTime = time.Now()
return err
}
// Success - reset failures
cb.failures = 0
return nil
}
Controller-runtime provides built-in metrics:
# Reconciliation metrics
controller_runtime_reconcile_total{controller="pod",result="success|error|requeue"}
controller_runtime_reconcile_time_seconds{controller="pod"}
# Work queue metrics
workqueue_adds_total{name="pod"}
workqueue_depth{name="pod"}
workqueue_queue_duration_seconds{name="pod"}
workqueue_retries_total{name="pod"}
# Client metrics
rest_client_requests_total{method="GET",url="/api/v1/pods"}
rest_client_request_duration_seconds{method="GET",url="/api/v1/pods"}
var (
resourcesProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "antimetal_controller_resources_processed_total",
Help: "Total resources processed by type",
},
[]string{"group", "version", "kind", "action"},
)
reconcileDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "antimetal_controller_reconcile_duration_seconds",
Help: "Reconciliation duration by controller",
},
[]string{"controller"},
)
)
func TestPodReconciler_Reconcile(t *testing.T) {
// Create fake client with test objects
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
UID: "test-uid",
},
Spec: corev1.PodSpec{
NodeName: "test-node",
},
}
client := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(pod).
Build()
// Create reconciler with mock store
store := &mockStore{}
reconciler := &PodReconciler{
Client: client,
Scheme: scheme,
Store: store,
Logger: logr.Discard(),
}
// Test reconciliation
result, err := reconciler.Reconcile(context.Background(),
ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "test-pod",
},
})
assert.NoError(t, err)
assert.False(t, result.Requeue)
assert.Equal(t, 1, len(store.resources))
}
func TestControllerIntegration(t *testing.T) {
// Use envtest for real API server
testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
}
cfg, err := testEnv.Start()
require.NoError(t, err)
defer testEnv.Stop()
// Create manager
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
})
require.NoError(t, err)
// Setup controllers
err = (&PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Store: store,
}).SetupWithManager(mgr)
require.NoError(t, err)
// Start manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := mgr.Start(ctx)
require.NoError(t, err)
}()
// Create test resources and verify behavior
// ...
}
-
High Memory Usage
# Check cache size kubectl top pod -n antimetal-system # Reduce watched resources # Set specific namespaces in configuration
-
Slow Reconciliation
// Increase concurrency MaxConcurrentReconciles: 20 // Add resource indexing cache.IndexField(...)
-
API Rate Limiting
// Configure client-side rate limiting config.QPS = 50 config.Burst = 100
-
Leader Election Failures
# Check lease object kubectl get lease -n antimetal-system antimetal-agent-leader -o yaml # Verify RBAC permissions kubectl auth can-i update leases -n antimetal-system --as system:serviceaccount:antimetal-system:antimetal-agent
-
Resource Filtering
- Use label selectors to limit watched resources
- Skip unneeded namespaces
- Filter events with predicates
-
Error Handling
- Don't requeue on permanent errors
- Use exponential backoff for transient errors
- Log errors with context
-
Performance
- Enable concurrent reconciliation
- Use informer cache indexes
- Batch related operations
-
Observability
- Export metrics for monitoring
- Use structured logging
- Track reconciliation duration
- Resource Store - Where resources are stored
- Architecture Overview - System design
- gRPC API - How data is streamed
Source code: internal/kubernetes/agent/