Home News How to Build a Fully Offline Multi-Tool Reasoning Agent with Dynamic Planning,...

How to Build a Fully Offline Multi-Tool Reasoning Agent with Dynamic Planning, Error Recovery, and Intelligent Function Routing

0

In this guide, we demonstrate how to develop a fully offline, multi-step reasoning agent that leverages the instructor library to produce structured outputs and efficiently manage intricate tool invocations. Our design enables the agent to intelligently select appropriate tools, validate inputs, orchestrate multi-phase workflows, and recover from errors seamlessly. By integrating Instructor, Transformers, and meticulously designed Pydantic schemas, we construct a sophisticated, adaptive system that emulates the behavior of real-world agentic AI.

Setting Up the Environment and Dependencies

We begin by preparing the environment, ensuring all necessary packages are installed and imported for offline operation. This includes core libraries such as Instructor and Transformers, which form the foundation for our agent’s capabilities. The setup dynamically detects GPU availability to optimize performance, installing quantization support when possible, or defaulting to CPU mode otherwise.

import subprocess
import sys

def installdependencies():
    import torch
    packages = [
        "instructor",
        "transformers>=4.35.0",
        "torch",
        "accelerate",
        "pydantic>=2.0.0",
        "numpy",
        "pandas"
    ]
    if torch.cuda.isavailable():
        packages.append("bitsandbytes")
        print("✅ GPU detected - installing quantization support")
    else:
        print("⚠️ No GPU detected - using CPU (slower but functional)")
    for package in packages:
        subprocess.checkcall([sys.executable, "-m", "pip", "install", "-q", package])

try:
    import instructor
except ImportError:
    print("📦 Installing required dependencies...")
    installdependencies()
    print("✅ Installation completed!")

Defining Robust Data Models for Complex Reasoning

To empower our agent with precise understanding and validation, we define comprehensive Pydantic schemas. These models encapsulate the structure for SQL queries, data transformation pipelines, API orchestration, code generation, and multi-step execution plans. Each schema enforces strict validation rules, ensuring safety and clarity in interpreting complex instructions, which forms the backbone of the agent’s reasoning process.

from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator

class SQLQuery(BaseModel):
    """Schema for constructing validated SQL queries with support for joins and aggregations."""
    table: str
    columns: List[str]
    whereconditions: Optional[Dict[str, Any]] = None
    joins: Optional[List[Dict[str, str]]] = None
    aggregations: Optional[Dict[str, str]] = None
    orderby: Optional[List[str]] = None

    @validator('columns')
    def musthavecolumns(cls, v):
        if not v:
            raise ValueError("At least one column must be specified")
        return v

class DataTransformation(BaseModel):
    """Defines operations for complex data processing pipelines."""
    operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
    sourcedata: str = Field(description="Reference identifier for the data source")
    parameters: Dict[str, Any]
    outputformat: Literal["json", "csv", "dataframe"]

class APIRequest(BaseModel):
    """Model for orchestrating multi-endpoint API calls with error handling."""
    endpoints: List[Dict[str, str]] = Field(description="Sequence of API endpoints to invoke")
    authentication: Dict[str, str]
    requestorder: Literal["sequential", "parallel", "conditional"]
    errorhandling: Literal["stop", "continue", "retry"]
    maxretries: int = Field(default=3, ge=0, le=10)

class CodeGeneration(BaseModel):
    """Schema for generating and validating code snippets safely."""
    language: Literal["python", "javascript", "sql", "bash"]
    purpose: str
    code: str = Field(description="Generated source code")
    dependencies: List[str] = Field(defaultfactory=list)
    testcases: List[Dict[str, Any]] = Field(defaultfactory=list)

    @validator('code')
    def checkforunsafecode(cls, v, values):
        unsafepatterns = ['eval(', 'exec(', 'import', 'os.system']
        if values.get('language') == 'python':
            if any(pattern in v for pattern in unsafepatterns):
                raise ValueError("Code contains potentially unsafe operations")
        return v

