preloader
blog post

The Missing Primitives for Trustworthy AI Agents

This is another installment of our ongoing series on building trustworthy AI Agents:

Verifiable Audit Logs (Part 5)

By this point in the series we know who an agent is, what it is allowed to do, and how to enforce guardrails at runtime. The next primitive is about proving what happened.

Modern AI agents act autonomously. They read data, write files, call APIs, and coordinate workflows at machine speed. A traditional log file is not enough. A compromised host can delete or alter log entries. A developer can accidentally drop fields. An attacker can erase their tracks.

A trustworthy system requires tamper proof, independently verifiable audit logs. Without this primitive you cannot meet serious compliance requirements or perform accurate forensic analysis.

Why Audit Logging Is Different For Agents

Agent actions are often:

  • autonomous
  • replayed as chains across multiple services
  • operating on sensitive data
  • governed by complex policies

During an incident you need a reliable answer to questions such as:

  • Which agent took this action
  • Which identity it used
  • What data it accessed or moved
  • What policy allowed the action
  • Whether the audit trail itself is trustworthy

Plain text logs cannot provide these guarantees. A verifiable audit log can.

Properties Of A Verifiable Audit Log

A production-ready audit log for AI agents should be:

  1. Append only. New entries can be appended but never removed or changed.
  2. Tamper evident. Altering any past entry must be detectable.
  3. Bound to identity. Entries must be cryptographically tied to the agent that produced them.
  4. Time ordered. Entries form a sequential chain suitable for replay.
  5. Independently verifiable. An external auditor should be able to check integrity without trusting the runtime.

This does not require a blockchain. Two primitives deliver most of what we need:

  • hash chains or Merkle trees
  • digital signatures

We start with hash chaining, then add Merkle trees, then signatures, then anchoring.

Hash Chained Log In Python

A hash chain links each entry to the previous one. A single modification breaks the entire chain.

import hashlib
import json
import time
from dataclasses import dataclass
from typing import List

@dataclass
class AuditEntry:
    index: int
    timestamp: float
    agent_id: str
    action: str
    payload: dict
    prev_hash: str
    hash: str

def compute_hash(data: dict) -> str:
    encoded = json.dumps(data, sort_keys=True).encode("utf-8")
    return hashlib.sha256(encoded).hexdigest()

class HashChainedLog:
    def __init__(self):
        self.entries: List[AuditEntry] = []

    def append(self, agent_id: str, action: str, payload: dict) -> AuditEntry:
        index = len(self.entries)
        ts = time.time()
        prev_hash = self.entries[-1].hash if self.entries else "GENESIS"

        body = {
            "index": index,
            "timestamp": ts,
            "agent_id": agent_id,
            "action": action,
            "payload": payload,
            "prev_hash": prev_hash,
        }

        entry_hash = compute_hash(body)
        entry = AuditEntry(hash=entry_hash, body)
        self.entries.append(entry)
        return entry

    def verify_chain(self) -> bool:
        prev_hash = "GENESIS"
        for i, entry in enumerate(self.entries):
            body = {
                "index": entry.index,
                "timestamp": entry.timestamp,
                "agent_id": entry.agent_id,
                "action": entry.action,
                "payload": entry.payload,
                "prev_hash": prev_hash,
            }
            if compute_hash(body) != entry.hash:
                print(f"Integrity failure at index {i}")
                return False
            prev_hash = entry.hash
        return True

Example

log = HashChainedLog()

log.append("spiffe://trust.local/agent/data-query", "db.query", {"dataset": "customers"})
log.append("spiffe://trust.local/agent/exporter", "s3.put_object", {"bucket": "exports"})

print("Valid:", log.verify_chain())  # True
log.entries[0].payload["dataset"] = "tampered"
print("Valid after tampering:", log.verify_chain())  # False

Merkle Tree Variant

Hash chains prove full log integrity. Merkle trees also allow efficient inclusion proofs.

A Merkle tree aggregates many entries into a single root hash. An auditor can verify that a specific entry was included by checking only a small set of sibling hashes.

This enables:

  • efficient proofs for large logs
  • multi tenant transparency logs
  • external publication of only the Merkle root

Merkle Tree Construction

from typing import List

def merkle_leaf_hash(entry_hash: str) -> str:
    return compute_hash({"leaf": entry_hash})

def merkle_parent_hash(left: str, right: str) -> str:
    return compute_hash({"left": left, "right": right})

def build_merkle_tree(entry_hashes: List[str]) -> List[List[str]]:
    if not entry_hashes:
        return [["EMPTY"]]

    level = [merkle_leaf_hash(h) for h in entry_hashes]
    tree = [level]

    while len(level) > 1:
        if len(level) % 2 == 1:
            level.append(level[-1])

        next_level = []
        for i in range(0, len(level), 2):
            parent = merkle_parent_hash(level[i], level[i + 1])
            next_level.append(parent)

        tree.append(next_level)
        level = next_level

    return tree

