Home Industries Education How to Design a Mini Reinforcement Learning Environment-Acting Agent with Intelligent Local...

How to Design a Mini Reinforcement Learning Environment-Acting Agent with Intelligent Local Feedback, Adaptive Decision-Making, and Multi-Agent Coordination

0

Building a Multi-Agent Reinforcement Learning Framework for Grid Navigation

This guide walks you through creating a compact reinforcement learning environment where multiple agents collaboratively learn to traverse a grid-based world. By integrating three distinct agent roles-an Action Agent, a Tool Agent, and a Supervisor-we demonstrate how layered decision-making, heuristic evaluation, and oversight combine to foster intelligent navigation. Throughout the process, the agents adapt their strategies, cooperate, and progressively master reaching the target while handling obstacles and uncertainty.

Designing the Grid Environment

We begin by constructing the GridWorld environment, defining the agent’s starting point, the goal location, and randomly placed obstacles. The environment maintains a record of visited cells and enforces movement constraints to ensure valid navigation. The state representation includes the agent’s current position, distance to the goal, visited cell count, and available moves, enabling dynamic interaction and feedback.

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clearoutput
import time
from collections import defaultdict

class GridWorld:
    def init(self, size=8):
        self.size = size
        self.agentpos = [0, 0]
        self.goalpos = [size - 1, size - 1]
        self.obstacles = self.createobstacles()
        self.visited = set()
        self.stepcount = 0
        self.maxsteps = size  size  2

    def createobstacles(self):
        obstacles = set()
        count = self.size
        while len(obstacles) pos = [0, 0]
        self.visited = {tuple(self.agentpos)}
        self.stepcount = 0
        return self.getstate()

    def getstate(self):
        return {
            'position': tuple(self.agentpos),
            'goal': self.goalpos,
            'distancetogoal': abs(self.agentpos[0] - self.goalpos[0]) +
                                abs(self.agentpos[1] - self.goalpos[1]),
            'visitedcount': len(self.visited),
            'steps': self.stepcount,
            'canmove': self.validmoves()
        }

    def validmoves(self):
        moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
        valid = []
        for action, delta in moves.items():
            newpos = [self.agentpos[0] + delta[0], self.agentpos[1] + delta[1]]
            if (0 pos[0] pos[1] pos) not in self.obstacles):
                valid.append(action)
        return valid

Simulating Agent Movement and Visual Feedback

Next, we define the mechanics of each step within the grid, including movement validation, collision detection, and reward assignment. The agent receives penalties for invalid moves or hitting obstacles and bonuses for exploring new cells or reaching the goal. A visualization function renders the grid, highlighting visited cells, obstacles, the goal, and the agent’s current position, providing intuitive real-time feedback on the agent’s progress.

class GridWorld(GridWorld):
    def step(self, action):
        self.stepcount += 1
        moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}

        if action not in moves:
            return self.getstate(), -1, False, "Invalid action"

        delta = moves[action]
        newpos = [self.agentpos[0] + delta[0], self.agentpos[1] + delta[1]]

        if not (0 pos[0] pos[1] getstate(), -1, False, "Hit boundary"

        if tuple(newpos) in self.obstacles:
            return self.getstate(), -1, False, "Hit obstacle"

        self.agentpos = newpos
        postuple = tuple(self.agentpos)
        reward = -0.1  # Small step penalty
        if postuple not in self.visited:
            reward += 0.5  # Reward for exploring new cell
            self.visited.add(postuple)

        done = False
        info = "Moved successfully"
        if self.agentpos == self.goalpos:
            reward += 10  # Goal reached reward
            done = True
            info = "Goal achieved!"
        elif self.stepcount >= self.maxsteps:
            done = True
            info = "Step limit reached"

        return self.getstate(), reward, done, info

    def render(self, agentfeedback=None):
        grid = np.zeros((self.size, self.size, 3))
        for cell in self.visited:
            grid[cell[0], cell[1]] = [0.6, 0.85, 1.0]  # Light blue for visited
        for obs in self.obstacles:
            grid[obs[0], obs[1]] = [0.15, 0.15, 0.15]  # Dark gray for obstacles
        grid[self.goalpos[0], self.goalpos[1]] = [0, 0.9, 0]  # Green for goal
        grid[self.agentpos[0], self.agentpos[1]] = [0.9, 0, 0]  # Red for agent

        plt.figure(figsize=(8, 8))
        plt.imshow(grid, interpolation='nearest')
        plt.title(f"Step {self.stepcount} | Visited: {len(self.visited)}/{self.size2}")
        for i in range(self.size + 1):
            plt.axhline(i - 0.5, color='gray', linewidth=0.5)
            plt.axvline(i - 0.5, color='gray', linewidth=0.5)
        if agentfeedback:
            plt.text(0.5, -1.5, agentfeedback, ha='center', fontsize=10,
                     bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8),
                     wrap=True, transform=plt.gca().transData)
        plt.axis('off')
        plt.tightlayout()
        plt.show()

