ProSnippets StreamLayers - Esri/arcgis-pro-sdk GitHub Wiki

Language:              C#  
Subject:               StreamLayers  
Contributor:           ArcGIS Pro SDK Team <[email protected]>  
Organization:          Esri, http://www.esri.com  
Date:                  11/7/2025  
ArcGIS Pro:            3.6  
Visual Studio:         2022  

Create Stream Layer

Create Stream Layer with URI

// Note: call within QueuedTask.Run()
  {
    var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
    var createParam = new FeatureLayerCreationParams(new Uri(url))
    {
      IsVisible = false //turned off by default
    };
    streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(createParam, map);

    //or use "original" create layer (will be visible by default)
    Uri uri = new Uri(url);
    streamLayer = LayerFactory.Instance.CreateLayer(uri, map) as StreamLayer;
    streamLayer.SetVisibility(false);//turn off
  }

Create a stream layer with a definition query

// Note: call within QueuedTask.Run()
  {
    //Must be on the QueuedTask
    var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
    var lyrCreateParam = new FeatureLayerCreationParams(new Uri(url))
    {
      IsVisible = true,
      DefinitionQuery = new DefinitionQuery(whereClause: "RWY = '29L'", name: "Runway")
    };

    streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(lyrCreateParam, map);
  }

Create a stream layer with a simple renderer

// Note: call within QueuedTask.Run()
  {
    var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/LABus/StreamServer";
    var uri = new Uri(url, UriKind.Absolute);
    var createParams = new FeatureLayerCreationParams(uri)
    {
      RendererDefinition = new SimpleRendererDefinition()
      {
        SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
                            ColorFactory.Instance.BlueRGB,
                            12,
                     SimpleMarkerStyle.Pushpin).MakeSymbolReference()
      }
    };
    streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                        createParams, map);
  }

Setting a unique value renderer for latest observations

// Note: call within QueuedTask.Run()
  {
    var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
    var uri = new Uri(url, UriKind.Absolute);

    var createParams = new FeatureLayerCreationParams(uri)
    {
      IsVisible = false
    };
    streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                        createParams, map);
    //Define the unique values by hand
    var uvr = new CIMUniqueValueRenderer()
    {
      Fields = new string[] { "ACTYPE" },
      UseDefaultSymbol = true,
      DefaultLabel = "Others",
      DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
                  CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
    };

    var classes = new List<CIMUniqueValueClass>
    {
      //add in classes - one for ACTYPE of 727, one for DC 9
      new CIMUniqueValueClass()
      {
        Values = new CIMUniqueValue[] {
                new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
        Visible = true,
        Label = "Boeing 727",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
                  ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
      },
      new CIMUniqueValueClass()
      {
        Values = new CIMUniqueValue[] {
                new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
        Visible = true,
        Label = "DC 9",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
                  ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
      }
    };
    //add the classes to a group
    var groups = new List<CIMUniqueValueGroup>()
  {
    new CIMUniqueValueGroup() {
       Classes = classes.ToArray()
    }
  };
    //add the groups to the renderer
    uvr.Groups = groups.ToArray();
    //Apply the renderer (for current observations)
    streamLayer.SetRenderer(uvr);
    streamLayer.SetVisibility(true);//turn on the layer        
  }

Stream Layer settings and properties

Find all Stream Layers that are Track Aware

var trackAwareLayers = MapView.Active.Map.GetLayersAsFlattenedList()
                             .OfType<StreamLayer>().Where(sl => sl.IsTrackAware)?.ToList();

Determine the Stream Layer type

//spatial or non-spatial?
  if (streamLayer.TrackType == TrackType.AttributeOnly)
  {
    //this is a non-spatial stream layer
  }
  else
  {
    //this must be a spatial stream layer
  }

Check the Stream Layer connection state

if (!streamLayer.IsStreamingConnectionOpen)
  // Note: call within QueuedTask.Run()
  {
    streamLayer.StartStreaming();
  }

Start and stop streaming

// Note: call within QueuedTask.Run()
  {
    //Start...
    streamLayer.StartStreaming();
    //Stop...
    streamLayer.StopStreaming();
  }

Delete all current and previous observations

// Note: call within QueuedTask.Run()
  {
    //Must be called on the feature class
    using var rfc = streamLayer.GetFeatureClass();
    rfc.Truncate();
  }

Get the Track Id Field

if (streamLayer.IsTrackAware)
  {
    var trackField = streamLayer.TrackIdFieldName;
    //TODO use the field name
  }

Get The Track Type

var trackType = streamLayer.TrackType;
  switch (trackType)
  {
    //TODO deal with tracktype
    case TrackType.None:
    case TrackType.AttributeOnly:
    case TrackType.Spatial:
      break;
  }

Set the Maximum Count of Previous Observations to be Stored in Memory

// Note: call within QueuedTask.Run()
  {
    //Set Expiration Method and Max Expiration Count
    if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureCount)
      streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureCount);
    streamLayer.SetExpirationMaxCount(15);
    //FYI
    if (streamLayer.IsTrackAware)
    {
      //MaxCount is per track! otherwise for the entire layer
    }
  }

