Persistence Service - sgajbi/portfolio-analytics-system GitHub Wiki
Persistence Service
Overview
The Persistence Service is the backbone of the Portfolio Analytics System's data pipeline. It consumes validated raw events from Kafka, persists them to PostgreSQL, tracks event processing for idempotency, and republishes events for downstream calculators. It is the system’s source of record for all ingested financial data.
Key Responsibilities
- Consume raw events (transactions, instruments, prices, FX rates) from Kafka topics after initial validation by the Ingestion Service.
- Persist all incoming data to PostgreSQL, maintaining strict schema and referential integrity.
- Track event processing using the
processed_events
table, ensuring idempotent processing and enabling operational traceability. - Emit completion events (e.g.,
raw_transactions_completed
) to Kafka for downstream calculator services (Cost, Cashflow, etc.). - Log every processing step, enriched with correlation IDs for end-to-end observability.
Technology Stack
- Python (core service implementation)
- SQLAlchemy (ORM for DB interaction)
- PostgreSQL (relational data persistence)
- confluent-kafka-python (Kafka consumer/producer)
- Standard logging utility (
portfolio_common.logging_utils
) for correlation ID logging and propagation
Implementation Details
Kafka Consumer
- Subscribes to primary topics:
raw_transactions
,instruments
,market_prices
,fx_rates
- Reads and propagates the
correlation_id
from Kafka headers (generates if missing) - Performs an idempotency check by querying the
processed_events
table using(event_id, service_name)
- Skips processing if event already handled; otherwise, continues and records event as processed
Database Persistence
- Writes to core tables:
transactions
,instruments
,market_prices
,fx_rates
- Uses atomic transactions to ensure data consistency
- Updates the
processed_events
table with(event_id, service_name, correlation_id, processed_at)
for every processed message
Event Emission
- Publishes completion events for each data type:
raw_transactions_completed
instruments_persisted
market_prices_persisted
fx_rates_persisted
- Events include the same correlation ID for downstream traceability
Logging & Observability
- All log entries include the correlation ID:
2025-08-01 12:34:56 [INFO] [corr_id=PST:123e4567-e89b-12d3-a456-426614174000] PersistenceService - Transaction TX123 persisted and published
- Enables end-to-end traceability from API ingestion through to final analytics
- The
processed_events
table acts as both an idempotency control and an audit log
Testing
- Unit tests for DB models and core logic
- Integration tests verify:
- Kafka message consumption and DB persistence
- Idempotency (duplicate event detection and handling)
- Correlation ID propagation and logging
- Correct event emission for all data types
Related Pages
- Ingestion Service
- Cost Calculator Service
- Cashflow Calculator Service
- Position Calculator Service
- API Service