API Reference
Daemon emitter
Section titled “Daemon emitter”The canonical way to record receipts: send tool-call events to a running
obsigna-daemon, which holds the signing key, builds the receipt, signs
it, and chains it. Your process never touches the key. See
Daemon Setup to start the daemon and
Quick Start for the full emit → list → verify loop.
from obsigna import DaemonEmitter, EmitTransportError, default_socket_pathDaemonEmitter
Section titled “DaemonEmitter”class DaemonEmitter: def __init__( self, *, socket_path: str = "", session_id: str = "", log: logging.Logger | None = None, best_effort: bool = False, ) -> None: ...Connect to the daemon socket. With socket_path="" the emitter resolves
default_socket_path() (the per-OS default, or AGENTRECEIPTS_SOCKET if set).
If neither yields a path — for example on Windows, or on macOS/Linux when $HOME
is unset and AGENTRECEIPTS_SOCKET is not configured — the constructor raises
ValueError; pass an explicit socket_path in those cases. Supply session_id
to propagate the host’s session id across frames — a UUID v4 is generated
otherwise. By default a transport failure raises EmitTransportError (the emit
failure contract, ADR-0025); pass best_effort=True to drop unreachable-daemon
events silently instead. Use it as a context manager, or call close() when done.
def emit( self, *, channel: str, tool_name: str, decision: str, tool_server: str = "", input: bytes | str | None = None, output: bytes | str | None = None, error: str = "",) -> NoneSend one tool-call event. channel is your integration namespace and becomes the
prefix of the stored action type (e.g. my-app.filesystem.file.read). decision
is one of "allowed", "denied", or "pending". input / output accept raw
JSON (passed verbatim; the daemon hashes them). Returns None.
Raises ValueError for caller bugs (empty channel / tool_name, invalid
decision, invalid JSON, oversized frame), RuntimeError if the emitter is
closed, and EmitTransportError when the daemon is unreachable — unless the
emitter was constructed with best_effort=True.
from obsigna import DaemonEmitter
with DaemonEmitter() as e: # uses AGENTRECEIPTS_SOCKET or the per-OS default e.emit( channel="my-app", tool_name="filesystem.file.read", decision="allowed", )def close(self) -> NoneRelease the underlying socket connection. Safe to call multiple times. After
close(), any subsequent emit() call raises RuntimeError. The context
manager calls close() automatically on exit.
EmitTransportError
Section titled “EmitTransportError”Raised by emit() when the daemon socket cannot be dialled or the write fails,
unless the emitter was constructed with best_effort=True.
default_socket_path
Section titled “default_socket_path”def default_socket_path() -> strThe per-OS default daemon socket path (an empty string on platforms without one).
Honours AGENTRECEIPTS_SOCKET when set.
Receipts
Section titled “Receipts”from obsigna import ( create_receipt, sign_receipt, verify_receipt, generate_key_pair, hash_receipt, verify_chain,)Functions
Section titled “Functions”create_receipt
Section titled “create_receipt”def create_receipt(input: CreateReceiptInput) -> UnsignedAgentReceiptBuild an unsigned receipt. Auto-generates an ID (urn:uuid:...), action ID, issuance date, and action timestamp.
sign_receipt
Section titled “sign_receipt”def sign_receipt( unsigned: UnsignedAgentReceipt, private_key: str, verification_method: str,) -> AgentReceiptSign an unsigned receipt with an Ed25519 private key (PEM-encoded). Returns a signed AgentReceipt with an Ed25519Signature2020 proof.
verify_receipt
Section titled “verify_receipt”def verify_receipt(receipt: AgentReceipt, public_key: str) -> boolVerify the Ed25519 signature on a signed receipt.
generate_key_pair
Section titled “generate_key_pair”def generate_key_pair() -> KeyPairGenerate an Ed25519 key pair in PEM format (SPKI public, PKCS8 private).
hash_receipt
Section titled “hash_receipt”def hash_receipt(receipt: AgentReceipt | dict) -> strCompute the SHA-256 hash of a receipt (excluding proof) using canonical JSON. Accepts a Pydantic model or plain dict. Returns sha256:<hex>.
verify_chain
Section titled “verify_chain”def verify_chain( receipts: list[AgentReceipt], public_key: str,) -> ChainVerificationVerify an entire receipt chain: signatures, hash linkage, and sequence numbers. Receipts must be provided in chain order.
canonicalize
Section titled “canonicalize”def canonicalize(value: Any) -> strRFC 8785 canonical JSON serialization.
sha256
Section titled “sha256”def sha256(data: str) -> strCompute a SHA-256 hash. Returns sha256:<hex>.
All receipt types are Pydantic BaseModel subclasses.
CreateReceiptInput
Section titled “CreateReceiptInput”class CreateReceiptInput(BaseModel): issuer: Issuer principal: Principal action: ActionInput outcome: Outcome chain: Chain intent: Intent | None = None authorization: Authorization | None = None action_timestamp: str | None = None
class ActionInput(BaseModel): type: str risk_level: str target: Any = None parameters_hash: str | None = None trusted_timestamp: str | None = NoneAgentReceipt
Section titled “AgentReceipt”class UnsignedAgentReceipt(BaseModel): context: list[str] # serialized as @context id: str type: list[str] version: str issuer: Issuer issuanceDate: str credentialSubject: CredentialSubject
class AgentReceipt(UnsignedAgentReceipt): proof: ProofKeyPair
Section titled “KeyPair”@dataclassclass KeyPair: public_key: str private_key: strIssuer
Section titled “Issuer”class Issuer(BaseModel): id: str type: str | None = None name: str | None = None operator: Operator | None = None model: str | None = None session_id: str | None = None runtime: Runtime | None = None # open metadata container (v0.5.0, ADR-0026)
class Runtime(BaseModel): # Open container for runtime/observability metadata. extra="allow" keeps it # extensible, so unknown runtime keys survive round-trips. model_config = ConfigDict(extra="allow") agent_id: str | None = None # the sub-agent that issued the receipt agent_type: str | None = None # e.g. "general-purpose"
class Operator(BaseModel): id: str name: strPrincipal
Section titled “Principal”class Principal(BaseModel): id: str type: str | None = NoneAction
Section titled “Action”class Action(BaseModel): id: str type: str risk_level: RiskLevel target: ActionTarget | None = None parameters_hash: str | None = None timestamp: str trusted_timestamp: str | None = None
class ActionTarget(BaseModel): system: str resource: str | None = NoneOutcome
Section titled “Outcome”class Outcome(BaseModel): status: OutcomeStatus error: str | None = None reversible: bool | None = None reversal_method: str | None = None reversal_window_seconds: int | None = None state_change: StateChange | None = None
class StateChange(BaseModel): before_hash: str after_hash: strclass Chain(BaseModel): sequence: int previous_receipt_hash: str | None chain_id: strChainVerification
Section titled “ChainVerification”@dataclassclass ChainVerification: valid: bool length: int receipts: list[ReceiptVerification] = field(default_factory=list) broken_at: int = -1
@dataclassclass ReceiptVerification: index: int receipt_id: str signature_valid: bool hash_link_valid: bool sequence_valid: boolIntent / Authorization / Proof
Section titled “Intent / Authorization / Proof”class Intent(BaseModel): conversation_hash: str | None = None prompt_preview: str | None = None prompt_preview_truncated: bool | None = None reasoning_hash: str | None = None
class Authorization(BaseModel): scopes: list[str] granted_at: str expires_at: str | None = None grant_ref: str | None = None
class Proof(BaseModel): type: str created: str | None = None verificationMethod: str | None = None proofPurpose: str | None = None proofValue: strConstants
Section titled “Constants”RiskLevel = Literal["low", "medium", "high", "critical"]OutcomeStatus = Literal["success", "failure", "pending"]
CONTEXT: list[str] # ["https://www.w3.org/ns/credentials/v2", "https://agentreceipts.ai/context/v2"]CREDENTIAL_TYPE: list[str] # ["VerifiableCredential", "AgentReceipt"]RECEIPT_VERSION: str # "0.5.0" (receipt schema)VERSION: str # current package versionParameter disclosure (HPKE)
Section titled “Parameter disclosure (HPKE)”from obsigna import ( generate_forensic_key_pair, encrypt_disclosure, decrypt_disclosure, ForensicKeyPair, DisclosureEnvelope, DisclosureRecipient,)HPKE-based envelope for encrypting tool-call parameters into a receipt’s
parameters_disclosure field (ADR-0012, ciphersuite
hpke-x25519-hkdf-sha256-aes-256-gcm). The emitter holds only the forensic
public key and never sees the plaintext again after encryption; the private key
stays offline. For the threat model and operator configuration see the
Parameter Disclosure specification.
For the envelope JSON shape see the schema reference.
This is the SDK-direct path. When emitting through the daemon the operator provides the public key in daemon config; the daemon calls these helpers automatically.
Functions
Section titled “Functions”generate_forensic_key_pair
Section titled “generate_forensic_key_pair”def generate_forensic_key_pair() -> ForensicKeyPairGenerate an X25519 key pair for forensic disclosure. public_key (32 raw
bytes) is shared with emitters. private_key (32 raw bytes) must be kept
offline, separate from the Ed25519 signing key (ADR-0001 / ADR-0012).
encrypt_disclosure
Section titled “encrypt_disclosure”def encrypt_disclosure( params: dict[str, Any], recipient_public_key: bytes, kid: str,) -> DisclosureEnvelopeEncrypt params as a v1 HPKE disclosure envelope. params is RFC 8785
JCS-canonicalized before encryption so all SDKs produce the same ciphertext for
the same parameters object. params must be a plain dict (Mapping subclasses
are rejected; convert Pydantic models with .model_dump() first).
recipient_public_key must be 32 bytes. kid is the recipient key identifier
(sha256:<hex> fingerprint or did:key DID URL). Raises TypeError for
non-dict params, ValueError for other invalid arguments.
decrypt_disclosure
Section titled “decrypt_disclosure”def decrypt_disclosure( env: DisclosureEnvelope, recipient_private_key: bytes,) -> dict[str, Any]Recover the plaintext parameters from a v1 HPKE disclosure envelope.
recipient_private_key must be 32 bytes. env is re-validated at runtime
(callers commonly pass dicts from json.loads). Raises ValueError if the
envelope version or algorithm is unsupported, if the key or ciphertext is
malformed, or if AEAD authentication fails.
Usage example
Section titled “Usage example”from obsigna import ( generate_forensic_key_pair, encrypt_disclosure, decrypt_disclosure,)import hashlib
# Key management (run once offline, store private key securely)kp = generate_forensic_key_pair()# Share kp.public_key with emitters; keep kp.private_key offline.digest = hashlib.sha256(kp.public_key).hexdigest()kid = f"sha256:{digest}" # use as kid and in daemon config
# Emitter side (holds only the public key)env = encrypt_disclosure( {"path": "/etc/passwd", "mode": "r"}, kp.public_key, kid,)# embed env in action.parameters_disclosure, then sign the receipt
# Forensic / audit side (holds the offline private key)params = decrypt_disclosure(env, kp.private_key)# params == {"mode": "r", "path": "/etc/passwd"}Recovering a stored disclosure
Section titled “Recovering a stored disclosure”In practice the envelope comes from a stored receipt and the private key from the
file obsigna-daemon --init-forensic-key wrote — a raw 32-byte X25519 key,
not PEM, so read it as bytes and pass it straight in. Pull the envelope out of the
receipt JSON at credentialSubject.action.parameters_disclosure:
import jsonfrom obsigna import decrypt_disclosure
# raw 32-byte private key written by --init-forensic-key (kept offline)with open("/path/to/forensic.key", "rb") as f: priv = f.read()
# from: obsigna receipt show <seq> --json > receipt.jsonwith open("receipt.json") as f: receipt = json.load(f)envelope = receipt["credentialSubject"]["action"]["parameters_disclosure"]
params = decrypt_disclosure(envelope, priv)print(params) # the original tool-call parametersForensicKeyPair
Section titled “ForensicKeyPair”@dataclass(frozen=True)class ForensicKeyPair: public_key: bytes # 32-byte X25519 public key; share with emitters private_key: bytes # 32-byte X25519 private key; keep offlineRaw X25519 key bytes. Unlike KeyPair (Ed25519, PEM-encoded), these are raw
bytes because X25519 has no widespread PKCS8 PEM convention.
DisclosureEnvelope / DisclosureRecipient
Section titled “DisclosureEnvelope / DisclosureRecipient”class DisclosureEnvelope(TypedDict): v: Literal["1"] alg: Literal["hpke-x25519-hkdf-sha256-aes-256-gcm"] recipients: list[DisclosureRecipient] # length 1 in v1 ct: str # AEAD ciphertext; unpadded base64url
class DisclosureRecipient(TypedDict): kid: str enc: str # HPKE encapsulated key; unpadded base64url, 43 chars for X25519v1 HPKE envelope stored in action.parameters_disclosure. Field names follow
RFC 9180 §4.1 (enc, not encap).
Note: ForensicKeyFingerprint and ForensicPublicFromPrivate are available
in the Go SDK only (added in PR #722). Use hashlib.sha256(public_key).hexdigest()
to compute the sha256:<hex> fingerprint string in Python.
from obsigna import ReceiptStore, open_store, verify_stored_chainSQLite-backed receipt persistence and querying.
open_store
Section titled “open_store”def open_store(db_path: str) -> ReceiptStoreOpen or create a SQLite receipt store. Pass ":memory:" for an in-memory store.
verify_stored_chain
Section titled “verify_stored_chain”def verify_stored_chain( store: ReceiptStore, chain_id: str, public_key: str,) -> ChainVerificationLoad a chain from the store and verify its integrity.
ReceiptStore
Section titled “ReceiptStore”class ReceiptStore: def __init__(self, db_path: str) -> None: ... def insert(self, receipt: AgentReceipt, receipt_hash: str) -> None: ... def get_by_id(self, receipt_id: str) -> AgentReceipt | None: ... def get_chain(self, chain_id: str) -> list[AgentReceipt]: ... def query(self, filters: ReceiptQuery) -> list[AgentReceipt]: ... def stats(self) -> StoreStats: ... def close(self) -> None: ...ReceiptQuery
Section titled “ReceiptQuery”@dataclassclass ReceiptQuery: chain_id: str | None = None action_type: str | None = None risk_level: str | None = None status: str | None = None after: str | None = None before: str | None = None # When None, all matching rows are returned (no default cap). limit: int | None = None # When True, returns newest receipts first; ties broken by sequence descending. newest_first: bool = FalseStoreStats
Section titled “StoreStats”@dataclassclass StoreStats: total: int chains: int by_risk: list[dict[str, str | int]] by_status: list[dict[str, str | int]] by_action: list[dict[str, str | int]]Taxonomy
Section titled “Taxonomy”from obsigna import ( classify_tool_call, get_action_type, resolve_action_type, load_taxonomy_config, ALL_ACTIONS,)Action type registry and tool call classification.
classify_tool_call
Section titled “classify_tool_call”def classify_tool_call( tool_name: str, mappings: list[TaxonomyMapping] | None = None,) -> ClassificationResultClassify a tool call to an action type and risk level using the provided mappings.
get_action_type
Section titled “get_action_type”def get_action_type(action_type: str) -> ActionTypeEntry | NoneLook up an action type by name. Returns None if not found.
resolve_action_type
Section titled “resolve_action_type”def resolve_action_type(action_type: str) -> ActionTypeEntryLike get_action_type but returns an “unknown” fallback instead of None.
load_taxonomy_config
Section titled “load_taxonomy_config”def load_taxonomy_config(file_path: str) -> list[TaxonomyMapping]Load taxonomy mappings from a JSON file.
@dataclass(frozen=True)class ActionTypeEntry: type: str description: str risk_level: RiskLevel
@dataclass(frozen=True)class TaxonomyMapping: tool_name: str action_type: str
@dataclass(frozen=True)class ClassificationResult: action_type: str risk_level: RiskLevelBuilt-in action registries
Section titled “Built-in action registries”FILESYSTEM_ACTIONS: list[ActionTypeEntry] # 7 typesSYSTEM_ACTIONS: list[ActionTypeEntry] # 7 typesALL_ACTIONS: list[ActionTypeEntry] # all + unknownUNKNOWN_ACTION: ActionTypeEntryDATA_ACTIONS (3 data/API types) is exported from the taxonomy submodule, not the package root:
from obsigna.taxonomy import DATA_ACTIONSBackwards compatibility aliases
Section titled “Backwards compatibility aliases”The SDK also exports camelCase aliases for all public functions (e.g. createReceipt, signReceipt, verifyChain) for consistency with the TypeScript SDK.