Home News A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent...

A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP)

0

In this tutorial, we implement the Agent Communication Protocol (ACP) through building a flexible, ACP-compliant messaging system in Python, leveraging Google’s Gemini API for natural language processing. Beginning with the installation and configuration of the google-generativeai library, the tutorial introduces core abstractions, message types, performatives, and the ACPMessage data class, which standardizes inter-agent communication. By defining ACPAgent and ACPMessageBroker classes, the guide demonstrates how to create, send, route, and process structured messages among multiple autonomous agents. Through clear code examples, users learn to implement querying, requesting actions, and broadcasting information, while maintaining conversation threads, acknowledgments, and error handling.

import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict


GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

We import essential Python modules, ranging from JSON handling and timing to unique identifier generation and type annotations, to support a structured ACP implementation. It then retrieves the user’s Gemini API key placeholder and configures the google-generativeai client for subsequent calls to the Gemini language model.

class ACPMessageType(Enum):
    """Standard ACP message types"""
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "query"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

The ACPMessageType enumeration defines the core message categories used in the Agent Communication Protocol, including requests, responses, informational broadcasts, queries, and control actions like subscription management, error signaling, and acknowledgments. By centralizing these message types, the protocol ensures consistent handling and routing of inter-agent communications throughout the system.

class ACPPerformative(Enum):
    """ACP speech acts (performatives)"""
    TELL = "tell"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "propose"
    ACCEPT = "accept"
    REJECT = "reject"

The ACPPerformative enumeration captures the variety of speech acts agents can use when interacting under the ACP framework, mapping high-level intentions, such as making requests, posing questions, giving commands, or negotiating agreements, onto standardized labels. This clear taxonomy enables agents to interpret and respond to messages in contextually appropriate ways, ensuring robust and semantically rich communication.

@dataclass
class ACPMessage:
    """Agent Communication Protocol Message Structure"""
    message_id: str
    sender: str
    receiver: str
    performative: str  
    content: Dict[str, Any]
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
   
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
   
    def to_acp_format(self) -> str:
        """Convert to standard ACP message format"""
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content": self.content,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
   
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        """Parse ACP message from string format"""
        data = json.loads(acp_string)
        return cls(
            message_id=data["message-id"],
            sender=data["sender"],
            receiver=data["receiver"],
            performative=data["performative"],
            content=data["content"],
            protocol=data.get("protocol", "ACP-1.0"),
            conversation_id=data.get("conversation-id"),
            reply_to=data.get("reply-to"),
            language=data.get("language", "english"),
            encoding=data.get("encoding", "json"),
            timestamp=data.get("timestamp", time.time())
        )

The ACPMessage data class encapsulates all the fields required for a structured ACP exchange, including identifiers, participants, performative, payload, and metadata such as protocol version, language, and timestamps. Its __post_init__ method auto-populates missing timestamp and conversation_id values, ensuring every message is uniquely tracked. Utility methods to_acp_format and from_acp_format handle serialization to and from the standardized JSON representation for seamless transmission and parsing.

