How to Build a Neuro-Symbolic Hybrid Agent that Combines Logical Planning with Neural Perception for Robust Autonomous Decision-Making

Building a Robust Hybrid Agent: Merging Symbolic Reasoning with Neural Networks

This guide explores the fusion of symbolic logic and neural learning to create an advanced hybrid agent. We develop a neuro-symbolic framework where classical planning provides structured, rule-based, goal-oriented behavior, while neural networks enhance perception and fine-tune actions. By examining the code, we observe the dynamic interplay between these components, enabling the agent to navigate complex environments, manage uncertainty, and adapt intelligently. Ultimately, this approach combines interpretability, resilience, and flexibility within a unified agent architecture.

Defining the Symbolic Planning Framework

We start by establishing the symbolic reasoning core, defining states, actions, and transitions. Using A* search, we implement classical planning to generate clear, goal-driven action sequences. This rule-based system forms the backbone of the agent’s high-level decision-making process.

import numpy as np
from dataclasses import dataclass, field
from typing import Tuple, Set, Optional, List, Dict

@dataclass
class State:
    robot_pos: Tuple[int, int]
    holding: Optional[str] = None
    visited: Set[Tuple[int, int]] = field(default_factory=set)
    objects_collected: Set[str] = field(default_factory=set)

    def __hash__(self):
        return hash((self.robot_pos, self.holding))

class SymbolicPlanner:
    def __init__(self, grid_size: int = 8):
        self.grid_size = grid_size
        self.actions = ['up', 'down', 'left', 'right', 'pickup', 'drop']

    def get_successors(self, state: State, obstacles: Set[Tuple[int, int]], objects: Dict[str, Tuple[int, int]]) -> List[Tuple[str, State]]:
        successors = []
        x, y = state.robot_pos
        moves = {'up': (x, y-1), 'down': (x, y+1), 'left': (x-1, y), 'right': (x+1, y)}

        for action, (nx, ny) in moves.items():
            if 0 <= nx < self.grid_size and 0 <= ny < self.grid_size and (nx, ny) not in obstacles:
                new_state = State((nx, ny), state.holding, state.visited | {(nx, ny)}, state.objects_collected.copy())
                successors.append((action, new_state))

        if state.holding is None:
            for obj_name, obj_pos in objects.items():
                if state.robot_pos == obj_pos and obj_name not in state.objects_collected:
                    new_state = State(state.robot_pos, obj_name, state.visited.copy(), state.objects_collected.copy())
                    successors.append(('pickup', new_state))

        if state.holding is not None:
            new_state = State(state.robot_pos, None, state.visited.copy(), state.objects_collected | {state.holding})
            successors.append(('drop', new_state))

        return successors

    def heuristic(self, state: State, goal: Tuple[int, int]) -> float:
        return abs(state.robot_pos[0] - goal[0]) + abs(state.robot_pos[1] - goal[1])

    def a_star_plan(self, start_state: State, goal: Tuple[int, int], obstacles: Set[Tuple[int, int]], objects: Dict[str, Tuple[int, int]]) -> List[str]:
        frontier = [(self.heuristic(start_state, goal), 0, 0, start_state, [])]
        visited = set()
        counter = 0

        while frontier:
            frontier.sort()
            _, _, cost, current_state, plan = frontier.pop(0)
            counter += 1

            if current_state.robot_pos == goal and len(current_state.objects_collected) >= len(objects):
                return plan

            state_key = (current_state.robot_pos, current_state.holding)
            if state_key in visited:
                continue
            visited.add(state_key)

            for action, next_state in self.get_successors(current_state, obstacles, objects):
                new_cost = cost + 1
                new_plan = plan + [action]
                priority = new_cost + self.heuristic(next_state, goal)
                frontier.append((priority, counter, new_cost, next_state, new_plan))
                counter += 1

        return []

Incorporating Neural Networks for Perception and Policy

Next, we introduce neural modules that empower the agent to interpret noisy sensory data and adapt its actions. A compact neural network cleans up environmental observations, while a policy network refines the agent’s movements by blending learned behavior with symbolic instructions. This design equips the agent to handle uncertainty and dynamically adjust its responses.

