System Data Flow Diagram - sgajbi/portfolio-analytics-system GitHub Wiki

System Data Flow Diagram

End-to-End Event Flow with Correlation IDs


📌 Overview

The Portfolio Analytics System processes financial data through a chain of microservices connected by Kafka topics. Every event carries a correlation ID for traceability, enabling debugging in logs, database, and monitoring tools.


🔄 Data Flow

graph TD
    subgraph Ingestion
        A["Ingestion Service"]
    end
    
    subgraph Persistence
        B["Persistence Service"]
    end
    
    subgraph Calculators
        C1["Cost Calculator"]
        C2["Cashflow Calculator"]
        C3["Position Calculator"]
        C4["Valuation Calculator"]
        C5["Performance Calculator"]
    end
    
    subgraph Query
        D["Query Service (API)"]
    end

    %% Flow
    A -- corr_id=ING:uuid --> B
    B -- corr_id=ING:uuid --> C1
    B -- corr_id=ING:uuid --> C2
    C1 -- corr_id=ING:uuid --> C3
    C2 -- corr_id=ING:uuid --> C3
    C3 -- corr_id=ING:uuid --> C4
    C4 -- corr_id=ING:uuid --> C5
    C5 -- corr_id=ING:uuid --> D
Loading

📂 Event Sequence with Correlation IDs

  1. Ingestion Service

    • Accepts API calls.
    • Generates correlation ID (ING:<uuid> if none provided).
    • Publishes raw_transactions to Kafka with correlation ID in headers.
  2. Persistence Service

    • Consumes raw_transactions.
    • Stores transactions in DB.
    • Writes (event_id, service_name, correlation_id) to processed_events.
    • Emits raw_transactions_completed.
  3. Calculator Services

    • Cost Calculator → Calculates cost basis, emits processed_transactions_completed.
    • Cashflow Calculator → Calculates cashflows, emits cashflow_calculated.
    • Position Calculator → Updates positions, emits position_history_persisted.
    • Valuation Calculator → Marks-to-market, emits valuation_calculated.
    • Performance Calculator → Computes returns, emits performance_calculated.
    • All calculators log and propagate the same correlation ID.
  4. Query Service

    • Serves API requests (positions, valuations, performance).
    • Returns X-Correlation-ID in every response.

🔍 Debugging with Correlation IDs

Example trace:

corr_id=ING:550e8400-e29b-41d4-a716-446655440000
  • Logs: Search in Splunk/ELK for corr_id=ING:...
  • Database: Query processed_events for correlation ID to see which services processed the event.
  • API: Include X-Correlation-ID in request to track end-to-end.
⚠️ **GitHub.com Fallback** ⚠️