New Workflow API - IRC-SPHERE/HyperStream GitHub Wiki
In the SPHERE-HyperStream repository, there is an example of the old- and new-style workflow APIs, which are both given below for comparison.
Old style API
From here
workflow_id = "lda_localisation_model_predict:
S = hyperstream.channel_manager.sphere
D = hyperstream.channel_manager.mongo
M = hyperstream.channel_manager.memory
A = hyperstream.channel_manager.assets
with hyperstream.create_workflow(
workflow_id=workflow_id,
name="Live Predictions",
owner="TD",
description="Deploy the LDA localisation model for live predictions",
online=True,
safe=safe) as w:
nodes = (
("rss_raw", S, ["H"]),
("location_prediction", D, ["H", "LocalisationModels"]),
("location_prediction_lda", M, ["H"]),
("every_2s", M, ["H.W"]),
("rss_per_uid", M, ["H.W"]),
("rss_per_uid_2s", M, ["H.W"]),
("location_prediction_models_broadcasted", M, ["H.W"]),
("predicted_locations_broadcasted", D, ["H.W"]),
("wearables_by_house", A, ["H"]),
("access_points_by_house", A, ["H"])
)
# Create all of the nodes
N = dict((stream_name, w.create_node(stream_name, channel, plate_ids))
for stream_name, channel, plate_ids in nodes)
w.create_multi_output_factor(
tool=hyperstream.channel_manager.get_tool(
name="sphere",
parameters=dict(modality="wearable", elements={"rss"})
),
source=None,
splitting_node=None,
sink=N["rss_raw"])
w.create_multi_output_factor(
tool=hyperstream.channel_manager.get_tool(
name="splitter_from_stream",
parameters=dict(
element="uid"
)
),
source=N["rss_raw"],
splitting_node=N["wearables_by_house"],
sink=N["rss_per_uid"])
w.create_factor(
tool=hyperstream.channel_manager.get_tool(
name="sliding_window",
parameters=dict(lower=-2.0, upper=0.0, increment=2.0)
),
sources=None,
sink=N["every_2s"])
def component_wise_max(init_value=None, id_field='aid', value_field='wearable-rss'):
if init_value is None:
init_value = {}
def func(data):
result = init_value.copy()
for (time, value) in data:
if value[id_field] in result:
result[value[id_field]] = max(result[value[id_field]], value[value_field])
else:
result[value[id_field]] = value[value_field]
return result
return func
w.create_factor(
tool=hyperstream.channel_manager.get_tool(
name="sliding_apply",
parameters=dict(func=component_wise_max())
),
sources=[N["every_2s"], N["rss_per_uid"]],
sink=N["rss_per_uid_2s"])
w.create_factor(
tool=hyperstream.channel_manager.get_tool(
name="index_of",
parameters=dict(selector_meta_data="localisation_model", index="lda")
),
sources=[N['location_prediction']],
sink=N["location_prediction_lda"])
w.create_multi_output_factor(
tool=hyperstream.channel_manager.get_tool(
name="stream_broadcaster_from_stream",
parameters=dict(func=lambda x: x.last())
),
source=N["location_prediction_lda"],
splitting_node=N["wearables_by_house"],
sink=N["location_prediction_models_broadcasted"])
w.create_factor(
tool=hyperstream.channel_manager.get_tool(
name="localisation_model_predict",
parameters=dict()
),
sources=[N['location_prediction_models_broadcasted'], N["rss_per_uid_2s"]],
sink=N["predicted_locations_broadcasted"])
return w
Note that the factor creation is done using w.create_factor()
(and related functions) and that the tools are loaded using get_tool()
. Using the new API, this now looks like:
## New style API
From here
workflow_id = "lda_localisation_model_predict"
sp = hs.plugins.sphere
S = hs.channel_manager.sphere
D = hs.channel_manager.mongo
M = hs.channel_manager.memory
A = hs.channel_manager.assets
houses = hs.plate_manager.plates["H"]
wearables = hs.plate_manager.plates["H.W"]
models = hs.plate_manager.plates["LocalisationModels"]
with hs.create_workflow(
workflow_id=workflow_id,
name="Live Predictions",
owner="TD",
description="Deploy the LDA localisation model for live predictions",
online=True,
safe=safe) as w:
nodes = (
("rss_raw", S, ["H"]),
("location_prediction", D, ["H", "LocalisationModels"]),
("location_prediction_lda", M, ["H"]),
("every_2s", M, ["H.W"]),
("rss_per_uid", M, ["H.W"]),
("rss_per_uid_2s", M, ["H.W"]),
("location_prediction_models_broadcasted", M, ["H.W"]),
("predicted_locations_broadcasted", D, ["H.W"]),
("wearables_by_house", A, ["H"]),
("access_points_by_house", A, ["H"])
)
# Create all of the nodes
N = dict((stream_name, w.create_node(stream_name, channel, plate_ids))
for stream_name, channel, plate_ids in nodes)
def component_wise_max(init_value=None, id_field='aid', value_field='wearable-rss'):
if init_value is None:
init_value = {}
def func(data):
result = init_value.copy()
for (time, value) in data:
if value[id_field] in result:
result[value[id_field]] = max(result[value[id_field]], value[value_field])
else:
result[value[id_field]] = value[value_field]
return result
return func
for house in houses:
N["rss_raw"][house] = sp.factors.sphere(source=None, modality="wearable", elements={"rss"})
for wearable in wearables[house]:
N["rss_per_uid"][house][wearable] = hs.factors.splitter_from_stream(
source=N["rss_raw"], splitting_node=N["wearables_by_house"], element="uid")
N["every_2s"][house][wearable] = hs.factors.sliding_window(
sources=None, lower=-2.0, upper=0.0, increment=2.0)
N["rss_per_uid_2s"][house][wearable] = hs.factors.sliding_apply(
sources=[N["every_2s"], N["rss_per_uid"]], func=component_wise_max())
N["location_prediction_lda"][house] = hs.factors.index_of(
sources=[N["location_prediction"]], selector_meta_data="localisation_model", index="lda")
for model in models:
N["location_prediction_models_broadcasted"][house, model] = hs.factors.stream_broadcaster_from_stream(
source=N["location_prediction_lda"], func=lambda x: x.last())
for wearable in wearables:
N["predicted_locations_broadcasted"][house][wearable] = sp.factors.localisation_model_predict(
sources=[N['location_prediction_models_broadcasted'], N["rss_per_uid_2s"]])
return w