Action Agent: Learning Through Q-Learning

The Action Agent employs Q-learning to decide its next move, balancing exploration and exploitation. It maintains a Q-table mapping states to action values, updating these values based on received rewards and future expected returns. This agent’s policy evolves over time, gradually favoring actions that maximize cumulative rewards.

class ActionAgent:
    def init(self):
        self.qtable = defaultdict(lambda: defaultdict(float))
        self.epsilon = 0.3  # Exploration rate
        self.alpha = 0.1    # Learning rate
        self.gamma = 0.95   # Discount factor

    def chooseaction(self, state):
        validactions = state['canmove']
        if not validactions:
            return None
        position = state['position']
        if np.random.random() actions)
            rationale = f"Exploring: randomly selected '{action}'"
        else:
            qvalues = {a: self.qtable[position][a] for a in validactions}
            action = max(qvalues, key=qvalues.get)
            rationale = f"Exploiting: selected '{action}' with Q-value {self.qtable[position][action]:.2f}"
        return action, rationale

    def learn(self, state, action, reward, nextstate):
        pos = state['position']
        nextpos = nextstate['position']
        currentq = self.qtable[pos][action]
        nextmaxq = max([self.qtable[nextpos][a] for a in nextstate['canmove']], default=0)
        updatedq = currentq + self.alpha  (reward + self.gamma  nextmaxq - currentq)
        self.qtable[pos][action] = updatedq

Tool Agent: Performance Analysis and Strategic Advice

The Tool Agent functions as an analytical layer, reviewing the agent’s recent performance and environmental context to offer strategic recommendations. It monitors proximity to the goal, exploration rates, reward trends, and movement constraints, providing actionable insights to guide the Action Agent’s behavior.

class ToolAgent:
    def analyze(self, state, actiontaken, cumulativereward, history):
        advice = []
        dist = state['distancetogoal']
        explorationratio = state['visitedcount'] / (state['steps'] + 1)

        if dist ratio  5:
            advice.append("🔍 Exploration is low. Consider trying new paths.")
        if len(history) >= 5:
            recentrewards = [h[2] for h in history[-5:]]
            avgreward = np.mean(recentrewards)
            if avgreward reward > 0.3:
                advice.append("✅ Positive progress! Keep current approach.")
        if len(state['canmove']) 

Supervisor Agent: Final Arbiter of Actions

The Supervisor Agent oversees the decision-making process, integrating the Action Agent's proposals with the Tool Agent's suggestions. It can override actions to prioritize goal-directed moves or avoid risky choices, ensuring the system's behavior remains aligned with overall objectives.