def merkle_root(entry_hashes: List[str]) -> str:
    return build_merkle_tree(entry_hashes)[-1][0]

Merkle Proofs (Conceptual)

After the tree is built, you can generate a Merkle proof for any entry. A proof is a list of sibling hashes up the tree that allows an auditor to recompute the root.

You provide the auditor with:

  • the log entry
  • the entry hash
  • the Merkle proof (sibling hashes)
  • the published Merkle root

The auditor recomputes the root. If it matches, the entry is proven to be part of the log.

This enables verification without revealing any other entries.

Digital Signatures

Hashing protects integrity, but we must also prove who produced each entry.

Here we use Ed25519 as an example. In a real deployment, the agent would obtain its signing key from SPIFFE Workload API, exactly as described in Part 3. The agent signs each entry with its private key. The public key is recorded in the log alongside the SPIFFE ID.

Signing helper

from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64

class SigningKey:
    def __init__(self):
        self._private_key = ed25519.Ed25519PrivateKey.generate()
        self.public_key = self._private_key.public_key()

    def sign(self, message: bytes) -> bytes:
        return self._private_key.sign(message)

    def public_bytes(self) -> bytes:
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw,
        )

def verify_signature(public_key_bytes: bytes, message: bytes, signature: bytes) -> bool:
    try:
        key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
        key.verify(signature, message)
        return True
    except InvalidSignature:
        return False

Signed Audit Entry

from dataclasses import dataclass

@dataclass
class SignedAuditEntry:
    index: int
    timestamp: float
    agent_id: str
    action: str
    payload: dict
    prev_hash: str
    hash: str
    public_key: str
    signature: str

Creating a signed entry

def create_signed_entry(
    index: int,
    timestamp: float,
    agent_id: str,
    action: str,
    payload: dict,
    prev_hash: str,
    signing_key: SigningKey,
) -> SignedAuditEntry:

    body = {
        "index": index,
        "timestamp": timestamp,
        "agent_id": agent_id,
        "action": action,
        "payload": payload,
        "prev_hash": prev_hash,
    }

    entry_hash = compute_hash(body)
    message = entry_hash.encode("utf-8")
    signature = signing_key.sign(message)

    return SignedAuditEntry(
        index=index,
        timestamp=timestamp,
        agent_id=agent_id,
        action=action,
        payload=payload,
        prev_hash=prev_hash,
        hash=entry_hash,
        public_key=base64.b64encode(signing_key.public_bytes()).decode("ascii"),
        signature=base64.b64encode(signature).decode("ascii"),
    )

Verification

def verify_signed_chain(entries: List[SignedAuditEntry]) -> bool:
    prev_hash = "GENESIS"

    for entry in entries:
        body = {
            "index": entry.index,
            "timestamp": entry.timestamp,
            "agent_id": entry.agent_id,
            "action": entry.action,
            "payload": entry.payload,
            "prev_hash": prev_hash,
        }

        expected_hash = compute_hash(body)
        if expected_hash != entry.hash:
            print("Hash mismatch at", entry.index)
            return False

        message = entry.hash.encode("utf-8")
        pub = base64.b64decode(entry.public_key)
        sig = base64.b64decode(entry.signature)

        if not verify_signature(pub, message, sig):
            print("Signature invalid at", entry.index)
            return False

        prev_hash = entry.hash

    return True

Now entries are integrity protected and identity bound.

Sidecar Style Audit Emission

Agents should not write directly to their audit logs. A compromised agent could delete its own incriminating events. The correct approach is to send events to an isolated audit sidecar.

Agent Emitting Events

import json
import requests
import time

AUDIT_URL = "http://localhost:8089/audit"

def emit_audit_event(agent_id: str, action: str, payload: dict):
    event = {
        "timestamp": time.time(),
        "agent_id": agent_id,
        "action": action,
        "payload": payload,
    }
    response = requests.post(AUDIT_URL, json=event)
    if response.status_code != 200:
        raise RuntimeError("Audit emission failed: " + response.text)

Audit Sidecar Receiver

Below is a complete and runnable minimal example, with the corrected HashChainedLog.append:

# sidecar.py

import hashlib
import json
import time
from dataclasses import dataclass
from typing import List

from flask import Flask, request, jsonify

@dataclass
class AuditEntry:
    index: int
    timestamp: float
    agent_id: str
    action: str
    payload: dict
    prev_hash: str
    hash: str

def compute_hash(data: dict) -> str:
    encoded = json.dumps(data, sort_keys=True).encode("utf-8")
    return hashlib.sha256(encoded).hexdigest()

