Moniker streaming using sideband mechanism - ni/grpc-device GitHub Wiki
This page contains an example of how to use sideband mechanism with moniker based streaming for streaming data to and from grpc-device server.
Sideband mechanism uses raw sockets instead of gRPC streams for streaming data to and from server. In most cases gRPC streaming support is sufficient. Scenarios where large amounts of data (>1-2GB/s) needs to be streamed, or the communication needs to happen with very low latency (<30us), sideband mechanism can help achieve higher throughput compared to using gRPC streams.
Setup
For streaming using sideband mechanism, client applications need to make use of additional library ni_grpc_sideband. This is available from ni/grpc-sideband repo on GitHub. You can either build ni_grpc_sideband library separate and load it dynamically in your client application, or statically link to the library generated from grpc-sideband repo.
Adding Client Code for Sideband Streaming
For this example, we will write a client code for streaming using DAQmx APIs WriteAnalogF64 and ReadAnalogF64, which are part of the sideband streaming functionality.
These APIs utilize sideband streaming to transfer data between the client and the server. To see an example that uses gRPC streaming, refer to the Setup a Client for Streaming.
Below is the client code for initiating a sideband stream using these APIs:
// Setup Daqmx task and request parameters.
auto request_write = BeginWriteAnalogF64Request{};
auto request_read = BeginReadAnalogF64Request{};
StubPtr stub = std::make_unique<NiDAQmx::Stub>(grpc::CreateChannel(target_str, creds)); // Create a gRPC stub to access DAQmx API functions over gRPC.
ni::data_monikers::DataMoniker::Stub moniker_service(grpc::CreateChannel(target_str, creds)); //Create a Data Moniker stub to interact with moniker API for streaming data
std::vector<pb::float64> write_data_float64 = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
auto response_write = BeginWriteAnalogF64Response{};
stub->BeginWriteAnalogF64(&context, request_write, &response_write);
auto response_read = BeginReadAnalogF64Response{};
stub->BeginReadAnalogF64(&context, request_read, &response_read);
auto write_moniker_f64 = new ni::data_monikers::Moniker(response_write.moniker());
auto read_moniker_f64 = new ni::data_monikers::Moniker(response_read.moniker());
grpc::ClientContext moniker_context;
ni::data_monikers::BeginMonikerSidebandStreamRequest sideband_request;
ni::data_monikers::BeginMonikerSidebandStreamResponse sideband_response;
sideband_request.set_strategy(ni::data_monikers::SidebandStrategy::SOCKETS);
sideband_request.mutable_monikers()->mutable_read_monikers()->AddAllocated(read_moniker_f64 );
sideband_request.mutable_monikers()->mutable_write_monikers()->AddAllocated(write_moniker_f64 );
auto write_stream = moniker_service.BeginSidebandStream(&moniker_context, sideband_request, &sideband_response);
auto sideband_token = InitClientSidebandData(sideband_response);
for (int i = 0; i < 5; i++) {
nifpga_grpc::MonikerWriteAnalogF64Request write_values_array_f64;
write_values_array_f64.mutable_array()->Add(write_data_float64.begin(), write_data_float64.end());
ni::data_monikers::SidebandWriteRequest write_data_request;
write_data_request.mutable_values()->add_values()->PackFrom(write_values_array_f64);
WriteSidebandMessage(sideband_token, write_data_request);
nifpga_grpc::MonikerReadAnalogF64Response read_values_array_f64;
ni::data_monikers::SidebandReadResponse read_result;
ReadSidebandMessage(sideband_token, &read_result);
read_result.values().values(0).UnpackTo(&read_values_array_f64);
}
ni::data_monikers::SidebandWriteRequest cancel_request;
cancel_request.set_cancel(true);
WriteSidebandMessage(sideband_token, cancel_request);
CloseSidebandData(sideband_token);auto request_read = BeginReadAnalogF64Request{};
Code Explanation
We will now break down the client code into smaller parts, explaining each component:
Prepare Write Request
auto request_write = BeginWWriteAnalogF64Request{};
auto request_write = BeginWriteAnalogF64Request{};: This creates an object ofBeginWriteAnalogF64Requestwhich is defined in thegrpc.pb.hfile (generated from the.protofile).
Call Write API
auto response_write = BeginWriteAnalogF64 Response{};
stub->BeginWriteAnalogF64 (&context, request_write, &response);
-
This sends the write request to the server using the
BeginWriteAnalogF64API. -
The response is stored in the response object. The stub represents the gRPC client connection to the server, which processes the request.
Similarly, we will prepare the read request, call BeginReadAnalogF64 and store the response.
Create Monikers for Write and Read
auto write_moniker_f64 = new ni::data_monikers::Moniker(response_write.moniker());
auto read_moniker_f64 = new ni::data_monikers::Moniker(response_read.moniker());
Here, we create two Moniker objects: one for the write data and one for the read data. These monikers are created from the responses received from the write and read operations.
Prepare Sideband Stream Request
grpc::ClientContext moniker_context;
ni::data_monikers::BeginMonikerSidebandStreamRequest sideband_request;
ni::data_monikers::BeginMonikerSidebandStreamResponse sideband_response;
sideband_request.set_strategy(ni::data_monikers::SidebandStrategy::SOCKETS);
sideband_request.mutable_monikers()->mutable_read_monikers()->AddAllocated(read_moniker_f64);
sideband_request.mutable_monikers()->mutable_write_monikers()->AddAllocated(write_moniker_f64);
-
A
BeginMonikerSidebandStreamRequestis created to initiate a sideband stream. This request includes the monikers for both the read and write data, which were created earlier. The strategy is set toSOCKETS, indicating the use of sockets for the sideband stream. -
The
mutable_monikers()method accesses the Monikers field of the request. -
The
mutable_read_monikers()andmutable_write_monikers()methods add the previously created monikers for the read and write streams, respectively. These monikers uniquely identify the data streams to be associated with the sideband connection.
Start the Sideband Stream
auto write_stream = moniker_service.BeginSidebandStream(&moniker_context, sideband_request, &sideband_response);
auto sideband_token = InitClientSidebandData(sideband_response);
The sideband stream is initiated by calling BeginSidebandStream. A sideband token is generated using InitClientSidebandData, which will be used to manage the data during streaming.
Streaming in a Loop
for (int i = 0; i < 5; i++) {
nifpga_grpc::MonikerWriteArrayI16Request write_values_array_f64;
write_values_array_f64.mutable_array()->Add(write_data_float64.begin(),
write_data_float64.end());
ni::data_monikers::SidebandWriteRequest write_data_request;
write_data_request.mutable_values()->add_values()->PackFrom(write_values_array_f64);
WriteSidebandMessage(sideband_token, write_data_request);
nifpga_grpc::MonikerReadAnalogF64Response read_values_array_f64;
ni::data_monikers::SidebandReadResponse read_result;
ReadSidebandMessage(sideband_token, &read_result);
read_result.values().values(0).UnpackTo(&read_values_array_f64);
}
-
Prepare the data to be written
- A
MonikerWriteAnalogF64Requestobject is created to hold the write data for the current iteration. - The
mutable_array()method provides access to the data array, and theAdd()function appends the contents ofwrite_data_float64(a pre-defined vector) to this array. This prepares the data to be streamed to the server.
- A
-
Pack the write data into a sideband write request
- A
SidebandWriteRequestobject is created to encapsulate the write data for the sideband stream. - The
mutable_values()method provides access to thevaluesfield, which is used to store the packed data. PackFrom()serializes thewrite_values_array_f64data.
- A
-
Send the packed write data to the sideband stream
- The
WriteSidebandMessage()function sends thewrite_data_requestto the server using the sideband stream identified bysideband_token.
- The
-
Prepare a container for the read response
read_values_array_f64: This will store the unpacked read data.read_result: This will store the raw response data from the server's sideband stream.
-
Read the response data from the sideband stream
- The
ReadSidebandMessage()function fetches data from the sideband stream, using thesideband_tokento identify the stream. - The server sends the data it has read, which is stored in
read_resultin its raw packed form.
- The
-
Unpack the read response
- The
values()method ofread_resultaccesses the packed data from the sideband stream. - The
UnpackTo()method deserializes the packed data fromread_resultand stores it inread_values_array_f64. - At this point, the client has successfully read and unpacked the data sent by the server.
- The