class NeuralPerception:
    def __init__(self, grid_size: int = 8):
        self.grid_size = grid_size
        self.W1 = np.random.randn(grid_size * grid_size, 64) * 0.1
        self.b1 = np.zeros(64)
        self.W2 = np.random.randn(64, 32) * 0.1
        self.b2 = np.zeros(32)
        self.W3 = np.random.randn(32, grid_size * grid_size) * 0.1
        self.b3 = np.zeros(grid_size * grid_size)

    def relu(self, x):
        return np.maximum(0, x)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -500, 500)))

    def perceive(self, noisy_grid: np.ndarray) -> np.ndarray:
        x = noisy_grid.flatten()
        h1 = self.relu(x @ self.W1 + self.b1)
        h2 = self.relu(h1 @ self.W2 + self.b2)
        out = self.sigmoid(h2 @ self.W3 + self.b3)
        return out.reshape(self.grid_size, self.grid_size)

class NeuralPolicy:
    def __init__(self, state_dim: int = 4, action_dim: int = 4):
        self.W = np.random.randn(state_dim, action_dim) * 0.1
        self.b = np.zeros(action_dim)
        self.action_map = ['up', 'down', 'left', 'right']

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def get_action_probs(self, state_features: np.ndarray) -> np.ndarray:
        logits = state_features @ self.W + self.b
        return self.softmax(logits)

    def select_action(self, state_features: np.ndarray, symbolic_action: str) -> str:
        probs = self.get_action_probs(state_features)
        if symbolic_action in self.action_map:
            idx = self.action_map.index(symbolic_action)
            probs[idx] += 0.7
            probs /= probs.sum()
        return np.random.choice(self.action_map, p=probs)

Integrating Symbolic and Neural Components into a Unified Agent

We combine the symbolic planner and neural networks into a cohesive agent. The symbolic planner generates a high-level plan, the neural perception module interprets the environment, and the neural policy fine-tunes each action. This synergy enables the agent to execute tasks effectively, even in uncertain or noisy settings.

class NeuroSymbolicAgent:
    def __init__(self, grid_size: int = 8):
        self.grid_size = grid_size
        self.planner = SymbolicPlanner(grid_size)
        self.perception = NeuralPerception(grid_size)
        self.policy = NeuralPolicy()
        self.obstacles = {(3, 3), (3, 4), (4, 3), (5, 5), (6, 2)}
        self.objects = {'key': (2, 6), 'gem': (6, 6)}
        self.goal = (7, 7)

    def create_noisy_observation(self, true_grid: np.ndarray) -> np.ndarray:
        noise = np.random.randn(*true_grid.shape) * 0.2
        return np.clip(true_grid + noise, 0, 1)

    def extract_state_features(self, pos: Tuple[int, int], goal: Tuple[int, int]) -> np.ndarray:
        return np.array([pos[0] / self.grid_size, pos[1] / self.grid_size, goal[0] / self.grid_size, goal[1] / self.grid_size])

    def execute_mission(self, verbose: bool = True) -> Tuple[List[Tuple[int, int]], List[str]]:
        start_state = State(robot_pos=(0, 0), visited={(0, 0)})
        symbolic_plan = self.planner.a_star_plan(start_state, self.goal, self.obstacles, self.objects)

        if verbose:
            print(f"🧠 Symbolic Plan Created: {len(symbolic_plan)} steps")
            print(f"   Plan preview: {symbolic_plan[:10]}{'...' if len(symbolic_plan) > 10 else ''}n")

        true_grid = np.zeros((self.grid_size, self.grid_size))
        for obs in self.obstacles:
            true_grid[obs[1], obs[0]] = 1.0

        noisy_obs = self.create_noisy_observation(true_grid)
        perceived_grid = self.perception.perceive(noisy_obs)

        if verbose:
            accuracy = np.mean((perceived_grid > 0.5) == true_grid)
            print(f"👁️ Neural Perception: Cleaned obstacle map")
            print(f"   Perception accuracy: {accuracy:.2%}n")

        trajectory = [(0, 0)]
        current_pos = (0, 0)
        actions_taken = []

        for i, sym_action in enumerate(symbolic_plan[:30]):
            features = self.extract_state_features(current_pos, self.goal)
            refined_action = self.policy.select_action(features, sym_action) if sym_action in ['up', 'down', 'left', 'right'] else sym_action
            actions_taken.append(refined_action)

            if refined_action == 'up':
                next_pos = (current_pos[0], max(0, current_pos[1] - 1))
            elif refined_action == 'down':
                next_pos = (current_pos[0], min(self.grid_size - 1, current_pos[1] + 1))
            elif refined_action == 'left':
                next_pos = (max(0, current_pos[0] - 1), current_pos[1])
            elif refined_action == 'right':
                next_pos = (min(self.grid_size - 1, current_pos[0] + 1), current_pos[1])
            else:
                next_pos = current_pos

            if next_pos not in self.obstacles:
                current_pos = next_pos
                trajectory.append(current_pos)

        return trajectory, actions_taken