class SupervisorAgent:
    def decide(self, state, proposedaction, tooladvice):
        if not proposedaction:
            return None, "No valid actions to execute"

        finaldecision = proposedaction
        explanation = f"Action '{proposedaction}' approved"

        for tip in tooladvice:
            if "goal" in tip.lower() and "close" in tip.lower():
                direction = self.determinegoaldirection(state)
                if direction in state['canmove']:
                    finaldecision = direction
                    explanation = f"Overriding to move '{direction}' towards goal"
                    break

        return finaldecision, explanation

    def determinegoaldirection(self, state):
        pos = state['position']
        goal = state['goal']
        if goal[0] > pos[0]:
            return 'down'
        elif goal[0]  pos[1]:
            return 'right'
        else:
            return 'left'

Training Loop: Collaborative Learning in Action

We orchestrate the interaction of all agents within the environment over multiple episodes. Each episode resets the grid, and the agents iteratively select, analyze, and approve actions until the goal is reached or the step limit is exceeded. The system tracks cumulative rewards and steps, visualizing progress and learning trends to illustrate the evolving navigation capabilities.

def trainagents(episodes=5, visualize=True):
    env = GridWorld(size=8)
    actionagent = ActionAgent()
    toolagent = ToolAgent()
    supervisor = SupervisorAgent()

    rewardsperepisode = []
    stepsperepisode = []

    for ep in range(episodes):
        state = env.reset()
        totalreward = 0
        done = False
        history = []

        print(f"n{'='60}")
        print(f"Starting Episode {ep + 1} of {episodes}")
        print(f"{'='60}")

        while not done:
            actionresult = actionagent.chooseaction(state)
            if actionresult is None:
                break
            proposedaction, actionreason = actionresult

            suggestions = toolagent.analyze(state, proposedaction, totalreward, history)
            finalaction, supervisorreason = supervisor.decide(state, proposedaction, suggestions)

            if finalaction is None:
                break

            nextstate, reward, done, info = env.step(finalaction)
            totalreward += reward
            actionagent.learn(state, finalaction, reward, nextstate)
            history.append((state, finalaction, reward, nextstate))

            if visualize:
                clearoutput(wait=True)
                feedback = (f"Action Agent: {actionreason}n"
                            f"Supervisor: {supervisorreason}n"
                            f"Tool Agent: {', '.join(suggestions[:2]) if suggestions else 'No advice'}n"
                            f"Reward this step: {reward:.2f} | Total reward: {totalreward:.2f}")
                env.render(feedback)
                time.sleep(0.3)

            state = nextstate

        rewardsperepisode.append(totalreward)
        stepsperepisode.append(env.stepcount)

        print(f"nEpisode {ep + 1} completed!")
        print(f"Total Reward: {totalreward:.2f}")
        print(f"Steps Taken: {env.stepcount}")
        print(f"Unique Cells Visited: {len(env.visited)}/{env.size2}")

    plt.figure(figsize=(14, 5))
    plt.subplot(1, 2, 1)
    plt.plot(rewardsperepisode, marker='o', linestyle='-', color='blue')
    plt.title('Total Rewards per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.grid(alpha=0.3)

    plt.subplot(1, 2, 2)
    plt.plot(stepsperepisode, marker='s', linestyle='-', color='orange')
    plt.title('Steps Taken per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Number of Steps')
    plt.grid(alpha=0.3)

    plt.tightlayout()
    plt.show()

    return actionagent, toolagent, supervisor


if name == "main":
    print("🤖 Multi-Agent Reinforcement Learning: Grid Navigation System")
    print("="  60)
    print("System Components:")
    print(" • Action Agent: Learns optimal moves via Q-learning")
    print(" • Tool Agent: Provides performance insights and suggestions")
    print(" • Supervisor Agent: Oversees and finalizes action decisions")
    print("="  60)

    trainedagents = trainagents(episodes=5, visualize=True)

Summary

This project illustrates how a multi-agent reinforcement learning system can be constructed from modular components to solve a navigation task. The Action Agent incrementally improves its policy through Q-learning, the Tool Agent offers analytical feedback to refine strategies, and the Supervisor Agent ensures decisions remain goal-focused and safe. The grid world environment serves as a clear, interactive platform to observe learning, exploration, and decision-making processes in real time.

Exit mobile version