Tutorial Implementing Communication - liuli-neko/NekoProtoTools GitHub Wiki

This tutorial demonstrates how to use NekoProtoTools' communication layer to easily send and receive your custom protocol messages over a network connection (specifically TCP in this example).

The communication layer builds upon the protocol messages you defined using NEKO_DECLARE_PROTOCOL (see Tutorial: Defining Protocol Messages) and handles the low-level details of:

  1. Framing: Automatically adding message type information and length headers to the data stream.
  2. Serialization: Converting your IProto objects into bytes using their default serializer before sending.
  3. Deserialization: Reading framed data from the network, identifying the message type using the ProtoFactory, and deserializing the bytes back into the correct IProto object.

This is achieved primarily through the ProtoStreamClient class, which wraps an underlying Ilias stream (like TcpClient).

Prerequisites

  • You understand how to define protocol messages with NEKO_SERIALIZER and NEKO_DECLARE_PROTOCOL.
  • Basic familiarity with asynchronous programming using C++ coroutines (co_await, ilias::Task).
  • NekoProtoTools installed with the communication feature enabled in your xmake.lua:
    add_requires("neko-proto-tools", {
        configs = {
            -- Must be true
            enable_communication = true,
            -- Need a JSON backend (or Binary, if protocols use BinarySerializer)
            enable_rapidjson = true, -- Or enable_simdjson = true
            -- Optional, but helpful for Ilias/NekoProtoTools logging
            enable_fmt = true
        }
    })
  • The Ilias library is required (xmake will handle downloading it if enable_communication = true).

Includes

You'll need headers for NekoProtoTools protocols, communication, the chosen serializer, Ilias networking and tasks, plus standard library types.

// NekoProtoTools Core & Types
#include <nekoproto/proto/proto_base.hpp>
#include <nekoproto/proto/json_serializer.hpp> // Or your chosen default serializer
#include <nekoproto/proto/types/string.hpp>    // Include for types used in messages
#include <nekoproto/proto/types/vector.hpp>    // If applicable
#include <nekoproto/communication/communication_base.hpp> // Core communication header

// Ilias Library (Dependency)
#include <ilias/net.hpp>         // For TcpListener, TcpClient, IPEndpoint
#include <ilias/platform.hpp>  // For PlatformContext (event loop)
#include <ilias/task.hpp>        // For ilias::Task, co_await, ilias_go

// Standard Library
#include <iostream>
#include <vector>
#include <string>
#include <chrono> // For timestamps in the example
#include <cstdint> // For uint64_t

Step 1: Define the Protocol Message

Let's create a simple ChatMessage protocol that we want to send between a client and server.

NEKO_USE_NAMESPACE // Use NekoProto namespace (optional, but convenient)

// Define the data structure for our chat message
class ChatMessage {
public:
    uint64_t    timestamp;
    std::string sender;
    std::string content;

    // Mark members for serialization
    NEKO_SERIALIZER(timestamp, sender, content);
    // Declare as a protocol using JsonSerializer by default
    NEKO_DECLARE_PROTOCOL(ChatMessage, JsonSerializer);
};

Step 2: Implement the Server

The server will listen for incoming connections, receive a ChatMessage, print its contents, and send a reply back. We'll implement this logic within an Ilias coroutine (ilias::Task<>).

using namespace ILIAS_NAMESPACE; // Use ilias namespace

