The Missing Primitives for Trustworthy AI Agents
This is another installment of our ongoing series on building trustworthy AI Agents:
- Part 0 - Introduction
- Part 1 - End-to-End Encryption
- Part 2 - Prompt Injection Protection
- Part 3 - Agent Identity and Attestation
- Part 4 - Policy-as-Code Enforcement
- Part 5 - Verifiable Audit Logs
Verifiable Audit Logs (Part 5)
By this point in the series we know who an agent is, what it is allowed to do, and how to enforce guardrails at runtime. The next primitive is about proving what happened.
Modern AI agents act autonomously. They read data, write files, call APIs, and coordinate workflows at machine speed. A traditional log file is not enough. A compromised host can delete or alter log entries. A developer can accidentally drop fields. An attacker can erase their tracks.
A trustworthy system requires tamper proof, independently verifiable audit logs. Without this primitive you cannot meet serious compliance requirements or perform accurate forensic analysis.
Why Audit Logging Is Different For Agents
Agent actions are often:
- autonomous
- replayed as chains across multiple services
- operating on sensitive data
- governed by complex policies
During an incident you need a reliable answer to questions such as:
- Which agent took this action
- Which identity it used
- What data it accessed or moved
- What policy allowed the action
- Whether the audit trail itself is trustworthy
Plain text logs cannot provide these guarantees. A verifiable audit log can.
Properties Of A Verifiable Audit Log
A production-ready audit log for AI agents should be:
- Append only. New entries can be appended but never removed or changed.
- Tamper evident. Altering any past entry must be detectable.
- Bound to identity. Entries must be cryptographically tied to the agent that produced them.
- Time ordered. Entries form a sequential chain suitable for replay.
- Independently verifiable. An external auditor should be able to check integrity without trusting the runtime.
This does not require a blockchain. Two primitives deliver most of what we need:
- hash chains or Merkle trees
- digital signatures
We start with hash chaining, then add Merkle trees, then signatures, then anchoring.
Hash Chained Log In Python
A hash chain links each entry to the previous one. A single modification breaks the entire chain.
import hashlib
import json
import time
from dataclasses import dataclass
from typing import List
@dataclass
class AuditEntry:
index: int
timestamp: float
agent_id: str
action: str
payload: dict
prev_hash: str
hash: str
def compute_hash(data: dict) -> str:
encoded = json.dumps(data, sort_keys=True).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
class HashChainedLog:
def __init__(self):
self.entries: List[AuditEntry] = []
def append(self, agent_id: str, action: str, payload: dict) -> AuditEntry:
index = len(self.entries)
ts = time.time()
prev_hash = self.entries[-1].hash if self.entries else "GENESIS"
body = {
"index": index,
"timestamp": ts,
"agent_id": agent_id,
"action": action,
"payload": payload,
"prev_hash": prev_hash,
}
entry_hash = compute_hash(body)
entry = AuditEntry(hash=entry_hash, body)
self.entries.append(entry)
return entry
def verify_chain(self) -> bool:
prev_hash = "GENESIS"
for i, entry in enumerate(self.entries):
body = {
"index": entry.index,
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"action": entry.action,
"payload": entry.payload,
"prev_hash": prev_hash,
}
if compute_hash(body) != entry.hash:
print(f"Integrity failure at index {i}")
return False
prev_hash = entry.hash
return True
Example
log = HashChainedLog()
log.append("spiffe://trust.local/agent/data-query", "db.query", {"dataset": "customers"})
log.append("spiffe://trust.local/agent/exporter", "s3.put_object", {"bucket": "exports"})
print("Valid:", log.verify_chain()) # True
log.entries[0].payload["dataset"] = "tampered"
print("Valid after tampering:", log.verify_chain()) # False
Merkle Tree Variant
Hash chains prove full log integrity. Merkle trees also allow efficient inclusion proofs.
A Merkle tree aggregates many entries into a single root hash. An auditor can verify that a specific entry was included by checking only a small set of sibling hashes.
This enables:
- efficient proofs for large logs
- multi tenant transparency logs
- external publication of only the Merkle root
Merkle Tree Construction
from typing import List
def merkle_leaf_hash(entry_hash: str) -> str:
return compute_hash({"leaf": entry_hash})
def merkle_parent_hash(left: str, right: str) -> str:
return compute_hash({"left": left, "right": right})
def build_merkle_tree(entry_hashes: List[str]) -> List[List[str]]:
if not entry_hashes:
return [["EMPTY"]]
level = [merkle_leaf_hash(h) for h in entry_hashes]
tree = [level]
while len(level) > 1:
if len(level) % 2 == 1:
level.append(level[-1])
next_level = []
for i in range(0, len(level), 2):
parent = merkle_parent_hash(level[i], level[i + 1])
next_level.append(parent)
tree.append(next_level)
level = next_level
return tree
def merkle_root(entry_hashes: List[str]) -> str:
return build_merkle_tree(entry_hashes)[-1][0]
Merkle Proofs (Conceptual)
After the tree is built, you can generate a Merkle proof for any entry. A proof is a list of sibling hashes up the tree that allows an auditor to recompute the root.
You provide the auditor with:
- the log entry
- the entry hash
- the Merkle proof (sibling hashes)
- the published Merkle root
The auditor recomputes the root. If it matches, the entry is proven to be part of the log.
This enables verification without revealing any other entries.
Digital Signatures
Hashing protects integrity, but we must also prove who produced each entry.
Here we use Ed25519 as an example. In a real deployment, the agent would obtain its signing key from SPIFFE Workload API, exactly as described in Part 3. The agent signs each entry with its private key. The public key is recorded in the log alongside the SPIFFE ID.
Signing helper
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
import base64
class SigningKey:
def __init__(self):
self._private_key = ed25519.Ed25519PrivateKey.generate()
self.public_key = self._private_key.public_key()
def sign(self, message: bytes) -> bytes:
return self._private_key.sign(message)
def public_bytes(self) -> bytes:
return self.public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
def verify_signature(public_key_bytes: bytes, message: bytes, signature: bytes) -> bool:
try:
key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
key.verify(signature, message)
return True
except InvalidSignature:
return False
Signed Audit Entry
from dataclasses import dataclass
@dataclass
class SignedAuditEntry:
index: int
timestamp: float
agent_id: str
action: str
payload: dict
prev_hash: str
hash: str
public_key: str
signature: str
Creating a signed entry
def create_signed_entry(
index: int,
timestamp: float,
agent_id: str,
action: str,
payload: dict,
prev_hash: str,
signing_key: SigningKey,
) -> SignedAuditEntry:
body = {
"index": index,
"timestamp": timestamp,
"agent_id": agent_id,
"action": action,
"payload": payload,
"prev_hash": prev_hash,
}
entry_hash = compute_hash(body)
message = entry_hash.encode("utf-8")
signature = signing_key.sign(message)
return SignedAuditEntry(
index=index,
timestamp=timestamp,
agent_id=agent_id,
action=action,
payload=payload,
prev_hash=prev_hash,
hash=entry_hash,
public_key=base64.b64encode(signing_key.public_bytes()).decode("ascii"),
signature=base64.b64encode(signature).decode("ascii"),
)
Verification
def verify_signed_chain(entries: List[SignedAuditEntry]) -> bool:
prev_hash = "GENESIS"
for entry in entries:
body = {
"index": entry.index,
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"action": entry.action,
"payload": entry.payload,
"prev_hash": prev_hash,
}
expected_hash = compute_hash(body)
if expected_hash != entry.hash:
print("Hash mismatch at", entry.index)
return False
message = entry.hash.encode("utf-8")
pub = base64.b64decode(entry.public_key)
sig = base64.b64decode(entry.signature)
if not verify_signature(pub, message, sig):
print("Signature invalid at", entry.index)
return False
prev_hash = entry.hash
return True
Now entries are integrity protected and identity bound.
Sidecar Style Audit Emission
Agents should not write directly to their audit logs. A compromised agent could delete its own incriminating events. The correct approach is to send events to an isolated audit sidecar.
Agent Emitting Events
import json
import requests
import time
AUDIT_URL = "http://localhost:8089/audit"
def emit_audit_event(agent_id: str, action: str, payload: dict):
event = {
"timestamp": time.time(),
"agent_id": agent_id,
"action": action,
"payload": payload,
}
response = requests.post(AUDIT_URL, json=event)
if response.status_code != 200:
raise RuntimeError("Audit emission failed: " + response.text)
Audit Sidecar Receiver
Below is a complete and runnable minimal example, with the corrected HashChainedLog.append:
# sidecar.py
import hashlib
import json
import time
from dataclasses import dataclass
from typing import List
from flask import Flask, request, jsonify
@dataclass
class AuditEntry:
index: int
timestamp: float
agent_id: str
action: str
payload: dict
prev_hash: str
hash: str
def compute_hash(data: dict) -> str:
encoded = json.dumps(data, sort_keys=True).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
class HashChainedLog:
def __init__(self):
self.entries: List[AuditEntry] = []
def append(self, agent_id: str, action: str, payload: dict) -> AuditEntry:
index = len(self.entries)
ts = time.time()
prev_hash = self.entries[-1].hash if self.entries else "GENESIS"
body = {
"index": index,
"timestamp": ts,
"agent_id": agent_id,
"action": action,
"payload": payload,
"prev_hash": prev_hash,
}
entry_hash = compute_hash(body)
entry = AuditEntry(hash=entry_hash, body)
self.entries.append(entry)
return entry
def verify_chain(self) -> bool:
prev_hash = "GENESIS"
for entry in self.entries:
body = {
"index": entry.index,
"timestamp": entry.timestamp,
"agent_id": entry.agent_id,
"action": entry.action,
"payload": entry.payload,
"prev_hash": prev_hash,
}
if compute_hash(body) != entry.hash:
return False
prev_hash = entry.hash
return True
app = Flask(__name__)
log = HashChainedLog()
@app.post("/audit")
def receive():
event = request.json
entry = log.append(
agent_id=event["agent_id"],
action=event["action"],
payload=event.get("payload", {}),
)
return jsonify({
"status": "ok",
"index": entry.index,
"hash": entry.hash,
"timestamp": entry.timestamp,
})
@app.get("/verify")
def verify():
return jsonify({"valid": log.verify_chain(), "entries": len(log.entries)})
if __name__ == "__main__":
app.run(port=8089)
This is the simplest working prototype.
Anchoring The Log Head (AWS S3 and DynamoDB)
Hash chains and signatures protect integrity inside the sidecar. Anchoring protects against the sidecar itself being compromised. The idea is simple:
- Periodically compute the head hash or Merkle root.
- Sign it with a trusted key.
- Write it to an immutable external system.
AWS S3 Anchoring with Object Lock
Amazon S3 supports Object Lock in Compliance mode. When enabled:
- objects become WORM (write once read many)
- no one, not even the root user, can delete or alter them until the retention period expires
This is ideal for anchoring log heads.
import boto3
import json
import time
import base64
s3 = boto3.client("s3")
def anchor_head_to_s3(bucket: str, key_prefix: str, head_hash: str, signing_key: SigningKey):
timestamp = int(time.time())
record = {
"head_hash": head_hash,
"timestamp": timestamp,
}
payload = json.dumps(record, sort_keys=True).encode("utf-8")
signature = signing_key.sign(payload)
obj = {
"record": record,
"signature": base64.b64encode(signature).decode("ascii"),
"public_key": base64.b64encode(signing_key.public_bytes()).decode("ascii"),
}
key = f"{key_prefix}/{timestamp}.json"
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(obj).encode("utf-8"),
ContentType="application/json",
ObjectLockMode="COMPLIANCE",
ObjectLockRetainUntilDate="2026-01-01T00:00:00Z",
)
return key
DynamoDB Anchoring with Streams
DynamoDB writes every change into an immutable, 24-hour stream. A verifier can watch this stream independently.
You write the head hash into a table:
import boto3
ddb = boto3.client("dynamodb")
def anchor_to_ddb(table: str, head_hash: str, signing_key: SigningKey):
payload = head_hash.encode("utf-8")
signature = signing_key.sign(payload)
ddb.put_item(
TableName=table,
Item={
"head_hash": {"S": head_hash},
"signature": {"B": signature},
"public_key": {"B": signing_key.public_bytes()},
"ts": {"N": str(time.time())},
}
)
An external verifier can read the DynamoDB Stream to reconstruct a historical record of anchoring events.
Multi Account Anchoring (Recommended)
Security teams often anchor the head hash in a separate AWS account with restrictive IAM policies. This protects the audit trail from a compromised runtime account.
Why This Matters
With verifiable audit logs you can:
- reconstruct incidents with confidence
- detect attempts to rewrite history
- prove which agent took which action
- provide auditors with cryptographically sound evidence
Combined with earlier primitives:
- Encryption protects what agents say.
- Injection protection guards how they think.
- Identity and attestation prove who they are.
- Policy-as-code controls what they are allowed to do.
- Verifiable audit logs record what they actually did in a way that can be proven later.
This closes a major trust gap in production agent systems.
Practical Next Steps
- Implement this pattern for one internal workflow.
- Introduce SPIFFE identity based signing keys.
- Start anchoring Merkle roots in S3 Object Lock or DynamoDB Streams.
- Consider Merkle proofs and transparency logs for multi tenant systems.
Part 6 will cover kill switches and circuit breakers for agentic workloads, which depend directly on having reliable, verifiable telemetry.




