Features - sgajbi/portfolio-analytics-system GitHub Wiki
The Portfolio Analytics System provides a production-grade analytics platform designed for accuracy, scalability, and resilience in wealth management.
Recent enhancements have added:
- Idempotent event processing
- Correlation ID propagation
- Standardized logging
- End-to-end observability
The system's entry points are designed for robust and flexible integration.
-
Unified Write & Read APIs (CQRS): Separate ingestion (
ingestion-service
) and query (query-service
) APIs to ensure calculation-heavy queries do not impact write performance. - Bulk Data Ingestion: All ingestion endpoints support batch operations for high-volume loads.
- Multi-Asset Support: Transactions, instruments, market prices, and FX rates are all supported.
The core of the system is a resilient, asynchronous Kafka-based pipeline.
- Decoupled Microservices Architecture: Independent services for each calculation stage (Cost, Cashflow, Position, Valuation, Performance).
- Asynchronous High-Throughput Processing: Kafka-based message flow ensures ingestion stays fast and responsive.
- Guaranteed Processing: Dead-Letter Queue (DLQ) for failed messages ensures no event loss.
-
Idempotent Processing: All calculators track processed events in
processed_events
table, preventing duplicate calculations.
These services produce accurate portfolio analytics in near real-time.
- Advanced Cost Basis Calculation: Calculates net cost and realized gains/losses with configurable accounting methods (FIFO, Average Cost).
- Automated Cashflow Calculation: Configurable classification (income, outflow, fees) and timing (BOD/EOD).
- Historical Position Keeping: Maintains full date-accurate position history.
- Mark-to-Market Valuation: Calculates daily valuations and unrealized PnL for each position.
- Portfolio Performance Metrics: Calculates Time-Weighted Return (TWR), Money-Weighted Return (MWR), and contribution analytics.
All data is stored securely and queried efficiently.
- Relational & Auditable Data Store: PostgreSQL schema managed by Alembic migrations for consistent deployments.
-
Correlation ID Traceability in DB:
processed_events
table stores correlation IDs for operational debugging. - Flexible Query API: Query endpoints return latest positions, full position histories, transactions with cashflows, valuations, and performance metrics.
-
Correlation ID in API Responses: All API responses include
X-Correlation-ID
for log tracing.
The system is built for production supportability.
-
Correlation ID Propagation:
- Generated in ingestion service (
<svc-shortname>:<uuid>
format). - Propagated across Kafka topics and services.
- Returned in API responses.
- Generated in ingestion service (
-
Standard Logging:
-
Shared logging utility in all services.
-
Consistent log format:
[LEVEL] [corr_id=PREFIX:uuid] Service - Message
-
-
Centralized Logging & Monitoring:
- Logs aggregated in Splunk/ELK.
- Metrics captured in Prometheus/Grafana.
-
Operational Debugging:
- Events traceable via correlation ID in logs and database.
- Database queries link event status across all services.