Aggregation Pipelines - open-inc/openware GitHub Wiki

If you want to query data from the system, but need aggregated data, e.g. the mean value of an hour for a given sensor, you can make use of the aggregation pipeline inside of open.WARE.

That aggregation service handling pipeline is available via API. In order to access it, you need to be authenticated and either provide a valid JWT via Bearer Token or an OD-SESSION Header with an valid session token in the request header.

Enpoint

The Endpoint is available at {{BASE_URL_OF_OPENWARE}}/api/transform/pipe via POST Request

Payload

You need to provide a payload in the following form

{"stages":[
			{"action":"name_of_action", "params":{}},
			{"action":"last_action_of_Pipeline", "params":{"foo":"bar"}}
		  ]
}

The above example shows an exemplary payload. When the service receives a pipeline it will try to process each stage sequentially and pass the resulting data of each action to its successor. Thereby it is possible to multiple transformation steps in sequence.

It is important to note, that after each stage, the system will check if the provided user (JWT or Session) has access to the result of the stage. If not, the pipeline processing will be canceled and an error will be thrown.

If everything is processed correctly, you will get a JSON-response representing an DataItem

Stages

The following stage action are typically available. Some of the actions are only available in the open.WARE EE addon. Depending on the configuration of your open.WARE Server, shorthands exist. We provide the default shorthands in brackets after the fully qualified name. Both options can be used.

DiffTransformer

Can be used to create difference between values. It takes in the result of the previous stage and substracts consecutive values. This stage relies on its previous stage to get data.

action: de.openinc.opentransformations.DiffTransformer (difference)
Parameter Description Type
initialValue An optional number that will be used to create the difference with the first value of the results from the previous stage Number
laterValuesFirst Bool-Parameter to switch ordering of the provided data Boolean

Dimension Reduce Transformer

Can be used to reduce a multidimensional data set (Sensors with more than one value, e.g. Gyroscope with x,y,z) to a single dimension for further processing

action: de.openinc.opentransformations.DimensionReduceTransformer (dimension_reduce)
Parameter Description Type
dimension Required parameter indicating the dimension to keep Number

Geo Transformer (experimental)

Provides utility functions to work with geo data. It provides

  • extractCoords: Used to extract coordinates from GeoJSON to an 2-Tuples Array
  • clusterKMeans: Used to perform clustering of geolocation using KMeans
  • clusterDBScan: Used to perform clustering of geolocation using DBSCAN

All of these actions require a previous stage to get the data.

action: de.openinc.opentransformations.GeoTransformer
Parameter Description Type
operation String indicating the utility functions, being either 'extractcoord' OR 'clusterkmeans' OR 'clusterdbscan' String

KMeans Additional Parameters

Parameter Description Type
clusters Number of clusters to produce Number

DBSCAN Additional Parameters

Parameter Description Type
epsilon Density criteria for clusters in Meters (distance when to group points) Number
minpoints Minimal number of points in 'epsilon' distance to establish a cluster Number

Returns a DataItem with Geo-Polygon containing the hulls (created using Graham-Scan) as FeatureCollection of the found clusters.

In Memory Aggregation

Can be used to to aggregate the provided data using the following operation:

  • mean
  • min
  • max
  • sum
  • stdd
  • variance
  • count
action: de.openinc.opentransformations.InMemAggregation (transform_aggregation)

Option 1: Define the number of splits Split the data into data buckets and perform operation for each bucket.

Parameter Description Type
operation One of the before mentioned operations String
dimension Required parameter indicating the dimension to aggregate Number
start Unix timestamp (milliseconds) indicating the beginning of the timeframe that should be aggregated Number
end Unix timestamp (milliseconds) indicating the end of the timeframe that should be aggregated Number
splits Number of splits. It determines how many buckets will be created and how many values therefore will be returned Number

Option 2: Create Splits based on the change of the first dimension Creates intervals based on the change of the value in the first dimension. Everytime it changes a new databucket will be created and the operation will be performed, e.g. summing up all values until the next change

Parameter Description Type
operations One of the before mentioned operations per dimension (excluding the first dimension) String[]

Option 3: Create Splits based on a fixed interval in milliseconds Creates intervals based on the provided interval value. E.g. having a minutes data, using an interval of 1000 results in 60 buckets.

Parameter Description Type
operation One of the before mentioned operations String
dimension Required parameter indicating the dimension to aggregate Number
interval Required parameter indicating the length of the interval in milliseconds Number

Live Data Source

Can be used to get live values (last recieved values) for a certain point in time. It will add the values of the previous stage to the results, therefore it can be used in chains to retrieve different data sets.

action: de.openinc.opentransformations.LiveDataSource (live)
Parameter Description Type
id ID of the sensor to retrieve values for String
source Source of the sensor to retrieve values for String
at Timestamp in milliseconds indicating the point in time live values should be retrieved for Number
amount Number of values to retrieve before at Number
reference Optional string indicating to only retrieve value with the given reference String