MVP ‐2 AI Predictive (PMMetrics) Service - logicmatter/PRS-AI-Mode GitHub Wiki

Overview

**Metrics Module Overview **This module provides a comprehensive framework for computing key performance indicators (KPIs) on time-series sensor data to support monitoring, analytics, and predictive modeling.

image

It operates in three main stages:

  1. Data Retrieval: Efficiently queries historical and recent sensor data (e.g., last 5 matching weekdays for the target hour) from a PostgreSQL database, ensuring enough context for accurate metric calculations.
  2. Metrics Calculation: Calculates a rich set of KPIs grouped into Base, Behavior/Quality, Distribution Shape, and Trend categories. These metrics capture data completeness, signal quality, statistical properties, and temporal patterns necessary for fault detection, anomaly analysis, drift detection, and system performance tracking.
  3. Data Storage: Inserts or updates the computed KPIs back into PostgreSQL for easy retrieval by dashboards, alerting systems, or machine learning pipelines.

The design leverages Python with pandas and numpy for data processing, providing flexibility for future enhancements and advanced analytics, while relying on PostgreSQL for scalable storage and efficient querying.



🧩 Base KPIs

These KPIs provide the fundamental statistical summary of the sensor data, including counts, central tendencies, and ranges. They establish a baseline understanding of data completeness and basic characteristics, forming the foundation for more advanced analytics.

KPI Name Purpose Formula / Logic Notes
Actual_Count Actual received samples len(samplevalue) Measures data completeness
Expected_Config_Count Design-configured expected samples Parameter from config (e.g. 3600/hr) Useful for data SLA comparisons
Expected_Hist_Count Historical average samples mean(expectedcount) Useful for dynamic expectations
Coverage_Pct_Config Coverage vs config Actual_Count / Config_Count * 100
Coverage_Pct_Hist Coverage vs historical mean Actual_Count / Hist_Count * 100
Missing_Pct_Config % missing wrt config 100 - Coverage_Pct_Config
Missing_Pct_Hist % missing wrt history 100 - Coverage_Pct_Hist
Actual_Mean Central tendency mean(samplevalue)
Actual_Median Robust central point median(samplevalue)
Actual_Mode Most common value mode(samplevalue)
Actual_Min Lowest value min(samplevalue)
Actual_Max Highest value max(samplevalue)
Actual_Range Spread max - min

⚙️ Behavior / Quality KPIs

These KPIs measure the quality and reliability of the sensor data by detecting anomalies such as stuck values, constant readings, or outliers. They help identify sensor faults or data issues that could degrade the accuracy of analytics and predictive models.

KPI Name Purpose Formula / Logic Analytics / Prediction Notes
Flatline_Pct Percent of readings that didn’t change (DIFF == 0).count() / total * 100 Detects stuck or frozen sensors
Outlier_Count Number of 3-sigma outliers COUNT(abs(x - mean) > 3*std) Indicates system instability or faulty data
Value_Constant Checks if value never changed 1 if MIN == MAX else 0 Total flatline means dead signal or fixed override

📊 Distribution Shape KPIs

These KPIs characterize the statistical distribution of the sensor data. Understanding the shape, spread, and symmetry helps in choosing appropriate models, detecting shifts in data behavior, and identifying unusual events.

KPI Name Purpose Formula / Logic Analytics / Prediction Notes
Stddev Measure of variation std(samplevalue) Indicates how spread the readings are; useful for trend window sizing
Variance Total spread var(samplevalue) Alternate view to std; used in statistical forecasting
IQR Spread between quartiles Q3 - Q1 Good for outlier-resistant volatility measurement
Skewness Data symmetry skew() Skewed data might need transformation before training
Kurtosis Data peak shape kurtosis() Helps assess presence of frequent extreme events
Percentile_95 High percentile boundary quantile(0.95) Used for setting control limits and anomaly thresholds

📈 Trend & Change KPIs

These KPIs capture the temporal dynamics and patterns in the data, such as trends, drifts, and sudden changes. They are vital for early detection of system degradation, concept drift in models, or operational shifts.

KPI Name Purpose Formula / Logic Analytics / Prediction Notes
Slope_Trend Slope of value over time Linear regression of time vs value Detects increasing or decreasing trend
Drift_Score Deviation from historical mean (Current mean - Previous mean) / std Used in ML for concept drift or state transition detection
Momentum_Trend Raw delta between current and previous mean Current mean - Previous mean Highlights sharp shifts in behavior
Rate_Change Change rate per second (max - min) / (duration_in_sec) Useful in process monitoring (e.g. flow rates)
Zscore_Max Largest z-score across values max(abs(zscore)) Used for detecting extreme outliers

📊 Hourly KPI Metrics

