To infinity and beyond - Skalwalker/MRLCommunication GitHub Wiki
It's simple, we kill the Pac Man
In order to create a new ghost agent, we need to declare a new class in agents.py
that implements both learn
and act
methods. The latter, act
, is called every simulation step to send an action to the simulator. learn
is called before act
during the learning phase to make the agent improve from its experience.
For instance, let's create an agent that always walk North (assuming that 'North'
is a valid representation of an action that moves it upwards in a particular simulator). This class must go inside the simulation's agents.py
file.
from multiagentrl import core
class NorthAgent(core.BaseControllerAgent):
def learn(self, state, action, reward)
def act(self, state, legal_actions, explore):
return 'North'
That's it! This agent chooses the action North
whenever requested.
We also modify the simulation's adapter.py
to send the agent class to the controller informing that we must launch one NorthAgent
instance when the simulation starts.
class ExampleExperiment(core.BaseExperiment):
def __init__(self, learn_games, test_games):
super(ExampleExperiment, self).__init__(
learn_games=learn_games,
test_games=test_games)
self.simulator = ExampleSimulator()
self.agents = [ExampleAgent()]
def execute_game(self):
# Send first state before start learning
[agent.send_state() for agent in self.agents]
while not self.simulator.is_finished():
# Receive an action for the current state
actions = [agent.receive_action() for agent in self.agents]
# Simulate one step
self.simulator.step(action)
# Update state to learn from the received reward
[agent.send_state() for agent in self.agents]
# Get reward when executing the action and reaching the new state
[agent.send_reward() for agent in self.agents]
class ExampleAgent(core.BaseAdapterAgent):
def __init__(self, agent_type, map_width, map_height):
super(ExampleAgent, self,).__init__()
self.agent_id = 0
self.agent_type = agent_type
self.map_width = map_width
self.map_height = map_height
def start_experiment(self):
message = messages.StartExperimentMessage(
agent_id=self.agent_id,
agent_team=self.agent_type,
agent_class=NorthAgent,
map_width=self.map_width,
map_height=self.map_height)
self.communicate(message)
def finish_experiment(self):
message = messages.FinishExperimentMessage(agent_id=self.agent_id)
self.communicate(message)
def start_game(self):
message = messages.StartGameMessage(agent_id=self.agent_id)
self.communicate(message)
def finish_game(self):
message = messages.FinishGameMessage(agent_id=self.agent_id)
self.communicate(message)
def send_state(self):
message = messages.StateMessage(
agent_id=self.agent_id,
state=self.state,
legal_actions=self.actions,
explore=self.is_learning)
return self.communicate(message)
def receive_action(self):
action_message = self.send_state()
self.action = action_message.action
return self.action
def send_reward(self):
if self.is_learning:
message = messages.RewardMessage(
agent_id=self.agent_id, state=self.state,
action=self.action, reward=self.reward)
self.communicate(message)
Now, our NorthAgent
is running in the controller and will select the 'North'
action at every simulation step!
I Know Kung-Fu
In It's simple, we kill the Pac-Man, we understood how to define a new ghost agent. In fact, it was pretty useless since the only action it takes is going North. Now, let's create a more complex agent, one that uses a learning algorithm to select the best possible action for the given state.
As in the previous tutorial, we define a class in the simulation's agents.py
file that implements learn
and act
. However, instead of blindly selection an action, say we want to use SARSA learning algorithm but it is not implemented in learning.py
. The first step is implementing it:
# multiagentrl/learning.py
class SARSA(BaseLearningAlgorithm):
def __init__(self):
self.previous_state = None
self.previous_action = None
def learn(self, state, action, reward):
# Incorporate learning from received state, action and reward
pass
def act(self, state):
# Select action from current policy
return action
Now, we need to create a learning agent in the simulation's agents.py
:
# examples/mymodule/agents.py
from multiagentrl import core
from multiagentrl import learning
class SARSAAgent(core.BaseControllerAgent):
def __init__(self, agent_id, ally_ids, enemy_ids):
super(SARSAAgent, self).__init__(agent_id)
self.learning = learning.SARSA()
self.exploration = exploration.EGreedy(exploration_rate=0.1)
def start_game(self):
pass
def finish_game(self):
pass
def learn(self, state, action, reward):
self.learning.learn(state, action, reward)
def act(self, state, legal_actions, explore):
action = self.learning.act(state)
if explore:
action = self.exploration.explore(action, legal_actions)
return action
Bang! You just created an agent that learns using SARSA. Now, you only have to update the adapter.py
, as in It's simple, we kill the Pac-Man, to make sure it sends the SARSAAgent
class when starting the simulation.
A Brave New World
It's possible to use Multiagent RL with different simulators by creating a new package inside the examples/
directory. It must have the following modules:
adapter.py
: Class inheriting fromBaseExperiment
, implementing simulation-specific logic to connect the simulation and Multiagent RL.agents.py
: Agents that will be instantiated by the simulation controller when running this simulation.plot.py
: Logic to plot the simulation results after running this simulation.
Adapter
The adapter
module contains an adapter class, inheriting from BaseExperiment
, which controls the flow of the simulation. It must also contain the build_parser()
function, to parse command-line arguments, and build_adapter_with_args(args)
, to build an adapter instance from the parsed arguments.
from multiagentrl import core
from multiagentrl import messages
class ExampleExperiment(core.BaseExperiment):
def __init__(self, learn_games, test_games):
super(ExampleExperiment, self).__init__(
learn_games=learn_games,
test_games=test_games)
def execute_game(self):
# Implement here the logic to run a simulation step
def build_parser():
parser = argparse.ArgumentParser(description='Run example simulation.')
parser.add_argument(
'-l', '--learn-games', dest='learn_games', type=int, default=1,
help='number of games to learn from')
parser.add_argument(
'-t', '--test-games', dest='test_games', type=int, default=1,
help='number of games to test learned policy')
return parser
def build_adapter_with_args(args):
return ExampleExperiment(
learn_games=args.learn_games,
test_games=args.test_games)
Agents
The agents
module contains agent classes to be used for action selection when the simulation is running. They must inherit from BaseControllerAgent
and implement its virtual methods, namely start_game
, stop_game
, learn
and act
.
The following agent selects random actions for every simulation step.
import random
from multiagentrl import core
class RandomAgent(core.BaseControllerAgent):
"""Agent that randomly selects an action."""
def __init__(self, agent_id, ally_ids, enemy_ids):
super(RandomAgent, self).__init__(agent_id)
def start_game(self):
pass
def finish_game(self):
pass
def learn(self, state, action, reward):
pass
def act(self, state, legal_actions, explore):
if legal_actions:
return random.choice(legal_actions)
Plot
The plot
module generate graphs from simulation results. It must contain the build_parser()
function, to parse command-line arguments, and plot(args)
, to plot graphs from the parsed arguments.
Hasta La Vista
Your use case may involve higher stakes, such as using real robots instead of simulated ones. In this case, using ROS is a nice idea, since several drivers, communication protocols, and middleware is already developed and ready to use.
Referring to A brave new world, ROS could nicely fit the example simulation loop by communicating with the real robots instead of simulating an environment. We only need to get or estimate the state somehow and send the selected actions through ROS to the robots.
from multiagentrl import core
from multiagentrl import messages
class ExampleExperiment(core.BaseExperiment):
def __init__(self, learn_games, test_games):
super(ExampleExperiment, self).__init__(
learn_games=learn_games,
test_games=test_games)
self.ros = ROSCommunicationChannel()
def execute_game(self):
# Send first state before start learning
[agent.send_state() for agent in self.agents]
while True:
# Receive an action for the current state
actions = [agent.receive_action() for agent in self.agents]
# Communicate with ROS
self.ros.send_actions(actions)
self.ros.receive_states()
# Update state to learn from the received reward
[agent.send_state() for agent in self.agents]
# Get reward when executing the action and reaching the new state
[agent.send_reward() for agent in self.agents]
Now, it would be possible to see the action in real-time in the real world!