How to Build an Agentic Deep Reinforcement Learning System with Curriculum Progression, Adaptive Exploration, and Meta-Level UCB Planning

Building a Self-Directed Deep Reinforcement Learning Agent

This guide demonstrates how to develop a sophisticated Deep Reinforcement Learning (DRL) framework where an agent not only learns to act within an environment but also autonomously determines its own training strategies. We implement a Dueling Double Deep Q-Network (Dueling Double DQN), introduce a progressively challenging curriculum, and incorporate adaptive exploration techniques that evolve alongside training. Central to this system is a meta-agent that orchestrates, assesses, and fine-tunes the entire learning process, transforming reinforcement learning into a strategic, self-governing workflow.

Establishing the Core Components: Environment, Network, and Replay Buffer

We begin by setting up the fundamental elements of our DRL system. This includes initializing the environment, constructing the dueling Q-network architecture, and creating a replay buffer to efficiently store experience tuples. These components lay the groundwork for the agent’s learning journey.

!pip install -q gymnasium[classic-control] torch matplotlib

import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, defaultdict
import math
import random
import matplotlib.pyplot as plt

# Set seeds for reproducibility
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

class DuelingQNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        hidden_units = 128
        self.shared_layers = nn.Sequential(
            nn.Linear(input_dim, hidden_units),
            nn.ReLU(),
        )
        self.value_stream = nn.Sequential(
            nn.Linear(hidden_units, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, 1),
        )
        self.advantage_stream = nn.Sequential(
            nn.Linear(hidden_units, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, output_dim),
        )

    def forward(self, x):
        features = self.shared_layers(x)
        value = self.value_stream(features)
        advantage = self.advantage_stream(features)
        return value + (advantage - advantage.mean(dim=1, keepdim=True))

