Appearance
Signed SSE Event Bus
This document is the authoritative bounded-context reference for the Signed SSE Event Bus that ships under internal/mesh/sse and exposes through GET /v1/nodes/{id}/events. It covers the ubiquitous language, the envelope schema and JetStream subject layout, the publish pipeline, the Last-Event-ID semantics, the in-memory nonce set, the threat model, the multi-replica handoff, and the operator runbook.
The bus is the plexsphere → plexd push channel: per-node long-lived SSE streams that carry signed envelopes with nonce/timestamp replay protection and a Last-Event-ID replay path on reconnect. Anything outside that surface — the underlying signer (internal/signing), the Postgres outbox, the JetStream broker, the audit sink — is a collaborator the bus orchestrates, not a concern of this document. This document is the implementation-side reference for the README's Signed Event Bus specification; the README pins the wire-level event-type catalogue and this document pins the in-process pipeline that emits it.
Status — partial delivery
This story ships the foundational primitives of the bus and the transport surface at GET /v1/nodes/{id}/events. Five control-plane gates that the README and this document name as part of the bus's contract are deferred to follow-up work and are NOT live in the production binary today:
- Production wiring of
EventStream/EventPublisher/Relay—cmd/plexsphere/sse_factory_prod.goBuildProductionSSEFactoryreturns a bundle that constructs only theNonceStore. WithEventStreamandEventPublishernil, the handler falls through toevents_dispatch.goand returns501 Not Implemented. Even when a future commit installs the factory incmd/plexsphere/main.go, the factory itself must grow to build the JetStream-backed adapter, the relay loop, and the publisher pipeline before the endpoint serves 200. - ReBAC
node-agentrelation enforcement — the handler currently treats every authenticated principal as authorised. TheRelationCheckerport is not yet plumbed throughhandlers.Deps; the ReBAC layer'snode-agentrelation seam is the upstream dependency. Until both land, the 403 path documented in the OpenAPI spec is unreachable. NodeRepoexistence check (404 path) — the handler does not verify that the addressed Node id maps to a real Node row before opening the subscription. Non-existent ids quietly tail an empty subject. TheNodeRepoport is owned by the reconciliation pull endpoint or a dedicated follow-up.- Audit emission on connect-open and verification-failure — the handler emits structured
sloglines as observability stand-ins, not durable audit rows. Theaudit.Sinkport is not yet plumbed through the transport boundary; once it is, the existingslogcall sites becomeaudit.Sink.Recordinvocations without changes to the security-control flow. - Per-Domain
ed25519.Verifyagainst the publisher's public key — theEventStreamAdapter'sSignatureVerifierport is optional today (Verify == nil). The structural sanity gates (canonicalise and signature length) remain load-bearing; a per-Domain key resolver must thread throughEventStreamAdapter.Verifybefore consumer-side tamper-evidence is enforced rather than approximated.
The chainsaw E2E fixture at ../../../tests/e2e/mesh/sse-signed-envelopes/chainsaw-test.yaml deploys cmd/sse-stub-plexd in serve mode as the producer (a test-image binary that emits the same wire format) and another sse-stub-plexd Pod in consume mode as the verifier. The real plexsphere binary is NOT exercised end-to-end by the chainsaw fixture today; the fixture is a wire-format compatibility test until the production wiring above lands. NATS JetStream is staged but unused in this fixture (a DECISION block at the top of chainsaw-test.yaml records this).
The handler-tier and integration-tier tests for the deferred gates (TestGetNodeEvents_403WithoutRelation, TestGetNodeEvents_404ForMissingNode, TestGetNodeEvents_410ForOutOfWindow, TestGetNodeEvents_AuditOnOpen, TestGetNodeEvents_AuditOnVerifyFailure, TestSSE_Authz_WithoutRelation_403_Deferred) are present as t.Skip blocks with TODO(security, PX-0017): markers so that landing a port flips them green without rediscovery.
What does ship and is load-bearing today:
- The publish pipeline (sign-before-publish with
Nats-Msg-Iddedup), the bounded-TTL nonce store with capacity-LRU eviction and Prometheus gauge, the outbox-relay with cursor advance andErrSigningKeyNotProvisionedskip-and-log, theLast-Event-IDparser with all four arms (empty / numeric / non-numeric / out-of- window), the canonical stream-name + subject helpers, theplexsphere.sse_relay_cursormigration, the depguard allow-list, the OpenAPI surface byte-equality drift gate, the in-processEventStreamAdapter, the multi-replica boot guard refusingreplicas > 1with the in-memory NonceStore, and the consumer-side capture harness contract. - The
node_state_updatedtrial event-type derived from atenancy.NodeRegisteredoutbox row — sufficient to drive the publish pipeline end-to-end through tests. - The
policy_updatedwire event-type — a second live wire literal alongsidenode_state_updated. The publisher'swireTypeFordispatch table maps the closed policy outbox literals (policy_revision_createdandpolicy_deleted) onto the wire literalpolicy_updated; every other outbox literal keeps the defaultnode_state_updatedmapping. The producer-side fan-out runs from the compile-service arm — one publish per matched Node on revision-created, one per previously compiled Node on delete. See../policy/events.mdfor the full publisher-side dispatch table, fan-out algorithm, payload schema, and the relay-loop deferral that remains open per the roadmap. - The
bridge_config_updatedwire event-type — a third live wire literal. The publisher'swireTypeFordispatch table collapses the closed seven-member set ofbridge.*outbox literals onto the single wire literalbridge_config_updated. The producer-side fan-out runs from the four bridge application services: one signed envelope per Node hosted by the changed bridge Resource, carrying the whole-object effective bridge config. The consumer-side/v1/nodes/{id}/eventsHTTP plumbing stays deferred per the partial-delivery status above, so a bridge-mode Node converges through the reconciliation-pullbridgeblock today. See../bridge/events.mdfor the full publisher-side dispatch table, the per-Node fan-out algorithm, theBridgeConfigUpdatedPayloadschema, and the byte-equality contract with the pull snapshot.
Downstream consumers that depend on the SSE bus MUST treat the production endpoint as 501 Not Implemented until the deferred items above land.
Cross-references
../../../README.md#signed-event-bus— top-level specification of the bus, the per-node SSE surface, the event-type catalogue, and theLast-Event-IDreplay contract../peers.md— Key and Peer Manager bounded sub-context reference; the six peer event_type strings (peer_registered,peer_psk_assigned,peer_deregistered,peer_endpoint_changed,rotate_keys,peer_key_rotated) the relay routes verbatim../key-rotation.md— the key-rotation workflow that emits therotate_keyscommand and thepeer_key_rotatednotification onto this bus.../signing-rotation.md— the Signing Service key-rotation workflow that emits thesigning_key_rotatedevent onto this bus.../policy/events.md— the policy events surface that maps the closed policy outbox set onto thepolicy_updatedwire literal.../bridge/events.md— the bridge events surface that collapses the closed seven-member bridge outbox set onto thebridge_config_updatedwire literal and fans it out one envelope per hosted Node.../../architecture/storage-topology.md— the Postgres node that backsplexsphere.sse_relay_cursorand the per-Domain outbox rows the relay drains.../../contributing/layout.md— bounded-context map row enumeratinginternal/mesh/sseand the depguard allow-list that keeps the bus boundary intact.../../how-to/mesh/inspect-the-event-bus.md— operator how-to for inspecting the live JetStream stream, the cursor row, and the publisher metrics.../../../api/openapi/plexsphere-v1.yaml— OpenAPI spec forGET /v1/nodes/{id}/events, including theLast-Event-IDheader and the registered Problem-Details codes.../../../tests/workspace/mesh_sse_depguard_test.go— workspace alignment test pinning the depguard allow-list for theinternal/mesh/sseboundary.
Ubiquitous language
The terms below travel together across the Go code, the OpenAPI contract, the audit log, the Prometheus metric label values, and operator-facing tooling. Names are preserved verbatim in error messages and structured-log attributes.
| Term | Definition |
|---|---|
| Envelope | The signed-event value type (internal/signing/envelope) carried over the bus: {ID, Type, Scope, KeyID, IssuedAt, Payload, Signature}. The Signature is an Ed25519 detached signature over CanonicalBytes of the envelope with Signature excluded. |
| Domain Event | The business event a payload encodes (e.g. NodeRegistered from internal/identity/tenancy/events). The bus does not interpret payloads; the EventType discriminator on the outbox row is reserved for forensic audit emission while the wire envelope's Type is fixed to node_state_updated. |
| Subscriber Stream | A single authenticated plexd Node's SSE subscription on GET /v1/nodes/{id}/events. One stream per node; the HTTP handler increments sse_active_streams on Subscribe and decrements on the deferred close. |
| Signing Key Resolver | The SigningKeyResolver seam in internal/mesh/sse/signing_key_resolver.go that returns the (KeyID, Scope) for a Domain. Returns ErrSigningKeyNotProvisioned when the per-Domain key has not been minted yet; the relay logs and skips on this sentinel. |
| Nonce Set | The bounded LRU + TTL set in internal/mesh/sse/nonce.go that admits each envelope ID exactly once per freshness window. Add(nonce) → bool: true admits, false is a duplicate the consumer must drop. |
| Last-Event-ID | The Last-Event-ID HTTP header on a reconnect. Resolved by ParseLastEventID in internal/mesh/sse/replay_math.go to the JetStream OptStartSeq the consumer should resume from. |
| Replay Window | The JetStream stream's [currentLow, currentHigh] retention bounds (24h MaxAge by default). A Last-Event-ID below currentLow has aged out; the handler returns 410 Gone with last_event_id_outside_replay_window. |
| Outbox Cursor | The per-stream row in plexsphere.sse_relay_cursor advanced under SELECT … FOR UPDATE SKIP LOCKED by the relay. Carries (stream_name, last_outbox_id, last_xid, updated_at). |
| Stream Replica Factor | The JetStream stream replicas count read from PLEXSPHERE_SSE_STREAM_REPLICAS at boot. Composition refuses to start with replicas > 1 while the in-memory NonceStore is wired. |
| NodeRegistered | A representative Domain Event (the Tenancy bounded context emits it when a node joins). Reaches the bus as an outbox row whose payload the relay forwards into a single envelope. |
| NodeReachabilityChanged | A Domain Event emitted by the per-Domain reachability evaluator (internal/mesh/reachability/evaluator.go) when a Node transitions between healthy, stale, and unreachable. Source: TypeNodeReachabilityChanged in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated (the bus pins one wire type — same as every other Domain Event row). Payload contract: {from_state, to_state, node_id, domain_id, occurred_at}. See ./reachability.md for the state-machine semantics. |
| rotate_keys | A peer-aggregate outbox event_type the Key and Peer Manager appends when an operator requests a Node mesh-key rotation. Unlike the past-tense peer notifications it is an imperative COMMAND addressed to a single Node, telling it to generate a fresh Curve25519 keypair and call POST /v1/keys/rotate. Source: TypeRotateKeys in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated (the bus pins one wire type — the per-event discriminator travels on the outbox event_type). See ./key-rotation.md for the rotation workflow. |
| peer_key_rotated | A peer-aggregate outbox event_type the Key and Peer Manager appends when a Node completes a mesh-key rotation; every peer of the rotated Node observes it. The payload carries the rotated Node's new 32-byte Curve25519 public key plus the (kid, wrap_key_version) PSK reference — never PSK plaintext or ciphertext. Source: TypePeerKeyRotated in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated. See ./key-rotation.md for the rotation workflow. |
| signing_key_rotated | A signing-aggregate outbox event_type the plexsphere-signer binary appends when a signing-key rotation opens. One outbox row per Node in the scope is written by OutboxEventPublisher.PublishKeyRotated in cmd/plexsphere-signer/event_publisher_outbox.go inside a dedicated publisher-owned pgx.Tx (NOT the rotation transaction — the rotation row commits first; the publisher's tx is independent). The per-Node rows are all-or-nothing within the publisher tx, but a crash between the rotation commit and the publisher tx leaves the rotation persisted with no fan-out (see signing-rotation.md two-phase trade-off). Source: TypeSigningKeyRotated in internal/identity/tenancy/events/events.go. Payload contract: {event_id, occurred_at, domain_id, scope_kind, scope_id, old_key_id, new_key_id, new_public_key, valid_from, opened_at, closes_at} (the 32-byte new_public_key is base64-encoded on the wire). Wire-level Type discriminator is node_state_updated. The consumer side is DomainSigningKeyResolver.Subscribe in internal/mesh/sse/signing_key_resolver.go, which calls Reset(DomainID) per event to invalidate the per-Domain public-key cache. See ../signing-rotation.md for the rotation workflow. |
| node_state_updated | The default wire-level Type discriminator the publisher stamps onto every envelope that does NOT map onto a more specific wire literal. Pinned by EventTypeNodeStateUpdated in internal/mesh/sse/publisher.go; the per-row mapping is owned by the wireTypeFor dispatch table in the same file. |
| policy_updated | A second live wire-level Type discriminator the publisher stamps onto envelopes whose outbox-side event_type belongs to the closed policy aggregate set (policy_revision_created or policy_deleted). Pinned by EventTypePolicyUpdated in internal/mesh/sse/publisher.go; the publisher-side dispatch table and the producer-side fan-out are documented in ../policy/events.md. |
| bridge_config_updated | A third live wire-level Type discriminator the publisher stamps onto envelopes whose outbox-side event_type belongs to the closed seven-member bridge aggregate set (bridge.RelayConfigured, the user-access / public-ingress / site-to-site *Configured and *Removed literals). Pinned by EventTypeBridgeConfigUpdated in internal/mesh/sse/publisher.go; the producer-side per-Node fan-out is wired while the consumer-side stays deferred. The publisher-side dispatch table, the fan-out algorithm, and the BridgeConfigUpdatedPayload schema are documented in ../bridge/events.md. |
| PLEXSPHERE_NODE_EVENTS | The JetStream stream name. Pinned by StreamName in internal/mesh/sse/streamname.go; a single source of truth for the publisher's EnsureStream call and the consumer's Replay call. |
| plexsphere.node.events.<domain>.<node> | The per-node JetStream subject template returned by SubjectFor(domainID, nodeID). The wire-format invariants (no ., *, >, no whitespace, no non-printables) are enforced by validateSubjectToken in the same file. |
Envelope schema
The wire envelope is the envelope.Envelope value type defined in internal/signing/envelope/canonical.go. Field semantics:
| Field | Type | Meaning |
|---|---|---|
ID | uuid.UUID (v7) | Unique per envelope. The publisher allocates a fresh UUIDv7 per call so IDs sort naturally by issuance time across replicas without a separate sequence column. The ID also doubles as the nonce key on the consumer side (see Nonce set design). |
Type | string | The wire-level discriminator. Always node_state_updated, pinned by the constant EventTypeNodeStateUpdated in internal/mesh/sse/publisher.go. |
Scope | signing.Scope | The signing scope the envelope was signed under — domain:<uuid> for every per-Domain envelope on this bus. |
KeyID | signing.KeyID | The key that produced Signature. The Resolver returns this verbatim; the publisher never lets the signer pick its own active key. |
IssuedAt | time.Time (UTC) | The publisher's wall-clock instant at envelope construction. Canonicalised to nine fractional digits via CanonicalTimeLayout. |
Payload | json.RawMessage | The Domain Event payload, taken verbatim from the outbox row. CanonicalBytes re-parses and re-emits the payload with sorted keys so a caller cannot leak its own key order into the signed pre-image. |
Signature | []byte | Ed25519 detached signature over CanonicalBytes(env) with Signature excluded. Excluded by the canonicaliser unconditionally — a verifier re-runs CanonicalBytes on the received envelope without clearing the field manually. |
The README's wire-level description of the envelope at README §Signed Event Bus lists a notional separate nonce field; the in-tree implementation uses the envelope ID (UUIDv7) as the nonce key on both publish and consume paths because the ID is already unique-per-envelope within the freshness window. A future dedicated Nonce field would slot into this same call site without touching the security control flow — see the DECISION block in internal/transport/http/v1/handlers/events.go around the nonceKey := ev.Envelope.ID.String() site.
CanonicalBytes is the byte-stable JSON pre-image the signer signs. Encoding rules — sorted object keys, no whitespace, HTML-safe escapes disabled, raw UTF-8, fixed-precision time layout — are pinned in the canonicaliser file's package doc; downstream re-canonicalisation by a verifier is therefore deterministic across Go versions, platforms, and clock resolutions.
JetStream subject layout
Two strings pin the wire contract between the publisher's EnsureStream call and the consumer's Replay call:
- Stream name — the constant
StreamName = "PLEXSPHERE_NODE_EVENTS"ininternal/mesh/sse/streamname.go. This is the SINGLE SOURCE OF TRUTH; both the relay'sEnsureStreamand the SSE handler'sReplaysource the literal from this file. - Per-node subject template —
plexsphere.node.events.<domainID>.<nodeID>, returned bySubjectFor(domainID, nodeID). The fixed prefixsubjectPrefix = "plexsphere.node.events."is unexported; callers always go throughSubjectFor.
validateSubjectToken (same file) rejects each token that:
- is empty (
domain id must not be empty), - contains the NATS subject delimiter
.(collides with the hierarchy), - contains the wildcard
*or the multi-wildcard>(would expand the publish target), - contains whitespace (caught via
unicode.IsSpaceso tab, LF, CR, and U+00A0 all surface a single error class), - contains a non-printable byte (caught via
unicode.IsPrintso the C0/C1 control planes and DEL all hit one branch).
A rejected token surfaces ErrInvalidSubjectToken wrapped with the offending kind and value; callers branch with errors.Is. The publisher reacts by skipping the row before any bus call.
Publish pipeline diagram
The publish pipeline runs strictly in the order shown — the "sign before publish" invariant forbids storing or transmitting an unsigned envelope, so any failure in the resolve, canonicalise, or sign step short-circuits before the JetStream publish.
text
┌────────────────────────────────────────────────────────────┐
│ Postgres plexsphere.outbox_event │
│ + plexsphere.sse_relay_cursor (per-stream) │
└────┬───────────────────────────────────────────────────────┘
│ 1. RunTick: SELECT … FOR UPDATE SKIP LOCKED on cursor
│ ReadOutboxBatchAfter(cursor, batch=64)
v
┌────────────────────┐
│ Relay (relay.go) │ for each row:
└────┬───────────────┘
│ 2. resolve(domainID) ──────────────► SignerClient.PublicKey
│ (gRPC mTLS)
│ KeyID, Scope (or ErrSigningKeyNotProvisioned: skip)
v
┌────────────────────────────┐
│ Publisher.Publish │
│ (publisher.go) │
└────┬───────────────────────┘
│ 3. allocate envelope ID = uuid.NewV7()
│ 4. build envelope { ID, Type=node_state_updated,
│ Scope, KeyID, IssuedAt, Payload }
│ 5. canonicalBytes = envelope.CanonicalBytes(env)
│ 6. signResult = Signer.Sign(scope, keyID, canonicalBytes)
│ ──────► SignerClient.Sign
│ (gRPC mTLS)
│ 7. env.Signature = signResult.Signature
│ 8. wireBody = json.Marshal(env)
│ 9. subject = SubjectFor(domainID, nodeID)
│ ────────────────► JetStream subject
│ plexsphere.node.events.<dom>.<node>
v
┌────────────────────────────┐
│ Bus.Publish(subject, │
│ wireBody, │
│ WithMsgID(ID))│ ──► JetStream stream
└────┬───────────────────────┘ PLEXSPHERE_NODE_EVENTS
│ 10. ack.Sequence (uint64)
v
┌────────────────────────────┐
│ Relay (relay.go) │ 11. AdvanceSseRelayCursor on
│ │ successful suffix; on any
│ │ publish/sign error: log,
│ │ bump counter, halt batch
│ │ without partial advance.
└────────────────────────────┘Failure modes are typed sentinels. ErrSigningKeyNotProvisioned short-circuits at step 2 — the publisher never reaches the sign or publish call. signing.ErrProviderUnavailable and any other Signer.Sign error short-circuit at step 6. Either way the cursor does not advance; the next Tick re-publishes the failed row, and the JetStream Nats-Msg-Id dedup window (keyed on the envelope ID) suppresses any duplicate that might briefly land on the wire.
Last-Event-ID semantics
ParseLastEventID(header, currentLow, currentHigh) in internal/mesh/sse/replay_math.go resolves the SSE handler's Last-Event-ID header into a JetStream OptStartSeq. The four arms are non-overlapping and exhaustive:
| Arm | Header | Bounds | Returned startSeq | Returned error | HTTP status |
|---|---|---|---|---|---|
| 1 | empty | any | currentHigh + 1 | nil | 200 (tail mode; consumer blocks until next publish) |
| 2 | numeric n | currentLow ≤ n | n + 1 | nil | 200 (resume; folds tail-position case n > currentHigh into the resume arm so a future-sequence reconnect blocks at n+1) |
| 3 | numeric n | n < currentLow | 0 | ErrLastEventIDOutsideWindow | 410 Gone, code last_event_id_outside_replay_window |
| 4 | malformed | any | 0 | ErrBadLastEventID | 400 Bad Request |
Both sentinels live in replay_math.go next to their producer:
ErrBadLastEventID— wrapsstrconv.ParseUint's error so the HTTP handler can surface a stable error code while the operator-side log carries the underlying parse detail. Trips on alphabetic input, signed integers (-7,+42), decimal/scientific notation (12.5,1e10), hex literals, surrounding whitespace, and any base-10 integer that overflowsuint64.ErrLastEventIDOutsideWindow— surfaced when the parsednfalls belowcurrentLow. The client falls back to the reconciliation pull atGET /v1/nodes/{id}/stateto rebuild state without lossy replay.
Edge cases the function pins explicitly:
n == 0againstcurrentLow ≥ 1is arm 3 (410). JetStream sequences are 1-based; a0bookmark cannot come from a real delivery.currentLow == 0(only when the stream is empty) with an empty header returnsstartSeq = 1— the consumer blocks until the first publish.- A future-sequence header (
n > currentHigh) withcurrentLow ≤ nis arm 2 — the consumer blocks atn+1until a matching new event is published. The user-story criterion is "Reconnect with Last-Event-ID: <head+1> blocks until a matching new event is published"; the resume arm is the correct home for it.
Nonce set design
The NonceStore in internal/mesh/sse/nonce.go is an in-memory, bounded LRU + TTL set that admits each envelope ID exactly once per freshness window. Its design properties:
- Insertion-time ordering, not access-time. The freshness window is a property of WHEN the nonce was first seen; an access-time LRU would let a steady stream of
Seen()probes keep an aging nonce alive past TTL. - Capacity bound. Construction takes
maxSize >= 1;Addevicts the oldest insertion-time entry when the post-expiry size is at the cap. A non-positivemaxSizeproducesErrInvalidNonceStoreConfigat constructor time. - TTL bound. Construction takes
ttl >= 1ns;Addruns expire-on-touch before any duplicate-detection branch, so a TTL-aged nonce re-Addreturnstrue. - Concurrency. All exported methods are safe for concurrent use across goroutines under a single
sync.Mutex. - Container choice. Backed by
container/list(doubly-linked list) plus amap[string]*list.Elementfor O(1)Add, O(1)Seen, and O(1) capacity eviction. - Prometheus gauge.
sse_nonce_live_size(the constantNonceLiveSizeMetricNameininternal/mesh/sse/metrics.go) is registered byNewviaprometheus.NewGaugeFunc. The callback readslen(map)under the store's mutex at scrape time only, so the steady-state hot path never touches Prometheus state. A nil registerer disables the gauge.
The in-memory store is single-replica-only. The composition root in cmd/plexsphere/sse_factory_prod.go refuses to boot when PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is set with the in-memory NonceStore wired.
Operator knobs:
| Env var | Default | Effect |
|---|---|---|
PLEXSPHERE_SSE_NONCE_TTL | feature default | Freshness window for the nonce set. Parsed as a Go time.Duration; non-positive values are rejected at boot. |
PLEXSPHERE_SSE_MAX_LIVE_NONCES | feature default | Hard upper bound on live nonces. Parsed as a positive integer; non-positive values are rejected at boot. |
Threat model
The bus mitigates four classes of attack along the publish-to-consume path. Each mitigation is implemented in a single, named place so a reader chasing a security claim does not have to assemble it from multiple files.
- Signature forgery. Every envelope is signed with the per-Domain Ed25519 key resolved via the
SigningKeyResolver. The consumer re-runsCanonicalBytesover the received envelope and verifies the detached signature against the per-Domain public key served bySignerClient.PublicKey. A failed verification is logged with theoutcome=signature_failureaudit attribute and bumpssse_envelope_verification_failures_total{outcome=bad_signature}. SeeEventStreamAdapterininternal/mesh/sse/event_stream.gofor the audit emission path and theAuditOutcomeSignatureFailureconstant. - Replay of a previously-valid envelope. The symmetric nonce set rejects a duplicate envelope ID inside the freshness window; the publisher's
IssuedAtfield anchors freshness on the signed pre-image so an attacker cannot mint a far-future timestamp under a captured signature. A duplicate incrementssse_nonce_replay_totaland is dropped with a structured-log line. Both sides (publish-side relay and consume-side handler) share one NonceStore at composition time so the counter aggregates the symmetric check. - Tampering in transit. JetStream guarantees the wire bytes reach the consumer unmodified at the broker level; the end-to-end Ed25519 signature over
CanonicalBytesmakes any in-transit byte mutation observable at verification time regardless of the broker's transport. A mutated payload fails re-canonicalisation or signature verification before the consumer accepts the envelope. - Signer outage. The publisher refuses to write unsigned envelopes:
ErrSigningKeyNotProvisionedshort-circuits the pipeline at the resolver step, BEFORE any bus call. The relay logs a structured WARN line with the Domain ID and bumpssse_relay_skip_total{reason=no_signing_key}. A generic signer error (e.g.signing.ErrProviderUnavailable) short-circuits at the sign step and bumpssse_publisher_sign_failures_total{reason=publish_error}. In either case the cursor does not advance; the next Tick replays the row once the signer recovers.
Multi-replica handoff
The relay supports horizontal scale via the SELECT … FOR UPDATE SKIP LOCKED cursor pattern in internal/mesh/sse/relay.go. Each Tick:
- Begins a Postgres transaction.
- Locks the per-stream cursor row (
stream_name = 'PLEXSPHERE_NODE_EVENTS') inplexsphere.sse_relay_cursorunderFOR UPDATE SKIP LOCKED. If another replica already owns the row, the query yields zero rows and the relay returnsErrCursorBusy— translated to a nil Tick error so the relay-loop driver does not back off on a healthy contention pattern. - Reads up to
batchSizeoutbox rows strictly after the cursor's(transaction_id, occurred_at, outbox_id)triple. - Calls the publish pipeline for each row; on the longest contiguous success suffix, advances the cursor inside the same transaction.
- Commits.
Failure suppresses the cursor advance for any row that follows the last success, so the next Tick (on this or a sibling replica) starts exactly where this one halted. Combined with the JetStream Nats-Msg-Id dedup window (keyed on the envelope ID), the at-least-once boundary is invisible to the SSE consumer: a row that two replicas briefly attempt to publish lands on JetStream once.
A reconnecting subscriber lands on a different replica without gap or duplicate because:
- The replay payload is owned by JetStream, not by the relay. The consumer's
Replay(streamName, startSeq)call (withstartSeqresolved byParseLastEventID) reads the bus directly; the receiving plexsphere replica is a transparent passthrough. - The cursor row's at-least-once boundary applies only to the publish side. A reconnect's
Last-Event-IDdoes not touch the cursor; it touches the JetStream stream's(currentLow, currentHigh)view.
The in-memory NonceStore is single-replica-only: each replica maintains its own bounded set, so a duplicate publish that two replicas race on cannot be defended against in-process across replicas. The composition root therefore refuses to boot when PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is set with the in-memory store wired — operators who scale beyond one replica must wire a distributed nonce backend before scaling. The exact refusal message is pinned in cmd/plexsphere/sse_factory_prod.go and rejects the boot with a "PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is incompatible with the in-memory NonceStore" error directing operators to pin replicas=1 or wire a distributed nonce store before scaling.
Operator runbook
For full step-by-step procedures see ../../how-to/mesh/inspect-the-event-bus.md. The pointers below are the entry points an operator chases when a subscriber complains about gaps, replays, or a stalled reconnect.
- Inspect the JetStream stream. Use the NATS CLI inside the cluster —
nats stream info PLEXSPHERE_NODE_EVENTSreports the current(first, last, age, max_age)triple. The stream name comes from theStreamNameconstant ininternal/mesh/sse/streamname.go; per-node subjects followplexsphere.node.events.<domainID>.<nodeID>and are listable vianats stream subjects PLEXSPHERE_NODE_EVENTS. - Drain the cursor row. The per-stream cursor lives in
plexsphere.sse_relay_cursor. Operators inspect the row withSELECT stream_name, last_outbox_id, last_xid, updated_at FROM plexsphere.sse_relay_cursor WHERE stream_name = 'PLEXSPHERE_NODE_EVENTS';. Theupdated_atcolumn is the last successful Tick instant; a stale value combined with a non-empty outbox is the signal that a publish error is halting batches. - Force a re-emit. Reset the cursor by deleting the cursor advance —
UPDATE plexsphere.sse_relay_cursor SET last_outbox_id = '<earlier-id>', last_xid = <earlier-xid> WHERE stream_name = 'PLEXSPHERE_NODE_EVENTS';. The next Tick re-publishes everything after the new cursor; the JetStreamNats-Msg-Iddedup window (24h) suppresses any duplicate that the original publish already landed. - Watch the Prometheus surface.
sse_envelope_verification_failures_total{outcome}— consumer-side rejections labelledbad_signature,bad_nonce, ordecode_error. A sustained rate is a malformed publisher, a key-rotation gap, or a replay attack.sse_nonce_replay_total—NonceStore.Addreturnedfalse. A spike correlates with a replay storm; a slow drift correlates with a clock skew letting old envelopes fall back into the freshness window.sse_relay_skip_total{reason}— relay rows skipped, labelledno_signing_key(rotation pipeline has not minted the per-Domain key yet).sse_publisher_sign_failures_total{reason}— publish-pipeline errors labelledpublish_error(today the relay's only label), withprovider_unavailableandkey_not_provisionedreserved for future direct-publisher call sites.sse_active_streams— count of currently-open SSE handler subscriptions. Watch against per-Node connection caps to detect leaked subscriptions or a runaway reconnect loop.sse_nonce_live_size— current nonce-set size. Watch againstPLEXSPHERE_SSE_MAX_LIVE_NONCESto detect approaching the bounded-set limit beforeAddlatency degrades.