// Server logic as an Ilias coroutine Task
ilias::Task<> server_task(PlatformContext& ioContext, ProtoFactory& protoFactory) {
    try {
        TcpListener listener(ioContext, AF_INET); // Create a TCP listener (IPv4)
        listener.bind(IPEndpoint("127.0.0.1", 12345)); // Bind to localhost port 12345
        std::cout << "Server listening on 127.0.0.1:12345" << std::endl;

        // Wait for a client to connect
        auto accept_result = co_await listener.accept();
        if (!accept_result) {
            std::cerr << "Server failed to accept connection: " << accept_result.error().message() << std::endl;
            co_return;
        }
        TcpClient client_conn = std::move(accept_result.value().first); // Get the client connection socket
        std::cout << "Server: Client connected." << std::endl;

        // *** Wrap the TCP connection with ProtoStreamClient ***
        // This handles framing, serialization/deserialization for us.
        ProtoStreamClient<TcpClient> proto_stream(protoFactory, ioContext, std::move(client_conn));

        // *** Receive a protocol message ***
        std::cout << "Server: Waiting for message..." << std::endl;
        auto recv_result = co_await proto_stream.recv(); // Asynchronously wait for a framed message

        if (recv_result) {
            IProto received_proto = std::move(recv_result.value()); // Get the received IProto object
            std::cout << "Server received protocol: " << received_proto.name() << std::endl;

            // Check if it's the type we expect and process it
            auto msg = received_proto.cast<ChatMessage>();
            if (msg) {
                std::cout << "  Timestamp: " << msg->timestamp << std::endl;
                std::cout << "  Sender: " << msg->sender << std::endl;
                std::cout << "  Content: '" << msg->content << "'" << std::endl;

                // *** Send a reply back ***
                ChatMessage reply_msg;
                reply_msg.timestamp = std::chrono::system_clock::now().time_since_epoch().count();
                reply_msg.sender = "Server";
                reply_msg.content = "Message received loud and clear!";

                std::cout << "Server: Sending reply..." << std::endl;
                // Send the reply (makeProto creates the IProto wrapper)
                auto send_reply_result = co_await proto_stream.send(reply_msg.makeProto());
                 if (!send_reply_result) {
                     std::cerr << "Server failed to send reply: " << send_reply_result.error().message() << std::endl;
                 } else {
                    std::cout << "Server: Reply sent." << std::endl;
                 }

            } else {
                 std::cerr << "Server: Received unexpected protocol type: " << received_proto.name() << std::endl;
            }
        } else {
            std::cerr << "Server failed to receive message: " << recv_result.error().message() << std::endl;
        }

        // Close the connection
        co_await proto_stream.close();
        listener.close();
        std::cout << "Server task finished." << std::endl;

    } catch (const std::exception& e) {
        std::cerr << "Server error: " << e.what() << std::endl;
    }
}

Key Points (Server):

  • TcpListener waits for connections.
  • ProtoStreamClient<TcpClient> wraps the accepted TcpClient. The protoFactory is passed so ProtoStreamClient knows how to create protocol instances during deserialization.
  • co_await proto_stream.recv() reads data, identifies the protocol type header, reads the correct number of bytes, uses the protoFactory to create an empty instance of the correct type (ChatMessage in this case), and deserializes the data into it, returning an IProto.
  • co_await proto_stream.send(reply_msg.makeProto()) takes an IProto, serializes it using its default serializer, adds the framing headers (type ID, length), and sends the bytes over the TcpClient.

Step 3: Implement the Client

The client will connect to the server, send a ChatMessage, and wait for the reply.

using namespace ILIAS_NAMESPACE; // Use ilias namespace