class MultiToolPlan(BaseModel):
    """Blueprint for orchestrating multi-step tool executions with dependencies."""
    goal: str
    steps: List[Dict[str, Any]] = Field(description="Ordered list of tool invocations")
    dependencies: Dict[str, List[str]] = Field(description="Mapping of step dependencies")
    fallbackstrategy: Optional[str] = None
    estimatedduration: float = Field(description="Estimated execution time in seconds")

class ToolCall(BaseModel):
    """Encapsulates tool selection logic with contextual reasoning."""
    reasoning: str
    confidence: float = Field(ge=0.0, le=1.0)
    toolname: Literal["sqlengine", "datatransformer", "apiorchestrator", "codegenerator", "planner", "none"]
    toolinput: Optional[Union[SQLQuery, DataTransformation, APIRequest, CodeGeneration, MultiToolPlan]] = None
    requireshumanapproval: bool = False

class ExecutionResult(BaseModel):
    """Detailed execution outcome including metadata and warnings."""
    success: bool
    data: Any
    executiontime: float
    warnings: List[str] = Field(defaultfactory=list)
    metadata: Dict[str, Any] = Field(defaultfactory=dict)

Implementing Core Tools with Simulated Real-World Behavior

We develop the core tool functions that simulate realistic workflows such as SQL query execution, data transformation, API orchestration, code generation, and planning. Each tool returns structured results with execution metadata and handles errors gracefully, enabling the agent to test decision-making in a controlled yet representative environment.

