Event Handling & Messaging - green-ecolution/backend GitHub Wiki

The backend employs an event-driven messaging system to manage complex dependencies efficiently. This approach allows different parts of the application to communicate asynchronously, reducing direct dependencies between components. By using events, the system can be more modular, scalable, and easier to maintain. It ensures that features can evolve independently while still responding to system-wide changes in a structured and reliable manner.

Overview: Events and Subscribers

  • Events represent significant actions or state changes within the system.
  • Subscribers listen for specific events and execute predefined handler logic in response.
  • Handler a function that will handle the corresponding event. A handler called by the subscriber.
  • Event Dispatching allows the system to propagate changes without tightly coupling components. This setup facilitates extensibility, enabling new features to be added without modifying existing logic extensively.

List of all events

Name Triggered When
update tree When a tree's data is modified by a user or automated system
create tree When a new tree is added to the database manually or via an import
delete tree When a tree is deleted due to removal, errors, or other administrative actions
update tree cluster When a tree cluster's data is modified by a user or automated system
receive sensor data When new sensor data is received from an IoT device
update watering plan When a watering plan is modified by a user or automated system

List of all handlers

A handler defines the logic to be executed when an event is captured by a subscriber. In this chapter there will be a list of events and the corresponding handler function. To minimise duplication of documentation, it will refer to the go pkg doc of that function. This document will explain what the function does.

Event Purporse Doc
update tree If the tree's sensor was previously linked to a different tree, the old cluster's watering status is updated. If the tree has moved to a different cluster, both the old and new clusters are updated. Go Doc
create tree If the sensor was previously connected to a different tree with a tree cluster, the cluster's watering status is recalculated. If the new tree has a tree cluster, an update for that cluster is triggered. Go Doc
delete tree If the status is updated, an update event is published. Go Doc
receive sensor data The function does three things: it retrieves the tree associated with the given sensor ID, it calculates the new watering status, and it updates it if there is a change. If the status is updated, an update event is published. Go Doc
receive sensor data The function finds the tree connected to the given sensor ID and checks if it is part of a tree group. If the tree is linked to a cluster, the watering status of the whole cluster is calculated using the latest sensor data. If the watering status calculated from the latest sensor data differs from the current status, the tree cluster is updated and an update event is published. Go Doc
update watering plan The function finds the tree connected to the given sensor ID and checks if it is part of a tree group. If the tree is linked to a cluster, the watering status of the whole cluster is calculated using the latest sensor data. If the watering status calculated from the latest sensor data differs from the current status, the tree cluster is updated and an update event is published. Go Doc

How to create new events?

  1. Define a new EventType.
  2. Create an event struct extending BasicEvent, including necessary attributes (e.g., previous and new states).
  3. Implement a subscriber struct that conforms to the Subscriber interface. This determines how the event is processed.

Example

We will go through the code and define a new event. This event will be fired when a tree is updated. First we need to define a new EventType.

//! internal/entities/events.go

const (
  EventTypeUpdateTree         EventType = "update tree" // the event is created here
  // [...]
  EventTypeUpdateTreeCluster  EventType = "update tree cluster"
  EventTypeNewSensorData      EventType = "receive sensor data"
  EventTypeUpdateWateringPlan EventType = "update watering plan"
)

Then we create a new struct that extends BasicEvent. This struct can hold necessary attributes of the state for the event (e.g. previous and new state of the tree).

//! internal/entities/events.go

// EventTypeUpdateTree is the struct that extends BasicEvent and hold some attributes
type EventUpdateTree struct {
  BasicEvent
  Prev         *Tree
  New          *Tree
  PrevOfSensor *Tree
}

To subscribe to the event, we need to define a Subscriber that implements the appropriate interface.

//! internal/worker/subscriber.go

// UpdateTreeSubscriber is the subscriber that implements the subscriber interface. It can hold some internal information such as a service
type UpdateTreeSubscriber struct {
  tcs service.TreeClusterService
}

func NewUpdateTreeSubscriber(tcs service.TreeClusterService) *UpdateTreeSubscriber {
  return &UpdateTreeSubscriber{
    tcs: tcs,
  }
}

func (s *UpdateTreeSubscriber) EventType() entities.EventType {
  return entities.EventTypeUpdateTree
}

// HandleEvent will decide what action to take when the event is fired. Here we will call 'HandleUpdateTree' in the TreeCluster service
func (s *UpdateTreeSubscriber) HandleEvent(ctx context.Context, e entities.Event) error {
  event := e.(entities.EventUpdateTree)
  return s.tcs.HandleUpdateTree(ctx, &event)
}

At this point we have defined an event and a subscriber to that event. Now we need to tell the application that it needs to listen to this subscriber. In the main function there is a section where all the subscribers are defined and started to subscribe to it's event. We will go through this in the next chapter.

How to listen to an event?

Subscribers register themselves to listen for specific events and handle them asynchronously. The RunSubscription method is responsible for managing these subscriptions by listening for incoming events of a specific type and forwarding them to the appropriate subscriber for processing. It ensures proper cleanup when the context is canceled or an error occurs. Each subscription runs in a separate goroutine to allow parallel event handling.

For detailed documentation, visit the GoDoc reference.

Starting Subscribers

//! main.go

// Here are all subscriber listed
subscribers := []worker.Subscriber{
 subscriber.NewUpdateTreeSubscriber(services.TreeClusterService),
 subscriber.NewCreateTreeSubscriber(services.TreeClusterService),
 subscriber.NewDeleteTreeSubscriber(services.TreeClusterService),
 subscriber.NewSensorDataSubscriber(services.TreeClusterService, services.TreeService),
 subscriber.NewUpdateWateringPlanSubscriber(services.TreeClusterService),
}

for _, sub := range subscribers {
 wg.Add(1)
 go func(sub worker.Subscriber) {
  defer wg.Done()
  if err := em.RunSubscription(ctx, sub); err != nil { // 'RunSubscription' will start the subscriber
   slog.Error("stop subscription with err", "eventType", sub.EventType(), "err", err)
  }
 }(sub)
}