Metric Name Purpose / Why This Metric? Use for Prediction or Analysis Python (calls backend)
Actual_Mean Average value over the hour Baseline for behavior tracking, forecast input pgmetrics_mean(tid, sid, sdt, edt)
Actual_Median Robust central tendency More reliable in skewed/noisy data pgmetrics_median(...)
Actual_Min Lowest value detected Used in floor-limit detection pgmetrics_min(...)
Actual_Max Highest value detected Detect spikes, ceiling condition pgmetrics_max(...)
Actual_Range Difference between max and min Simple volatility indicator pgmetrics_range(...)
Actual_Count Number of samples received Checks if data completeness meets expectations pgmetrics_count(...)
Expected_Daily_Count Static config value for daily sampling Used to estimate hourly expected count pgmetrics_expected_daily_count(tid, sid)
Expected_Hourly_Count Derived from daily config / 24 Used to calculate coverage Expected_Daily_Count / 24
Expected_Historical_Count Rolling 30-day average for hour Data-driven threshold base pgmetrics_expected_historical_count(...)
Normalized_Count_Hist Actual_Count / Historical_Expected_Count Used in clustering or threshold logic Python calculation
Sample_Coverage_Pct Actual_Count / Expected_Config_Count Percentage of ideal samples seen Python calculation
KMeans_Cluster_ID Cluster ID from k-means model Used to assign behavior category kmeans.predict([[normalized_count]])
Anomaly_Label Derived from frequency of cluster Used for alert logic and visual tagging "Expected" / "Anomalous" based on cluster frequency

Implementing PMMetrics

A Python script that queries ADS for datasets for a point (sid) and produces metrics resultset to be stored in ADS

Paramters

  • Parameters & Example Notes

tid : '32dlkfjge399u' Tenant identifier

sid : 3535 Sensor / point ID

year : 2025 Extracted from sdt

month : 6 Month number (1–12)

dayofmonth : 25 specifc day of month

hourofday : 14 edt.hour - sdt.hour (0–23)

metric_name : 'Actual_Mean' E.g. 'Actual_Median', 'Actual_Count'

metric_value : 12.4 Float result

sdt : start datetime for absolute datetime

edt: end datetime for absolute datetime

res : 'hourly' Resolution for metrics

dur : 'yesterday' used for relative timerange

Python omponent Code

Layer Role
Data Ingestion DataFrame is assumed to come from a PostgreSQL function call.
Computation Handles 25+ KPIs grouped across Base, Behavior, Distribution, Trend
Output Format Output is a DataFrame ready to be inserted back into PostgreSQL.
  • Example code shared as per the design above
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Optional, List, Dict

# === Main Compute Function ===

