MCCIServerOperation - GobySoft/goby GitHub Wiki
MCCI: Server Operation
The MCCI server is the broker of all inter-client communication, including cases of inter-node communication.
Bootup
TBD: the order of these steps
-
The server configuration is loaded
- Node Address
- Soft limit on clients' requests timeouts (must be communicated on client-server handshake)
- Retention timeout for foreign data (how long the server caches incoming foreign data -- may not be necessary, and 10 seconds would be on the high side of this value)
- Distribution Schema to use
- Distribution Policy to use
-
The Distribution Schema is loaded
- the SHA-1 of the distribution schema is calculated
-
The Distribution Policy is loaded
Client Connection
TBD
Accepting and Processing Requests
Without outstanding requests, the server does nothing; it would ignore every data packet sent to it.
Note: the following algorithm might be better implemented as threads and scheduling, but it is presented as iteration in python for simple if/then/else clarity
def init():
# self.network_address is the network address
# self.unfulfilled_requests is an array of request packets
# self.already_forwarded is the set of request packets that have been forwarded
# self.latest is a 2-dimensional array of revisions, keyed on node_address and variable_id
# self.memory is an array of data packets, keyed on variable_id
# self.deliver_matching_packets() is really handled by 0mq publish-subscribe
# self.forward_request_to_clients() is handled by 0mq request-response(?)
# self.respond_to_production() is handled by 0mq request-response
def accept_production_packet(production_packet):
# convert into data packet, store in memory, and forward
new_revision = self.latest[0][production_packet.variable_id] + 1
if 0 != production_packet.response_id:
self.respond_to_production(production_packet.response_id, new_revision)
data_packet = DataPacket()
data_packet.network_address = self.network_address
data_packet.variable_id = production_packet.variable_id
data_packet.revision = new_revision
data_packet.payload = production_packet.payload
self.memory[data_packet.variable_id] = data_packet
self.latest[0][data_packet.variable_id] = new_revision
self.process_requests()
def accept_data_packet(data_packet):
# just record the latest revision and pass it along
local_rev = self.latest[data_packet.network_address][data_packet.variable_id]
new_rev = max(local_rev, data_packet.revision)
self.latest[data_packet.network_address][data_packet.variable_id] = new_rev
self.process_requests() # with the new packet, somehow
def accept_request_packet(request_packet):
# TBD: make sure request_packet.timeout is within allowable limits
self.unfulfilled_requests.append(request_packet)
def process_requests():
for request_packet in self.unfulfilled_requests:
if is_elapsed(request_packet.timeout):
# done with this request
self.unfulfilled_requests.remove(request_packet)
self.already_forwarded.remove(request_packet)
elif request_packet.network_address not in self.latest:
# do nothing -- keep waiting until the timeout just in case it shows up
elif request_packet.variable_id not in self.latest[request_packet.network_address]:
# do nothing -- keep waiting until the timeout just in case it shows up
elif -1 == request_packet.node_address or 0 == request_packet.variable_id:
# wildcard: return any matching packets to the client that sent this request
self.deliver_matching_packets(request_packet)
if request_packet in self.already_forwarded: raise Exception
elif 0 == request_packet.node_address and \
(0 == request_packet.revision or \
request_packet.revision == self.latest[0][request_packet.variable_id]):
# this means they requested the latest revision of locally-published data
# the server has this data in memory. return to client that sent this request
self.deliver_matching_packets(request_packet)
self.unfulfilled_requests.remove(request_packet) # done
if request_packet in self.already_forwarded: raise Exception
elif data_available: # if the data from another host has been delivered to this server
self.deliver_matching_packets(request_packet) # send to the client that requested
self.unfulfilled_requests.remove(request_packet) # done
self.already_forwarded.remove(request_packet)
elif request_packet not in self.already_forwarded:
self.forward_request_to_clients(request_packet)
self.already_forwarded.append(request_packet)
else:
# the data was requested but it has not arrived yet, so wait
Server-published Variables
The server publishes several variables that describe its own operation. All of these might be expanded to a set of time-varied data (e.g. last minute, last 5 minutes, last 10 minutes, all time, etc)
- The list of clients connected
- The list of local variables published
- Map of clients to the variables they publish
- Map of clients to the variables they request
- Map of variables to the clients that have published them
- Map of variables to the clients that have requested them