class ACPAgent:
    """Agent implementing Agent Communication Protocol"""
   
    def __init__(self, agent_id: str, name: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities
        self.model = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: List[ACPMessage] = []
        self.subscriptions: Dict[str, List[str]] = {}  
        self.conversations: Dict[str, List[ACPMessage]] = {}
   
    def create_message(self, receiver: str, performative: str,
                      content: Dict[str, Any], conversation_id: str = None,
                      reply_to: str = None) -> ACPMessage:
        """Create a new ACP-compliant message"""
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content=content,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
   
    def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
        """Send an INFORM message (telling someone a fact)"""
        content = {"fact": fact, "data": data}
        return self.create_message(receiver, ACPPerformative.TELL.value, content)
   
    def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
        """Send a QUERY message (asking for information)"""
        content = {"question": question, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.value, content)
   
    def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
        """Send a REQUEST message (asking someone to perform an action)"""
        content = {"action": action, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
   
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        """Send a REPLY message in response to another message"""
        content = {"response": response_data, "original-question": original_msg.content}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.value,
            content,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
   
    def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
        """Process incoming ACP message and generate appropriate response"""
        self.message_queue.append(message)
       
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations[conv_id] = []
        self.conversations[conv_id].append(message)
       
        if message.performative == ACPPerformative.ASK.value:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.value:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.value:
            return self._handle_inform(message)
       
        return None
   
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming query messages"""
        question = message.content.get("question", "")
       
        prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
        try:
            response = self.model.generate_content(prompt)
            answer = response.text.strip()
        except:
            answer = "Unable to process query at this time"
       
        return self.send_reply(message, {"answer": answer, "confidence": 0.8})
   
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming action requests"""
        action = message.content.get("action", "")
        parameters = message.content.get("parameters", {})
       
        if any(capability in action.lower() for capability in self.capabilities):
            result = f"Executing {action} with parameters {parameters}"
            status = "agreed"
        else:
            result = f"Cannot perform {action} - not in my capabilities"
            status = "refused"
       
        return self.send_reply(message, {"status": status, "result": result})
   
    def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
        """Handle incoming information messages"""
        fact = message.content.get("fact", "")
        print(f"[{self.name}] Received information: {fact}")
       
        ack_content = {"status": "received", "fact": fact}
        return self.create_message(message.sender, "acknowledge", ack_content,
                                 conversation_id=message.conversation_id)

The ACPAgent class encapsulates an autonomous entity capable of sending, receiving, and processing ACP-compliant messages using Gemini’s language model. It manages its own message queue, conversation history, and subscriptions, and provides helper methods (send_inform, send_query, send_request, send_reply) to construct correctly formatted ACPMessage instances. Incoming messages are routed through process_message, which delegates to specialized handlers for queries, action requests, and informational messages.

class ACPMessageBroker:
    """Message broker implementing ACP routing and delivery"""
   
    def __init__(self):
        self.agents: Dict[str, ACPAgent] = {}
        self.message_log: List[ACPMessage] = []
        self.routing_table: Dict[str, str] = {}  
   
    def register_agent(self, agent: ACPAgent):
        """Register an agent with the message broker"""
        self.agents[agent.agent_id] = agent
        self.routing_table[agent.agent_id] = "local"
        print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
   
    def route_message(self, message: ACPMessage) -> bool:
        """Route ACP message to appropriate recipient"""
        if message.receiver not in self.agents:
            print(f"✗ Receiver {message.receiver} not found")
            return False
       
        print(f"n ACP MESSAGE ROUTING:")
        print(f"From: {message.sender} → To: {message.receiver}")
        print(f"Performative: {message.performative}")
        print(f"Content: {json.dumps(message.content, indent=2)}")
       
        receiver_agent = self.agents[message.receiver]
        response = receiver_agent.process_message(message)
       
        self.message_log.append(message)
       
        if response:
            print(f"n GENERATED RESPONSE:")
            print(f"From: {response.sender} → To: {response.receiver}")
            print(f"Content: {json.dumps(response.content, indent=2)}")
           
            if response.receiver in self.agents:
                self.agents[response.receiver].process_message(response)
                self.message_log.append(response)
       
        return True
   
    def broadcast_message(self, message: ACPMessage, recipients: List[str]):
        """Broadcast message to multiple recipients"""
        for recipient in recipients:
            msg_copy = ACPMessage(
                message_id=str(uuid.uuid4()),
                sender=message.sender,
                receiver=recipient,
                performative=message.performative,
                content=message.content.copy(),
                conversation_id=message.conversation_id
            )
            self.route_message(msg_copy)

The ACPMessageBroker serves as the central router for ACP messages, maintaining a registry of agents and a message log. It provides methods to register agents, deliver individual messages via route_message, which handles lookup, logging, and response chaining, and to send the same message to multiple recipients with broadcast_message.

def demonstrate_acp():
    """Comprehensive demonstration of Agent Communication Protocol"""
   
    print(" AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
    print("=" * 60)
   
    broker = ACPMessageBroker()
   
    researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])
    assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
    calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
   
    broker.register_agent(researcher)
    broker.register_agent(assistant)
    broker.register_agent(calculator)
   
    print(f"n REGISTERED AGENTS:")
    for agent_id, agent in broker.agents.items():
        print(f"  • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
   
    print(f"n SCENARIO 1: Information Query (ASK performative)")
    query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
    broker.route_message(query_msg)
   
    print(f"n SCENARIO 2: Action Request (REQUEST-ACTION performative)")
    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
    broker.route_message(calc_request)
   
    print(f"n SCENARIO 3: Information Sharing (TELL performative)")
    info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
    broker.route_message(info_msg)
   
    print(f"n PROTOCOL STATISTICS:")
    print(f"  • Total messages processed: {len(broker.message_log)}")
    print(f"  • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
    print(f"  • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
   
    print(f"n SAMPLE ACP MESSAGE FORMAT:")
    sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
    print(sample_msg.to_acp_format())

The demonstrate_acp function orchestrates a hands-on walkthrough of the entire ACP framework: it initializes a broker and three distinct agents (Researcher, AI Assistant, and MathBot), registers them, and illustrates three key interaction scenarios, querying for information, requesting a computation, and sharing an update. After routing each message and handling responses, it prints summary statistics on the message flow. It showcases a formatted ACP message, providing users with a clear, end-to-end example of how agents communicate under the protocol.

def setup_guide():
    print("""
     GOOGLE COLAB SETUP GUIDE:
   
    1. Get Gemini API Key: https://makersuite.google.com/app/apikey
    2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
    3. Run: demonstrate_acp()
   
     ACP PROTOCOL FEATURES:
   
    • Standardized message format with required fields
    • Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)
    • Conversation tracking and message threading
    • Error handling and acknowledgments
    • Message routing and delivery confirmation
   
     EXTEND THE PROTOCOL:
    ```python
    # Create custom agent
    my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
    broker.register_agent(my_agent)
   
    # Send custom message
    msg = my_agent.send_query("agent-001", "Your question here")
    broker.route_message(msg)
    ```
    """)


if __name__ == "__main__":
    setup_guide()
    demonstrate_acp() 

Finally, the setup_guide function provides a quick-start reference for running the ACP demo in Google Colab, outlining how to obtain and configure your Gemini API key and invoke the demonstrate_acp routine. It also summarizes key protocol features, such as standardized message formats, performatives, and message routing. It provides a concise code snippet illustrating how to register custom agents and send tailored messages.

In conclusion, this tutorial implements ACP-based multi-agent systems capable of research, computation, and collaboration tasks. The provided sample scenarios illustrate common use cases, information queries, computational requests, and fact sharing, while the broker ensures reliable message delivery and logging. Readers are encouraged to extend the framework by adding new agent capabilities, integrating domain-specific actions, or incorporating more sophisticated subscription and notification mechanisms.


Download the . All credit for this research goes to the researchers of this project. Also, feel free to follow us on  and don’t forget to join our  and Subscribe to .

NO COMMENTS

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Exit mobile version