A Coding Guide to Build an AI-Powered Cryptographic Agent System with Hybrid Encryption, Digital Signatures, and Adaptive Security Intelligence

In this tutorial, we build an AI-powered cryptographic agent system that combines the strength of classical encryption with adaptive intelligence. We design agents capable of performing hybrid encryption with RSA and AES, generating digital signatures, detecting anomalies in message patterns, and intelligently recommending key rotations. As we progress, we witness these autonomous agents securely establish communication channels, exchange encrypted data, and continuously assess security risks in real time, all within a compact, efficient implementation. Check out the .

import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend


@dataclass
class SecurityEvent:
   timestamp: float
   event_type: str
   risk_score: float
   details: Dict

We begin by importing all necessary libraries for cryptography, AI-based analysis, and data handling. We also define a SecurityEvent dataclass to record and analyze all important events in the cryptographic system. Check out the .

class CryptoAgent:
   def __init__(self, agent_id: str):
       self.agent_id = agent_id
       self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
       self.public_key = self.private_key.public_key()
       self.session_keys = {}
       self.security_events = []
       self.encryption_count = 0
       self.key_rotation_threshold = 100


   def get_public_key_bytes(self) -> bytes:
       return self.public_key.public_bytes(
           encoding=serialization.Encoding.PEM,
           format=serialization.PublicFormat.SubjectPublicKeyInfo
       )


   def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
       session_key = secrets.token_bytes(32)
       self.session_keys[partner_id] = session_key
       partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
       encrypted_session_key = partner_public_key.encrypt(
           session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
       return encrypted_session_key


   def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
       session_key = self.private_key.decrypt(
           encrypted_session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.session_keys[partner_id] = session_key

We define the CryptoAgent class and initialize its keys, session storage, and security tracking system. We then implement methods that let agents generate and exchange RSA public keys and establish secure hybrid-encrypted sessions. Check out the .

 def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
       if partner_id not in self.session_keys:
           raise ValueError(f"No session established with {partner_id}")
       self.encryption_count += 1
       if self.encryption_count >= self.key_rotation_threshold:
           self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
       iv = secrets.token_bytes(12)
       cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
       encryptor = cipher.encryptor()
       ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
       message_data = iv + ciphertext + encryptor.tag
       signature = self.sign_data(message_data)
       risk_score = self.analyze_encryption_pattern(len(plaintext))
       return {
           "sender": self.agent_id,
           "recipient": partner_id,
           "iv": iv.hex(),
           "ciphertext": ciphertext.hex(),
           "tag": encryptor.tag.hex(),
           "signature": signature.hex(),
           "timestamp": time.time(),
           "risk_score": risk_score
       }


   def decrypt_message(self, encrypted_msg: Dict) -> str:
       sender_id = encrypted_msg["sender"]
       if sender_id not in self.session_keys:
           raise ValueError(f"No session established with {sender_id}")
       iv = bytes.fromhex(encrypted_msg["iv"])
       ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
       tag = bytes.fromhex(encrypted_msg["tag"])
       cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
       decryptor = cipher.decryptor()
       plaintext = decryptor.update(ciphertext) + decryptor.finalize()
       if encrypted_msg.get("risk_score", 0) > 0.7:
           self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
       return plaintext.decode()

We implement the encryption and decryption logic that allows agents to communicate securely using AES-GCM. Each message is encrypted, signed, risk-scored, and then safely decrypted by the recipient, preserving authenticity and confidentiality. Check out the .

def sign_data(self, data: bytes) -> bytes:
       return self.private_key.sign(
           data,
           padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
           hashes.SHA256()
       )


   def analyze_encryption_pattern(self, message_length: int) -> float:
       recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
       avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
       risk_score = 0.1
       if message_length > 10000:
           risk_score += 0.3
       if self.encryption_count % 50 == 0 and self.encryption_count > 0:
           risk_score += 0.2
       risk_score = (risk_score + avg_risk) / 2
       self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
       return min(risk_score, 1.0)


   def log_security_event(self, event_type: str, risk_score: float, details: Dict):
       event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
       self.security_events.append(event)


   def generate_security_report(self) -> Dict:
       if not self.security_events:
           return {"status": "No events recorded"}
       total_events = len(self.security_events)
       high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
       avg_risk = np.mean([e.risk_score for e in self.security_events])
       event_types = {}
       for event in self.security_events:
           event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
       return {
           "agent_id": self.agent_id,
           "total_events": total_events,
           "high_risk_events": len(high_risk_events),
           "average_risk_score": round(avg_risk, 3),
           "encryption_count": self.encryption_count,
           "key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
           "event_breakdown": event_types,
           "security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
       }

We add advanced AI-driven components that analyze encryption behavior, detect anomalies, and log risk events. This snippet gives our agent adaptive intelligence, the ability to identify unusual patterns, and the ability to generate a complete security report. Check out the .

def demo_crypto_agent_system():
   print("🔐 Advanced Cryptographic Agent System Demon")
   print("=" * 60)
   alice = CryptoAgent("Alice")
   bob = CryptoAgent("Bob")
   print("n1. Agents Created")
   print(f"   Alice ID: {alice.agent_id}")
   print(f"   Bob ID: {bob.agent_id}")
   print("n2. Establishing Secure Session (Hybrid Encryption)")
   alice_public_key = alice.get_public_key_bytes()
   bob_public_key = bob.get_public_key_bytes()
   encrypted_session_key = alice.establish_session("Bob", bob_public_key)
   bob.receive_session_key("Alice", encrypted_session_key)
   print(f"   ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
   print("n3. Encrypting and Transmitting Messages")
   messages = [
       "Hello Bob! This is a secure message.",
       "The launch codes are: Alpha-7-Charlie-9",
       "Meeting at 3 PM tomorrow.",
       "This is a very long message " * 100
   ]
   for i, msg in enumerate(messages, 1):
       encrypted = alice.encrypt_message("Bob", msg)
       print(f"n   Message {i}:")
       print(f"   - Plaintext length: {len(msg)} chars")
       print(f"   - Ciphertext: {encrypted['ciphertext'][:60]}...")
       print(f"   - Risk Score: {encrypted['risk_score']:.3f}")
       print(f"   - Signature: {encrypted['signature'][:40]}...")
       decrypted = bob.decrypt_message(encrypted)
       print(f"   - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
       print(f"   - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
   print("n4. AI-Powered Security Analysis")
   print("n   Alice's Security Report:")
   alice_report = alice.generate_security_report()
   for k, v in alice_report.items(): print(f"   - {k}: {v}")
   print("n   Bob's Security Report:")
   bob_report = bob.generate_security_report()
   for k, v in bob_report.items(): print(f"   - {k}: {v}")
   print("n" + "=" * 60)
   print("Demo Complete! Key Features Demonstrated:")
   print("✓ Hybrid encryption (RSA + AES-GCM)")
   print("✓ Digital signatures for authentication")
   print("✓ AI-powered anomaly detection")
   print("✓ Intelligent key rotation recommendations")
   print("✓ Real-time security monitoring")


if __name__ == "__main__":
   demo_crypto_agent_system()

We demonstrate the full cryptographic workflow in which two agents securely exchange messages, detect anomalies, and review detailed security reports. We conclude the demo with an overview of all the intelligent features the system performs.

In conclusion, we demonstrate how artificial intelligence can enhance traditional cryptography by introducing adaptability and context awareness. We not only encrypt and authenticate messages securely but also enable our agents to learn from communication behavior and dynamically adjust security measures. By the end, we see how combining AI-driven analytics with hybrid encryption empowers a new generation of intelligent, self-monitoring cryptographic systems.


Check out the . Feel free to check out our . Also, feel free to follow us on  and don’t forget to join our  and Subscribe to . Wait! are you on telegram? 

More from this stream

Recomended