Building a Multi-Agent Reinforcement Learning Framework for Grid Navigation
This guide walks you through creating a compact reinforcement learning environment where multiple agents collaboratively learn to traverse a grid-based world. By integrating three distinct agent roles-an Action Agent, a Tool Agent, and a Supervisor-we demonstrate how layered decision-making, heuristic evaluation, and oversight combine to foster intelligent navigation. Throughout the process, the agents adapt their strategies, cooperate, and progressively master reaching the target while handling obstacles and uncertainty.
Designing the Grid Environment
We begin by constructing the GridWorld environment, defining the agent’s starting point, the goal location, and randomly placed obstacles. The environment maintains a record of visited cells and enforces movement constraints to ensure valid navigation. The state representation includes the agent’s current position, distance to the goal, visited cell count, and available moves, enabling dynamic interaction and feedback.
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clearoutput
import time
from collections import defaultdict
class GridWorld:
def init(self, size=8):
self.size = size
self.agentpos = [0, 0]
self.goalpos = [size - 1, size - 1]
self.obstacles = self.createobstacles()
self.visited = set()
self.stepcount = 0
self.maxsteps = size size 2
def createobstacles(self):
obstacles = set()
count = self.size
while len(obstacles) pos = [0, 0]
self.visited = {tuple(self.agentpos)}
self.stepcount = 0
return self.getstate()
def getstate(self):
return {
'position': tuple(self.agentpos),
'goal': self.goalpos,
'distancetogoal': abs(self.agentpos[0] - self.goalpos[0]) +
abs(self.agentpos[1] - self.goalpos[1]),
'visitedcount': len(self.visited),
'steps': self.stepcount,
'canmove': self.validmoves()
}
def validmoves(self):
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
valid = []
for action, delta in moves.items():
newpos = [self.agentpos[0] + delta[0], self.agentpos[1] + delta[1]]
if (0 pos[0] pos[1] pos) not in self.obstacles):
valid.append(action)
return valid
Simulating Agent Movement and Visual Feedback
Next, we define the mechanics of each step within the grid, including movement validation, collision detection, and reward assignment. The agent receives penalties for invalid moves or hitting obstacles and bonuses for exploring new cells or reaching the goal. A visualization function renders the grid, highlighting visited cells, obstacles, the goal, and the agent’s current position, providing intuitive real-time feedback on the agent’s progress.
class GridWorld(GridWorld):
def step(self, action):
self.stepcount += 1
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
if action not in moves:
return self.getstate(), -1, False, "Invalid action"
delta = moves[action]
newpos = [self.agentpos[0] + delta[0], self.agentpos[1] + delta[1]]
if not (0 pos[0] pos[1] getstate(), -1, False, "Hit boundary"
if tuple(newpos) in self.obstacles:
return self.getstate(), -1, False, "Hit obstacle"
self.agentpos = newpos
postuple = tuple(self.agentpos)
reward = -0.1 # Small step penalty
if postuple not in self.visited:
reward += 0.5 # Reward for exploring new cell
self.visited.add(postuple)
done = False
info = "Moved successfully"
if self.agentpos == self.goalpos:
reward += 10 # Goal reached reward
done = True
info = "Goal achieved!"
elif self.stepcount >= self.maxsteps:
done = True
info = "Step limit reached"
return self.getstate(), reward, done, info
def render(self, agentfeedback=None):
grid = np.zeros((self.size, self.size, 3))
for cell in self.visited:
grid[cell[0], cell[1]] = [0.6, 0.85, 1.0] # Light blue for visited
for obs in self.obstacles:
grid[obs[0], obs[1]] = [0.15, 0.15, 0.15] # Dark gray for obstacles
grid[self.goalpos[0], self.goalpos[1]] = [0, 0.9, 0] # Green for goal
grid[self.agentpos[0], self.agentpos[1]] = [0.9, 0, 0] # Red for agent
plt.figure(figsize=(8, 8))
plt.imshow(grid, interpolation='nearest')
plt.title(f"Step {self.stepcount} | Visited: {len(self.visited)}/{self.size2}")
for i in range(self.size + 1):
plt.axhline(i - 0.5, color='gray', linewidth=0.5)
plt.axvline(i - 0.5, color='gray', linewidth=0.5)
if agentfeedback:
plt.text(0.5, -1.5, agentfeedback, ha='center', fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8),
wrap=True, transform=plt.gca().transData)
plt.axis('off')
plt.tightlayout()
plt.show()
Action Agent: Learning Through Q-Learning
The Action Agent employs Q-learning to decide its next move, balancing exploration and exploitation. It maintains a Q-table mapping states to action values, updating these values based on received rewards and future expected returns. This agent’s policy evolves over time, gradually favoring actions that maximize cumulative rewards.
class ActionAgent:
def init(self):
self.qtable = defaultdict(lambda: defaultdict(float))
self.epsilon = 0.3 # Exploration rate
self.alpha = 0.1 # Learning rate
self.gamma = 0.95 # Discount factor
def chooseaction(self, state):
validactions = state['canmove']
if not validactions:
return None
position = state['position']
if np.random.random() actions)
rationale = f"Exploring: randomly selected '{action}'"
else:
qvalues = {a: self.qtable[position][a] for a in validactions}
action = max(qvalues, key=qvalues.get)
rationale = f"Exploiting: selected '{action}' with Q-value {self.qtable[position][action]:.2f}"
return action, rationale
def learn(self, state, action, reward, nextstate):
pos = state['position']
nextpos = nextstate['position']
currentq = self.qtable[pos][action]
nextmaxq = max([self.qtable[nextpos][a] for a in nextstate['canmove']], default=0)
updatedq = currentq + self.alpha (reward + self.gamma nextmaxq - currentq)
self.qtable[pos][action] = updatedq
Tool Agent: Performance Analysis and Strategic Advice
The Tool Agent functions as an analytical layer, reviewing the agent’s recent performance and environmental context to offer strategic recommendations. It monitors proximity to the goal, exploration rates, reward trends, and movement constraints, providing actionable insights to guide the Action Agent’s behavior.
class ToolAgent:
def analyze(self, state, actiontaken, cumulativereward, history):
advice = []
dist = state['distancetogoal']
explorationratio = state['visitedcount'] / (state['steps'] + 1)
if dist ratio 5:
advice.append("🔍 Exploration is low. Consider trying new paths.")
if len(history) >= 5:
recentrewards = [h[2] for h in history[-5:]]
avgreward = np.mean(recentrewards)
if avgreward reward > 0.3:
advice.append("✅ Positive progress! Keep current approach.")
if len(state['canmove'])
Supervisor Agent: Final Arbiter of Actions
The Supervisor Agent oversees the decision-making process, integrating the Action Agent's proposals with the Tool Agent's suggestions. It can override actions to prioritize goal-directed moves or avoid risky choices, ensuring the system's behavior remains aligned with overall objectives.
class SupervisorAgent:
def decide(self, state, proposedaction, tooladvice):
if not proposedaction:
return None, "No valid actions to execute"
finaldecision = proposedaction
explanation = f"Action '{proposedaction}' approved"
for tip in tooladvice:
if "goal" in tip.lower() and "close" in tip.lower():
direction = self.determinegoaldirection(state)
if direction in state['canmove']:
finaldecision = direction
explanation = f"Overriding to move '{direction}' towards goal"
break
return finaldecision, explanation
def determinegoaldirection(self, state):
pos = state['position']
goal = state['goal']
if goal[0] > pos[0]:
return 'down'
elif goal[0] pos[1]:
return 'right'
else:
return 'left'
Training Loop: Collaborative Learning in Action
We orchestrate the interaction of all agents within the environment over multiple episodes. Each episode resets the grid, and the agents iteratively select, analyze, and approve actions until the goal is reached or the step limit is exceeded. The system tracks cumulative rewards and steps, visualizing progress and learning trends to illustrate the evolving navigation capabilities.
def trainagents(episodes=5, visualize=True):
env = GridWorld(size=8)
actionagent = ActionAgent()
toolagent = ToolAgent()
supervisor = SupervisorAgent()
rewardsperepisode = []
stepsperepisode = []
for ep in range(episodes):
state = env.reset()
totalreward = 0
done = False
history = []
print(f"n{'='60}")
print(f"Starting Episode {ep + 1} of {episodes}")
print(f"{'='60}")
while not done:
actionresult = actionagent.chooseaction(state)
if actionresult is None:
break
proposedaction, actionreason = actionresult
suggestions = toolagent.analyze(state, proposedaction, totalreward, history)
finalaction, supervisorreason = supervisor.decide(state, proposedaction, suggestions)
if finalaction is None:
break
nextstate, reward, done, info = env.step(finalaction)
totalreward += reward
actionagent.learn(state, finalaction, reward, nextstate)
history.append((state, finalaction, reward, nextstate))
if visualize:
clearoutput(wait=True)
feedback = (f"Action Agent: {actionreason}n"
f"Supervisor: {supervisorreason}n"
f"Tool Agent: {', '.join(suggestions[:2]) if suggestions else 'No advice'}n"
f"Reward this step: {reward:.2f} | Total reward: {totalreward:.2f}")
env.render(feedback)
time.sleep(0.3)
state = nextstate
rewardsperepisode.append(totalreward)
stepsperepisode.append(env.stepcount)
print(f"nEpisode {ep + 1} completed!")
print(f"Total Reward: {totalreward:.2f}")
print(f"Steps Taken: {env.stepcount}")
print(f"Unique Cells Visited: {len(env.visited)}/{env.size2}")
plt.figure(figsize=(14, 5))
plt.subplot(1, 2, 1)
plt.plot(rewardsperepisode, marker='o', linestyle='-', color='blue')
plt.title('Total Rewards per Episode')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.grid(alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(stepsperepisode, marker='s', linestyle='-', color='orange')
plt.title('Steps Taken per Episode')
plt.xlabel('Episode')
plt.ylabel('Number of Steps')
plt.grid(alpha=0.3)
plt.tightlayout()
plt.show()
return actionagent, toolagent, supervisor
if name == "main":
print("🤖 Multi-Agent Reinforcement Learning: Grid Navigation System")
print("=" 60)
print("System Components:")
print(" • Action Agent: Learns optimal moves via Q-learning")
print(" • Tool Agent: Provides performance insights and suggestions")
print(" • Supervisor Agent: Oversees and finalizes action decisions")
print("=" 60)
trainedagents = trainagents(episodes=5, visualize=True)
Summary
This project illustrates how a multi-agent reinforcement learning system can be constructed from modular components to solve a navigation task. The Action Agent incrementally improves its policy through Q-learning, the Tool Agent offers analytical feedback to refine strategies, and the Supervisor Agent ensures decisions remain goal-focused and safe. The grid world environment serves as a clear, interactive platform to observe learning, exploration, and decision-making processes in real time.
