Performance Calculator - sgajbi/portfolio-analytics-system GitHub Wiki

Overview

The Performance Calculator service consumes valuation_calculated events, combines valuation data with cashflow information, calculates portfolio returns and contribution metrics, persists the results, and emits performance_calculated events.

Recent enhancements:

  • Idempotent event processing to ensure accurate and non-duplicated performance results.
  • Correlation ID propagation to trace performance metrics back to positions, valuations, and transactions.
  • Standardized logging for consistent observability across the pipeline.

Key Responsibilities

  • Consume valuation_calculated events from Kafka topic calculated_valuations.

  • Ensure idempotent processing:

    • Check processed_events for (event_id, service_name).
    • Skip duplicate processing.
    • Store correlation_id in processed_events.
  • Access historical valuations and cashflows.

  • Calculate:

    • Time-Weighted Return (TWR)
    • Money-Weighted Return (MWR)
    • Contribution by asset class or instrument.
  • Persist results in performance table.

  • Emit performance_calculated events with correlation ID.

  • Log all operations with [corr_id=<svc-shortname>:uuid] for traceability.


Technology Stack

  • Python Kafka consumer (confluent-kafka-python)
  • SQLAlchemy ORM
  • PostgreSQL for persistence
  • Standard logging utility (portfolio_common.logging_utils) for correlation ID logging

Implementation Details

Kafka Consumer

  • Subscribes to valuation_calculated.
  • Extracts correlation_id from Kafka headers (generates if missing).
  • Logs start, calculation steps, and persistence with correlation ID.
  • Idempotency check before calculations.

Calculation Logic

  • Combines valuation history and cashflow data.

  • Calculates:

    • Portfolio-level performance.
    • Contribution by security or asset class.
  • Supports multiple calculation windows (daily, monthly, YTD).

Persistence

  • Inserts results into performance table.
  • Records (event_id, service_name, correlation_id) in processed_events.

Event Emission

  • Publishes performance_calculated event with correlation ID in Kafka headers.
  • Downstream consumers can correlate performance metrics with valuations and cashflows.

Logging & Observability

  • Logs enriched with correlation ID:

    2025-08-01 12:34:56 [INFO] [corr_id=PERF:123e4567-e89b-12d3-a456-426614174000] PerformanceCalculator - Performance calculated for Portfolio 1001
    
  • Enables tracing from transaction ingestion through cost, cashflow, position, valuation, and performance.


Testing

  • Unit tests for performance calculation logic (TWR, MWR, contribution).

  • Integration tests to verify:

    • Idempotency prevents duplicate calculations.
    • Correlation ID is propagated.
    • Results are accurate across different performance windows.