ProConcepts Stream Layers - Esri/arcgis-pro-sdk GitHub Wiki
Stream layers are feature layers with a stream service as their data source. They reference real-time datasets where the observations are live. Observations can include changes to location, attributes, or both. Stream layers can contain point, polyline, or polygon features. Even though the title of this ProConcepts is "Stream Layers", Stream layers and real-time datasets usage relevant to the API is discussed. The terms "events" and "observations" as relates to "events" streamed from a stream server are used interchangeably. They mean the same thing.
Language: C#
Subject: Map Authoring
Contributor: ArcGIS Pro SDK Team <[email protected]>
Organization: Esri, http://www.esri.com
Date: 10/06/2024
ArcGIS Pro: 3.4
Visual Studio: 2022
- Background
- Stream Layer Types
- Connecting to a Stream Service
- Streaming
- Service Capabilities
- Track Aware
- Expiring Events
- Definition Queries
- Editing
- Rendering
- Row Events
Stream layers are a special kind of feature-based layer designed to work with stream services. Stream services, similar to other services, are hosted on portal or online and require the GeoEvent Server extension. GeoEvent Server consumes incoming "raw" event data (using a "connector") and maps it to json for the stream service to consume (and also publishes the stream services). The stream service broadcasts the json data to clients such as ArcGIS web maps and ArcGIS Pro stream layers. Unlike feature services, stream services do not need to persist the incoming data before it can be visualized but broadcast it to clients directly.
Stream layers are connected to stream services using LayerFactory
and the stream service URI. When the stream layer is created, it subscribes to the stream service and can immediately begin receiving (real time) data. Whereas a feature layer "pulls" features from the feature service, a stream layer receives features from the stream service (without polling). Stream services can also have one or both of the following associated feature services present:
- A feature service which retains the latest observations (if snapshot archiving was enabled during publishing of the stream service)
- feature service to store related attribute information (feature location, vehicle or aircraft characteristics, etc) if a related features service URL was specified during publishing of the stream service).
Consult the Service capabilities section of this ProConcepts for more information. Note: Stream services are published from GeoEvent Server, not from Pro.
Both stream layer and Realtime feature class expose similar capabilities though they do have certain differences. For instance, the feature class provides details on the schema and service capabilities not available on the layer and likewise, the layer controls rendering and event display filtering options not relevant for the feature class. However, in most cases the same options can be manipulated on both.
One important consideration to keep in mind when choosing whether to code against the stream layer or its feature class is that settings changed via the layer are persisted in the project (if it is saved) whereas the same changes made via the feature class are not. Feature class setting changes are retained in-memory only for the lifetime of the session (or until they are overwritten by another change). Additionally, changes made via the layer will get reflected in the layer properties, ribbon, and symbology UI. Changes made via the feature class will not. Therefore, if maintaining consistency between the Pro UI and settings made via the API is a requirement, the stream layer should be used, and not its feature class, to manipulate equivalent settings.
For these reasons it is assumed add-in developers will be working mostly with the layer though, for completeness, when there is an equivalent setting, the ProConcepts document will cover both. Manipulating the Realtime feature class is required, for example, when operating in a headless configuration such as a console program using the "CoreHost" pattern.
Stream layers and their underlying Realtime feature class can be one of two types (consult Stream layers):
- Spatial
- Non-spatial (often referred to as "attribute only")
Spatial stream layers receive broadcast events that do include a location (eg x, y). The location can either vary with each event as is the case when a vehicle or airplane is being tracked, or the location can be fixed as is the case when an event, in the literal sense, "happens" at a particular location such as an accident, crime, meeting event, or tweet.
Non-spatial stream layers receive broadcast events that do not include a location. The associated location must come from a related feature service. In many cases, non-spatial stream layers are used to receive broadcast events related to stationary sensors such as water gauges, weather stations, or air quality sensors. From the standpoint of the API the "type" of the layer (or its feature class) - spatial or non-spatial - is mostly irrelevant. They both support the same set of properties and functions albeit the intended use cases for each may be different.
If it is required that you need to identify the stream layer type (or are just curious), it can be inferred from the tracking information on the layer (something we haven't discussed yet). Essentially, spatial stream layers must have either a spatial track or no track. Non spatial layers must have an "attribute only" track. Therefore we can write some logic as follows:
//spatial or non-spatial?
if (streamLayer.TrackType == TrackType.AttributeOnly)
{
//this is a non-spatial stream layer
}
else
{
//this must be a spatial stream layer
}
For a RealtimeFeatureClass access its RealtimeFeatureClassDefinition
and GetTrackType()
using the same logic pattern.
To connect to a stream service (to create a stream layer) use any of the LayerFactory
overloads that consume a URI. The most basic form of a request is either via LayerFactory.Instance.Createlayer(Uri dataUri,....)
or LayerFactory.Instance.FeatureCreatelayer(Uri dataUri,....)
casting the result to a StreamLayer
.
var air_traffic_url =
@"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
await QueuedTask.Run(() => {
var uri = new Uri(air_traffic_url, UriKind.Absolute);
var streamLayer = LayerFactory.Instance.CreateLayer(
uri, MapView.Active.Map) as StreamLayer;
//TODO work with the stream layer
});
Uris can be either service connections (as shown above) or portal item Uris. If a portal item uri is provided then that portal must be the currently active portal or the connection request will fail.
As a derived type of FeatureLayer
, stream layers can also be created with additional options specified upfront using the T CreateLayer<T>(LayerCreationParams layerDef, ...)
overload and a FeatureLayerCreationParams
instance as the LayerCreationParams argument. In this example, a stream layer is created with a definition query and renderer specified upfront:
var air_traffic_url =
@"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
await QueuedTask.Run(() => {
var uri = new Uri(air_traffic_url, UriKind.Absolute);
var symbol = SymbolFactory.Instance.ConstructPointSymbol(
ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
.MakeSymbolReference();
var sl_params = new FeatureLayerCreationParams(new Uri(air_traffic_url)) {
IsVisible = false,
DefinitionQuery = new DefinitionQuery("RWY = '29L'", "Runway"),
RendererDefinition = new SimpleRendererDefinition(symbol)
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
sl_params, MapView.Active.Map);
//TODO work with the stream layer
});
When a stream layer is created it is automatically streaming so streaming does not need to be explicitly started. Refer to the Streaming section for more details.
ArcGIS.Core.Data.Realtime
data stores follow the same pattern for establishing a connection with a stream service as with other data stores:
- Create the relevant (Realtime) connection property class using the uri of the service
- Pass the connection properties as the argument to the Realtime datastore constructor
- Access a feature class or table from the datastore (dispose and clean up when it is no longer needed)
When establishing a connection to a feature class via the data store, the feature class will not be streaming by default (opposite of the layer). Instead, call "StartStreaming" on the feature class to explicitly open the connection.
//Within a Queued Task
var props= new RealtimeServiceConnectionProperties(new Uri(@"--stream_service_url--"),
RealtimeDatastoreType.StreamService
);
using (var rt_datastore = new RealtimeDatastore(realtimeServiceConProp))
{
var name = rt_datastore.GetTableNames().First();
using (var rfc = rt_datastore.OpenTable(name) as RealtimeFeatureClass)
{
//feature class, by default, is not streaming (opposite of the stream layer)
rfc.StartStreaming();
//TODO use the feature class
//...
}
}
Note: A Realtime data store of type RealtimeDatastoreType.StreamService
only contains one Realtime feature class (or table).
Stream layers operate in two modes: Streaming vs not streaming (or streaming started vs streaming stopped). This is reflected in the StreamLayer streamLayer.IsStreamingConnectionOpen
property. If streamLayer.IsStreamingConnectionOpen == true;
then the stream layer is streaming and is receiving events from the stream service. If streamLayer.IsStreamingConnectionOpen == false;
then the stream layer is not streaming and the connection is closed (technically the connection could also be transitioning between open and closed or vice versa). When the connection is closed, the stream layer is not receiving events. Switching modes between streaming on and off is accomplished via StartStreaming()
and StopStreaming()
methods on both the layer and feature class.
To start streaming, call streamLayer.StartStreaming()
(or realtimeFc.StartStreaming()
). When a stream layer is created, it is automatically placed into streaming mode so there is no need to explicitly call StartStreaming
on a newly created layer. When a Realtime datastore connection is created it is not automatically streaming. The Realtime feature class must be retrieved and streaming explicitly started. When starting streaming on the layer, its streamLayer.IsStreamingConnectionOpen
property is set to true once streaming has started. There can be some latency involved in establishing the layer connection to the stream service so streamLayer.IsStreamingConnectionOpen
state can return false after a StartStreaming
call (or layer creation) until the connection is established. For a finer-grained granularity of the connection state, the RealtimeFeatureClass
exposes a public StreamingConnectionState GetStreamingConnectionState()
method. The StreamingConnectionState
includes Unknown, Ready, Connecting, Open, Failed, and Closed. "Open" is equivalent to IsStreamingConnectionOpen == true
. All other states are equivalent to IsStreamingConnectionOpen == false
.
When streaming is started on the layer (or feature class if calling realTimeFc.StartStreaming()
), all existing observations in the Realtime feature class are deleted before new events are retrieved. Existing observations retrieved from a previously open connection are never persisted beyond the next start streaming call (or between Pro sessions). If the stream service has an associated feature service (for snapshot archiving or location information) then those records are retrieved from the service first to provide an initial display of data, otherwise nothing will draw until new events are received.
To stop streaming on the layer, call streamLayer.StopStreaming()
(or realtimeFc.StopStreaming()
if using the feature class). The stream layer connection will be closed and streamLayer.IsStreamingConnectionOpen
will be set to false. While streaming is stopped, event rows in the feature class are not expired and no new events will be received until streaming is re-started. Therefore, turning streaming off can be advantageous when you wish to access the set of events currently stored, in-memory, in the feature class, on the client (eg to perform a lengthy analysis) without being impacted by updates to the events. Search(...)
and Select(...)
methods can also safely be executed against the stream layer and client add-in logic can be guaranteed to process all returned rows (refer to the next section for details).
When the stream layer (or feature class) is streaming, Search(...)
and Select(...)
behavior can be affected if the stream service has a particularly high velocity. Events selected via the Search(...)
orSelect(...)
can be expired before the client add-in logic can process them (via the traditional RowCursor.MoveNext()
). For example, a selection may return 100 rows but only 10 of them get processed by the add-in logic before the other 90 are expired. In other words, following this same example, querying GetCount()
on the selection returns a count of "100" but executing MoveNext()
on the cursor only iterates 10 times (not 100) because by the time the 10th MoveNext call is made the other 90 rows have been expired. For this reason, it is generally recommended that streaming be stopped before any Search(...)
or Select(...)
operation.
//assume we ~are streaming~
var select = streamLayer.Select(....);
var count = select.GetCount();//eg, 100 rows are selected
int num_processed = 0;
var rc = select.Search();
while(rc.MoveNext()) {
//add-in logic to process the result
num_processed++;
}
if (num_processed != count) {
//stream velocity ended up expiring previously selected rows
...
To process rows during streaming, use Subscribe()
and/or SearchAndSubscribe()
in conjunction with a RealtimeCursor
and rtCursor.WaitForRowsAsync(...)
. Consult Row Events for more details.
Service capabilities refers to the presence of:
- An "archive" feature service used to record the latest observations (on the server).
- A "related" feature service used to capture the spatial location of stationary events and/or static or non-changing event attributes such as vehicle make, manufacturer, plane type, passenger capacity, and so on.
Consult stream service json response keepLatestArchive and relatedFeatures properties.
Stream layers with an associated feature service (archive and/or related) will automatically display event locations when a connection is established. Stream layers without a service capability will only display events when they are received via a broadcast from the stream service. This can be a little disconcerting in cases where the stream service velocity is slow or infrequent. It is possible that streaming can be started but no data initially draws on the layer because no new events have been broadcast (since the connection was established) and so the layer remains "blank".
Although service capability does not affect the functionality of the stream layer, if add-ins wish to determine the service capabilities of a given stream layer they can interrogate the layer's RealtimeFeatureClass
for its StreamServiceFeatureClassDefinition
(derived from RealtimeFeatureClassDefinition
) for an archive and/or related feature service url.
//Define an enum to represent stream layer service capabilities
internal enum StreamLayerServiceCapabilities {
None = 0,
Archive,
Related,
ArchiveAndRelated
}
//Extension method to determine the capabilities
internal static class StreamLayerExtensions {
public static StreamLayerServiceCapabilities GetServiceCapabilities(
this StreamLayer streamLayer) {
//must be on the QueuedTask
using (var fc = streamLayer.GetFeatureClass())
using (var sfcdef = fc.GetDefinition() as StreamServiceFeatureClassDefinition)
{
var url = sfcdef?.GetArchiveFeatureServiceLayerURL()?.ToString() ?? "";
var url2 = sfcdef?.GetRelatedFeatureServiceLayerURL()?.ToString() ?? "";
var has_archive = !string.IsNullOrEmpty(url);
var has_related = !string.IsNullOrEmpty(url2);
if (has_archive && has_related)
return StreamLayerServiceCapabilities.ArchiveAndRelated;
if (has_archive)
return StreamLayerServiceCapabilities.Archive;
if (has_related)
return StreamLayerServiceCapabilities.Related;
}
return StreamLayerServiceCapabilities.None;
}
...
//usage elsewhere
//On QueuedTask...
var streamLayer ... ;
var capabilities = streamLayer.GetServiceCapabilities()
Service capabilities are also visible on the "Source" tab of the layer properties within the Pro UI.
Track Aware or "tracking awareness" refers to the presence of a common identifier, a "track id" used to relate events together with the same id into "tracks". A track can represent the changing location of a vehicle (spatial track) or the changing value of a sensor (non-spatial track) depending on the nature of the stream service and its connected layer(s). In any case, the track id groups together all related observations distinguishing them from other observations for different objects. A stream layer (or Realtime feature class) cannot be "set" track aware, it is a characteristic it inherits from its source stream service. For spatial stream layers track-awareness is optional - some services are track aware, some are not - whereas for non-spatial stream layers it is required - all non-spatial stream services are track aware. To determine if a layer is track aware, query its IsTrackAware
property. To retrieve the track id field name, query streamLayer.TrackIdFieldName
.
var streamLayer = ... ;
if (streamLayer.IsTrackAware)
{
//layer is track aware
var trackField = streamLayer.TrackIdFieldName;
}
If dealing with the Realtime feature class, access the track information off the RealtimeFeatureClassDefinition
via HasTrackIDField()
and GetTrackIDField()
.
//Must be on QueuedTask
using (var rfc = streamLayer.GetFeatureClass())
using (var rfc_def = rfc.GetDefinition())
{
if (rfc_def.HasTrackIDField()) {
//Feature class is track aware
var trackField = rfc_def.GetTrackIDField();
}
}
For spatial layers (and feature classes) that are track aware, the tracking type is always TrackType.Spatial
as the track represents the changing position of an object being "tracked". For non-spatial layers (and their feature class), they must be track aware and the tracking type is always TrackType.AttributeOnly
. AttributeOnly tracks relate changing observation values for a fixed location (eg pressure, temperature, water level, or some other telemetry) rather than a changing location. The type of tracking the stream layer is using can be queried off either the stream layer via TrackType
or via the Realtime feature class RealtimeFeatureClassDefinition
and GetTrackType()
.
//Track type from the layer
var streamLayer = ...;//else where
var trackType = streamLayer.TrackType;
//TrackType.None, TrackType.AttributeOnly, TrackType.Spatial
...
//Track type from the feature class
//On the QueuedTask...
using (var fc = streamLayer.GetFeatureClass())
using (var fcdef = fc.GetDefinition() as RealtimeFeatureClassDefinition) {
var trackType = fcdef.GetTrackType();
//etc.
During streaming, new events are inserted into the in-memory Realtime feature class whilst previous event rows "get" expired and removed. The rate at which previous observations are expired and flushed from the table can be controlled by one of two settings:
- Either set a maximum count above which events are deleted
- Or set a maximum age (i.e. duration) after which events are deleted
If the layer is track aware, the maximum count or age is set per track otherwise it (count or age) applies to all the events combined. Maximum count is set via SetExpirationMaxCount(count)
. Maximum age is set via SetExpirationMaxAge(timespan)
. These methods exist on both the layer and feature class. As per Using Stream Layer vs Realtime FeatureClass remember that changing these settings via the feature class will not get persisted.
Which setting to apply is controlled via SetExpirationMethod(...)
, again available on both the layer and feature class. Call SetExpirationMethod with a value of RowExpirationMethod.MaxCount
to apply the maximum count limit whereas a value of RowExpirationMethod.MaxAge
applies the maximum age.
Maximum count:
//Must be on QueuedTask
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureCount)
streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureCount);
streamLayer.SetExpirationMaxCount(15);
//FYI
if (streamLayer.IsTrackAware)
{
//MaxCount is per track! otherwise for the entire layer
}
A similar pattern is followed for max age:
//Must be on QueuedTask
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureAge)
streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureAge);
//set to 12 hours (max is 24 hours)
streamLayer.SetExpirationMaxAge(new TimeSpan(12,0,0));
//FYI
if (streamLayer.IsTrackAware)
{
//MaxAge is per track! otherwise for the entire layer
}
Definition query expressions can be applied in three different ways:
- Use
LayerFactory
and theFeatureLayerCreationParams.DefinitionFilter
parameter to apply the filter expression at the same time the stream layer is created. - Use the (inherited)
SetDefinitionQuery(query)
method on the stream layer to apply the query on the layer. - Use the
SetFilterWhereClause(query)
method on theRealtimeFeatureClass
to apply the query on the feature class.
Definition queries (whether set on the layer or feature class) are applied on the connection to the stream service. If the stream layer is currently streaming its underlying connection is closed and a new connection is established with the query string from the definition query applied. In other words, if a layer or feature class is streaming when the query is applied, streaming is automatically stopped and restarted. When streaming is (re)started all previously retrieved observations are always automatically deleted from the feature class (see Start Streaming). Therefore, applying a definition query to a streaming layer (or feature class) truncates all existing rows. This is why, in the UI, the definition query tab shows a warning to the user if the layer is streaming.
If a definition query is applied to the feature class, via realTimeFC.SetFilterWhereClause(query)
, the query is not persisted in the layer and is not reflected in the layer UI (same as changing any setting on the feature class). Assuming the feature class was retrieved via a GetFeatureClass()
call off a stream layer, and a query applied: when the Pro project is closed and reopened, the layer connection will be reestablished either with the layer's definition query, if it had one (persisted in the project) or with no definition query. It will not be reestablished with the query that was set on its feature class.
Stream layer and Realtime feature class definition queries follow the same rules as do GeoEvent Server attribute filters. For a list of the supported operators consult the GeoEvent Server attribute filter documentation.
Stream layers and Realtime feature class are read-only. They do not support editing.
Stream layers can have up to three different renderers, one each to display current observations, previous observations and track lines (note: only spatial track aware stream layers can specifiy renderers for previous observations and track lines). When selecting renderers to apply, stream layers can use the same set of renderer choices as do "regular" feature layers with the exception of the track line renderer. The track line renderer can only use a simple renderer in the current release. Renderers are retrieved and set via GetRenderer(...)
and SetRenderer(...)
calls on the layer respectively.
The current observation renderer is the default renderer for the layer. For spatial stream layers, the current observations can be considered the latest event or observation received from the service. For non-spatial stream layers, the current observation location does not change and so the same location (for a given observation) is always drawn. Creating a renderer for the current observations follows the same basic pattern as for any feature layer:
- The renderer can be specified upfront as part of the LayerFactory CreateLayer
FeatureLayerCreationParams
. - The renderer can be applied after the layer has been created via a
SetRenderer(renderer)
call.
(The renderer can also be set on the CIM directly using the underlyingCIMFeatureLayer
layer definition and theCIMFeatureLayer.Renderer
property.)
For example, here a connection is established to a point stream layer and a simple renderer is applied for the current observations. The layer visibility is set to false:
var labus =
@"https://geoeventsample1.esri.com:6443/arcgis/rest/services/LABus/StreamServer";
var uri = new Uri(labus, UriKind.Absolute);
QueuedTask.Run(() => {
var createParams = new FeatureLayerCreationParams(uri){
IsVisible = false,
RendererDefinition = new SimpleRendererDefinition() {
SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
ColorFactory.Instance.BlueRGB,
12,
SimpleMarkerStyle.Pushpin).MakeSymbolReference()
}
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
createParams, MapView.Active.Map);
...
});
Certain of the more complex renderers, such as unique value or class breaks renderers, depend on statistics derived from the underlying layer (via streamLayer.CreateRenderer(...)
) to determine unique value classes, class breaks, and so forth. In the case of a stream layer, depending on its velocity and/or associated service capabilities, the derived unique values or class breaks computed by CreateRenderer(...)
can vary depending on when a CreateRenderer
call is made - the layer may have received only a few records vs having received many records which can affect the range of values present in the table. Add-in developers should keep this in mind if they are using CreateRenderer(...)
to "auto" assign unique value or class break ranges from the layer statistics vs. defining them by hand.
In this example, a unique value renderer is defined for a stream layer and the value classes are assigned by hand using the CIM rather than being derived from layer statistics with a CreateRenderer(...)
call:
var airTraffic =
@"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var uri = new Uri(airTraffic, UriKind.Absolute);
//Must be on QueuedTask!
var createParams = new FeatureLayerCreationParams(uri) {
IsVisible = false
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
createParams, map);
//Define the unique values by hand
var uvr = new CIMUniqueValueRenderer() {
Fields = new string[] { "ACTYPE" },
UseDefaultSymbol = true,
DefaultLabel = "Others",
DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
CIMColor.CreateRGBColor(185, 185, 185),
8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};
var classes = new List<CIMUniqueValueClass>();
//add in classes - one for ACTYPE of 727, one for DC 9. We know the data can contain
//these values even if records containing those values may not have been broadcast yet
classes.Add(
new CIMUniqueValueClass() {
Values = new CIMUniqueValue[] {
new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
Visible = true,
Label = "Boeing 727",
Symbol = SymbolFactory.Instance.ConstructPointSymbol(
ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
.MakeSymbolReference()
});
classes.Add(
new CIMUniqueValueClass() {
Values = new CIMUniqueValue[] {
new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
Visible = true,
Label = "DC 9",
Symbol = SymbolFactory.Instance.ConstructPointSymbol(
ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon)
.MakeSymbolReference()
});
//add the classes to a group
var groups = new List<CIMUniqueValueGroup>() {
new CIMUniqueValueGroup() {
Classes = classes.ToArray()
}
};
//add the groups to the renderer
uvr.Groups = groups.ToArray();
//Apply the renderer
streamLayer.SetRenderer(uvr);
streamLayer.SetVisibility(true);//turn on the layer
Note: Previous observation renderer applies to spatial track aware stream layers only.
Previous observations can be rendered along with the current observations. When a track aware stream layer is created it is always assigned a default previous observation (simple) renderer. To assign a different previous observations renderer use the stream layer CreateRenderer(RendererDefinition, FeatureRendererTarget)
and SetRenderer(CIMRenderer, FeatureRendererTarget)
overloads. Specify a target value of FeatureRendererTarget.PreviousObservations
. The existing previous observation renderer can be retrieved with streamLayer.GetRenderer(FeatureRendererTarget.PreviousObservations)
.
var streamLayer = .... ;
//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
return;
var prevRenderer = new SimpleRendererDefinition() {
SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon)
.MakeSymbolReference()
};
streamLayer.SetRenderer(
streamLayer.CreateRenderer(prevRenderer),
FeatureRendererTarget.PreviousObservations);
Previous observations visibility can be controlled via SetPreviousObservationsVisibility(true -or- false)
. The maximum number of previous observations to display can be controlled via SetPreviousObservationsCount(count)
. The count should be set to a value less than the current "GetExpirationMaxCount" setting. Setting the previous observation count equal to or greater than the current "GetExpirationMaxCount" will have no effect. It will always be capped at the max expiration count, whatever that is. Refer to Expiring Events for more information on GetExpirationMaxCount.
//The layer must be track aware and spatial for these setting to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
return;//these setting will be ignored
//On the QueuedTask
var max = streamLayer.GetExpirationMaxCount();
//assume "max" is greater than 1
streamLayer.SetPreviousObservationsCount(max -1);
if (!streamLayer.ArePreviousObservationsVisible)
streamLayer.SetPreviousObservationsVisibility(true);
Note: Track lines renderer applies to spatial track aware stream layers only.
Track lines can be drawn to connect previous observations together. This can be useful for showing the path that a moving feature has traveled. Track lines can only be rendered using a simple renderer and the line style must be solid, otherwise the renderer symbology is ignored. To change the default track lines renderer use the stream layer CreateRenderer(RendererDefinition, FeatureRendererTarget)
and SetRenderer(CIMRenderer, FeatureRendererTarget)
overloads same as can be used for the other renderers. Specify a target value of FeatureRendererTarget.TrackLines
. Retrieve the existing track lines renderer with streamLayer.GetRenderer(FeatureRendererTarget.TrackLines)
.
//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
return;//these setting will be ignored
//Must be on QueuedTask!
//Note: only a simple renderer with solid line symbol is supported for track
//line renderering
var trackRenderer = new SimpleRendererDefinition() {
SymbolTemplate = SymbolFactory.Instance.ConstructLineSymbol(
ColorFactory.Instance.BlueRGB, 2, SimpleLineStyle.Solid)
.MakeSymbolReference()
};
streamLayer.SetRenderer(
streamLayer.CreateRenderer(trackRenderer),
FeatureRendererTarget.TrackLines);
Track line visibility is controlled via SetTrackLinesVisibility(true -or- false)
.
//The layer must be track aware and spatial for this setting to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
return;//these setting will be ignored
//On the QueuedTask
if (!streamLayer.AreTrackLinesVisible)
streamLayer.SetTrackLinesVisibility(true);
Previous observation count, visibility, and track line visibility can also be set via the CIM. This may be useful if you are already manipulating the CIM definition to apply or modify the layer renderers and want to set everything as a single transaction:
//we are within a queued task...
//acquire a reference to the relevant stream layer
var streamLayer = ...;
//get the CIM Definition
var def = streamLayer.GetDefinition() as CIMFeatureLayer;
//... other CIM changes here ...
//set the number of previous observations,
//set show previous observations and track lines to true
def.PreviousObservationsCount = (int)streamLayer.GetExpirationMaxCount() - 1;
def.ShowPreviousObservations = true;
def.ShowTracks = true;
//apply the changes back
streamLayer.SetDefinition(def);
Row events occur whenever rows are inserted or deleted to or from the Realtime feature class/table. Row events can have 3 possible sources identified by the RealtimeRowSource
enum:
- RealtimeRowSource.PreExisting: Previously retrieved rows were already in the table when the subscription was made. We will explain "subscription" shortly.
- RealtimeRowSource.EventInsert: New events were broadcast from the service during streaming.
- RealtimeRowSource.EventDelete: Events were deleted from the table. This could be because rows were truncated when streaming was started, rows were explicitly truncated via the API (whether streaming is started or stopped), or previous observations expired during streaming.
To process row events, clients should follow a three step process:
- Clients subscribe for events on the feature class via the layer or feature class. Subscription returns a
RealtimeCursor
. - Clients retrieve the row events via
WaitForRowsAsync(...)
called on theRealtimeCursor
. - Clients unsubscribe from row events either explicitly via
RealtimeCursor
Unsubscribe()
or implicitly viaRealtimeCursor
Dispose()
.
Clients should also be prepared to handle cancellation of row retrieval.
To subscribe for row events, add-ins call either Subscribe(query_filter)
or SearchAndSubscribe(query_filter)
on the stream layer or Realtime feature class. An optional query filter parameter can be provided to filter row events otherwise all row events are returned. Use SearchAndSubscribe
in lieu of Subscribe(...)
to query all pre-existing rows in the Realtime feature class first before subscribing*. Subscription is independent of streaming connection state. It does not matter if the layer/feature class is streaming or not. On subscription, a RealtimeCursor
is returned to the client which is used to retrieve incoming row events.
Note: multiple subscriptions can be made to a stream layer or Realtime feature class. Each subscription results in its own RealtimeCursor
that is monitored for rows. If callers choose to use the RealtimeFeatureClass
to subscribe they have the additional flexibility of using a System.Threading.Tasks.Task
to perform the subscription, in addition to the QueuedTask
, which may be useful in various scenarios.
*Preexisting rows that were searched via SearchAndSubscribe
will be returned immediately in the initial WaitForRowsAsync
call on the Realtime row cursor.
//retrieve the stream layer and its feature class
//subscribe.
var stream_layer = ...;
var filter = new SpatialQueryFilter() { .... };
//Feature class must always be retrieved on the QueuedTask
var rfc = await QueuedTask.Run(() => stream_layer .GetFeatureClass());
//Note: we can use a System Task -not- just a QueuedTask to Subscribe
//with the feature class
await System.Threading.Tasks.Task.Run(async () => {
//Search and subscribe..
using(var rc = rfc.SearchAndSubscribe(filter, true)) {
//or... rfc.Subscribe(filter, true) //No search...
//We must still call await WaitForRowsAsync to actually ~get~ any rows.
In this snippet, because we are subscribing on the stream layer, we ~must~ use the QueuedTask
.
var stream_layer = ...;
var filter = new SpatialQueryFilter() { .... };
await QueuedTask.Run(async () => {
using(var rc = stream_layer.SearchAndSubscribe(filter, true)) {
//or... stream_layer.Subscribe(filter, true) //No search...
//etc
Once we are subscribed, we use the returned RealtimeCursor
and its WaitForRowsAsync
to (asynchronously) retrieve row events as they happen. The awaited task will complete when rows become ready in the cursor. The RealtimeCursor
follows the same semantic as a "regular" row cursor once rows are ready - namely, call MoveNext()
to iterate over the returned rows in a forward-only fashion. When the rows provided in the cursor are exhausted rowCursor.MoveNext()
returns false and the next row event(s) can be awaited by renewing the WaitForRowsAsync()
call. Clients repeat this pattern of making a WaitForRowsAsync
call to listen for; and MoveNext
to process; incoming events. Client code can process all rows retrieved in the cursor without any interference from other row events occurring back on the table. Further row events are handled via additional WaitForRowsAsync
calls.
RealtimeCursor
can either be recycling or non-recycling. A recycling cursor will overwrite the rowCursor.Current
member with each retrieved row whereas a non-recycling will maintain a unique row for each row retrieved. This is the same recycling behavior as for the ArcGIS.Core.Data.RowCursor
. Add-ins can stop listening for additional row events by not renewing the WaitForRowsAsync
call after they have exhausted the current cursor rows (via MoveNext) at which point they should unsubscribe, the topic of the next section.
//As before, retrieve the stream layer and/or its feature class
//and subscribe. We are use a System Task with the feature class...
await System.Threading.Tasks.Task.Run(async () => {
//Search and subscribe..
using(var rc = rfc.SearchAndSubscribe(filter, true)) {
//get the rows for incoming events now that we are subscribed
while (await rc.WaitForRowsAsync(CancellationToken.None)) {
//process in a forward-only fashion
while (rc.MoveNext()) {
//check RealtimeRowSource if we want to know the origin of the event
RealtimeRowSource source = rc.Current.GetRowSource();
...
//TODO, process the rows as needed
var some_val = rc.Current[field_index];//access row values as normal
...
}
}
...
As long as a RealtimeCursor is subscribed to a Realtime feature class, it will continue to receive row events until it is unsubscribed (whether the events are being processed via WaitForRowsAsync
calls or not). RealtimeCursors can either be explicitly unsubscribed by calling realtimeCursor.Unsubscribe()
(unusual) or implicitly unsubscribed by allowing the cursor to be disposed when it goes out of scope of an enclosing using(...) { }
statement (most common) [though Dispose can also be explicitly called].
If the cursor is currently awaiting rows in a WaitForRowsAsync
call and the Unsubscribe is explicitly called, the task will complete and calling MoveNext will return false. No special logic is required to handle unsubscription if it should occur while the row cursor is listening for events though typically it would occur after WaitForRowsAsync
calls have completed and the row cursor has gone out of scope (and is being disposed). Not unsubscribing the cursor can lead to the cursor's internal queue becoming full. Remember, even though the add-in may no longer be processing rows, it the cursor is still subscribed it is still receiving rows. If the cursor's queue becomes full, the RealtimeCursor will be automatically unsubscribed to prevent an internal buffer overflow condition.
Once a cursor has been unsubscribed it cannot be re-subscribed and calling WaitForRowsAsync
will result in undefined behavior - probably an exception. The same is true if WaitForRowsAsync
is called and the cursor has already been disposed.
Note: The connection state of the cursor can be tested with rc.GetState()
. This returns a RealtimeCursorState
indicating if the cursor is subscribed, unsubscribed, or was automatically unsubscribed because of an overrun.
//implicit unsubscribe (preferred)
//Search and subscribe..or subscribe
using(var rc = rfc.SearchAndSubscribe(filter, true)) {
//get the rows for incoming events now that we are subscribed
while (await rc.WaitForRowsAsync(CancellationToken.None)) {
...
//process rows here
break;//some exit condition causes us to conclude processing
}
}//<-- cursor goes out of scope. It is disposed and unsubscribed
//explicit unsubscribe and/or disposal
var rc = rfc.SearchAndSubscribe(filter, true)) {
//TODO listen for rows
}
//unsubscribe
rc.Unsubscribe();
//do something before dispose - eg check state, etc
...
rc.Dispose();
//-- or simply --
rc.Dispose();//also unsubscribes if still subscribed
An ongoing WaitForRowsAsync call can be cancelled via its CancellationToken
parameter. Use of a CancellationToken is a standard Microsoft pattern for cancelling Tasks. The CancellationToken can be cancelled explicitly by calling its parent CancellationTokenSource's Cancel()
method or by specifying a timeout in the constructor of the CancellationTokenSource which cancels the token once the duration has expired. When a Task is cancelled, it completes immediately and a TaskCanceledException is thrown indicating that the awaited method is cancelled. The task.IsCanceled
property can also be checked to determine if the task was canceled or completed successfully. Add-ins are responsible for catching the TaskCanceledException
.
//handle cancellation
using(var rc = rfc.Subscribe(filter, true)) {
//auto-cancel after 20 seconds
var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
//Handle TaskCanceledException
try
{
//pass in the cancellation token to WaitForRowsAsync
while (await rc.WaitForRowsAsync(cancel.Token)) {
//Process rows
}
}
//we must catch TaskCanceledException
catch(TaskCanceledException tce)
{
//TODO - we were cancelled
}
cancel.Dispose();
...
//to explicitly cancel....
var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
//else where...
if (_cancelWaitingForRows)
cancel.Cancel();//explicit cancellation