In this guide, we demonstrate how to develop a fully offline, multi-step reasoning agent that leverages the instructor library to produce structured outputs and efficiently manage intricate tool invocations. Our design enables the agent to intelligently select appropriate tools, validate inputs, orchestrate multi-phase workflows, and recover from errors seamlessly. By integrating Instructor, Transformers, and meticulously designed Pydantic schemas, we construct a sophisticated, adaptive system that emulates the behavior of real-world agentic AI.
Setting Up the Environment and Dependencies
We begin by preparing the environment, ensuring all necessary packages are installed and imported for offline operation. This includes core libraries such as Instructor and Transformers, which form the foundation for our agent’s capabilities. The setup dynamically detects GPU availability to optimize performance, installing quantization support when possible, or defaulting to CPU mode otherwise.
import subprocess
import sys
def installdependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.isavailable():
packages.append("bitsandbytes")
print("✅ GPU detected - installing quantization support")
else:
print("⚠️ No GPU detected - using CPU (slower but functional)")
for package in packages:
subprocess.checkcall([sys.executable, "-m", "pip", "install", "-q", package])
try:
import instructor
except ImportError:
print("📦 Installing required dependencies...")
installdependencies()
print("✅ Installation completed!")
Defining Robust Data Models for Complex Reasoning
To empower our agent with precise understanding and validation, we define comprehensive Pydantic schemas. These models encapsulate the structure for SQL queries, data transformation pipelines, API orchestration, code generation, and multi-step execution plans. Each schema enforces strict validation rules, ensuring safety and clarity in interpreting complex instructions, which forms the backbone of the agent’s reasoning process.
from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
class SQLQuery(BaseModel):
"""Schema for constructing validated SQL queries with support for joins and aggregations."""
table: str
columns: List[str]
whereconditions: Optional[Dict[str, Any]] = None
joins: Optional[List[Dict[str, str]]] = None
aggregations: Optional[Dict[str, str]] = None
orderby: Optional[List[str]] = None
@validator('columns')
def musthavecolumns(cls, v):
if not v:
raise ValueError("At least one column must be specified")
return v
class DataTransformation(BaseModel):
"""Defines operations for complex data processing pipelines."""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
sourcedata: str = Field(description="Reference identifier for the data source")
parameters: Dict[str, Any]
outputformat: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Model for orchestrating multi-endpoint API calls with error handling."""
endpoints: List[Dict[str, str]] = Field(description="Sequence of API endpoints to invoke")
authentication: Dict[str, str]
requestorder: Literal["sequential", "parallel", "conditional"]
errorhandling: Literal["stop", "continue", "retry"]
maxretries: int = Field(default=3, ge=0, le=10)
class CodeGeneration(BaseModel):
"""Schema for generating and validating code snippets safely."""
language: Literal["python", "javascript", "sql", "bash"]
purpose: str
code: str = Field(description="Generated source code")
dependencies: List[str] = Field(defaultfactory=list)
testcases: List[Dict[str, Any]] = Field(defaultfactory=list)
@validator('code')
def checkforunsafecode(cls, v, values):
unsafepatterns = ['eval(', 'exec(', 'import', 'os.system']
if values.get('language') == 'python':
if any(pattern in v for pattern in unsafepatterns):
raise ValueError("Code contains potentially unsafe operations")
return v
class MultiToolPlan(BaseModel):
"""Blueprint for orchestrating multi-step tool executions with dependencies."""
goal: str
steps: List[Dict[str, Any]] = Field(description="Ordered list of tool invocations")
dependencies: Dict[str, List[str]] = Field(description="Mapping of step dependencies")
fallbackstrategy: Optional[str] = None
estimatedduration: float = Field(description="Estimated execution time in seconds")
class ToolCall(BaseModel):
"""Encapsulates tool selection logic with contextual reasoning."""
reasoning: str
confidence: float = Field(ge=0.0, le=1.0)
toolname: Literal["sqlengine", "datatransformer", "apiorchestrator", "codegenerator", "planner", "none"]
toolinput: Optional[Union[SQLQuery, DataTransformation, APIRequest, CodeGeneration, MultiToolPlan]] = None
requireshumanapproval: bool = False
class ExecutionResult(BaseModel):
"""Detailed execution outcome including metadata and warnings."""
success: bool
data: Any
executiontime: float
warnings: List[str] = Field(defaultfactory=list)
metadata: Dict[str, Any] = Field(defaultfactory=dict)
Implementing Core Tools with Simulated Real-World Behavior
We develop the core tool functions that simulate realistic workflows such as SQL query execution, data transformation, API orchestration, code generation, and planning. Each tool returns structured results with execution metadata and handles errors gracefully, enabling the agent to test decision-making in a controlled yet representative environment.
def sqlenginetool(params: SQLQuery) -> ExecutionResult:
import time
start = time.time()
mockdb = {
"users": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "userid": 1, "amount": 100, "status": "completed"},
{"id": 2, "userid": 1, "amount": 200, "status": "pending"},
{"id": 3, "userid": 2, "amount": 150, "status": "completed"},
]
}
data = mockdb.get(params.table, [])
if params.whereconditions:
data = [row for row in data if all(row.get(k) == v for k, v in params.whereconditions.items())]
data = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation logic simplified in mock environment")
return ExecutionResult(
success=True,
data=data,
executiontime=time.time() - start,
warnings=warnings,
metadata={"rowsreturned": len(data), "querytype": "SELECT"}
)
def datatransformertool(params: DataTransformation) -> ExecutionResult:
import time
start = time.time()
operations = {
"filter": lambda d, p: [item for item in d if item.get(p['field']) == p['value']],
"aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
"normalize": lambda d, p: [{k: v / p.get('factor', 1) for k, v in item.items()} for item in d]
}
sampledata = [{"value": i, "category": "A" if i % 2 == 0 else "B"} for i in range(10)]
operationfunc = operations.get(params.operation)
resultdata = operationfunc(sampledata, params.parameters) if operationfunc else sampledata
return ExecutionResult(
success=True,
data=resultdata,
executiontime=time.time() - start,
warnings=[],
metadata={"operation": params.operation, "inputsize": len(sampledata)}
)
def apiorchestratortool(params: APIRequest) -> ExecutionResult:
import time
start = time.time()
responses = []
warnings = []
for idx, endpoint in enumerate(params.endpoints):
if params.errorhandling == "retry" and idx == 1:
warnings.append(f"Retrying failed call to {endpoint.get('url')}")
responses.append({
"endpoint": endpoint.get('url'),
"status": 200,
"response": f"Simulated response from {endpoint.get('url')}"
})
return ExecutionResult(
success=True,
data=responses,
executiontime=time.time() - start,
warnings=warnings,
metadata={"totalendpoints": len(params.endpoints), "requestorder": params.requestorder}
)
def codegeneratortool(params: CodeGeneration) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code length exceeds recommended size; consider refactoring")
if not params.testcases:
warnings.append("No test cases provided for the generated code")
return ExecutionResult(
success=True,
data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
executiontime=time.time() - start,
warnings=warnings,
metadata={"linesofcode": len(params.code.split('n'))}
)
def plannertool(params: MultiToolPlan) -> ExecutionResult:
import time
start = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan contains many steps; consider subdividing into smaller plans")
for stepid, deps in params.dependencies.items():
if stepid in deps:
warnings.append(f"Circular dependency detected in step {stepid}")
return ExecutionResult(
success=True,
data={"plansteps": params.steps, "estimatedduration": params.estimatedduration},
executiontime=time.time() - start,
warnings=warnings,
metadata={"totalsteps": len(params.steps)}
)
TOOLS = {
"sqlengine": sqlenginetool,
"datatransformer": datatransformertool,
"apiorchestrator": apiorchestratortool,
"codegenerator": codegeneratortool,
"planner": plannertool
}
Building the Intelligent Agent with Dynamic Tool Routing and Error Handling
Our agent class encapsulates the model loading, query analysis, tool routing, and execution with robust error recovery. It dynamically selects the best tool based on query complexity, executes the corresponding function, and retries on failure up to a configurable limit. This design ensures resilience and adaptability in handling diverse, complex tasks.
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from datetime import datetime
from typing import Optional, Dict, Any
class AdvancedToolAgent:
"""An intelligent agent capable of multi-step reasoning, tool selection, and error recovery."""
def init(self, modelname: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f"🤖 Loading model: {modelname}")
modelkwargs = {"devicemap": "auto"}
if torch.cuda.isavailable():
print("💪 GPU detected - enabling 8-bit quantization")
from transformers import BitsAndBytesConfig
quantconfig = BitsAndBytesConfig(loadin8bit=True, llmint8threshold=6.0)
modelkwargs["quantizationconfig"] = quantconfig
else:
print("🖥️ CPU mode - switching to smaller model for efficiency")
modelname = "google/flan-t5-base"
modelkwargs["torchdtype"] = "auto"
self.tokenizer = AutoTokenizer.frompretrained(modelname)
self.model = AutoModelForCausalLM.frompretrained(modelname, *modelkwargs)
self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer,
maxnewtokens=768, temperature=0.7, dosample=True)
self.client = instructor.frompipe(self.pipe)
self.executionhistory = []
print("✅ Agent successfully initialized!")
def routetotool(self, userquery: str, context: Optional[str] = None) -> ToolCall:
tooldescriptions = """
Available Tools:
- sqlengine: Execute advanced SQL queries with joins and aggregations
- data
transformer: Build multi-step data pipelines including filtering and normalization
apiorchestrator: Manage multiple API calls with retries and error handling
codegenerator: Produce safe, validated code snippets with test cases
planner: Develop multi-step execution plans with dependency tracking
none: Provide direct answers through reasoning
"""
prompt = f"""{tooldescriptions}
User Query: {userquery}
{f'Context: {context}' if context else ''}
Evaluate the query complexity and select the most suitable tool. For tasks requiring multiple steps, utilize the planner."""
return self.client(prompt, responsemodel=ToolCall)
def executewithrecovery(self, toolcall: ToolCall, maxretries: int = 2) -> ExecutionResult:
for attempt in range(maxretries + 1):
try:
if toolcall.toolname == "none":
return ExecutionResult(success=True, data="Direct response generated", executiontime=0.0,
warnings=[], metadata={})
toolfunction = TOOLS.get(toolcall.toolname)
if not toolfunction:
return ExecutionResult(success=False, data=None, executiontime=0.0,
warnings=[f"Tool '{toolcall.toolname}' not found"], metadata={})
result = toolfunction(toolcall.toolinput)
self.executionhistory.append({
"tool": toolcall.toolname,
"success": result.success,
"timestamp": datetime.now().isoformat()
})
return result
except Exception as error:
if attempt retries:
print(f"⚠️ Attempt {attempt + 1} failed, retrying...")
continue
return ExecutionResult(success=False, data=None, executiontime=0.0,
warnings=[f"Failed after {maxretries + 1} attempts: {str(error)}"],
metadata={"error": str(error)})
Executing Complex Queries: Demonstration and Results
The run method orchestrates the entire process: analyzing the query, selecting the tool, executing with error handling, and reporting detailed results. The main function showcases the agent tackling a series of challenging queries, illustrating its ability to plan, reason, and execute with precision.
def run(self, userquery: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"n{'='70}")
print(f"🎯 Processing Query: {userquery}")
print(f"{'='70}")
print("n🧠 Step 1: Analyzing query and selecting tool...")
toolcall = self.routetotool(userquery)
if verbose:
print(f" → Selected Tool: {toolcall.toolname}")
print(f" → Confidence: {toolcall.confidence:.2%}")
print(f" → Reasoning: {toolcall.reasoning}")
if toolcall.requireshumanapproval:
print(" ⚠️ Human approval required!")
if verbose:
print("n⚙️ Step 2: Executing tool with error recovery...")
result = self.executewithrecovery(toolcall)
if verbose:
print(f" → Success: {result.success}")
print(f" → Execution Time: {result.executiontime:.3f} seconds")
if result.warnings:
print(f" → Warnings: {', '.join(result.warnings)}")
print(f" → Data Preview: {str(result.data)[:200]}...")
if verbose and result.metadata:
print("n📊 Metadata:")
for key, value in result.metadata.items():
print(f" • {key}: {value}")
if verbose:
print(f"n{'='70}n")
return {
"query": userquery,
"toolused": toolcall.toolname,
"result": result,
"historylength": len(self.executionhistory)
}
def main():
agent = AdvancedToolAgent()
complexqueries = [
"Construct a SQL query to retrieve all users from the USA with completed orders exceeding $150, including order details.",
"Design a data pipeline that filters records where category='A', aggregates by count, and normalizes results by a factor of 100.",
"Execute three API calls sequentially: authenticate at /auth, fetch user data at /users/{id}, and update preferences at /preferences, retrying failed steps up to 3 times.",
"Generate a Python function to validate email addresses using regex, with error handling and at least two test cases, avoiding unsafe operations.",
"Formulate a multi-step plan to extract data from a database, transform it using pandas, generate a report, and send it via email, showing dependencies."
]
print("n" + "🔥 COMPLEX QUERY EXECUTION DEMO ".center(70, "=") + "n")
for idx, query in enumerate(complexqueries, 1):
print(f"n{'#'70}")
print(f"# TASK {idx} of {len(complexqueries)}")
print(f"{'#'70}")
try:
agent.run(query, verbose=True)
except Exception as e:
print(f"❌ Critical error encountered: {e}n")
successrate = sum(1 for h in agent.executionhistory if h['success']) / len(agent.executionhistory) * 100 if agent.executionhistory else 0
print(f"n✅ COMPLETED {len(agent.executionhistory)} TOOL EXECUTIONS ".center(70, "="))
print(f"📈 Success Rate: {success_rate:.1f}%n")
if name == "main":
main()
Summary
We have successfully engineered a versatile offline agent capable of interpreting complex instructions, intelligently routing tasks across multiple specialized tools, and recovering from errors with resilience. Through modular schema definitions, validated tool calls, and layered execution logic, this architecture demonstrates how to build reliable AI agents that perform sophisticated reasoning and planning in real-world scenarios.