class HashChainedLog:
    def __init__(self):
        self.entries: List[AuditEntry] = []

    def append(self, agent_id: str, action: str, payload: dict) -> AuditEntry:
        index = len(self.entries)
        ts = time.time()
        prev_hash = self.entries[-1].hash if self.entries else "GENESIS"

        body = {
            "index": index,
            "timestamp": ts,
            "agent_id": agent_id,
            "action": action,
            "payload": payload,
            "prev_hash": prev_hash,
        }

        entry_hash = compute_hash(body)
        entry = AuditEntry(hash=entry_hash, body)
        self.entries.append(entry)
        return entry

    def verify_chain(self) -> bool:
        prev_hash = "GENESIS"
        for entry in self.entries:
            body = {
                "index": entry.index,
                "timestamp": entry.timestamp,
                "agent_id": entry.agent_id,
                "action": entry.action,
                "payload": entry.payload,
                "prev_hash": prev_hash,
            }
            if compute_hash(body) != entry.hash:
                return False
            prev_hash = entry.hash
        return True

app = Flask(__name__)
log = HashChainedLog()

@app.post("/audit")
def receive():
    event = request.json

    entry = log.append(
        agent_id=event["agent_id"],
        action=event["action"],
        payload=event.get("payload", {}),
    )

    return jsonify({
        "status": "ok",
        "index": entry.index,
        "hash": entry.hash,
        "timestamp": entry.timestamp,
    })

@app.get("/verify")
def verify():
    return jsonify({"valid": log.verify_chain(), "entries": len(log.entries)})

if __name__ == "__main__":
    app.run(port=8089)

This is the simplest working prototype.

Anchoring The Log Head (AWS S3 and DynamoDB)

Hash chains and signatures protect integrity inside the sidecar. Anchoring protects against the sidecar itself being compromised. The idea is simple:

  1. Periodically compute the head hash or Merkle root.
  2. Sign it with a trusted key.
  3. Write it to an immutable external system.

AWS S3 Anchoring with Object Lock

Amazon S3 supports Object Lock in Compliance mode. When enabled:

  • objects become WORM (write once read many)
  • no one, not even the root user, can delete or alter them until the retention period expires

This is ideal for anchoring log heads.

import boto3
import json
import time
import base64

s3 = boto3.client("s3")

def anchor_head_to_s3(bucket: str, key_prefix: str, head_hash: str, signing_key: SigningKey):
    timestamp = int(time.time())

    record = {
        "head_hash": head_hash,
        "timestamp": timestamp,
    }
    payload = json.dumps(record, sort_keys=True).encode("utf-8")
    signature = signing_key.sign(payload)

    obj = {
        "record": record,
        "signature": base64.b64encode(signature).decode("ascii"),
        "public_key": base64.b64encode(signing_key.public_bytes()).decode("ascii"),
    }

    key = f"{key_prefix}/{timestamp}.json"

    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=json.dumps(obj).encode("utf-8"),
        ContentType="application/json",
        ObjectLockMode="COMPLIANCE",
        ObjectLockRetainUntilDate="2026-01-01T00:00:00Z",
    )

    return key

DynamoDB Anchoring with Streams

DynamoDB writes every change into an immutable, 24-hour stream. A verifier can watch this stream independently.

You write the head hash into a table:

import boto3
ddb = boto3.client("dynamodb")

def anchor_to_ddb(table: str, head_hash: str, signing_key: SigningKey):
    payload = head_hash.encode("utf-8")
    signature = signing_key.sign(payload)

    ddb.put_item(
        TableName=table,
        Item={
            "head_hash": {"S": head_hash},
            "signature": {"B": signature},
            "public_key": {"B": signing_key.public_bytes()},
            "ts": {"N": str(time.time())},
        }
    )

An external verifier can read the DynamoDB Stream to reconstruct a historical record of anchoring events.

Security teams often anchor the head hash in a separate AWS account with restrictive IAM policies. This protects the audit trail from a compromised runtime account.

Why This Matters

With verifiable audit logs you can:

  • reconstruct incidents with confidence
  • detect attempts to rewrite history
  • prove which agent took which action
  • provide auditors with cryptographically sound evidence

Combined with earlier primitives:

  1. Encryption protects what agents say.
  2. Injection protection guards how they think.
  3. Identity and attestation prove who they are.
  4. Policy-as-code controls what they are allowed to do.
  5. Verifiable audit logs record what they actually did in a way that can be proven later.

This closes a major trust gap in production agent systems.

Practical Next Steps

  • Implement this pattern for one internal workflow.
  • Introduce SPIFFE identity based signing keys.
  • Start anchoring Merkle roots in S3 Object Lock or DynamoDB Streams.
  • Consider Merkle proofs and transparency logs for multi tenant systems.

Part 6 will cover kill switches and circuit breakers for agentic workloads, which depend directly on having reliable, verifiable telemetry.

Built for Cloud. Ready for AI.

Accelerate your cloud, data, and AI initiatives with expert support built to scale and adapt.
Partner with us to design, automate, and manage systems that keep your business moving.

Unlock Your Potential