Visualizing Agent Behavior and Architecture

To better understand the agent’s operation, we visualize its path through the environment alongside the architectural layers. This includes plotting obstacles, collectible objects, the goal location, and the agent’s trajectory. The diagram also highlights the flow from symbolic planning through neural perception and policy to execution.

import matplotlib.pyplot as plt
from typing import List

def visualize_execution(agent: NeuroSymbolicAgent, trajectory: List[Tuple[int, int]], title: str = "Neuro-Symbolic Agent Execution"):
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))

    # Environment visualization
    ax = axes[0]
    grid = np.zeros((agent.grid_size, agent.grid_size, 3))

    for obs in agent.obstacles:
        grid[obs[1], obs[0]] = [0.3, 0.3, 0.3]  # Dark gray for obstacles

    for obj_pos in agent.objects.values():
        grid[obj_pos[1], obj_pos[0]] = [1.0, 0.8, 0.0]  # Gold for objects

    grid[agent.goal[1], agent.goal[0]] = [0.0, 1.0, 0.0]  # Green for goal

    for i, pos in enumerate(trajectory):
        intensity = 0.3 + 0.7 * (i / len(trajectory))
        grid[pos[1], pos[0]] = [intensity, 0.0, 1.0]  # Blue gradient for path

    if trajectory:
        grid[trajectory[0][1], trajectory[0][0]] = [1.0, 0.0, 0.0]  # Red start point

    ax.imshow(grid)
    ax.set_title("Agent Trajectory in Environment", fontsize=14, fontweight='bold')
    ax.set_xlabel("X Coordinate")
    ax.set_ylabel("Y Coordinate")
    ax.grid(True, alpha=0.3)

    # Architecture visualization
    ax = axes[1]
    ax.axis('off')
    ax.text(0.5, 0.95, "Neuro-Symbolic Architecture", ha='center', fontsize=16, fontweight='bold', transform=ax.transAxes)

    layers = [
        ("SYMBOLIC LAYER", 0.75, "Planning · State Logic · Rules"),
        ("↔ INTEGRATION ↔", 0.60, "Feature Extraction · Action Blending"),
        ("NEURAL LAYER", 0.45, "Perception · Policy Learning"),
        ("↔ EXECUTION ↔", 0.30, "Action Refinement · Feedback"),
        ("ENVIRONMENT", 0.15, "State Transitions · Observations")
    ]

    colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7']

    for i, (name, y, desc) in enumerate(layers):
        ax.add_patch(plt.Rectangle((0.1, y - 0.05), 0.8, 0.08, facecolor=colors[i], alpha=0.7, transform=ax.transAxes))
        ax.text(0.5, y, f"{name}n{desc}", ha='center', va='center', fontsize=10, fontweight='bold', transform=ax.transAxes)

    plt.tight_layout()
    plt.savefig('neuro_symbolic_agent_visualization.png', dpi=150, bbox_inches='tight')
    plt.show()

    print(f"n✅ Execution finished! Total steps: {len(trajectory)}")

Running the Complete Neuro-Symbolic Pipeline

Finally, we execute the entire system, from planning through perception and action refinement to visualization. This end-to-end run demonstrates the hybrid agent’s capabilities and highlights the complementary strengths of symbolic and neural components.

if __name__ == "__main__":
    print("=" * 70)
    print("NEURO-SYMBOLIC HYBRID AGENT DEMONSTRATION")
    print("Integrating Classical Planning with Neural Adaptation")
    print("=" * 70)
    print()

    agent = NeuroSymbolicAgent(grid_size=8)
    trajectory, actions = agent.execute_mission(verbose=True)
    visualize_execution(agent, trajectory)

    print("n" + "=" * 70)
    print("KEY TAKEAWAYS:")
    print("=" * 70)
    print("✦ Symbolic Layer: Delivers transparent, verifiable plans")
    print("✦ Neural Layer: Manages noisy inputs and adapts dynamically")
    print("✦ Integration: Harnesses the best of both AI paradigms")
    print("✦ Advantages: Enhanced explainability, flexibility, and robustness")
    print("=" * 70)

Summary

This tutorial illustrates how combining symbolic planning with neural networks creates a more capable and dependable agent. The symbolic planner offers clear, interpretable strategies, while the neural modules provide perceptual grounding and adaptability that pure logic lacks. This neuro-symbolic synergy paves the way for intelligent agents that reason, perceive, and act with both transparency and resilience, advancing practical AI applications.

More from this stream

Recomended