Skip to main content

Python — zeq-observer-client.py

The Python form factor of the Observer Agent is a single-file ZeqObserver class that drops into any long-lived Python process — AI/ML training loops, robotics controllers, scientific pipelines, server-side workers. It speaks the same wire contract as the Web JS, Embedded C, and Network Tap form factors.

It is 17 KB, has one dependency (requests), and one optional dependency (cryptography) only when encrypt=True.

Install

curl -O https://zeqsdk.com/zeq-observer-client.py

The file is the entire agent — no package, no sub-modules. Drop it in your project root or somewhere on PYTHONPATH.

Five-line drop-in

from zeq_observer_client import ZeqObserver

obs = ZeqObserver(
slug = "zeq07792026349",
key = "zsm_live_2c5e12b5b3f1ad99…",
encrypt = True,
types = ["epoch", "checkpoint", "anomaly"],
)

obs.observe(type="epoch", state_hash=epoch_state_sha, meta={"loss": 0.041})

That's the full surface area for a typical training loop. Heartbeats fire automatically every 8 zeqonds; the outbox queues to disk if the network blips; HF6/HF8/HF12/HF14/HF18 are computed on every emission.

Public API

class ZeqObserver:
def __init__(self, slug, key, *, encrypt=False, types=None,
endpoint="https://zeqsdk.com", heartbeat_zeqonds=8,
outbox_path="~/.zeq-observer-outbox.json", outbox_cap=200): ...

def observe(self, *, type, state_hash, meta=None): ...
def flush(self) -> int: ...
def status(self) -> dict: ...
def shutdown(self, *, drain=True) -> None: ...

status() returns the same shape as the Web JS form factor — phase, outbox depth, last zeqond, session id, agent string — so a unified dashboard can render every form factor with the same template.

shutdown(drain=True) blocks until the outbox is empty or 30 zeqonds elapse, whichever comes first. Use it in atexit:

import atexit; atexit.register(obs.shutdown)

Outbox — file-based, atomic-write

The Python form factor persists its outbox to ~/.zeq-observer-outbox.json by default. Every flush rewrites the file via the standard tempfile + os.replace atomic-rename pattern, so a process kill mid-flush cannot corrupt the file.

Cap defaults to 200 events; overflow drops oldest first and the next successful flush carries a synthetic outbox.overflow row with the dropped count in meta.dropped.

The same path can be shared across multiple processes on the same host — the file is flock-protected on Unix, msvcrt.locking on Windows. A multi-worker training loop emitting from each rank produces one ordered queue per host without coordination.

Background threads

The agent spins up two daemon threads in __init__:

  1. Flush thread — wakes every Zeqond (0.777 s), checks if the outbox is non-empty or the heartbeat is due, batches up to 50 events into one POST.
  2. Heartbeat thread — wakes every 8 zeqonds, enqueues a type=heartbeat row carrying outbox depth, last user-event zeqond, and the five HF signals.

Both threads are daemon=True so a sys.exit() will not hang on them. For graceful shutdown, call obs.shutdown() first.

Phase-locked encryption

When encrypt=True, the agent imports cryptography.hazmat.primitives.ciphers lazily and switches every emission to:

# 12-byte nonce from a keyed HMAC-SHA256, domain-separated by the label.
mac = hmac.new(key_bytes + b"ZeqField/PhaseLockedNonce/v1", digestmod=sha256)
mac.update(f"{zeqond}|{slug}".encode())
nonce = mac.digest()[:12]
ct = AESGCM(key_bytes).encrypt(nonce, plaintext_json.encode(), aad=None)

The agent sends {"_enc": "phase-locked-v1", "ct": <hex>} in place of the plaintext payload. The nonce is phase-locked to (zeqond, slug), so changing either changes the nonce and the GCM auth tag fails — the seal is tamper-evident and bound to its Zeqond (see R3 — Cipher + HZC).

The HMAC key domain-separates the nonce with the canonical label "ZeqField/PhaseLockedNonce/v1", so a phase-locked ciphertext can never decrypt against the random-IV cipher. key_bytes is derived once at startup from the state-machine key plus the slug, so the encryption key never appears on the wire.

Use cases — the three big ones

AI/ML training loops

Drop in next to your for epoch in range(N) loop and emit per-epoch state. The entangled state becomes a tamper-evident training journal — every loss, every checkpoint, every anomaly is hashlinked into a row whose zeqond proves when it happened.

for epoch, batch in enumerate(loader):
loss = train_step(batch)
obs.observe(
type = "epoch",
state_hash = sha256(f"{epoch}|{loss:.6f}".encode()).hexdigest(),
meta = {"epoch": epoch, "loss": float(loss)},
)

Robotics controllers

A 100 Hz control loop emits a type=control-tick row every n iterations. The cadence-drift signal HF8 becomes the SCADA-grade liveness proof: if the controller's wall-clock drifts past 0.1% of τ_z, the chain row carries the drift explicitly.

Scientific pipelines

A multi-stage pipeline emits type=stage-complete rows. Replaying the chain forward reconstructs the exact ordering of stages — even across machines — because each row's zeqond is monotonic on the canonical chain. R1's UNIQUE INDEX makes that guarantee load-bearing.

File map

  • apps/zeq-dev/public/zeq-observer-client.py — the agent
  • shared/api-core/src/routes/chain.ts — receiver
  • shared/api-core/src/lib/zeqField.ts — phase-locked cipher