Moniker streaming using gRPC streams - ni/grpc-device GitHub Wiki
This page will guide you through the process of writing client application to use moniker based streaming that uses gRPC streams for streaming data. Any of the APIs that support moniker based streaming can be used in conjunction with one or more APIs from data_moniker.proto
listed below based on your application needs.
StreamRead
: Used for reading data from grpc-device serverStreamWrite
: Supports writing data to grpc-device serverStreamReadWrite
: Supports both read and write as part of single API call
Example
For this example we will look at how to write streaming application that reads data using DAQmx ReadAnalogF64
API. Refer to full example for more details.
- Initialize
NiDAQmx
stub along withDataMoniker
stub.NiDAQmx
stub will be used for invoking driver APIs where asDataMoniker
will be used for streaming the actual data.
NiDAQmx::Stub client(channel);
ni::data_monikers::DataMoniker::Stub moniker_service(channel);
- Setup the read stream and and acquire moniker that will be used for streaming the data.
::grpc::ClientContext begin_read_context;
auto begin_read_request = BeginReadAnalogF64Request{};
begin_read_request.mutable_task()->CopyFrom(daqmx_read_task);
begin_read_request.set_num_samps_per_chan(10);
begin_read_request.set_timeout(10.0);
begin_read_request.set_fill_mode(GroupBy::GROUP_BY_GROUP_BY_CHANNEL);
begin_read_request.set_array_size_in_samps(10);
auto begin_read_response = BeginReadAnalogF64Response{};
client.BeginReadAnalogF64(&begin_read_context, begin_read_request, &begin_read_response);
auto daqmx_read_moniker = new ni::data_monikers::Moniker(begin_read_response.moniker());
If you notice here, we invoke BeginReadAnalogF64
API, which is similar to ReadAnalogF64
API from DAQmx. The BeginReadAnalogF64
call allows us to configure read stream and instead of returning data immediately, returns a "moniker" which contains information related to this specific Begin* call. This moniker will be used later for streaming data in a loop.
- Acquire read stream.
::grpc::ClientContext stream_read_context;
ni::data_monikers::MonikerList read_request;
read_request.mutable_read_monikers()->AddAllocated(daqmx_read_moniker);
auto read_stream = moniker_service.StreamRead(&stream_read_context, read_request);
This piece of code passes moniker to moniker_service.StreamRead()
API and gets hold of read_stream
. If you notice, the read_request
is of type MonikerList
. Which means you can add multiple monikers acquired from different Begin* invocations to the same read_request
and it will allow you to stream data from multiple devices using single read_stream
.
[!NOTE]
The order in which monikers are added toread_request
is the order in which they will be executed when callingread_stream->Read()
later.
[!NOTE]
Make sure the monikers added toMonikerList
matches the API invoked onmoniker_service
. Request forStreamRead
should contain only read monikers (acquired fromBeginRead*
APIs),StreamWrite
requests should contain only write monikers and finallyStreamReadWrite
can contain both read and write monikers and they will be executed in order in which they are added toMonikerList
.
- Read the data.
for (int i = 0; i < NUM_ITERATIONS; i++) {
ni::data_monikers::MonikerReadResponse read_data_result;
if (read_stream->Read(&read_data_result)) {
google::protobuf::Any read_message = read_data_result.data().values(0);
MonikerReadAnalogF64Response read_analog_f64_response;
read_message.UnpackTo(&read_analog_f64_response);
print_array(read_analog_f64_response);
}
else {
std::cout << "No data available." << std::endl;
}
}
stream_read_context.TryCancel();