Cashflow Calculator - sgajbi/portfolio-analytics-system GitHub Wiki

Overview

The Cashflow Calculator service consumes persisted raw transaction events, calculates the corresponding cashflow based on a set of configurable business rules, persists the result, and emits a cashflow_calculated event for downstream analytics.

Recent enhancements:

  • Idempotent processing to prevent duplicate cashflows.
  • Correlation ID propagation for end-to-end traceability.
  • Standardized logging for consistent observability across the pipeline.

Key Responsibilities

  • Consume raw_transactions_completed events from Kafka topic.

  • Ensure idempotent processing:

    • Check processed_events table for (event_id, service_name).
    • Skip if already processed.
    • Record correlation_id in processed_events.
  • Apply business logic to determine cashflow attributes (amount, classification, timing, etc.).

  • Persist calculated cashflow in cashflows table.

  • Emit cashflow_calculated event with same correlation ID.

  • Log all actions with [corr_id=<svc-shortname>:uuid] for traceability.


Technology Stack

  • Python Kafka consumer (confluent-kafka-python)
  • SQLAlchemy ORM
  • PostgreSQL for persistence
  • Standard logging utility (portfolio_common.logging_utils) for correlation ID logging

Implementation Details

Kafka Consumer

  • Subscribes to raw_transactions_completed.
  • Extracts correlation_id from Kafka headers (generates if missing).
  • Logs receipt and processing with correlation ID.
  • Performs idempotency check before calculation.

Calculation Logic

  • Configuration-driven rules for each transaction type define:

    • classification (Investment Inflow, Dividend, Interest, Fee, etc.)
    • timing (BOD/EOD)
    • calculation_method (NET/GROSS)
  • Stateless core logic takes transaction and rules as input to produce cashflow.

Persistence

  • Inserts cashflow into cashflows table.
  • Records (event_id, service_name, correlation_id) in processed_events atomically.

Event Emission

  • Publishes cashflow_calculated event with correlation_id in headers.
  • Downstream calculators use correlation ID for traceability.

Logging & Observability

  • Logs enriched with correlation ID:

    2025-08-01 12:34:56 [INFO] [corr_id=CFLOW:123e4567-e89b-12d3-a456-426614174000] CashflowCalculator - Cashflow calculated for TX123
    
  • Ops can trace calculation from ingestion to cashflow output.


Testing

  • Unit tests for CashflowLogic across transaction types.

  • Integration tests to verify:

    • Idempotency works for reprocessed events.
    • Correlation ID persists in DB and events.
    • End-to-end cashflow calculation correctness.