def sqlenginetool(params: SQLQuery) -> ExecutionResult:
    import time
    start = time.time()
    mockdb = {
        "users": [
            {"id": 1, "name": "Alice", "age": 30, "country": "USA"},
            {"id": 2, "name": "Bob", "age": 25, "country": "UK"},
            {"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
        ],
        "orders": [
            {"id": 1, "userid": 1, "amount": 100, "status": "completed"},
            {"id": 2, "userid": 1, "amount": 200, "status": "pending"},
            {"id": 3, "userid": 2, "amount": 150, "status": "completed"},
        ]
    }
    data = mockdb.get(params.table, [])
    if params.whereconditions:
        data = [row for row in data if all(row.get(k) == v for k, v in params.whereconditions.items())]
    data = [{col: row.get(col) for col in params.columns} for row in data]
    warnings = []
    if params.aggregations:
        warnings.append("Aggregation logic simplified in mock environment")
    return ExecutionResult(
        success=True,
        data=data,
        executiontime=time.time() - start,
        warnings=warnings,
        metadata={"rowsreturned": len(data), "querytype": "SELECT"}
    )

def datatransformertool(params: DataTransformation) -> ExecutionResult:
    import time
    start = time.time()
    operations = {
        "filter": lambda d, p: [item for item in d if item.get(p['field']) == p['value']],
        "aggregate": lambda d, p: {"count": len(d), "operation": p.get('function', 'count')},
        "normalize": lambda d, p: [{k: v / p.get('factor', 1) for k, v in item.items()} for item in d]
    }
    sampledata = [{"value": i, "category": "A" if i % 2 == 0 else "B"} for i in range(10)]
    operationfunc = operations.get(params.operation)
    resultdata = operationfunc(sampledata, params.parameters) if operationfunc else sampledata
    return ExecutionResult(
        success=True,
        data=resultdata,
        executiontime=time.time() - start,
        warnings=[],
        metadata={"operation": params.operation, "inputsize": len(sampledata)}
    )

def apiorchestratortool(params: APIRequest) -> ExecutionResult:
    import time
    start = time.time()
    responses = []
    warnings = []
    for idx, endpoint in enumerate(params.endpoints):
        if params.errorhandling == "retry" and idx == 1:
            warnings.append(f"Retrying failed call to {endpoint.get('url')}")
        responses.append({
            "endpoint": endpoint.get('url'),
            "status": 200,
            "response": f"Simulated response from {endpoint.get('url')}"
        })
    return ExecutionResult(
        success=True,
        data=responses,
        executiontime=time.time() - start,
        warnings=warnings,
        metadata={"totalendpoints": len(params.endpoints), "requestorder": params.requestorder}
    )

def codegeneratortool(params: CodeGeneration) -> ExecutionResult:
    import time
    start = time.time()
    warnings = []
    if len(params.code) > 1000:
        warnings.append("Generated code length exceeds recommended size; consider refactoring")
    if not params.testcases:
        warnings.append("No test cases provided for the generated code")
    return ExecutionResult(
        success=True,
        data={"code": params.code, "language": params.language, "dependencies": params.dependencies},
        executiontime=time.time() - start,
        warnings=warnings,
        metadata={"linesofcode": len(params.code.split('n'))}
    )

def plannertool(params: MultiToolPlan) -> ExecutionResult:
    import time
    start = time.time()
    warnings = []
    if len(params.steps) > 10:
        warnings.append("Plan contains many steps; consider subdividing into smaller plans")
    for stepid, deps in params.dependencies.items():
        if stepid in deps:
            warnings.append(f"Circular dependency detected in step {stepid}")
    return ExecutionResult(
        success=True,
        data={"plansteps": params.steps, "estimatedduration": params.estimatedduration},
        executiontime=time.time() - start,
        warnings=warnings,
        metadata={"totalsteps": len(params.steps)}
    )

TOOLS = {
    "sqlengine": sqlenginetool,
    "datatransformer": datatransformertool,
    "apiorchestrator": apiorchestratortool,
    "codegenerator": codegeneratortool,
    "planner": plannertool
}

Building the Intelligent Agent with Dynamic Tool Routing and Error Handling

Our agent class encapsulates the model loading, query analysis, tool routing, and execution with robust error recovery. It dynamically selects the best tool based on query complexity, executes the corresponding function, and retries on failure up to a configurable limit. This design ensures resilience and adaptability in handling diverse, complex tasks.

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from datetime import datetime
from typing import Optional, Dict, Any

class AdvancedToolAgent:
    """An intelligent agent capable of multi-step reasoning, tool selection, and error recovery."""

    def init(self, modelname: str = "HuggingFaceH4/zephyr-7b-beta"):
        import torch
        print(f"🤖 Loading model: {modelname}")
        modelkwargs = {"devicemap": "auto"}
        if torch.cuda.isavailable():
            print("💪 GPU detected - enabling 8-bit quantization")
            from transformers import BitsAndBytesConfig
            quantconfig = BitsAndBytesConfig(loadin8bit=True, llmint8threshold=6.0)
            modelkwargs["quantizationconfig"] = quantconfig
        else:
            print("🖥️ CPU mode - switching to smaller model for efficiency")
            modelname = "google/flan-t5-base"
            modelkwargs["torchdtype"] = "auto"
        self.tokenizer = AutoTokenizer.frompretrained(modelname)
        self.model = AutoModelForCausalLM.frompretrained(modelname, *modelkwargs)
        self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer,
                             maxnewtokens=768, temperature=0.7, dosample=True)
        self.client = instructor.frompipe(self.pipe)
        self.executionhistory = []
        print("✅ Agent successfully initialized!")

    def routetotool(self, userquery: str, context: Optional[str] = None) -> ToolCall:
        tooldescriptions = """
Available Tools:
  • sqlengine: Execute advanced SQL queries with joins and aggregations
  • data
transformer: Build multi-step data pipelines including filtering and normalization
  • apiorchestrator: Manage multiple API calls with retries and error handling
  • codegenerator: Produce safe, validated code snippets with test cases
  • planner: Develop multi-step execution plans with dependency tracking
  • none: Provide direct answers through reasoning
  • """ prompt = f"""{tooldescriptions} User Query: {userquery} {f'Context: {context}' if context else ''} Evaluate the query complexity and select the most suitable tool. For tasks requiring multiple steps, utilize the planner.""" return self.client(prompt, responsemodel=ToolCall) def executewithrecovery(self, toolcall: ToolCall, maxretries: int = 2) -> ExecutionResult: for attempt in range(maxretries + 1): try: if toolcall.toolname == "none": return ExecutionResult(success=True, data="Direct response generated", executiontime=0.0, warnings=[], metadata={}) toolfunction = TOOLS.get(toolcall.toolname) if not toolfunction: return ExecutionResult(success=False, data=None, executiontime=0.0, warnings=[f"Tool '{toolcall.toolname}' not found"], metadata={}) result = toolfunction(toolcall.toolinput) self.executionhistory.append({ "tool": toolcall.toolname, "success": result.success, "timestamp": datetime.now().isoformat() }) return result except Exception as error: if attempt retries: print(f"⚠️ Attempt {attempt + 1} failed, retrying...") continue return ExecutionResult(success=False, data=None, executiontime=0.0, warnings=[f"Failed after {maxretries + 1} attempts: {str(error)}"], metadata={"error": str(error)})

    Executing Complex Queries: Demonstration and Results

    The run method orchestrates the entire process: analyzing the query, selecting the tool, executing with error handling, and reporting detailed results. The main function showcases the agent tackling a series of challenging queries, illustrating its ability to plan, reason, and execute with precision.

    def run(self, userquery: str, verbose: bool = True) -> Dict[str, Any]:
        if verbose:
            print(f"n{'='70}")
            print(f"🎯 Processing Query: {userquery}")
            print(f"{'='70}")
            print("n🧠 Step 1: Analyzing query and selecting tool...")
        toolcall = self.routetotool(userquery)
        if verbose:
            print(f"   → Selected Tool: {toolcall.toolname}")
            print(f"   → Confidence: {toolcall.confidence:.2%}")
            print(f"   → Reasoning: {toolcall.reasoning}")
            if toolcall.requireshumanapproval:
                print("   ⚠️ Human approval required!")
        if verbose:
            print("n⚙️ Step 2: Executing tool with error recovery...")
        result = self.executewithrecovery(toolcall)
        if verbose:
            print(f"   → Success: {result.success}")
            print(f"   → Execution Time: {result.executiontime:.3f} seconds")
            if result.warnings:
                print(f"   → Warnings: {', '.join(result.warnings)}")
            print(f"   → Data Preview: {str(result.data)[:200]}...")
        if verbose and result.metadata:
            print("n📊 Metadata:")
            for key, value in result.metadata.items():
                print(f"   • {key}: {value}")
        if verbose:
            print(f"n{'='70}n")
        return {
            "query": userquery,
            "toolused": toolcall.toolname,
            "result": result,
            "historylength": len(self.executionhistory)
        }
    
    def main():
        agent = AdvancedToolAgent()
        complexqueries = [
            "Construct a SQL query to retrieve all users from the USA with completed orders exceeding $150, including order details.",
            "Design a data pipeline that filters records where category='A', aggregates by count, and normalizes results by a factor of 100.",
            "Execute three API calls sequentially: authenticate at /auth, fetch user data at /users/{id}, and update preferences at /preferences, retrying failed steps up to 3 times.",
            "Generate a Python function to validate email addresses using regex, with error handling and at least two test cases, avoiding unsafe operations.",
            "Formulate a multi-step plan to extract data from a database, transform it using pandas, generate a report, and send it via email, showing dependencies."
        ]
        print("n" + "🔥 COMPLEX QUERY EXECUTION DEMO ".center(70, "=") + "n")
        for idx, query in enumerate(complexqueries, 1):
            print(f"n{'#'70}")
            print(f"# TASK {idx} of {len(complexqueries)}")
            print(f"{'#'70}")
            try:
                agent.run(query, verbose=True)
            except Exception as e:
                print(f"❌ Critical error encountered: {e}n")
        successrate = sum(1 for h in agent.executionhistory if h['success']) / len(agent.executionhistory) * 100 if agent.executionhistory else 0
        print(f"n✅ COMPLETED {len(agent.executionhistory)} TOOL EXECUTIONS ".center(70, "="))
        print(f"📈 Success Rate: {success_rate:.1f}%n")
    
    if name == "main":
        main()

    Summary

    We have successfully engineered a versatile offline agent capable of interpreting complex instructions, intelligently routing tasks across multiple specialized tools, and recovering from errors with resilience. Through modular schema definitions, validated tool calls, and layered execution logic, this architecture demonstrates how to build reliable AI agents that perform sophisticated reasoning and planning in real-world scenarios.

    Exit mobile version