class ExperienceReplay:
    def __init__(self, max_capacity=100000):
        self.memory = deque(maxlen=max_capacity)

    def add(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        def to_tensor(data, dtype):
            return torch.tensor(data, dtype=dtype, device=device)
        return (to_tensor(states, torch.float32),
                to_tensor(actions, torch.long),
                to_tensor(rewards, torch.float32),
                to_tensor(next_states, torch.float32),
                to_tensor(dones, torch.float32))

    def __len__(self):
        return len(self.memory)

Designing the DQN Agent: Action Selection and Learning Updates

Next, we define the agent’s behavior: how it perceives the environment, selects actions, and updates its neural network. We implement Double DQN to reduce overestimation bias, apply gradient clipping for stable training, and incorporate multiple exploration strategies such as epsilon-greedy and softmax action selection. This equips the agent with robust learning and exploration capabilities.

class DQNAgent:
    def __init__(self, input_dim, output_dim, gamma=0.99, learning_rate=1e-3, batch_size=64):
        self.q_network = DuelingQNetwork(input_dim, output_dim).to(device)
        self.target_network = DuelingQNetwork(input_dim, output_dim).to(device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.replay_buffer = ExperienceReplay()
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.gamma = gamma
        self.batch_size = batch_size
        self.step_count = 0

    def _epsilon_decay(self, step, start=1.0, end=0.05, decay_rate=8000):
        return end + (start - end) * math.exp(-step / decay_rate)

    def choose_action(self, state, mode, exploration_strategy, softmax_temperature=1.0):
        state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        with torch.no_grad():
            q_values = self.q_network(state_tensor).cpu().numpy()[0]

        if mode == "eval":
            return int(np.argmax(q_values)), None

        if exploration_strategy == "epsilon":
            epsilon = self._epsilon_decay(self.step_count)
            if random.random() < epsilon:
                return random.randrange(len(q_values)), epsilon
            return int(np.argmax(q_values)), epsilon

        if exploration_strategy == "softmax":
            scaled_logits = q_values / softmax_temperature
            probabilities = np.exp(scaled_logits - np.max(scaled_logits))
            probabilities /= probabilities.sum()
            return int(np.random.choice(len(q_values), p=probabilities)), None

        return int(np.argmax(q_values)), None

    def perform_training_step(self):
        if len(self.replay_buffer) < self.batch_size:
            return None

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        with torch.no_grad():
            next_q_values_online = self.q_network(next_states)
            best_next_actions = next_q_values_online.argmax(dim=1, keepdim=True)
            next_q_values_target = self.target_network(next_states).gather(1, best_next_actions).squeeze(1)
            targets = rewards + self.gamma * next_q_values_target * (1 - dones)

        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        loss = nn.MSELoss()(current_q_values, targets)

        self.optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(self.q_network.parameters(), max_norm=1.0)
        self.optimizer.step()

        return float(loss.item())

    def sync_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

    def execute_episodes(self, environment, num_episodes, mode, exploration_strategy):
        episode_returns = []
        for _ in range(num_episodes):
            observation, _ = environment.reset()
            done = False
            total_reward = 0.0
            while not done:
                self.step_count += 1
                action, _ = self.choose_action(observation, mode, exploration_strategy)
                next_observation, reward, terminated, truncated, _ = environment.step(action)
                done = terminated or truncated

                if mode == "train":
                    self.replay_buffer.add(observation, action, reward, next_observation, float(done))
                    self.perform_training_step()

                observation = next_observation
                total_reward += reward

            episode_returns.append(total_reward)
        return float(np.mean(episode_returns))

    def assess_performance_across_difficulties(self, difficulty_levels, episodes_per_level=5):
        performance_scores = {}
        for level_name, max_steps in difficulty_levels.items():
            env = gym.make("CartPole-v1", max_episode_steps=max_steps)
            average_return = self.execute_episodes(env, episodes_per_level, mode="eval", exploration_strategy="epsilon")
            env.close()
            performance_scores[level_name] = average_return
        return performance_scores

Introducing the Meta-Agent: Strategic Oversight of Training

To elevate the learning process, we implement a meta-agent that governs the training regimen. This meta-agent employs an Upper Confidence Bound (UCB) algorithm to dynamically select difficulty levels, training modes, and exploration methods based on historical outcomes. Through iterative meta-rounds, the meta-agent strategically guides the base agent's learning trajectory, optimizing for long-term performance gains.

class MetaAgent:
    def __init__(self, base_agent):
        self.base_agent = base_agent
        self.difficulty_settings = {
            "EASY": 100,
            "MEDIUM": 300,
            "HARD": 500,
        }
        self.training_plans = []
        for difficulty in self.difficulty_settings.keys():
            for mode in ["train", "eval"]:
                for exploration in ["epsilon", "softmax"]:
                    self.training_plans.append((difficulty, mode, exploration))

        self.plan_counts = defaultdict(int)
        self.plan_values = defaultdict(float)
        self.total_iterations = 0
        self.training_history = []

    def _ucb_value(self, plan, exploration_param=2.0):
        count = self.plan_counts[plan]
        if count == 0:
            return float("inf")
        average_value = self.plan_values[plan]
        return average_value + exploration_param * math.sqrt(math.log(self.total_iterations + 1) / count)

    def select_training_plan(self):
        self.total_iterations += 1
        ucb_scores = [self._ucb_value(plan) for plan in self.training_plans]
        best_plan_index = int(np.argmax(ucb_scores))
        return self.training_plans[best_plan_index]

    def create_environment(self, difficulty):
        max_episode_steps = self.difficulty_settings[difficulty]
        return gym.make("CartPole-v1", max_episode_steps=max_episode_steps)

    def compute_meta_reward(self, difficulty, mode, average_return):
        reward = average_return
        if difficulty == "MEDIUM":
            reward += 20
        elif difficulty == "HARD":
            reward += 50
        if mode == "eval" and difficulty == "HARD":
            reward += 50
        return reward

    def update_plan_statistics(self, plan, meta_reward):
        self.plan_counts[plan] += 1
        n = self.plan_counts[plan]
        current_mean = self.plan_values[plan]
        self.plan_values[plan] = current_mean + (meta_reward - current_mean) / n

    def conduct_meta_training(self, total_meta_rounds=30):
        evaluation_log = {"EASY": [], "MEDIUM": [], "HARD": []}
        for round_number in range(1, total_meta_rounds + 1):
            difficulty, mode, exploration = self.select_training_plan()
            env = self.create_environment(difficulty)
            episodes_to_run = 5 if mode == "train" else 3
            avg_return = self.base_agent.execute_episodes(env, episodes_to_run, mode, exploration if mode == "train" else "epsilon")
            env.close()

            if round_number % 3 == 0:
                self.base_agent.sync_target_network()

            meta_reward = self.compute_meta_reward(difficulty, mode, avg_return)
            self.update_plan_statistics((difficulty, mode, exploration), meta_reward)
            self.training_history.append((round_number, difficulty, mode, exploration, avg_return, meta_reward))

            if mode == "eval":
                evaluation_log[difficulty].append((round_number, avg_return))

            print(f"Round {round_number}: Difficulty={difficulty}, Mode={mode}, Exploration={exploration}, AvgReturn={avg_return:.2f}, MetaReward={meta_reward:.2f}")

        return evaluation_log

Executing the Training Regimen and Evaluating Performance

We integrate all components by initiating meta-training rounds where the meta-agent selects training configurations and the DQN agent carries them out. This process tracks performance evolution and the agent's adaptation to progressively complex challenges, showcasing the emergence of autonomous, long-term learning strategies.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize environment to get observation and action dimensions
temp_env = gym.make("CartPole-v1", max_episode_steps=100)
observation_dim = temp_env.observation_space.shape[0]
action_dim = temp_env.action_space.n
temp_env.close()

# Instantiate the DQN agent and meta-agent
dqn_agent = DQNAgent(observation_dim, action_dim)
meta_controller = MetaAgent(dqn_agent)

# Run meta-training rounds
evaluation_results = meta_controller.conduct_meta_training(total_meta_rounds=36)

# Final evaluation across difficulty levels
final_performance = dqn_agent.assess_performance_across_difficulties(meta_controller.difficulty_settings, episodes_per_level=10)
print("Final Performance Summary:")
for difficulty_level, score in final_performance.items():
    print(f"{difficulty_level}: {score:.2f}")

Visualizing Learning Progress Across Difficulty Levels

To better understand the agent's development, we plot average returns over meta-training rounds for each difficulty tier. These visualizations reveal learning trends, the impact of strategic meta-control, and the agent's growing proficiency in handling more challenging environments.

plt.figure(figsize=(10, 5))
colors = {"EASY": "tab:blue", "MEDIUM": "tab:orange", "HARD": "tab:red"}

for difficulty, color in colors.items():
    if evaluation_results[difficulty]:
        rounds, returns = zip(*evaluation_results[difficulty])
        plt.plot(rounds, returns, marker='o', color=color, label=difficulty)

plt.xlabel("Meta-Training Round")
plt.ylabel("Average Return")
plt.title("Performance Trends Under Meta-Agent Control")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

Summary: Empowering Agents with Strategic Self-Learning

This project illustrates the transformation of a reinforcement learning agent into a multi-level learner capable of refining its policies, modulating exploration, and autonomously selecting training strategies. The meta-agent leverages UCB-based decision-making to steer the base learner toward increasingly difficult tasks, enhancing stability and performance. This layered approach exemplifies how embedding agency within reinforcement learning frameworks fosters adaptive, self-optimizing systems that evolve intelligently over time.

More from this stream

Recomended