How We Learn Step-Level Rewards from Preferences to Solve Sparse-Reward Environments Using Online Process Reward Learning

Mastering Online Process Reward Learning (OPRL) for Sparse-Reward Reinforcement Tasks

In this comprehensive guide, we delve into the mechanics of Online Process Reward Learning (OPRL), a cutting-edge approach designed to extract dense, step-wise reward signals from trajectory preferences. This technique is particularly effective for tackling reinforcement learning challenges where rewards are infrequent or sparse. We will systematically cover the setup of the environment, the architecture of reward and policy networks, the generation of preference data, and the iterative training process. Throughout, we observe how the agent’s behavior progressively refines through preference-driven reward shaping, enabling more efficient credit assignment, accelerated learning, and robust policy optimization in complex scenarios.

Constructing the Maze Environment and Neural Architectures

Our foundation begins with defining a grid-based maze environment where the agent starts at the top-left corner and aims to reach the bottom-right goal. The maze includes strategically placed obstacles that restrict movement, creating a sparse reward landscape where the agent only receives a significant reward upon reaching the goal. States are encoded as one-hot vectors representing the agent’s position within the grid.

To model the reward and policy, we implement two neural networks. The Process Reward Model predicts dense, step-level rewards from state inputs, using a multi-layer perceptron with normalization and nonlinear activations to capture complex reward structures. The Policy Network consists of a shared backbone feeding into separate actor and critic heads, enabling the agent to learn both action distributions and value estimates simultaneously.

Agent Design: Action Selection and Trajectory Collection

The OPRL agent employs an ε-greedy policy to balance exploration and exploitation, randomly selecting actions with probability ε and otherwise sampling from the learned policy distribution. As the agent interacts with the maze, it records full trajectories comprising states, actions, and received rewards. These trajectories form the basis for generating preference pairs, which are essential for training the reward model.

Preference Generation and Reward Model Training

To learn a meaningful reward function, the agent compares pairs of trajectories, labeling which one is preferable based on cumulative returns. Using the Bradley-Terry model, the reward network is trained to predict the probability that one trajectory is better than another by comparing their aggregated process rewards. This approach transforms sparse terminal rewards into rich, differentiable feedback signals that guide the agent’s learning more effectively.

Policy Optimization with Shaped Rewards

The policy is updated using rewards shaped by the learned process reward model, combining the environment’s sparse rewards with the dense, preference-informed signals. We calculate discounted returns and advantages, normalize them for stability, and optimize the policy using a combination of policy gradient loss, value function loss, and an entropy bonus to encourage exploration. Gradient clipping ensures stable updates throughout training.

End-to-End Training Loop and Performance Monitoring

The training loop orchestrates the entire learning process over multiple episodes. Exploration rate ε decays gradually to favor exploitation as the agent improves. Preference pairs accumulate over time, enabling continuous refinement of the reward model. Both the reward model and policy network are updated iteratively, allowing the agent to adapt dynamically to the evolving reward landscape.

Periodically, the agent’s performance is evaluated by running test episodes without exploration noise, and the maze state is rendered to visualize progress. Key metrics such as episode returns, success rates, reward model loss, and policy loss are tracked and plotted to provide insights into the learning dynamics.

Visualizing Learning Progress

Graphical analysis reveals how the agent’s returns increase steadily, success rates improve, and losses for both the reward model and policy decrease over time. These trends confirm that OPRL effectively shapes the reward signal, enabling the agent to solve the sparse-reward maze more reliably and efficiently.

Summary and Future Directions

OPRL offers a powerful framework for converting sparse terminal rewards into dense, informative feedback by leveraging trajectory preferences. This method enhances credit assignment, accelerates policy learning, and stabilizes optimization in challenging environments. The approach is highly adaptable, allowing extensions to larger, more complex mazes, varying reward shaping intensities, or integration with human preference data for real-world applications.

