Architecture - julianvb03/MOM-Implementation GitHub Wiki

Architecture

Content Table

Architecture Design

The MOM Server is a distributed system designed to manage message queues and topics with high availability, scalability, and fault tolerance. It leverages a combination of RESTful APIs, message-oriented middleware (MOM), gRPC replication, and centralized coordination to ensure robust message handling across multiple instances. Below is an overview of the architecture:

  • API Layer: A FastAPI-based REST API serves as the entry point for creating and managing queues and topics. It runs on three instances (e.g., 172.31.1.10:8080, 172.31.1.11:8080, 172.31.1.12:8080), each deployed in a Docker container.
  • Local Redis: Each instance has a local Redis database (redis:6379) to store queues (mom:queues:<name>), topics (mom:topics:<name>), and associated metadata, subscribers, offsets, and messages.
  • gRPC Replication: When a queue or topic is created, the action is replicated to other nodes via gRPC. A separate gRPC server application runs on each instance to receive and process these replication requests, ensuring data consistency across nodes.
  • Centralized Databases:
    • Users Database: A centralized Redis instance (e.g., 172.31.1.20:6379) manages user data for authentication and authorization across all API instances.
    • Backup Database: Another centralized Redis instance (e.g., 172.31.1.21:6379) stores backups of queues and topics, used for failback when an instance restarts.
  • ZooKeeper: A ZooKeeper cluster tracks the location of each queue and topic (e.g., which instance owns it), enabling failover (reassigning to another instance) and failback (restoring from backup) while supporting partitioning for load distribution.
  • NGINX Load Balancer: An NGINX instance (e.g., <nginx-public-ip>:80) balances API requests across the three MOM instances using the least_conn strategy, providing a single endpoint for clients.

Key interactions:

  1. A client sends a REST request (e.g., POST /api/v1.0.0/MomServer/queue_topic) to the NGINX load balancer.
  2. NGINX forwards the request to one of the three API instances.
  3. The API instance processes the request, stores data in its local Redis, and sends a gRPC replication request to other instances.
  4. The gRPC server on each instance updates its local Redis accordingly.
  5. ZooKeeper updates the queue/topic location metadata.
  6. Backup data is periodically synced to the centralized backup Redis.

Architecture Diagram

The following diagram illustrates the system architecture:

https://drive.google.com/file/d/1dMI5pt-zb3-6fGOj3wS2n5AbqVAasvdI/view

Arquitectura drawio

Programming Decisions

The architecture and implementation reflect several key programming decisions to meet the project’s goals of scalability, reliability, and maintainability:

  • FastAPI for REST API:

    • Chosen for its asynchronous capabilities, automatic OpenAPI documentation (Swagger), and ease of integration with Python-based Redis and gRPC clients.
    • Example endpoint:
      @router.post("/queue_topic")
      async def create_queue_topic(data: QueueTopic):
          # Store in local Redis
          redis_client.rpush(f"mom:{data.type}s:{data.name}", "initial_message")
          # Trigger gRPC replication
          grpc_stub.ReplicateAction(ReplicationRequest(name=data.name, type=data.type))
          return {"status": "created"}
  • Redis as Local MOM Store:

    • Selected for its simplicity, speed, and support for multiple data structures (lists for queues, sets for subscribers, zsets for offsets).
    • Local instances reduce latency, while replication ensures consistency.
  • gRPC for Replication:

    • Used over REST for its performance (HTTP/2, protobuf) and type safety, critical for reliable queue/topic replication across nodes.
    • Example .proto:
      service ReplicationService {
          rpc ReplicateAction (ReplicationRequest) returns (ReplicationResponse) {}
      }
      message ReplicationRequest {
          string name = 1;
          string type = 2;
      }
      message ReplicationResponse {
          string status = 1;
      }
  • Centralized Databases:

    • Users DB: Centralizes authentication data to avoid duplication and ensure consistency across instances.
    • Backup DB: Provides a failback mechanism, with the backup_database function copying data from the backup Redis to local instances on restart.
      def backup_database(elements: List[QueueTopic]):
          client = ObjectFactory.get_instance(Database, ObjectFactory.USERS_DATABASE).get_client()
          backup_client = ObjectFactory.get_instance(Database, ObjectFactory.BACK_UP_DATABASE).get_client()
          for element in elements:
              keys = generate_keys(element.name, element.type)
              for key in keys:
                  data = backup_client.get(key)
                  if data:
                      client.set(key, data)
  • ZooKeeper for Failover/Failback:

    • Chosen for its proven distributed coordination capabilities, tracking queue/topic ownership and enabling automatic reassignment during failures.
    • Example ZooKeeper node: /mom/queues/queue-exampleinstance: 172.31.1.10.
  • NGINX Load Balancer:

    • Implements least_conn to distribute load evenly across three instances, improving throughput and resilience.
    • Config:
      upstream api_backend {
          least_conn;
          server 172.31.1.10:8080 max_fails=5 fail_timeout=60s;
          server 172.31.1.11:8080 max_fails=5 fail_timeout=60s;
          server 172.31.1.12:8080 max_fails=5 fail_timeout=60s;
      }
      server {
          listen 80;
          location / {
              proxy_pass http://api_backend;
          }
      }
  • Docker Deployment:

    • Containers (mom-server, redis, zookeeper, nginx) ensure consistent environments and easy scaling.
    • Multi-Dockerfile approach (Dockerfile.api, Dockerfile.nginx) supports separate builds for API and load balancer.
⚠️ **GitHub.com Fallback** ⚠️