def compute_metrics_from_dataset(tid: str, sid: int, df: pd.DataFrame,
                                 sdt: datetime, edt: datetime,
                                 res: str = 'hourly',
                                 dur: Optional[str] = None,
                                 expected_config_count: Optional[int] = None,
                                 previous_mean: Optional[float] = None) -> pd.DataFrame:
    """
    Compute all KPIs from 1-minute sample data grouped to an hourly resolution.
    """

    if df.empty:
        return pd.DataFrame()

    metrics: List[Dict] = []

    def add_metric(name: str, value: Optional[float]):
        metrics.append({
            'tid': tid,
            'sid': sid,
            'year': sdt.year,
            'month': sdt.month,
            'dayofmonth': sdt.day,
            'hourofday': sdt.hour,
            'metric_name': name,
            'metric_value': round(value, 4) if pd.notnull(value) else None,
            'sdt': sdt,
            'edt': edt,
            'res': res,
            'dur': dur or ''
        })

    sv = df['samplevalue'].dropna()
    ec_hist = df['expectedcount'].mean() if 'expectedcount' in df.columns else None
    ec_conf = expected_config_count

    # === 📌 Base Metrics ===
    add_metric('Actual_Count', len(sv))
    add_metric('Expected_Config_Count', ec_conf)
    add_metric('Expected_Hist_Count', ec_hist)

    if ec_conf:
        add_metric('Coverage_Pct_Config', (len(sv) * 100.0 / ec_conf))
        add_metric('Missing_Pct_Config', 100.0 - (len(sv) * 100.0 / ec_conf))
    else:
        add_metric('Coverage_Pct_Config', None)
        add_metric('Missing_Pct_Config', None)

    if ec_hist:
        add_metric('Coverage_Pct_Hist', (len(sv) * 100.0 / ec_hist))
        add_metric('Missing_Pct_Hist', 100.0 - (len(sv) * 100.0 / ec_hist))
    else:
        add_metric('Coverage_Pct_Hist', None)
        add_metric('Missing_Pct_Hist', None)

    add_metric('Actual_Mean', sv.mean())
    add_metric('Actual_Median', sv.median())
    add_metric('Actual_Mode', sv.mode().iloc[0] if not sv.mode().empty else None)
    add_metric('Actual_Min', sv.min())
    add_metric('Actual_Max', sv.max())
    add_metric('Actual_Range', sv.max() - sv.min())

    # === ⚙️ Behavior / Quality KPIs ===
    flat_pct = (sv.diff().fillna(1) == 0).sum() * 100.0 / len(sv)
    add_metric('Flatline_Pct', flat_pct)
    outliers = ((sv - sv.mean()).abs() > (3 * sv.std())).sum()
    add_metric('Outlier_Count', outliers)
    add_metric('Value_Constant', 1 if sv.min() == sv.max() else 0)

    # === 📊 Distribution Shape KPIs ===
    add_metric('Stddev', sv.std())
    add_metric('Variance', sv.var())
    add_metric('IQR', sv.quantile(0.75) - sv.quantile(0.25))
    add_metric('Skewness', sv.skew())
    add_metric('Kurtosis', sv.kurtosis())
    add_metric('Percentile_95', sv.quantile(0.95))

    # === 📈 Trend & Change KPIs ===
    if 'timeofsample' in df.columns:
        df_sorted = df.sort_values('timeofsample')
        x = pd.to_datetime(df_sorted['timeofsample']).astype(np.int64) // 10**9
        y = df_sorted['samplevalue']
        if len(x) > 1:
            slope = np.polyfit(x, y, 1)[0]
            add_metric('Slope_Trend', slope)
        else:
            add_metric('Slope_Trend', None)
    else:
        add_metric('Slope_Trend', None)

    # Drift-related KPIs
    if previous_mean and sv.std() > 0:
        drift = (sv.mean() - previous_mean) / sv.std()
        add_metric('Drift_Score', drift)
        add_metric('Momentum_Trend', sv.mean() - previous_mean)
    else:
        add_metric('Drift_Score', None)
        add_metric('Momentum_Trend', None)

    # Rate of change
    duration_seconds = (edt - sdt).total_seconds()
    add_metric('Rate_Change', (sv.max() - sv.min()) / duration_seconds if duration_seconds > 0 else None)

    # Z-score
    zscore_max = ((sv - sv.mean()).abs() / sv.std()).max() if sv.std() > 0 else None
    add_metric('Zscore_Max', zscore_max)

    return pd.DataFrame(metrics)




PlantUMl Reference


@startuml
!theme vibrant
skinparam defaultFontName Consolas
skinparam defaultFontSize 22

title Metrics Module – Layered Architecture with Notes

' Define layers
rectangle "🧑‍💼 User Layer" {
  actor User
}

package "🐍 Application Layer (Python)" as app {
  control "[def get_data()]" as PMGet #lightblue
  component "[def compute_metrics()]" as Compute #lightgreen
  control "[def put_metrics()]" as PMPut #lightyellow

}

  note as NoteCompute #lightgreen
    Calculates KPIs on DataFrame including:\n
    - Base (count, mean, median, etc.)\n
    - Behavior / Quality (flatline, outliers)\n
    - Distribution shape (skewness, kurtosis)\n
    - Trend / drift metrics (slope, momentum)
  end note

package "🗄️ Data Layer (PostgreSQL)" as dbs {

  control "[function pgsql_get_dataset()]" as PGGet #lightblue
  note left of PMGet  #lightblue
    Retrieves raw sensor data from PostgreSQL
    - Calls pgsql_get_dataset()
    - Returns DataFrame with 5 days historical data
  end note
  control "[pgsql_put_metrics()]" as PGPut #lightyellow
  note right of PMPut #lightyellow
    Serializes and stores computed KPIs\n
    - Calls pgsql_put_metrics()\n
    - Upserts metrics into metrics_result table
  end note
}
  database "ADS" as ads #lightgrey
app .d[hidden]. dbs

User -d-> PMGet : <b>[1.]</b> Run(tid,sid...) 
PMGet <.d. PGGet : <b>[2.]</b>  Get_dataset(sid...)
PGGet <.d. ads : <b>[3.]</b>  SELECT raw data for last 5 similar weekdays
PMGet .r.> Compute : [4.] compute_metrics(df)
Compute .r.> PMPut : [5.] put_metrics(metrics_df)
PMPut ..> PGPut : <b>[6.]</b> Store metrics in DB
PGPut ..> ads : <b>[7.]</b> INSERT/UPDATE metrics_result
User <-- PMPut : <b>[8.]</b> OK


NoteCompute =d[#lightgreen]=> app

@enduml

⚠️ **GitHub.com Fallback** ⚠️