Resource Store - antimetal/system-agent GitHub Wiki
The Resource Store is the central data hub of the System Agent, providing persistent storage, event routing, and relationship management for all monitored resources.
Built on BadgerDB, the Resource Store provides:
- High-performance storage - In-memory key-value store
- Event-driven updates - Publish/subscribe for real-time changes
- Relationship tracking - RDF-style triplets connect resources
- Type-safe operations - Schema validation and versioning
- Atomic transactions - Consistent state management
graph TB
subgraph ResourceStore ["Resource Store"]
subgraph Components ["Core Components"]
StorageEngine["Storage Engine<br/>BadgerDB<br/>Key-Value"]
EventRouter["Event Router<br/>Type-based<br/>Filtering"]
RelationshipManager["Relationship Manager<br/>RDF Triplets<br/>Indexing"]
end
subgraph Indices ["Indices"]
TypeIndex["Type Index"]
NamespaceIndex["Namespace Index"]
RelationshipIndex["Relationship Index"]
end
StorageEngine --- Indices
EventRouter --- Indices
RelationshipManager --- Indices
end
type Store interface {
// Resource operations
AddResource(rsrc *Resource) error
UpdateResource(rsrc *Resource) error
DeleteResource(ref *ResourceRef) error
GetResource(ref *ResourceRef) (*Resource, error)
// Bulk operations
ListResources(typeDef *TypeDescriptor, opts ...ListOption) ([]*Resource, error)
// Relationship operations
AddRelationships(rels ...*Relationship) error
GetRelationships(subject, object *ResourceRef, predicate proto.Message) ([]*Relationship, error)
DeleteRelationships(rels ...*Relationship) error
// Event subscriptions
Subscribe(typeDef *TypeDescriptor) <-chan Event
// Lifecycle
Run(ctx context.Context) error
Close() error
}
type Resource struct {
// Unique reference
Ref *ResourceRef
// Resource data (protobuf Any)
Data *anypb.Any
// Metadata
Labels map[string]string
Annotations map[string]string
// Timestamps
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
// Version for optimistic locking
Version uint64
}
type ResourceRef struct {
Group string // API group (e.g., "v1", "apps")
Version string // API version (e.g., "v1", "v1beta1")
Kind string // Resource kind (e.g., "Pod", "Service")
Namespace string // Namespace (empty for cluster-scoped)
Name string // Resource name
UID string // Unique identifier
}
type Relationship struct {
// RDF-style triplet
Subject *ResourceRef // Source resource
Predicate proto.Message // Relationship type
Object *ResourceRef // Target resource
// Metadata
CreatedAt time.Time
UpdatedAt time.Time
}
// Example predicates
message OwnerReference {
string kind = 1;
string name = 2;
string uid = 3;
}
message PodNodeBinding {
string node_name = 1;
}
type EventType int
const (
EventTypeAdd EventType = iota
EventTypeUpdate
EventTypeDelete
)
type Event struct {
Type EventType
Resource *Resource
Previous *Resource // For updates
}
// Subscribe to specific resource types
typeDef := &TypeDescriptor{
Group: "v1",
Version: "v1",
Kind: "Pod",
}
events := store.Subscribe(typeDef)
// Process events
for event := range events {
switch event.Type {
case EventTypeAdd:
fmt.Printf("New pod: %s\n", event.Resource.Ref.Name)
case EventTypeUpdate:
fmt.Printf("Pod updated: %s\n", event.Resource.Ref.Name)
case EventTypeDelete:
fmt.Printf("Pod deleted: %s\n", event.Resource.Ref.Name)
}
}
// Internal router implementation
type eventRouter struct {
subscribers map[string][]chan Event // Type -> channels
mu sync.RWMutex
}
func (r *eventRouter) route(event Event) {
r.mu.RLock()
defer r.mu.RUnlock()
typeKey := formatType(event.Resource.Ref)
// Fan-out to all subscribers of this type
for _, ch := range r.subscribers[typeKey] {
select {
case ch <- event:
default:
// Non-blocking send, skip if full
}
}
}
Resources are stored with structured keys:
/resources/{group}/{version}/{kind}/{namespace}/{name} --> Resource
/indices/type/{group}/{version}/{kind} --> ResourceRef set
/indices/namespace/{namespace} --> ResourceRef set
/relationships/{subject_key}/{predicate_type} --> Relationship set
func (s *store) UpdateResource(rsrc *Resource) error {
return s.db.Update(func(txn *badger.Txn) error {
// Get existing resource
existing, err := s.getResourceTxn(txn, rsrc.Ref)
if err != nil {
return err
}
// Version check for optimistic locking
if rsrc.Version != existing.Version {
return ErrVersionConflict
}
// Update resource
rsrc.Version++
rsrc.UpdatedAt = time.Now()
// Store updated resource
if err := s.setResourceTxn(txn, rsrc); err != nil {
return err
}
// Update indices
if err := s.updateIndicesTxn(txn, rsrc); err != nil {
return err
}
// Queue event
s.queueEvent(Event{
Type: EventTypeUpdate,
Resource: rsrc,
Previous: existing,
})
return nil
})
}
// Type index for efficient queries
func (s *store) updateTypeIndex(txn *badger.Txn, ref *ResourceRef, add bool) error {
key := fmt.Sprintf("/indices/type/%s/%s/%s", ref.Group, ref.Version, ref.Kind)
// Get current index
item, err := txn.Get([]byte(key))
if err == badger.ErrKeyNotFound {
if !add {
return nil // Nothing to remove
}
// Create new index
refs := []*ResourceRef{ref}
return s.setIndex(txn, key, refs)
}
// Update existing index
refs, err := s.parseIndex(item)
if err != nil {
return err
}
if add {
refs = appendUnique(refs, ref)
} else {
refs = removeRef(refs, ref)
}
return s.setIndex(txn, key, refs)
}
// Add ownership relationship
ownerRef := &OwnerReference{
Kind: "Deployment",
Name: "my-app",
UID: "abc123",
}
relationship := &Relationship{
Subject: deploymentRef,
Predicate: ownerRef,
Object: podRef,
}
err := store.AddRelationships(relationship)
// Find all pods owned by a deployment
relationships, err := store.GetRelationships(
deploymentRef, // Subject
nil, // Any object
&OwnerReference{}, // Predicate type
)
// Find what owns a pod
relationships, err := store.GetRelationships(
nil, // Any subject
podRef, // Object
&OwnerReference{},
)
// Bidirectional indexing for fast queries
/relationships/subject/{subject_key}/{predicate_type} --> [objects]
/relationships/object/{object_key}/{predicate_type} --> [subjects]
type store struct {
db *badger.DB
cache *lru.Cache // Recently accessed resources
// Pre-computed indices
typeIndex map[string][]*ResourceRef
namespaceIndex map[string][]*ResourceRef
indexMu sync.RWMutex
}
func (s *store) AddResourcesBatch(resources []*Resource) error {
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for _, rsrc := range resources {
key := s.resourceKey(rsrc.Ref)
value, err := proto.Marshal(rsrc)
if err != nil {
return err
}
if err := batch.Set(key, value); err != nil {
return err
}
}
return batch.Flush()
}
// Efficient listing with iterator
func (s *store) ListResources(typeDef *TypeDescriptor, opts ...ListOption) ([]*Resource, error) {
options := &listOptions{}
for _, opt := range opts {
opt(options)
}
prefix := fmt.Sprintf("/resources/%s/%s/%s/",
typeDef.Group, typeDef.Version, typeDef.Kind)
var resources []*Resource
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = []byte(prefix)
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
if options.limit > 0 && len(resources) >= options.limit {
break
}
item := it.Item()
rsrc, err := s.parseResource(item)
if err != nil {
return err
}
if options.filter != nil && !options.filter(rsrc) {
continue
}
resources = append(resources, rsrc)
}
return nil
})
return resources, err
}
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get pod from Kubernetes
var pod v1.Pod
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if apierrors.IsNotFound(err) {
// Delete from store
ref := &ResourceRef{
Group: "v1",
Version: "v1",
Kind: "Pod",
Namespace: req.Namespace,
Name: req.Name,
}
return ctrl.Result{}, r.store.DeleteResource(ref)
}
return ctrl.Result{}, err
}
// Convert to store resource
resource := &Resource{
Ref: &ResourceRef{
Group: "v1",
Version: "v1",
Kind: "Pod",
Namespace: pod.Namespace,
Name: pod.Name,
UID: string(pod.UID),
},
Data: mustMarshalAny(&pod),
Labels: pod.Labels,
Annotations: pod.Annotations,
}
// Store or update
if err := r.store.AddResource(resource); err != nil {
if errors.Is(err, ErrResourceExists) {
return ctrl.Result{}, r.store.UpdateResource(resource)
}
return ctrl.Result{}, err
}
// Add relationships
for _, owner := range pod.OwnerReferences {
rel := &Relationship{
Subject: &ResourceRef{
Group: owner.APIVersion,
Kind: owner.Kind,
Namespace: pod.Namespace,
Name: owner.Name,
UID: string(owner.UID),
},
Predicate: &OwnerReference{
Kind: owner.Kind,
Name: owner.Name,
UID: string(owner.UID),
},
Object: resource.Ref,
}
r.store.AddRelationships(rel)
}
return ctrl.Result{}, nil
}
// Create a store client
store := resourcestore.New(resourcestore.Config{
Path: "/var/lib/antimetal/store",
InMemory: false,
})
// Start the store
ctx := context.Background()
go store.Run(ctx)
// Add custom resources
resource := &Resource{
Ref: &ResourceRef{
Group: "custom.io",
Version: "v1",
Kind: "MyResource",
Name: "example",
},
Data: mustMarshalAny(&MyCustomResource{
Spec: MyResourceSpec{
Field: "value",
},
}),
}
err := store.AddResource(resource)
// Subscribe to changes
events := store.Subscribe(&TypeDescriptor{
Group: "custom.io",
Version: "v1",
Kind: "MyResource",
})
for event := range events {
// Handle events
}
type Config struct {
// Storage path (empty for in-memory)
Path string
// In-memory only mode
InMemory bool
// Cache configuration
CacheSize int
// Event buffer size
EventBufferSize int
// Garbage collection interval
GCInterval time.Duration
// BadgerDB options
BadgerOptions badger.Options
}
// Default configuration
func DefaultConfig() Config {
return Config{
InMemory: true,
CacheSize: 10000,
EventBufferSize: 1000,
GCInterval: 5 * time.Minute,
}
}
-
Resource Design
- Use consistent naming conventions
- Include sufficient metadata in labels
- Version resources for compatibility
-
Event Handling
- Use buffered channels for subscribers
- Handle channel overflow gracefully
- Unsubscribe when done
-
Performance
- Batch operations when possible
- Use transactions for consistency
- Monitor memory usage
-
Error Handling
- Check for version conflicts
- Handle missing resources
- Log transaction failures
var (
resourceCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "resourcestore_resources_total",
Help: "Total number of resources by type",
},
[]string{"group", "version", "kind"},
)
eventsSent = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "resourcestore_events_sent_total",
Help: "Total events sent by type",
},
[]string{"type"},
)
storageSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "resourcestore_storage_bytes",
Help: "Storage size in bytes",
},
)
)
-
High Memory Usage
- Reduce cache size
- Enable garbage collection
- Use persistent storage
-
Slow Queries
- Check index usage
- Use type-specific queries
- Limit result sets
-
Event Overflow
- Increase buffer size
- Add backpressure handling
- Filter unnecessary events
- Architecture Overview - System design
- Hardware Discovery - Hardware graph topology
- Runtime Discovery - Container and process graph topology
- Kubernetes Controller - K8s integration
- gRPC API - Intake service protocol
Source code: pkg/resource/store/