System UnrealPlugin TCP MultiTcpConnection - kcccr123/ue-reinforcement-learning GitHub Wiki

MultiTcpConnection

UMultiTcpConnection is a subclass of UBaseTcpConnection specifically designed for multi-environment training scenarios. It accepts one admin socket and multiple environment sockets, enabling batched or parallel reinforcement learning via MultiEnvBridge.


Overview

This connection type supports a control (admin) connection plus NumEnvironments parallel environment connections. Each outbound message to an environment is suffixed with a \n delimiter. Incoming messages are parsed per-environment, tagged with ;ENV=<id>, and combined using || for the bridge to process.


Connection Lifecycle

bool StartListening(const FString& IPAddress, int32 Port)

Creates or resets sockets, allocates EnvSockets and PartialData arrays to NumEnvironments, and binds a listening socket with a backlog of NumEnvironments + 1. On success, it spawns the accept thread by calling StartAcceptThread() and logs the outcome.

virtual bool AcceptEnvConnection(FSocket* InNewSocket) override

Handles environment socket acceptance under EnvSocketMutex lock. Finds the first free index in EnvSockets to assign InNewSocket. If full, rejects the socket. Stops the accept thread once all environment slots are filled.

virtual void CloseConnection() override

Stops the accept thread (FAcceptRunnable::Stop()), kills and deletes the thread, then closes and destroys the listening, admin, and all environment sockets. Clears EnvSockets and PartialData arrays while holding the mutex.

bool IsConnected() const override

Returns true only if the admin socket is assigned and all EnvSockets entries are non-null, indicating full connectivity.


Messaging API

bool SendMessageEnv(const FString& Data)

Parses ENV=<id> from Data to select the correct EnvSocket[id]. Appends \n, converts to UTF-8, and sends the payload. Returns true on success; logs and returns false on failure or invalid socket.

FString ReceiveMessageEnv(int32 BufSize = 1024)

Under lock, iterates each EnvSocket[i] and calls ReadFromSocket(i, EnvSocket[i], BufSize). Each resulting line is suffixed with ;ENV=i, and all lines are joined with ||. Returns an empty string if no messages are available.

bool SendMessageAdmin(const FString& Data)

Inherited from UBaseTcpConnection. Sends a UTF-8 message to the admin socket.

FString ReceiveMessageAdmin(int32 BufSize = 1024)

Inherited from UBaseTcpConnection. Receives a UTF-8 string from the admin socket.


Acceptance Thread

UMultiTcpConnection uses FAcceptRunnable to accept incoming connections asynchronously.

void StartAcceptThread()

Instantiates AcceptRunnableRef with new FAcceptRunnable(this) and creates AcceptThreadRef via FRunnableThread::Create. Thread name: "MultiEnvAcceptThread". Logs success or failure.

Accept Thread (FAcceptRunnable)

Implements FRunnable:

Constructor

FAcceptRunnable::FAcceptRunnable(UMultiTcpConnection* InOwner);

Stores the owning connection pointer.

uint32 Run()

Main loop:

  • While not stopped and Owner->GetListeningSocket() is valid:
    • Poll HasPendingConnection();
    • If true, call Owner->AcceptConnection();
    • Sleep for 0.1 seconds to avoid busy-wait.

void Stop()

Sets the stop flag (bStop) to exit the loop cleanly. Called when all envs are assigned or during shutdown.


Internal State & Members

  • int32 NumEnvironments — Number of environment connections to accept.
  • TArray<FSocket*> EnvSockets — Array of environment sockets.
  • TArray<FString> PartialData — Buffers incomplete incoming messages per environment.
  • FCriticalSection EnvSocketMutex — Protects EnvSockets and PartialData.
  • FRunnableThread* AcceptThreadRef — Thread executing FAcceptRunnable.
  • TSharedPtr<FAcceptRunnable> AcceptRunnableRef — Runnable logic for acceptance.
  • bool bStopAcceptThreadRef — Flag to signal thread termination.

Helpers

FString ReadFromSocket(int32 EnvId, FSocket* Socket, int32 BufSize)

Reads up to BufSize bytes into PartialData[EnvId], splits on \n, returns the next complete line, and retains leftover data.

int32 ExtractEnvIdFromData(const FString& Message) const

Parses the integer following ENV= in Message. Returns -1 if not found or invalid.

bool AreAllEnvsAssigned() const

Checks that no element in EnvSockets is null, indicating all slots are filled.

⚠️ **GitHub.com Fallback** ⚠️