Valuation Calculator - sgajbi/portfolio-analytics-system GitHub Wiki
Overview
The Valuation Calculator service performs mark-to-market valuations for portfolio positions. It consumes position_history_persisted
and market_price_persisted
events, calculates market value and unrealized PnL, persists results, and emits valuation_calculated
events for downstream analytics.
Recent enhancements:
- Idempotent event processing to prevent duplicate valuations.
- Correlation ID propagation for full traceability.
- Standardized logging to enable consistent operational debugging across the valuation workflow.
Key Responsibilities
-
Consume
position_history_persisted
andmarket_price_persisted
events from Kafka topics. -
Ensure idempotent processing:
- Check
processed_events
table for(event_id, service_name)
. - Skip if already processed.
- Record
correlation_id
inprocessed_events
.
- Check
-
Retrieve latest market prices for relevant securities.
-
Calculate:
market_value = quantity × market_price
unrealized_gain_loss = market_value - cost_basis
-
Persist valuation results to
position_history
table. -
Emit
valuation_calculated
events with correlation ID. -
Log all operations with
[corr_id=<svc-shortname>:uuid]
for traceability.
Technology Stack
- Python Kafka consumer (
confluent-kafka-python
) - SQLAlchemy ORM
- PostgreSQL for persistence
- Standard logging utility (
portfolio_common.logging_utils
) for correlation ID logging
Implementation Details
Kafka Consumer
-
Subscribes to:
position_history_persisted
for new positions.market_price_persisted
for revaluing positions on price updates.
-
Reads
correlation_id
from Kafka headers (generates if missing). -
Logs receipt, valuation calculation, and persistence with correlation ID.
-
Idempotency check before performing calculations.
Calculation Logic
-
Stateless calculation using latest available price on or before valuation date.
-
Applies consistent formula for:
market_value
unrealized_gain_loss
Persistence
- Updates
market_price
,market_value
, andunrealized_gain_loss
inposition_history
. - Records
(event_id, service_name, correlation_id)
inprocessed_events
.
Event Emission
- Emits
valuation_calculated
event with correlation ID in headers. - Downstream services (Performance Calculator) continue using same correlation ID for traceability.
Logging & Observability
-
Logs enriched with correlation ID:
2025-08-01 12:34:56 [INFO] [corr_id=VAL:123e4567-e89b-12d3-a456-426614174000] ValuationCalculator - Valuation updated for Portfolio 1001, AAPL
-
Enables tracing from position updates through valuation in Splunk/ELK.
Testing
-
Unit tests for valuation formulas (including edge cases).
-
Integration tests to verify:
- Idempotency prevents double processing.
- Correlation ID propagation.
- Correct valuations for daily price updates.