// Client logic as an Ilias coroutine Task
ilias::Task<> client_task(PlatformContext& ioContext, ProtoFactory& protoFactory) {
    try {
        TcpClient tcpClient(ioContext, AF_INET); // Create a TCP client (IPv4)

        // Connect to the server
        std::cout << "Client: Connecting to 127.0.0.1:12345..." << std::endl;
        auto conn_result = co_await tcpClient.connect(IPEndpoint("127.0.0.1", 12345));
        if (!conn_result) {
            std::cerr << "Client failed to connect: " << conn_result.error().message() << std::endl;
            co_return;
        }
        std::cout << "Client: Connected to server." << std::endl;

        // *** Wrap the TCP connection with ProtoStreamClient ***
        ProtoStreamClient<TcpClient> proto_stream(protoFactory, ioContext, std::move(tcpClient));

        // Prepare the message to send
        ChatMessage msg_to_send;
        msg_to_send.timestamp = std::chrono::system_clock::now().time_since_epoch().count();
        msg_to_send.sender = "Client Alpha";
        msg_to_send.content = "Hello from the client!";

        // *** Send the protocol message ***
        std::cout << "Client: Sending message..." << std::endl;
        // Use makeProto() to get the IProto wrapper
        // 'SerializerInThread' is an optional flag to potentially perform serialization
        // in a background thread (managed by Ilias/NekoProtoTools), useful for large messages.
        auto send_result = co_await proto_stream.send(msg_to_send.makeProto(), ProtoStreamClient<TcpClient>::SerializerInThread);
        if (!send_result) {
             std::cerr << "Client failed to send message: " << send_result.error().message() << std::endl;
             co_await proto_stream.close();
             co_return;
        }
        std::cout << "Client: Message sent." << std::endl;

        // *** Receive the reply from the server ***
        std::cout << "Client: Waiting for reply..." << std::endl;
        auto recv_result = co_await proto_stream.recv();
        if (recv_result) {
            IProto received_proto = std::move(recv_result.value());
            std::cout << "Client received protocol: " << received_proto.name() << std::endl;

            // Process the reply
            auto reply_msg = received_proto.cast<ChatMessage>();
            if (reply_msg) {
                 std::cout << "  Reply Content: '" << reply_msg->content << "'" << std::endl;
            } else {
                 std::cerr << "Client: Received unexpected protocol type: " << received_proto.name() << std::endl;
            }
        } else {
             std::cerr << "Client failed to receive reply: " << recv_result.error().message() << std::endl;
        }

        // Close the connection
        co_await proto_stream.close();
        std::cout << "Client task finished." << std::endl;

    } catch (const std::exception& e) {
        std::cerr << "Client error: " << e.what() << std::endl;
    }
}

Key Points (Client):

  • TcpClient connects to the server address.
  • ProtoStreamClient<TcpClient> wraps the connected TcpClient.
  • msg_to_send.makeProto() creates the IProto instance.
  • co_await proto_stream.send(...) serializes and sends the framed message.
  • co_await proto_stream.recv() waits for, receives, and deserializes the server's reply.

Step 4: Run the Client and Server

The main function sets up the Ilias PlatformContext (which manages the event loop and background threads for async operations) and the ProtoFactory. It then launches the server and client tasks using ilias_go and runs the event loop.

int main() {
    PlatformContext ioContext;    // Ilias event loop and execution context
    ProtoFactory protoFactory(1, 0, 0); // Protocol factory (must know about ChatMessage)

    std::cout << "Starting server and client tasks..." << std::endl;

    // Launch the server coroutine task
    ilias_go server_task(ioContext, protoFactory);

    // Launch the client coroutine task
    ilias_go client_task(ioContext, protoFactory);

    // Run the Ilias event loop until all tasks are complete
    ioContext.run();

    std::cout << "All tasks finished." << std::endl;
    return 0;
}

Compile and Run

  1. Save the code (e.g., communication_tutorial.cpp).
  2. Make sure your xmake.lua includes the target and dependencies as shown in the Prerequisites.
  3. Compile: xmake build communication_tutorial
  4. Run: xmake run communication_tutorial

You should see output similar to this (order might vary slightly due to concurrency):

Starting server and client tasks...
Server listening on 127.0.0.1:12345
Client: Connecting to 127.0.0.1:12345...
Server: Client connected.
Server: Waiting for message...
Client: Connected to server.
Client: Sending message...
Client: Message sent.
Client: Waiting for reply...
Server received protocol: ChatMessage
  Timestamp: 1678886400123456789 // Example timestamp
  Sender: Client Alpha
  Content: 'Hello from the client!'
Server: Sending reply...
Server: Reply sent.
Server task finished.
Client received protocol: ChatMessage
  Reply Content: 'Message received loud and clear!'
Client task finished.
All tasks finished.

Summary

The ProtoStreamClient (and its UDP counterpart ProtoDatagramClient, not shown here) significantly simplifies network communication by handling the repetitive tasks of message framing, serialization, and deserialization based on the protocols you define with NEKO_DECLARE_PROTOCOL. It integrates seamlessly with the Ilias asynchronous framework for efficient network I/O.

Next Steps

  • Explore UDP communication: (See ProtoDatagramClient - documentation pending)
  • Learn more about Ilias: Ilias GitHub Repository
⚠️ **GitHub.com Fallback** ⚠️