Set the Maximum Age of Previous Observations to be Stored in Memory

// Note: call within QueuedTask.Run()
  {
    //Set Expiration Method and Max Expiration Age
    if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureAge)
      streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureAge);
    //set to 12 hours (max is 24 hours)
    streamLayer.SetExpirationMaxAge(new TimeSpan(12, 0, 0));

    //FYI
    if (streamLayer.IsTrackAware)
    {
      //MaxAge is per track! otherwise for the entire layer
    }
  }

Set Stream Layer properties via the CIM

//The layer must be track aware and spatial
  if (streamLayer.TrackType != TrackType.Spatial)
  {
    // not track aware and spatial
  }
  // Note: call within QueuedTask.Run()
  {
    //get the CIM Definition
    var def = streamLayer.GetDefinition() as CIMFeatureLayer;
    //set the number of previous observations, 
    def.PreviousObservationsCount = (int)streamLayer.GetExpirationMaxCount() - 1;
    //set show previous observations and track lines to true
    def.ShowPreviousObservations = true;
    def.ShowTracks = true;
    //commit the changes
    streamLayer.SetDefinition(def);
  }

Rendering

Defining a unique value renderer definition

// Note: call within QueuedTask.Run()
  {
    streamLayer = null;
    //https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer

    var uvrDef = new UniqueValueRendererDefinition()
    {
      ValueFields = new List<string> { "ACTYPE" },
      SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
        ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
          .MakeSymbolReference(),
      ValuesLimit = 5
    };
    //Note: CreateRenderer can only create value classes based on
    //the current events it has received
    streamLayer.SetRenderer(streamLayer.CreateRenderer(uvrDef));
  }

Setting a unique value renderer for latest observations 2

