Ingestor Architecture - fedjo/CEI-InOE GitHub Wiki
The CEI-InOE Ingestor is a modular, connector-based data ingestion system designed to collect, validate, transform, and load data from multiple sources (files, APIs) into a PostgreSQL data warehouse. It uses a worker pool pattern with a message queue for concurrent processing and APScheduler for periodic data discovery.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IngestorApp β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββββββββββ β
β β APScheduler β β Thread-Safe β β WorkerPool β β
β β βββββΆβ Queue βββββΆβ (n worker threads) β β
β β discover jobs β β β β β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββ¬ββββββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββββββββ βββββββββββββββββββββββββββββββ
β β Connectors β β PipelineRunner ββ
β β ββββββββ ββββββββ ββββββββββ ββββββββ β β ββ
β β β File β β Tago β βAirbeld β βFusionβ β β ββββββββββββββββββββββββ ββ
β β β Conn β β Conn β β Conn β βSolar β β β β DataPipeline β ββ
β β ββββββββ ββββββββ ββββββββββ ββββββββ β β β validateβtransform β ββ
β β β β β β β β β βstageβload β ββ
β β βΌ βΌ βΌ βΌ β β ββββββββββββββββββββββββ ββ
β β InputEnvelope (standardized) β β β ββ
β ββββββββββββββββββββββββββββββββββββββββββ β βΌ ββ
β β ββββββββββββββββββββββββ ββ
β β β DAOFactory β ββ
β β β (Database Access) β ββ
β β ββββββββββββββββββββββββ ββ
β βββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββ
β PostgreSQL β
β Data Warehouse β
ββββββββββββββββββββ
The main application orchestrator that:
- Initializes all connectors from configuration
- Manages the APScheduler for periodic discovery jobs
- Creates and manages the worker pool
- Handles graceful shutdown on SIGTERM/SIGINT
class IngestorApp:
connectors: Dict[str, BaseConnector] # Registry of active connectors
queue: Queue # Thread-safe work queue
scheduler: BackgroundScheduler # APScheduler instance
runner: PipelineRunner # Pipeline execution handler
worker_pool: WorkerPool # Worker thread managerA pool of daemon worker threads that process InputEnvelope messages from the queue:
-
Concurrency: Configurable via
NUM_WORKERSenvironment variable (default: 2) -
Processing: Each worker runs a loop that:
- Gets an envelope from the queue (blocking with timeout)
- Resolves the connector from envelope's
connector_id - Executes the pipeline via
PipelineRunner.run() - Calls
connector.ack()on success orconnector.fail()on error
- Graceful Shutdown: Uses threading.Event for coordinated shutdown
Worker Thread Lifecycle:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β while not shutdown_event.is_set(): β
β βββΊ queue.get(timeout=1) β InputEnvelope β
β βββΊ connector = connectors[envelope.connector_id] β
β βββΊ metrics = runner.run(envelope) β
β βββΊ connector.ack(envelope) # or .fail() β
β βββΊ queue.task_done() β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Connectors are responsible for discovering, fetching, and wrapping data into standardized InputEnvelope objects. All connectors implement the BaseConnector interface.
class BaseConnector(ABC):
def start() -> None: # Initialize resources
def stop() -> None: # Cleanup resources
def discover() -> List[str]: # Find available work items
def fetch(item_id: str) -> Optional[InputEnvelope]: # Fetch and wrap data
def ack(envelope: InputEnvelope) -> None: # Mark success
def fail(envelope: InputEnvelope, error: str) -> None: # Mark failure
def health() -> dict: # Health statusA lightweight, standardized data carrier:
class InputEnvelope(BaseModel):
connector_id: str # Source connector ID
input_id: str # Idempotency key (e.g., "file.csv:{sha256}")
source_uri: str # Origin (file path, API URL)
received_at: datetime # Timestamp
content: Any # Raw data: List[Dict], bytes
content_type: str # "csv", "excel", "json"
# Pipeline hints
hint_mapping: Optional[str] # YAML mapping file path
hint_device_id: Optional[str] # Device identifier
hint_granularity: Optional[str] # "hourly", "daily"
# Provenance tracking
metadata: Dict[str, Any] # sha256, file_size, date ranges, etc.| Connector | Type | Description | Authentication | Schedule |
|---|---|---|---|---|
FileConnector |
file | Watches /data/incoming for CSV/Excel files |
N/A | Every 5 s (FILE_POLL_INTERVAL) |
TagoConnector |
tago | Tago.io energy API (hourly/daily) | Per-device token header | Every 1 hr (TAGO_POLL_INTERVAL) |
AirbeldConnector |
airbeld | Airbeld environmental/weather sensors | OAuth email/password | Every 12 hr (AIRBELD_POLL_INTERVAL) |
FusionSolarConnector |
fusionsolar | Huawei FusionSolar PV station API | Username/system code | Every 1 hr (FUSIONSOLAR_POLL_INTERVAL) |
HttpConnector |
http | Generic REST API (base class for API connectors) | Bearer, API key, OAuth | N/A (base class) |
Orchestrates the complete ingestion workflow for a single InputEnvelope:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PipelineRunner.run() β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 1. Check Duplicates β
β βββΊ BatchDAO.exists_by_sha256(envelope.metadata.sha256) β
β β
β 2. Load YAML Mapping β
β βββΊ Parse hint_mapping file β
β β
β 3. Resolve Datasource β
β βββΊ DatasourceDAO.get_by_external_id(hint_device_id) β
β β
β 4. Register Batch β
β βββΊ BatchDAO.register() β batch_id UUID β
β β
β 5. Execute Pipeline β
β βββΊ DataPipeline(conn, mapping, context).execute(records) β
β β
β 6. Update Metrics β
β βββΊ BatchDAO.update_metrics(quality_score, etc.) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The core ETL engine that processes records through staging:
Pipeline Stages:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXTRACT β Records already parsed by connector (CSV rows/API) β
βββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β VALIDATE β PydanticTransformer validates + transforms β
β + β - Column mapping (CSV β DB columns) β
β TRANSFORM β - Type coercion (European decimals, AM/PM dates) β
β + β - Constraint validation (min/max ranges) β
β STAGE β - Insert to staging table (raw + transformed) β
βββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β LOAD β Move valid records from staging β final table β
β β - Conflict resolution (update/ignore/fail) β
β β - datasource_id injection for fact tables β
βββββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Unified validation and transformation using Pydantic models:
- Column Mapping: Maps source columns (CSV headers) to database columns via YAML config
-
Type Coercion: Handles European decimals (
1.234,56), AM/PM dates, integer-from-float -
Constraint Validation: Uses Pydantic
Field(ge=, le=, ...)for range validation -
Error Collection: Converts Pydantic errors to
ValidationResultfor staging
Centralized database access through the DAOFactory:
dao = DAOFactory(connection)
dao.datasource # DatasourceDAO - datasource lookup
dao.batch # BatchDAO - ingest batch registration/dedup
dao.staging('energy_hourly') # StagingDAO - staging operations
dao.data(conflict_config) # DataDAO - final table inserts
dao.pipeline # PipelineDAO - execution logging
dao.cursor # CursorDAO - API cursor trackingββββββββββββββ ββββββββββββββββββ ββββββββββββββββ
β /data/ β β FileConnector β β Queue β
β incoming/ βββββΆβ .discover() βββββΆβ β
β file.csv β β .fetch() β β InputEnvelopeβ
ββββββββββββββ ββββββββββββββββββ ββββββββ¬ββββββββ
β
ββββββββββββββββββββββββββββ
βΌ
ββββββββββββββββ
βWorker Thread β
β β
β runner.run() β
ββββββββ¬ββββββββ
β
βββββββββββββββΌβββββββββββββββ
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β staging_ β β staging_ β β staging_ β
β energy β β environ β β dairy β
β hourly β β metrics β β prod β
ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β fact_ β β environ β β dairy_ β
β energy β β mental β β produc β
β hourly β β _metrics β β tion β
ββββββββββββ ββββββββββββ ββββββββββββ
β
βΌ
ββββββββββββββββ
β /data/ β
β processed/ β
β file.csv β
ββββββββββββββββ
ββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββ
β APScheduler β β TagoConnector β β Queue β
β every 1 hr βββββΆβ .discover() βββββΆβ β
β β β per device β β InputEnvelopeβ
ββββββββββββββββββ βββββββββ¬βββββββββ ββββββββββββββββ
β
β For each device:
β βββββββββββββββββββββββββββββ
β β 1. Get cursor timestamp β
β β 2. Query API with range β
β β 3. Transform response β
β β 4. Create InputEnvelope β
β β 5. Update cursor β
β βββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββ
β Tago.io API β
β /data?start=.. β
β device-token β
ββββββββββββββββββ
Connectors encapsulate all source-specific logic (authentication, data format, ack/fail semantics). The pipeline only sees standardized InputEnvelope objects.
All records pass through staging tables before final tables:
- Raw data preserved for debugging
- Validation errors stored for inspection
- Atomic loads with rollback capability
- Audit trail via
loaded_to_finalflag
- Files: SHA256 hash stored in
ingest_batch.file_sha256 - APIs: Cursor tracking in
api_fetch_cursortable - Pipeline:
DuplicateInputErrorraised for known inputs
YAML mappings define:
- Column/field mappings
- Type coercions
- Validation constraints
- Conflict resolution strategy
- Target/staging table names
-
pipeline_executiontable logs stage start/end times -
data_quality_checksstores validation error samples -
PipelineMetricsdataclass tracks all execution stats
- Per-record error handling (invalid records don't block valid ones)
- Connector-level isolation (one failing connector doesn't affect others)
- Automatic retries with exponential backoff for HTTP connectors
ingestor/
βββ Dockerfile # Container image definition
βββ requirements.txt # Python dependencies
βββ app/
βββ main.py # Application entry point, WorkerPool
βββ config.py # Environment-based configuration
βββ pipeline.py # DataPipeline orchestrator
βββ pipeline_runner.py # InputEnvelope β Pipeline execution
βββ models.py # Pydantic data models (BaseRecord, EnergyHourlyRecord, etc.)
βββ pydantic_transformer.py # Unified validation + transformation
βββ validation.py # ValidationResult, SchemaValidator, TypeValidator
β
βββ connectors/ # Data source connectors
β βββ base.py # BaseConnector, InputEnvelope, ConnectorStatus
β βββ registry.py # create_connector() factory
β βββ file_connector.py # CSV/Excel file watcher
β βββ http_connector.py # Generic REST API client (base for API connectors)
β βββ tago_connector.py # Tago.io energy API
β βββ airbeld_connector.py # Airbeld environmental/weather API
β βββ fusionsolar_connector.py # Huawei FusionSolar PV API
β
βββ dao/ # Data Access Objects
β βββ factory.py # DAOFactory for centralized access
β βββ datasource_dao.py # datasource lookup
β βββ batch_dao.py # ingest_batch registration + deduplication
β βββ staging_dao.py # Staging table operations
β βββ data_dao.py # Final table inserts with conflict resolution
β βββ pipeline_dao.py # Execution + quality logging
β βββ cursor_dao.py # API cursor tracking
β
βββ conf/
β βββ datasources.yaml # 27 datasource definitions (Tago, Airbeld, FusionSolar, File)
β βββ site_config.yaml # Site metadata
β
βββ mappings/ # YAML dataset configurations
β βββ energy_hourly.yaml
β βββ energy_daily.yaml
β βββ environmental_metrics.yaml
β βββ dairy_production.yaml
β βββ api_energy_hourly.yaml
β βββ api_energy_daily.yaml
β βββ api_environmental_metrics.yaml
β βββ api_solar_hourly.yaml
β βββ api_solar_daily.yaml
β βββ api_solar_monthly.yaml
β
βββ tests/ # Unit tests
βββ test_models.py
βββ test_pipeline.py
βββ test_transformer.py
| Variable | Default | Description |
|---|---|---|
DB_DSN |
β | PostgreSQL connection string (required) |
NUM_WORKERS |
2 |
Number of worker threads |
QUEUE_MAX_SIZE |
100 |
Maximum queue capacity |
LOG_LEVEL |
INFO |
Logging verbosity |
WATCH_DIR |
/data/incoming |
File connector watch directory |
PROCESSED_DIR |
/data/processed |
Processed files directory |
REJECTED_DIR |
/data/rejected |
Failed files directory |
MAPPINGS_DIR |
/app/mappings |
YAML mappings directory |
FILE_POLL_INTERVAL |
5 |
File polling interval (seconds) |
FILE_STABLE_SECONDS |
3 |
File stability wait before reading (seconds) |
TAGO_ENABLED |
true |
Enable Tago.io connector |
TAGO_API_URL |
https://api.tago.io |
Tago.io base URL |
TAGO_POLL_INTERVAL |
3600 |
Tago polling interval (seconds) |
TAGO_LOOKBACK_DAYS |
7 |
Tago lookback if no cursor |
AIRBELD_API_URL |
β | Airbeld API base URL |
AIRBELD_EMAIL |
β | Airbeld account email |
AIRBELD_PASSWORD |
β | Airbeld account password |
AIRBELD_POLL_INTERVAL |
43200 |
Airbeld polling interval (seconds) |
AIRBELD_LOOKBACK_DAYS |
7 |
Airbeld lookback if no cursor |
FUSIONSOLAR_ENABLED |
true |
Enable FusionSolar connector |
FUSIONSOLAR_API_URL |
https://intl.fusionsolar.huawei.com/thirdData |
FusionSolar API URL |
FUSIONSOLAR_USER |
β | FusionSolar username |
FUSIONSOLAR_SYSTEM_CODE |
β | FusionSolar system code |
FUSIONSOLAR_POLL_INTERVAL |
3600 |
FusionSolar polling interval (seconds) |
FUSIONSOLAR_LOOKBACK_DAYS |
30 |
FusionSolar lookback if no cursor |
CONF_DIR |
/app/conf |
Configuration files directory |
dataset: energy_hourly
granularity: hourly
# Column mapping: source β database
columns:
"Date and Time": ts
"Hourly": energy_kwh
# Type coercions
coercions:
ts: datetime
energy_kwh: float
# Validation rules
validation_rules:
schema:
required:
- ts
- energy_kwh
constraints:
energy_kwh:
min: 0
max: 10000
# Conflict resolution
conflict_resolution:
strategy: update # update | ignore | fail | append
on_columns:
- source_batch_id
- ts
update_columns:
- energy_kwh
# Target tables
target_table: fact_energy_hourly
staging_table: staging_energy_hourly- Create connector class extending
BaseConnectororHttpConnector - Implement required methods:
start(),stop(),discover(),fetch(),ack(),fail(),health() - Register in
connectors/registry.py:CONNECTOR_TYPES['myapi'] = MyApiConnector
- Add entry in
ingestor/conf/datasources.yamlfor each datasource it manages - Add environment variables in
config.pyfor credentials and scheduling
- Create Pydantic model in
models.pyextendingBaseRecord - Register in
MODEL_REGISTRY - Create YAML mapping in
mappings/ - Add staging table mapping in
StagingDAO.STAGING_TABLES - Add SQLAlchemy model to
shared/src/shared/models.py - Run
alembic revision --autogenerate -m "add_<dataset>_tables" - Add Pydantic schema to
shared/src/shared/schemas.py - Add API router + query module if queryable via the API