Clock Synchronization - chitsaw/psi GitHub Wiki
In a distributed \psi system it is important to synchronize the pipeline clocks between \psi processes running on separate machines. Stream fusion, for example, depends heavily on accurate originating time stamps. A network time server may be enough in some scenarios. Another way to ensure some level of synchronization is to use the RemoteClockExporter
and RemoteClockImporter
components (in Microsoft.Psi.Remoting
).
Note: The RemoteClockExporter
and RemoteClockImporter
components below rely on TCP sockets, and all communication happens in the clear. These communication channels are not secure, and the user must ensure the security of the network as appropriate.
Exporting Clock Information
The RemoteClockExporter
listens on a TCP port (11511 by default) for importers interested in information about the machine clock. This is technically not a \psi component running within a pipeline, but is typically constructed along with a pipeline being run.
var remoteClock = new RemoteClockExporter(port: 1234);
Importing Clock Information
The RemoteClockImporter
is responsible for synchronizing a pipeline clock with a remote clock exporter. It gathers remote clock information and offsets the local pipeline clock prior to the pipeline being run.
var remoteClockImporter = new RemoteClockImporter(pipeline, "123.123.123.123", 1234);
remoteClockImporter.Connected.WaitOne();
Notice that the importer is given the address and port of the RemoteClockExporter
and that Connected.WaitOne()
must be called prior to running the pipeline to ensure that the handshake completes.
Single Clock Reference
The above implies that there is a single clock exporter in a given distributed system. Other machines involved all refer to the same clock authority. Starting multiple clock importers in a single process, each referring to different exporters, will throw an exception explaining that this is invalid. It should also be noted that the clock being exported cannot also be synchronized. That is, running a clock importer (i.e., synchronizing the pipeline clock with a remote exporter), while at the same time running a local exporter is invalid and will throw an exception.
Rendezvous
To avoid hard coding the address and port of the clock exporter, the Rendezvous system may be used to advertise and discover this information dynamically.
/// advertise MyApp endpoints (e.g. remoteClock)
var remoteClockEndpoint = remoteClock.ToRendezvousEndpoint(host: "123.123.123.123");
var process = new Rendezvous.Process("MyApp", new[] { remoteClockEndpoint });
var server = new RendezvousServer();
server.Rendezvous.TryAddProcess(process);
server.Start();
This starts a Rendezvous server and advertises a "MyApp" process with the remoteClock
endpoint information. To discover this in another process on another machine, we can use a Rendezvous client.
var client = new RendezvousClient(host: "123.123.123.123");
client.Rendezvous.ProcessAdded += (_, p) =>
{
if (p.Name == "MyApp")
{
foreach (var e in p.Endpoints)
{
if (e is Rendezvous.RemoteClockExporterEndpoint remoteClock)
{
// create remote clock importer to sync pipeline clock with remote MyApp
var remoteClockImporter = remoteClock.ToRemoteClockImporter(pipeline);
}
}
}
};
client.Connected.WaitOne();