// Note: call within QueuedTask.Run()
  {
    //Define the classes by hand to avoid using CreateRenderer(...)
    CIMUniqueValueClass uvcB727 = new CIMUniqueValueClass()
    {
      Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
      Visible = true,
      Label = "Boeing 727",
      Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(255, 0, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
    };

    CIMUniqueValueClass uvcD9 = new CIMUniqueValueClass()
    {
      Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
      Visible = true,
      Label = "DC 9",
      Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(0, 255, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
    };
    //Assign the classes to a group
    CIMUniqueValueGroup uvGrp = new CIMUniqueValueGroup()
    {
      Classes = new CIMUniqueValueClass[] { uvcB727, uvcD9 }
    };
    //assign the group to the renderer
    var UVrndr = new CIMUniqueValueRenderer()
    {
      Fields = new string[] { "ACTYPE" },
      Groups = new CIMUniqueValueGroup[] { uvGrp },
      UseDefaultSymbol = true,
      DefaultLabel = "Others",
      DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
        CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
    };
    //set the renderer. Depending on the current events received, the
    //layer may or may not have events for each of the specified
    //unique value classes
    streamLayer.SetRenderer(UVrndr);
  }

Setting a unique value renderer for previous observations

//The layer must be track aware and spatial
  if (streamLayer.TrackType != TrackType.Spatial)
  {
    // not track aware and spatial
  }
  // Note: call within QueuedTask.Run()
  {
    //Define unique value classes same as we do for current observations
    //or use "CreateRenderer(...)" to assign them automatically
    CIMUniqueValueClass uvcB727Prev = new CIMUniqueValueClass()
    {
      Values = new CIMUniqueValue[] { new CIMUniqueValue() {
      FieldValues = new string[] { "B727" } } },
      Visible = true,
      Label = "Boeing 727",
      Symbol = SymbolFactory.Instance.ConstructPointSymbol(
      CIMColor.CreateRGBColor(255, 0, 0), 4, SimpleMarkerStyle.Hexagon)
      .MakeSymbolReference()
    };

    CIMUniqueValueClass uvcD9Prev = new CIMUniqueValueClass()
    {
      Values = new CIMUniqueValue[] { new CIMUniqueValue() {
      FieldValues = new string[] { "DC9" } } },
      Visible = true,
      Label = "DC 9",
      Symbol = SymbolFactory.Instance.ConstructPointSymbol(
        CIMColor.CreateRGBColor(0, 255, 0), 4, SimpleMarkerStyle.Hexagon)
        .MakeSymbolReference()
    };

    CIMUniqueValueGroup uvGrpPrev = new CIMUniqueValueGroup()
    {
      Classes = new CIMUniqueValueClass[] { uvcB727Prev, uvcD9Prev }
    };

    var UVrndr = new CIMUniqueValueRenderer()
    {
      Fields = new string[] { "ACTYPE" },
      Groups = new CIMUniqueValueGroup[] { uvGrpPrev },
      UseDefaultSymbol = true,
      DefaultLabel = "Others",
      DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
        CIMColor.CreateRGBColor(185, 185, 185), 4, SimpleMarkerStyle.Hexagon)
        .MakeSymbolReference()
    };

    streamLayer.SetRenderer(UVrndr, FeatureRendererTarget.PreviousObservations);
  }

Setting a simple renderer to draw track lines

// Note: call within QueuedTask.Run()
  {
    //The layer must be track aware and spatial
    if (streamLayer.TrackType != TrackType.Spatial)
    {
      // not track aware and spatial
    }

    //Note: only a simple renderer with solid line symbol is supported for track 
    //line renderer
    var trackRenderer = new SimpleRendererDefinition()
    {
      SymbolTemplate = SymbolFactory.Instance.ConstructLineSymbol(
          ColorFactory.Instance.BlueRGB, 2, SimpleLineStyle.Solid)
            .MakeSymbolReference()
    };
    streamLayer.SetRenderer(
         streamLayer.CreateRenderer(trackRenderer),
           FeatureRendererTarget.TrackLines);
  }

Check Previous Observation and Track Line Visibility

//The layer must be track aware and spatial for these settings
  //to have an effect
  if (streamLayer.TrackType != TrackType.Spatial)
  {
    // not track aware and spatial
  }
  // Note: call within QueuedTask.Run()
  {
    if (!streamLayer.AreTrackLinesVisible)
      streamLayer.SetTrackLinesVisibility(true);
    if (!streamLayer.ArePreviousObservationsVisible)
      streamLayer.SetPreviousObservationsVisibility(true);
  }

Make Track Lines and Previous Observations Visible

//The layer must be track aware and spatial for these settings
  //to have an effect

  if (streamLayer.TrackType != TrackType.Spatial)
  {
    // not track aware and spatial
  }

  // Note: call within QueuedTask.Run()
  {
    //Note: Setting PreviousObservationsCount larger than the 
    //"SetExpirationMaxCount()" has no effect
    streamLayer.SetPreviousObservationsCount(6);
    if (!streamLayer.AreTrackLinesVisible)
      streamLayer.SetTrackLinesVisibility(true);
    if (!streamLayer.ArePreviousObservationsVisible)
      streamLayer.SetPreviousObservationsVisibility(true);
  }

Retrieve the current observation renderer

// Note: call within QueuedTask.Run()
  {
    var renderer = streamLayer.GetRenderer();
  }

Retrieve the previous observation renderer

//The layer must be track aware and spatial
  if (streamLayer.TrackType != TrackType.Spatial)
  {
    // not track aware and spatial
  }
  // Note: call within QueuedTask.Run()
  {
    var prev_renderer = streamLayer.GetRenderer(
    FeatureRendererTarget.PreviousObservations);
  }

Retrieve the track lines renderer

//The layer must be track aware and spatial
  if (streamLayer.TrackType != TrackType.Spatial)
  {

  }

  // Note: call within QueuedTask.Run()
  {
    var track_renderer = streamLayer.GetRenderer(
      FeatureRendererTarget.TrackLines);
  }

Subscribe and SearchAndSubscribe

Search And Subscribe for Streaming Data

// Note: call within QueuedTask.Run()
  {
    //query filter can be null to search and retrieve all rows
    //true means recycling cursor
    using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (rc.WaitForRowsAsync().Result)
      {
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            //determine the origin of the row event
            switch (row.GetRowSource())
            {
              case RealtimeRowSource.PreExisting:
                //pre-existing row at the time of subscribe
                continue;
              case RealtimeRowSource.EventInsert:
                //row was inserted after subscribe
                continue;
              case RealtimeRowSource.EventDelete:
                //row was deleted after subscribe
                continue;
            }
          }
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed

    //....or....
    //Use the feature class instead of the layer
    using var rfc = streamLayer.GetFeatureClass();
    //non-recycling cursor - 2nd param "false"
    using (RealtimeCursor rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (rc.WaitForRowsAsync().Result)
      {
        //etc
      }
    }
  }

Search And Subscribe With Cancellation

// Note: call within QueuedTask.Run()
  {
    //Recycling cursor - 2nd param "true"
    //or streamLayer.Subscribe(qfilter, true) to just subscribe
    using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
    {
      //auto-cancel after 20 seconds
      var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
      //catch TaskCanceledException
      try
      {
        while (rc.WaitForRowsAsync(cancel.Token).Result)
        {
          //check for row events
          while (rc.MoveNext())
          {
            using var row = rc.Current;
            //etc
          }
        }
      }
      catch (TaskCanceledException)
      {
        //Handle cancellation as needed
      }
      cancel.Dispose();
    }
  }

Explicitly Cancel WaitForRowsAsync

RealtimeCursor rc = null;
  bool SomeConditionForCancel = false;

  //somewhere in our code we create a CancellationTokenSource
  var cancel = new CancellationTokenSource();
  //...

  //call cancel on the CancellationTokenSource anywhere in
  //the add-in, assuming the CancellationTokenSource is in scope
  if (SomeConditionForCancel)
    cancel.Cancel();//<-- will cancel the token

  //Within QueuedTask we are subscribed! streamLayer.Subscribe() or SearchAndSubscribe()
  try
  {
    //TaskCanceledException will be thrown when the token is cancelled
    while (rc.WaitForRowsAsync(cancel.Token).Result)
    {
      //check for row events
      while (rc.MoveNext())
      {
        using var row = rc.Current;
        //etc
      }
    }
  }
  catch (TaskCanceledException)
  {
    //Handle cancellation as needed
  }
  cancel.Dispose();

Realtime FeatureClass

Connect to a real-time feature class from a real-time datastore

var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";

  // Note: call within QueuedTask.Run()
  {
    var realtimeServiceConProp = new RealtimeServiceConnectionProperties(
                                   new Uri(url),
                                   RealtimeDatastoreType.StreamService
                                );
    using var realtimeDatastore = new RealtimeDatastore(realtimeServiceConProp);
    //A Realtime data store only contains **one** Realtime feature class (or table)
    var name = realtimeDatastore.GetTableNames().First();
    using (RealtimeFeatureClass realtimeFeatureClass = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass)
    {
      //feature class, by default, is not streaming (opposite of the stream layer)
      realtimeFeatureClass.StartStreaming();
      //TODO use the feature class
      //...
    }
  }

Check the Realtime Feature Class is Track Aware

// Note: call within QueuedTask.Run()
  {
    using var rfc = streamLayer.GetFeatureClass();
    using var rfc_def = rfc.GetDefinition();
    if (rfc_def.HasTrackIDField())
    {
      //Track aware
    }
  }

Get the Track Id Field from the Realtime Feature class

// Note: call within QueuedTask.Run()
  {
    using (var rfc = streamLayer.GetFeatureClass())
    using (var rfc_def = rfc.GetDefinition())
    {
      if (rfc_def.HasTrackIDField())
      {
        var fld_name = rfc_def.GetTrackIDField();

      }
    }
  }

Subscribe to Streaming Data

//Note: with feature class we can also use a System Task to subscribe and
  //process rows
  // Note: call within QueuedTask.Run()
  {
    // or var rfc = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass
    using RealtimeFeatureClass rfc = streamLayer.GetFeatureClass();
    //non-recycling cursor - 2nd param "false"
    //subscribe, pre-existing rows are not searched
    using RealtimeCursor rc = rfc.Subscribe(qfilter, false);
    SpatialQueryFilter spatialFilter = new SpatialQueryFilter();
    //waiting for new features to be streamed
    //default is no cancellation
    while (rc.WaitForRowsAsync().Result)
    {
      while (rc.MoveNext())
      {
        using (var row = rc.Current)
        {
          switch (row.GetRowSource())
          {
            case RealtimeRowSource.EventInsert:
              //getting geometry from new events as they arrive
              Polygon poly = ((RealtimeFeature)row).GetShape() as Polygon;

              //using the geometry to select features from another feature layer
              spatialFilter.FilterGeometry = poly;//project poly if needed...
              featureLayer.Select(spatialFilter);
              continue;
            default:
              continue;
          }
        }
      }
    }
    //row cursor is disposed. row cursor is unsubscribed
  }

Search Existing Data and Subscribe for Streaming Data

//Note we can use System Task with the Realtime feature class
  //for subscribe

  // Note: call within QueuedTask.Run()
  {
    using RealtimeFeatureClass rfc = streamLayer.GetFeatureClass();
    //non-recycling cursor - 2nd param "false"
    using RealtimeCursor rc = rfc.SearchAndSubscribe(qfilter, false);

    //waiting for new features to be streamed
    //default is no cancellation - use await in async method
    while (rc.WaitForRowsAsync().Result)
    {
      //pre-existing rows will be retrieved that were searched
      while (rc.MoveNext())
      {
        using RealtimeRow row = rc.Current;
        var row_source = row.GetRowSource();
        switch (row_source)
        {
          case RealtimeRowSource.EventDelete:
            //TODO - handle deletes
            break;
          case RealtimeRowSource.EventInsert:
            //TODO handle inserts
            break;
          case RealtimeRowSource.PreExisting:
            //TODO handle pre-existing rows
            break;
        }
      }
    }
    //row cursor is disposed. row cursor is unsubscribed
  }

Search And Subscribe With Cancellation 2

// Note: call within QueuedTask.Run()
  {
    using RealtimeFeatureClass rfc = streamLayer.GetFeatureClass();
    //Recycling cursor - 2nd param "true"
    using RealtimeCursor rc = rfc.SearchAndSubscribe(qfilter, true);
    //auto-cancel after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      // Use await in async method
      while (rc.WaitForRowsAsync(cancel.Token).Result)
      {
        //check for row events
        while (rc.MoveNext())
        {
          using RealtimeRow record = rc.Current;
          // Process the record
        }
      }
    }
    catch (TaskCanceledException)
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
  }
⚠️ **GitHub.com Fallback** ⚠️