Building a Sophisticated Multi-Agent AI System for Collaborative Reasoning
This guide walks you through constructing a cutting-edge Agentic AI framework designed to enable multiple intelligent agents to collaboratively reason, communicate, reflect, and learn from their experiences. We meticulously develop each component of the system, demonstrating how agents handle tasks through planning, memory management, inter-agent messaging, and semantic inference. By the conclusion, the architecture evolves into a dynamic multi-agent ecosystem capable of entity extraction, contextual interpretation, reasoning chain formation, and knowledge graph construction, all while continuously refining its performance via episodic learning and self-reflection.
Setting Up Core Components: Messaging, Task Structures, and Memory Modules
We begin by importing essential libraries and defining the foundational data structures that underpin our agentic system. This includes message formats for inter-agent communication, task representations, and two types of memory modules: working memory for short-term information retention and episodic memory for storing past observations and learning from them. These elements establish the backbone for reasoning, data storage, and collaborative interaction within the system.
!pip install spacy networkx matplotlib -q
import spacy
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import json
import hashlib
from datetime import datetime
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
QUERY = "query"
@dataclass
class Message:
sender: str
receiver: str
msg_type: MessageType
content: Dict[str, Any]
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
priority: int = 1
def get_id(self) -> str:
unique_str = f"{self.sender}{self.timestamp}"
return hashlib.md5(unique_str.encode()).hexdigest()[:8]
@dataclass
class AgentTask:
task_id: str
task_type: str
data: Any
priority: int = 1
dependencies: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
@dataclass
class Observation:
state: str
action: str
result: Any
confidence: float
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
class WorkingMemory:
def __init__(self, capacity: int = 10):
self.capacity = capacity
self.items = deque(maxlen=capacity)
self.attention_scores = {}
def add(self, key: str, value: Any, attention: float = 1.0):
self.items.append((key, value))
self.attention_scores[key] = attention
def recall(self, n: int = 5) -> List[Tuple[str, Any]]:
sorted_items = sorted(self.items, key=lambda x: self.attention_scores.get(x[0], 0), reverse=True)
return sorted_items[:n]
def get(self, key: str) -> Optional[Any]:
for k, v in self.items:
if k == key:
return v
return None
class EpisodicMemory:
def __init__(self):
self.episodes = []
self.success_patterns = defaultdict(int)
def store(self, observation: Observation):
self.episodes.append(observation)
if observation.confidence > 0.7:
pattern = f"{observation.state}→{observation.action}"
self.success_patterns[pattern] += 1
def query_similar(self, state: str, top_k: int = 3) -> List[Observation]:
recent_episodes = self.episodes[-50:]
scored = [(obs, self._similarity(state, obs.state)) for obs in recent_episodes]
scored.sort(key=lambda x: x[1], reverse=True)
return [obs for obs, _ in scored[:top_k]]
def _similarity(self, state1: str, state2: str) -> float:
words1, words2 = set(state1.split()), set(state2.split())
if not words1 or not words2:
return 0.0
return len(words1 & words2) / len(words1 | words2)
Reflection Engine and Foundational Agent Class: Enabling Adaptive Reasoning
Next, we develop a reflection module that tracks agent performance over time, providing insights and recommendations based on recent task outcomes. This module helps agents adapt their strategies dynamically. We also define a base agent class that equips each agent with planning, memory, and communication capabilities. Building on this, we implement a Cognitive Entity Agent that specializes in extracting entities from text along with their surrounding context, storing observations to improve future performance.
class ReflectionModule:
def __init__(self):
self.performance_log = []
def reflect(self, task_type: str, confidence: float, result: Any) -> Dict[str, Any]:
self.performance_log.append({
'task': task_type,
'confidence': confidence,
'timestamp': datetime.now().timestamp()
})
recent = [p for p in self.performance_log if p['task'] == task_type][-5:]
avg_conf = sum(p['confidence'] for p in recent) / len(recent) if recent else 0.5
insights = {
'performance_trend': 'improving' if confidence > avg_conf else 'declining',
'avg_confidence': avg_conf,
'recommendation': self._get_recommendation(confidence, avg_conf)
}
return insights
def _get_recommendation(self, current: float, average: float) -> str:
if current < 0.4:
return "Seek support from a specialized agent"
elif current < average:
return "Analyze similar past cases for patterns"
else:
return "Maintain current approach"
class AdvancedAgent:
def __init__(self, name: str, specialty: str, nlp):
self.name = name
self.specialty = specialty
self.nlp = nlp
self.working_memory = WorkingMemory()
self.episodic_memory = EpisodicMemory()
self.reflector = ReflectionModule()
self.message_queue = deque()
self.collaboration_graph = defaultdict(int)
def plan(self, task: AgentTask) -> List[str]:
similar = self.episodic_memory.query_similar(str(task.data))
if similar and similar[0].confidence > 0.7:
return [similar[0].action]
return self._default_plan(task)
def _default_plan(self, task: AgentTask) -> List[str]:
return ['analyze', 'extract', 'validate']
def send_message(self, receiver: str, msg_type: MessageType, content: Dict):
msg = Message(self.name, receiver, msg_type, content)
self.message_queue.append(msg)
return msg
def receive_message(self, message: Message):
self.message_queue.append(message)
self.collaboration_graph[message.sender] += 1
def process(self, task: AgentTask) -> Dict[str, Any]:
raise NotImplementedError
class CognitiveEntityAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
entities = defaultdict(list)
entity_contexts = []
for ent in doc.ents:
context_start = max(0, ent.start - 5)
context_end = min(len(doc), ent.end + 5)
context = doc[context_start:context_end].text
entities[ent.label_].append(ent.text)
entity_contexts.append({
'entity': ent.text,
'type': ent.label_,
'context': context,
'position': (ent.start_char, ent.end_char)
})
for ent_type, ents in entities.items():
attention = len(ents) / len(doc.ents) if doc.ents else 0
self.working_memory.add(f"entities_{ent_type}", ents, attention)
confidence = min(len(entities) / 4, 1.0) if entities else 0.3
obs = Observation(
state=f"entity_extraction_{len(doc)}tokens",
action="extract_with_context",
result=len(entity_contexts),
confidence=confidence
)
self.episodic_memory.store(obs)
reflection = self.reflector.reflect('entity_extraction', confidence, entities)
return {
'entities': dict(entities),
'contexts': entity_contexts,
'confidence': confidence,
'reflection': reflection,
'next_actions': ['semantic_analysis', 'knowledge_graph'] if confidence > 0.5 else []
}
Semantic Reasoning Agent: Deepening Understanding Through Inference and Clustering
We introduce the Semantic Reasoning Agent, which delves into sentence structures to extract logical reasoning chains and groups related concepts based on semantic similarity. This agent leverages working memory to enrich its contextual awareness, transitioning the system from basic extraction to more profound inference and conceptual clustering.
class SemanticReasoningAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
reasoning_chains = []
for sent in doc.sents:
chain = self._extract_reasoning_chain(sent)
if chain:
reasoning_chains.append(chain)
entity_memory = self.working_memory.recall(3)
semantic_clusters = self._cluster_by_semantics(doc)
confidence = min(len(reasoning_chains) / 3, 1.0) if reasoning_chains else 0.4
obs = Observation(
state=f"semantic_analysis_{len(list(doc.sents))}sents",
action="reason_and_cluster",
result=len(reasoning_chains),
confidence=confidence
)
self.episodic_memory.store(obs)
return {
'reasoning_chains': reasoning_chains,
'semantic_clusters': semantic_clusters,
'memory_context': entity_memory,
'confidence': confidence,
'next_actions': ['knowledge_integration']
}
def _extract_reasoning_chain(self, sent) -> Optional[Dict]:
subj, verb, obj = None, None, None
for token in sent:
if token.dep_ == 'nsubj':
subj = token
elif token.pos_ == 'VERB':
verb = token
elif token.dep_ in ['dobj', 'attr', 'pobj']:
obj = token
if subj and verb and obj:
return {
'subject': subj.text,
'predicate': verb.lemma_,
'object': obj.text,
'confidence': 0.8
}
return None
def _cluster_by_semantics(self, doc) -> List[Dict]:
clusters = []
nouns = [token for token in doc if token.pos_ in ['NOUN', 'PROPN']]
visited = set()
for noun in nouns:
if noun.i in visited:
continue
cluster = [noun.text]
visited.add(noun.i)
for other in nouns:
if other.i != noun.i and other.i not in visited:
if noun.similarity(other) > 0.5:
cluster.append(other.text)
visited.add(other.i)
if len(cluster) > 1:
clusters.append({'concepts': cluster, 'size': len(cluster)})
return clusters
Knowledge Graph Agent and Meta-Controller: Orchestrating Multi-Agent Collaboration
The Knowledge Graph Agent constructs relational graphs by linking entities through verbs extracted from sentences, enabling the system to visualize and reason about connections within the text. The Meta-Controller oversees all agents, managing task planning, execution flow, and memory integration. This controller ensures smooth multi-step processing, dynamically scheduling tasks based on agent outputs and system state.
class KnowledgeGraphAgent(AdvancedAgent):
def process(self, task: AgentTask) -> Dict[str, Any]:
doc = self.nlp(task.data)
graph = {'nodes': set(), 'edges': []}
for sent in doc.sents:
entities = list(sent.ents)
if len(entities) >= 2:
for ent in entities:
graph['nodes'].add((ent.text, ent.label_))
root = sent.root
if root.pos_ == 'VERB':
for i in range(len(entities) - 1):
graph['edges'].append({
'from': entities[i].text,
'relation': root.lemma_,
'to': entities[i+1].text,
'sentence': sent.text[:100]
})
graph['nodes'] = list(graph['nodes'])
confidence = min(len(graph['edges']) / 5, 1.0) if graph['edges'] else 0.3
obs = Observation(
state=f"knowledge_graph_{len(graph['nodes'])}nodes",
action="construct_graph",
result=len(graph['edges']),
confidence=confidence
)
self.episodic_memory.store(obs)
return {
'graph': graph,
'node_count': len(graph['nodes']),
'edge_count': len(graph['edges']),
'confidence': confidence,
'next_actions': []
}
class MetaController:
def __init__(self):
self.nlp = spacy.load('en_core_web_sm')
self.agents = {
'cognitive_entity': CognitiveEntityAgent('CognitiveEntity', 'entity_analysis', self.nlp),
'semantic_reasoning': SemanticReasoningAgent('SemanticReasoner', 'reasoning', self.nlp),
'knowledge_graph': KnowledgeGraphAgent('KnowledgeBuilder', 'graph_construction', self.nlp)
}
self.task_history = []
self.global_memory = WorkingMemory(capacity=20)
def execute_with_planning(self, text: str) -> Dict[str, Any]:
initial_task = AgentTask(task_id="task_001", task_type="cognitive_entity", data=text, metadata={'source': 'user_input'})
results = {}
task_queue = [initial_task]
iterations = 0
max_iterations = 10
while task_queue and iterations < max_iterations:
task = task_queue.pop(0)
agent = self.agents.get(task.task_type)
if not agent or task.task_type in results:
continue
result = agent.process(task)
results[task.task_type] = result
self.global_memory.add(task.task_type, result, result['confidence'])
for next_action in result.get('next_actions', []):
if next_action in self.agents and next_action not in results:
next_task = AgentTask(
task_id=f"task_{iterations+1:03d}",
task_type=next_action,
data=text,
dependencies=[task.task_id]
)
task_queue.append(next_task)
iterations += 1
self.task_history.append({
'results': results,
'iterations': iterations,
'timestamp': datetime.now().isoformat()
})
return results
def generate_insights(self, results: Dict[str, Any]) -> str:
report = "=" * 70 + "n"
report += " ADVANCED AGENTIC AI SYSTEM - ANALYSIS REPORTn"
report += "=" * 70 + "nn"
for agent_type, result in results.items():
agent = self.agents[agent_type]
report += f"🤖 {agent.name}n"
report += f" Specialty: {agent.specialty}n"
report += f" Confidence: {result['confidence']:.2%}n"
if 'reflection' in result:
report += f" Performance Trend: {result['reflection'].get('performance_trend', 'N/A')}n"
report += " Key Findings:n"
filtered_result = {k: v for k, v in result.items() if k not in ['reflection', 'next_actions']}
report += json.dumps(filtered_result, indent=6) + "nn"
report += "📊 System-Level Insights:n"
report += f" Total iterations: {len(self.task_history)}n"
report += f" Active agents: {len(results)}n"
report += f" Global memory size: {len(self.global_memory.items)}n"
return report
Executing the Multi-Agent Pipeline: A Practical Demonstration
To showcase the system’s capabilities, we run the entire multi-agent pipeline on a sample text describing recent advances in artificial intelligence research. The Meta-Controller orchestrates the agents, executing planned tasks sequentially and generating a detailed report summarizing the analysis. This demonstration highlights the system’s ability to integrate multiple layers of understanding and adapt through reflection and learning.
if __name__ == "__main__":
sample_text = """
Leading AI research institutions such as OpenAI and DeepMind are pioneering
state-of-the-art language models. OpenAI, headquartered in San Francisco under
Sam Altman's leadership, collaborates with DeepMind, based in London and led by
Demis Hassabis. These organizations partner with top universities including MIT
and Stanford. Their research emphasizes machine learning, neural networks, and
reinforcement learning. A major breakthrough occurred in 2017 with the advent
of transformer architectures, revolutionizing natural language processing.
"""
controller = MetaController()
results = controller.execute_with_planning(sample_text)
print(controller.generate_insights(results))
print("Multi-agent AI analysis completed successfully with adaptive learning!")
Summary: A Versatile Framework for Multi-Agent Reasoning and Learning
In summary, we have developed a robust multi-agent reasoning framework that processes real-world text using spaCy, integrating planning, episodic learning, and memory into a unified workflow. Each agent contributes a distinct analytical perspective-from entity extraction to semantic reasoning and knowledge graph construction-while the Meta-Controller coordinates their efforts to produce comprehensive, interpretable insights. This modular and extensible design offers a strong foundation for scaling to more complex datasets, incorporating advanced language models, and tackling diverse AI challenges.