By following this tutorial, you gain hands-on experience with OPRL’s core components and training methodology, laying the groundwork for exploring advanced reinforcement learning scenarios where traditional sparse rewards hinder progress.


Code Snippets Overview

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Maze Environment Definition
class MazeEnv:
    def __init__(self, size=8):
        self.size = size
        self.start = (0, 0)
        self.goal = (size - 1, size - 1)
        self.obstacles = {(i, size // 2) for i in range(1, size - 2)}
        self.reset()

    def reset(self):
        self.pos = self.start
        self.steps = 0
        return self._get_state()

    def _get_state(self):
        state = np.zeros(self.size * self.size)
        state[self.pos[0] * self.size + self.pos[1]] = 1
        return state

    def step(self, action):
        moves = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        new_pos = (self.pos[0] + moves[action][0], self.pos[1] + moves[action][1])
        if 0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size and new_pos not in self.obstacles:
            self.pos = new_pos
        self.steps += 1
        done = self.pos == self.goal or self.steps >= 60
        reward = 10.0 if self.pos == self.goal else 0.0
        return self._get_state(), reward, done

    def render(self):
        grid = [['.' for _ in range(self.size)] for _ in range(self.size)]
        for obs in self.obstacles:
            grid[obs[0]][obs[1]] = '█'
        grid[self.goal[0]][self.goal[1]] = 'G'
        grid[self.pos[0]][self.pos[1]] = 'A'
        return 'n'.join([''.join(row) for row in grid])

# Process Reward Model: Predicts dense rewards from states
class ProcessRewardModel(nn.Module):
    def __init__(self, state_dim, hidden=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden),
            nn.LayerNorm(hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.LayerNorm(hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1),
            nn.Tanh()
        )

    def forward(self, states):
        return self.net(states)

    def trajectory_reward(self, states):
        return self.forward(states).sum()

# Policy Network: Outputs action logits and state value estimates
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden=128):
        super().__init__()
        self.backbone = nn.Sequential(
            nn.Linear(state_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU()
        )
        self.actor = nn.Linear(hidden, action_dim)
        self.critic = nn.Linear(hidden, 1)

    def forward(self, state):
        features = self.backbone(state)
        return self.actor(features), self.critic(features)

# OPRL Agent: Combines policy and reward model with training routines
class OPRLAgent:
    def __init__(self, state_dim, action_dim, lr=3e-4):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.reward_model = ProcessRewardModel(state_dim)
        self.policy_opt = Adam(self.policy.parameters(), lr=lr)
        self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
        self.trajectories = deque(maxlen=200)
        self.preferences = deque(maxlen=500)
        self.action_dim = action_dim

    def select_action(self, state, epsilon=0.1):
        if random.random() < epsilon:
            return random.randint(0, self.action_dim - 1)
        state_t = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            logits, _ = self.policy(state_t)
            probs = F.softmax(logits, dim=-1)
            return torch.multinomial(probs, 1).item()

    def collect_trajectory(self, env, epsilon=0.1):
        states, actions, rewards = [], [], []
        state = env.reset()
        done = False
        while not done:
            action = self.select_action(state, epsilon)
            next_state, reward, done = env.step(action)
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            state = next_state
        traj = {
            'states': torch.FloatTensor(np.array(states)),
            'actions': torch.LongTensor(actions),
            'rewards': torch.FloatTensor(rewards),
            'return': float(sum(rewards))
        }
        self.trajectories.append(traj)
        return traj

    def generate_preference(self):
        if len(self.trajectories) < 2:
            return
        t1, t2 = random.sample(list(self.trajectories), 2)
        label = 1.0 if t1['return'] > t2['return'] else 0.0
        self.preferences.append({'t1': t1, 't2': t2, 'label': label})

    def train_reward_model(self, n_updates=5):
        if len(self.preferences) < 32:
            return 0.0
        total_loss = 0.0
        for _ in range(n_updates):
            batch = random.sample(list(self.preferences), 32)
            loss = 0.0
            for item in batch:
                r1 = self.reward_model.trajectory_reward(item['t1']['states'])
                r2 = self.reward_model.trajectory_reward(item['t2']['states'])
                logit = r1 - r2
                pred_prob = torch.sigmoid(logit)
                label = item['label']
                loss += -(label * torch.log(pred_prob + 1e-8) +
                          (1 - label) * torch.log(1 - pred_prob + 1e-8))
            loss = loss / len(batch)
            self.reward_opt.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
            self.reward_opt.step()
            total_loss += loss.item()
        return total_loss / n_updates

    def train_policy(self, n_updates=3, gamma=0.98):
        if len(self.trajectories) < 5:
            return 0.0
        total_loss = 0.0
        for _ in range(n_updates):
            traj = random.choice(list(self.trajectories))
            with torch.no_grad():
                process_rewards = self.reward_model(traj['states']).squeeze()
            shaped_rewards = traj['rewards'] + 0.1 * process_rewards
            returns = []
            G = 0
            for r in reversed(shaped_rewards.tolist()):
                G = r + gamma * G
                returns.insert(0, G)
            returns = torch.FloatTensor(returns)
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
            logits, values = self.policy(traj['states'])
            log_probs = F.log_softmax(logits, dim=-1)
            action_log_probs = log_probs.gather(1, traj['actions'].unsqueeze(1))
            advantages = returns - values.squeeze().detach()
            policy_loss = -(action_log_probs.squeeze() * advantages).mean()
            value_loss = F.mse_loss(values.squeeze(), returns)
            entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).mean()
            loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
            self.policy_opt.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
            self.policy_opt.step()
            total_loss += loss.item()
        return total_loss / n_updates

# Main training routine
def train_oprl(episodes=500, render_interval=100):
    env = MazeEnv(size=8)
    agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
    returns, reward_losses, policy_losses = [], [], []
    success_rate = []
    for ep in range(episodes):
        epsilon = max(0.05, 0.5 - ep / 1000)
        traj = agent.collect_trajectory(env, epsilon)
        returns.append(traj['return'])
        if ep % 2 == 0 and ep > 10:
            agent.generate_preference()
        if ep > 20 and ep % 2 == 0:
            rew_loss = agent.train_reward_model(n_updates=3)
            reward_losses.append(rew_loss)
        if ep > 10:
            pol_loss = agent.train_policy(n_updates=2)
            policy_losses.append(pol_loss)
        success = 1 if traj['return'] > 5 else 0
        success_rate.append(success)
        if ep % render_interval == 0 and ep > 0:
            test_env = MazeEnv(size=8)
            agent.collect_trajectory(test_env, epsilon=0)
            print(test_env.render())
    return returns, reward_losses, policy_losses, success_rate

# Execute training and plot results
print("Starting OPRL Agent Training on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

axes[0, 0].plot(returns, alpha=0.3)
axes[0, 0].plot(np.convolve(returns, np.ones(20) / 20, mode='valid'), linewidth=2)
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Return')
axes[0, 0].set_title('Agent Performance Over Time')
axes[0, 0].grid(alpha=0.3)

success_smooth = np.convolve(success, np.ones(20) / 20, mode='valid')
axes[0, 1].plot(success_smooth, linewidth=2, color='green')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Success Rate')
axes[0, 1].set_title('Goal Achievement Rate')
axes[0, 1].grid(alpha=0.3)

axes[1, 0].plot(rew_losses, linewidth=2, color='orange')
axes[1, 0].set_xlabel('Update Step')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].set_title('Reward Model Training Loss')
axes[1, 0].grid(alpha=0.3)

axes[1, 1].plot(pol_losses, linewidth=2, color='red')
axes[1, 1].set_xlabel('Update Step')
axes[1, 1].set_ylabel('Loss')
axes[1, 1].set_title('Policy Network Loss')
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

print("OPRL Training Completed Successfully!")
print("Demonstrated key concepts: process reward learning, preference-based shaping, and online policy updates.")

More from this stream

Recomended