Position Calculator - sgajbi/portfolio-analytics-system GitHub Wiki

Overview

The Position Calculator service aggregates processed transactions into portfolio positions, such as quantity held and book cost per instrument. It consumes processed_transactions_completed events, performs aggregation, persists results, and emits position_history_persisted events for downstream services.

Recent enhancements:

  • Idempotent event processing to prevent duplicate position updates.
  • Correlation ID propagation for end-to-end tracing.
  • Standardized logging to enable consistent operational debugging.

Key Responsibilities

  • Consume processed_transactions_completed events from Kafka topic.

  • Ensure idempotent processing:

    • Check processed_events for (event_id, service_name).
    • Skip if already processed.
    • Persist correlation_id when recording processed event.
  • Aggregate transactions into positions at portfolio-instrument level.

  • Maintain historical position changes in position_history table.

  • Publish position_history_persisted event with correlation ID.

  • Log all operations with [corr_id=<svc-shortname>:uuid] for traceability.


Technology Stack

  • Python Kafka consumer (confluent-kafka-python)
  • SQLAlchemy ORM
  • PostgreSQL for persistence
  • Standard logging utility (portfolio_common.logging_utils) for correlation ID logging

Implementation Details

Kafka Consumer

  • Subscribes to processed_transactions_completed.
  • Reads correlation_id from Kafka headers (generates if missing).
  • Logs receipt and processing steps with correlation ID.
  • Performs idempotency check before calculation.

Position Aggregation Logic

  • Aggregates by:

    • portfolio_id
    • instrument_id
  • Updates:

    • quantity
    • book_cost
    • Derived values from cost calculator.
  • Maintains full position history in position_history table for auditability.

Persistence

  • Inserts/updates positions in positions table.
  • Inserts historical record in position_history table.
  • Records (event_id, service_name, correlation_id) in processed_events.

Event Emission

  • Emits position_history_persisted event with correlation ID in Kafka headers.
  • Enables downstream services (Valuation Calculator) to maintain traceability.

Logging & Observability

  • Logs enriched with correlation ID:

    2025-08-01 12:34:56 [INFO] [corr_id=POS:123e4567-e89b-12d3-a456-426614174000] PositionCalculator - Position updated for Portfolio 1001, AAPL
    
  • Ops can follow correlation ID from transaction ingestion through cost, cashflow, and position updates.


Testing

  • Unit tests for aggregation logic across various transaction types.

  • Integration tests to confirm:

    • Idempotency skips duplicate events.
    • Correlation ID is persisted and propagated.
    • Correct position updates after cost basis changes.