Aplicação python ‐ Vaisala (sugestao) - tecnologiadB/MonitoramentoRuidoWiki GitHub Wiki
Revisão Arquitetural e Clean Code — Service do Vaisala (Python)
Contexto: o pacote VaisalaConfigure contém um
config.inicom parâmetros de sonômetro e estação Vaisala (porta serial, taxas de coleta, períodos/limites, etc.). Você informou que o serviço é em Python. Abaixo segue uma revisão especializada voltada para Python, cobrindo arquitetura, padrões, boas práticas, resiliência, observabilidade, segurança e testes.
1) Diagnóstico rápido (a partir do INI e cenário típico)
- Configuração frágil:
config.inicom vírgula decimal (pt-BR), chaves e segredos em texto plano (user/senha). - Integrações: porta serial (Vaisala) e TCP/IP (sonômetro), propensas a timeouts e reconexões.
- Regras: limites por período (diurno/vespertino/noturno), tonal/impulsivo, vento/chuva influenciam o processamento.
- Riscos: parsing sensível a localidade, acoplamento entre leitura de dispositivos e regras de negócio, pouca observabilidade.
2) Arquitetura alvo (Pythonic, assíncrona e testável)
vaisala_service/
├─ app/
│ ├─ __init__.py
│ ├─ main.py # bootstrap (uvicorn/worker CLI)
│ ├─ settings.py # Pydantic Settings (carrega INI/env)
│ ├─ orchestrators/
│ │ ├─ scheduler.py # Agenda coletas (vento/PTU/chuva)
│ │ └─ pipeline.py # Ingestão → validação → enrich → persistir/publicar
│ ├─ domain/
│ │ ├─ models.py # dataclasses/pydantic (Vento, PTU, Chuva, NivelRuido, Periodo)
│ │ ├─ policies.py # limites, tonal/impulsivo (Strategy)
│ │ └─ periods.py # mapa de períodos (diurno/vespertino/noturno)
│ ├─ infra/
│ │ ├─ serial_gateway.py # Adapter Vaisala (pyserial-asyncio)
│ │ ├─ sonometro_gateway.py # Adapter (TCP/IP)
│ │ ├─ storage.py # Persistência (CSV/DB/Blob)
│ │ ├─ streaming.py # MQTT/HTTP/gRPC (opcional)
│ │ └─ observability.py # logging estruturado + metrics
│ └─ web/
│ └─ api.py # FastAPI (health/metrics/opcional)
├─ tests/ # pytest (mocks, loopback serial)
├─ pyproject.toml # ruff/black/mypy/uvicorn deps
└─ README.md
Padrões: Adapter (gateways), Strategy (cálculos/limites), Pipeline (processamento), State (períodos), Factory (seleção de leitores/políticas).
Concorrência: asyncio com asyncio.Queue para backpressure + workers dedicados.
3) Configuração tipada (Pydantic Settings) + parsing pt-BR
- Carregue
config.inicomconfigparsere hidrate Settings (Pydantic) — ou migre para TOML/YAML. - Converta vírgula decimal de forma centralizada na borda.
# app/settings.py
from pydantic import BaseModel, Field
from datetime import timedelta
class SonometroSettings(BaseModel):
conectar: bool = Field(default=True)
ip: str
user: str
senha: str
sn: str
cic: int = 0
class VaisalaSettings(BaseModel):
comunicacao: bool = Field(default=True)
canal: str = "COM3"
taxa_vento: timedelta = timedelta(seconds=60)
taxa_ptu: timedelta = timedelta(seconds=12)
taxa_chuva: timedelta = timedelta(seconds=12)
class AppSettings(BaseModel):
sonometro: SonometroSettings
vaisala: VaisalaSettings
# ... ConfigPadrao, PeriodoLimites etc.
Conversão pt-BR: para valores
"60,000000", façafloat(val.replace(',', '.'))apenas no carregamento; internamente, usetimedelta/Decimal/floatcom padrão invariante.
Segredos: remova user/senha do INI → use variáveis de ambiente (SONOMETRO_USER, SONOMETRO_PASS) com fallback seguro.
4) Adapter de porta serial com pyserial-asyncio + resiliência
# app/infra/serial_gateway.py
import asyncio, serial, serial_asyncio
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
class VaisalaFrame: ... # dataclass c/ campos parseados
class SerialVaisalaGateway:
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
self._port = port
self._baud = baudrate
self._timeout = timeout
self._reader = self._writer = None
@retry(stop=stop_after_attempt(5), wait=wait_exponential_jitter(0.5, 3))
async def open(self) -> None:
self._reader, self._writer = await serial_asyncio.open_serial_connection(
url=self._port, baudrate=self._baud
)
async def frames(self):
while True:
line = await asyncio.wait_for(self._reader.readline(), timeout=2.0)
if not line:
continue
frame = self._parse(line)
if frame:
yield frame
def _parse(self, raw: bytes) -> VaisalaFrame | None:
# validar checksum/CRC, normalizar locale, retornar dataclass
...
- Tenacity para retry com backoff/jitter ao abrir/reabrir a porta.
- Timeout por leitura (evita hang).
- Validação do frame (checksum) e contadores de descartes.
5) Orquestração com asyncio.Queue e workers
# app/orchestrators/scheduler.py
import asyncio
from datetime import timedelta
class Scheduler:
def __init__(self, q_vento: asyncio.Queue, q_ptu: asyncio.Queue, q_chuva: asyncio.Queue, s):
self.qv, self.qp, self.qc, self.s = q_vento, q_ptu, q_chuva, s
async def _tick(self, queue, period: timedelta, label: str):
try:
while True:
await queue.put({"cmd": f"coletar_{label}"})
await asyncio.sleep(period.total_seconds())
except asyncio.CancelledError:
return
async def run(self):
async with asyncio.TaskGroup() as tg:
tg.create_task(self._tick(self.qv, self.s.vaisala.taxa_vento, "vento"))
tg.create_task(self._tick(self.qp, self.s.vaisala.taxa_ptu, "ptu"))
tg.create_task(self._tick(self.qc, self.s.vaisala.taxa_chuva, "chuva"))
- Backpressure: limite
maxsizedasQueues. - Workers dedicados para ler gateway, validar, enriquecer e persistir/publicar.
6) Domínio: modelos, períodos e Strategy para limites
# app/domain/models.py
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
@dataclass(frozen=True)
class Vento:
ts: datetime
velocidade: Decimal # m/s
direcao: int # graus
@dataclass(frozen=True)
class PTU:
ts: datetime
temp_c: Decimal
umid_rel: Decimal
pressao_hpa: Decimal
- Períodos (diurno/vespertino/noturno) em
periods.pycom janela [hora_inicio, hora_fim). - Strategy para limites/condições (vento/chuva/tonal/impulsivo), selecionada por configuração.
7) Observabilidade (logging JSON + métricas + health)
- logging estruturado (ex.:
structlogou JSONFormatter dologging) com campos:station,serial_port,frame_type,period,elapsed_ms. - Prometheus:
prometheus_clientcom métricas:vaisala_frames_total,invalid_frames_total,read_latency_seconds,reconnect_total,queue_size. - Health:
FastAPI(opcional) em/healthze/metrics(expor o registry do Prometheus).
8) Persistência/streaming
- CSV/Parquet (pandas/pyarrow) ou DB (SQLite/Postgres) via
asyncpg. - Idempotência: nomes determinísticos (
YYYY/MM/DD/HH/vento_{ts}_{hash}.csv) e UPSERT por(ts, tipo, station). - Publicação (opcional): MQTT (
asyncio-mqtt) ou HTTP (httpx) — com timeouts/retries.
9) Segurança
- Segredos apenas via ambiente/cofre; nunca em INI.
- Mascarar credenciais em logs (filters).
- Executar como usuário não raiz; limitar permissões do INI e pastas de dados.
10) Qualidade de código
- Tipagem: type hints completos +
mypy --strict(ou near-strict). - Lint/format:
ruff+black+isortvia pre-commit. - Exceções: trate somente o que agrega; wrap com contexto e re-raise.
- Clock: sempre UTC (
datetime.now(tz=UTC)). - Unidades: considere
pintpara segurança de unidades (m/s, °C, hPa).
11) Testes (pytest)
- Unit: parsing de frames, períodos, políticas de limites.
- Integração: loopback serial (
pyserialvirtual), gateway falso de sonômetro, teste de reconexão. - Carga: replay de frames (arquivo) para medir p95/p99 de latência e perda.
# tests/test_periods.py
def test_period_mapping():
assert period_of(time(7,0)) == "diurno"
12) CI/CD e operação
- Pyproject com versões fixas; build em container slim (Python 3.12).
- Pipelines: lint → type-check → test → build → scan (SAST/SBOM) → deploy.
- Docker:
USER app,readOnlyRootFilesystem,HEALTHCHECK. - Logs fluem para Loki/ELK; métricas para Prometheus/Grafana; alertas (reconnect alto, invalid frames > X%).
13) Roadmap de refatoração
- Base: Settings tipadas (Pydantic), logging estruturado, tenacity + httpx/serial.
- Fluxo:
asyncio.Queuecom workers e backpressure; parsers/validators isolados. - Observabilidade: métricas + health + dashboards; testes de integração com loopback.
- Hardening: segredos via ambiente/cofre; limites de recursos; políticas de retry finas.
14) Checklist rápido
- Pydantic Settings + carregador do INI (pt-BR → tipos fortes)
- Gateways (serial/TCP) como Adapters assíncronos com retry/timeout
- Orquestração com
asyncio.Queuee workers por tipo de leitura - Domain models tipados (dataclasses/pydantic) + Strategy de limites
- Logging JSON + Prometheus + health endpoints
- Persistência idempotente e/ou publicação MQTT/HTTP com retry
- Testes unit/integration + pre-commit (ruff/black/mypy)
- Segurança: segredos fora do INI, permissões mínimas, UTC em tudo
Resultado esperado
- Serviço robusto e observável, preparado para 24/7 com reconexão transparente.
- Código limpo e modular, fácil de manter e evoluir (novos sensores/formatos).
- Confiabilidade e segurança elevadas (tipagem, métricas, segredos protegidos).