Skip to content

Signed SSE Event Bus

This document is the authoritative bounded-context reference for the Signed SSE Event Bus that ships under internal/mesh/sse and exposes through GET /v1/nodes/{id}/events. It covers the ubiquitous language, the envelope schema and JetStream subject layout, the publish pipeline, the Last-Event-ID semantics, the in-memory nonce set, the threat model, the multi-replica handoff, and the operator runbook.

The bus is the plexsphere → plexd push channel: per-node long-lived SSE streams that carry signed envelopes with nonce/timestamp replay protection and a Last-Event-ID replay path on reconnect. Anything outside that surface — the underlying signer (internal/signing), the Postgres outbox, the JetStream broker, the audit sink — is a collaborator the bus orchestrates, not a concern of this document. This document is the implementation-side reference for the README's Signed Event Bus specification; the README pins the wire-level event-type catalogue and this document pins the in-process pipeline that emits it.

Status — partial delivery

This story ships the foundational primitives of the bus and the transport surface at GET /v1/nodes/{id}/events. Five control-plane gates that the README and this document name as part of the bus's contract are deferred to follow-up work and are NOT live in the production binary today:

  • Production wiring of EventStream / EventPublisher / Relaycmd/plexsphere/sse_factory_prod.goBuildProductionSSEFactory returns a bundle that constructs only the NonceStore. With EventStream and EventPublisher nil, the handler falls through to events_dispatch.go and returns 501 Not Implemented. Even when a future commit installs the factory in cmd/plexsphere/main.go, the factory itself must grow to build the JetStream-backed adapter, the relay loop, and the publisher pipeline before the endpoint serves 200.
  • ReBAC node-agent relation enforcement — the handler currently treats every authenticated principal as authorised. The RelationChecker port is not yet plumbed through handlers.Deps; the ReBAC layer's node-agent relation seam is the upstream dependency. Until both land, the 403 path documented in the OpenAPI spec is unreachable.
  • NodeRepo existence check (404 path) — the handler does not verify that the addressed Node id maps to a real Node row before opening the subscription. Non-existent ids quietly tail an empty subject. The NodeRepo port is owned by the reconciliation pull endpoint or a dedicated follow-up.
  • Audit emission on connect-open and verification-failure — the handler emits structured slog lines as observability stand-ins, not durable audit rows. The audit.Sink port is not yet plumbed through the transport boundary; once it is, the existing slog call sites become audit.Sink.Record invocations without changes to the security-control flow.
  • Per-Domain ed25519.Verify against the publisher's public key — the EventStreamAdapter's SignatureVerifier port is optional today (Verify == nil). The structural sanity gates (canonicalise and signature length) remain load-bearing; a per-Domain key resolver must thread through EventStreamAdapter.Verify before consumer-side tamper-evidence is enforced rather than approximated.

The chainsaw E2E fixture at ../../../tests/e2e/mesh/sse-signed-envelopes/chainsaw-test.yaml deploys cmd/sse-stub-plexd in serve mode as the producer (a test-image binary that emits the same wire format) and another sse-stub-plexd Pod in consume mode as the verifier. The real plexsphere binary is NOT exercised end-to-end by the chainsaw fixture today; the fixture is a wire-format compatibility test until the production wiring above lands. NATS JetStream is staged but unused in this fixture (a DECISION block at the top of chainsaw-test.yaml records this).

The handler-tier and integration-tier tests for the deferred gates (TestGetNodeEvents_403WithoutRelation, TestGetNodeEvents_404ForMissingNode, TestGetNodeEvents_410ForOutOfWindow, TestGetNodeEvents_AuditOnOpen, TestGetNodeEvents_AuditOnVerifyFailure, TestSSE_Authz_WithoutRelation_403_Deferred) are present as t.Skip blocks with TODO(security, PX-0017): markers so that landing a port flips them green without rediscovery.

What does ship and is load-bearing today:

  • The publish pipeline (sign-before-publish with Nats-Msg-Id dedup), the bounded-TTL nonce store with capacity-LRU eviction and Prometheus gauge, the outbox-relay with cursor advance and ErrSigningKeyNotProvisioned skip-and-log, the Last-Event-ID parser with all four arms (empty / numeric / non-numeric / out-of- window), the canonical stream-name + subject helpers, the plexsphere.sse_relay_cursor migration, the depguard allow-list, the OpenAPI surface byte-equality drift gate, the in-process EventStreamAdapter, the multi-replica boot guard refusing replicas > 1 with the in-memory NonceStore, and the consumer-side capture harness contract.
  • The node_state_updated trial event-type derived from a tenancy.NodeRegistered outbox row — sufficient to drive the publish pipeline end-to-end through tests.
  • The policy_updated wire event-type — a second live wire literal alongside node_state_updated. The publisher's wireTypeFor dispatch table maps the closed policy outbox literals (policy_revision_created and policy_deleted) onto the wire literal policy_updated; every other outbox literal keeps the default node_state_updated mapping. The producer-side fan-out runs from the compile-service arm — one publish per matched Node on revision-created, one per previously compiled Node on delete. See ../policy/events.md for the full publisher-side dispatch table, fan-out algorithm, payload schema, and the relay-loop deferral that remains open per the roadmap.
  • The bridge_config_updated wire event-type — a third live wire literal. The publisher's wireTypeFor dispatch table collapses the closed seven-member set of bridge.* outbox literals onto the single wire literal bridge_config_updated. The producer-side fan-out runs from the four bridge application services: one signed envelope per Node hosted by the changed bridge Resource, carrying the whole-object effective bridge config. The consumer-side /v1/nodes/{id}/events HTTP plumbing stays deferred per the partial-delivery status above, so a bridge-mode Node converges through the reconciliation-pull bridge block today. See ../bridge/events.md for the full publisher-side dispatch table, the per-Node fan-out algorithm, the BridgeConfigUpdatedPayload schema, and the byte-equality contract with the pull snapshot.

Downstream consumers that depend on the SSE bus MUST treat the production endpoint as 501 Not Implemented until the deferred items above land.

Cross-references

  • ../../../README.md#signed-event-bus — top-level specification of the bus, the per-node SSE surface, the event-type catalogue, and the Last-Event-ID replay contract.
  • ./peers.md — Key and Peer Manager bounded sub-context reference; the six peer event_type strings (peer_registered, peer_psk_assigned, peer_deregistered, peer_endpoint_changed, rotate_keys, peer_key_rotated) the relay routes verbatim.
  • ./key-rotation.md — the key-rotation workflow that emits the rotate_keys command and the peer_key_rotated notification onto this bus.
  • ../signing-rotation.md — the Signing Service key-rotation workflow that emits the signing_key_rotated event onto this bus.
  • ../policy/events.md — the policy events surface that maps the closed policy outbox set onto the policy_updated wire literal.
  • ../bridge/events.md — the bridge events surface that collapses the closed seven-member bridge outbox set onto the bridge_config_updated wire literal and fans it out one envelope per hosted Node.
  • ../../architecture/storage-topology.md — the Postgres node that backs plexsphere.sse_relay_cursor and the per-Domain outbox rows the relay drains.
  • ../../contributing/layout.md — bounded-context map row enumerating internal/mesh/sse and the depguard allow-list that keeps the bus boundary intact.
  • ../../how-to/mesh/inspect-the-event-bus.md — operator how-to for inspecting the live JetStream stream, the cursor row, and the publisher metrics.
  • ../../../api/openapi/plexsphere-v1.yaml — OpenAPI spec for GET /v1/nodes/{id}/events, including the Last-Event-ID header and the registered Problem-Details codes.
  • ../../../tests/workspace/mesh_sse_depguard_test.go — workspace alignment test pinning the depguard allow-list for the internal/mesh/sse boundary.

Ubiquitous language

The terms below travel together across the Go code, the OpenAPI contract, the audit log, the Prometheus metric label values, and operator-facing tooling. Names are preserved verbatim in error messages and structured-log attributes.

TermDefinition
EnvelopeThe signed-event value type (internal/signing/envelope) carried over the bus: {ID, Type, Scope, KeyID, IssuedAt, Payload, Signature}. The Signature is an Ed25519 detached signature over CanonicalBytes of the envelope with Signature excluded.
Domain EventThe business event a payload encodes (e.g. NodeRegistered from internal/identity/tenancy/events). The bus does not interpret payloads; the EventType discriminator on the outbox row is reserved for forensic audit emission while the wire envelope's Type is fixed to node_state_updated.
Subscriber StreamA single authenticated plexd Node's SSE subscription on GET /v1/nodes/{id}/events. One stream per node; the HTTP handler increments sse_active_streams on Subscribe and decrements on the deferred close.
Signing Key ResolverThe SigningKeyResolver seam in internal/mesh/sse/signing_key_resolver.go that returns the (KeyID, Scope) for a Domain. Returns ErrSigningKeyNotProvisioned when the per-Domain key has not been minted yet; the relay logs and skips on this sentinel.
Nonce SetThe bounded LRU + TTL set in internal/mesh/sse/nonce.go that admits each envelope ID exactly once per freshness window. Add(nonce) → bool: true admits, false is a duplicate the consumer must drop.
Last-Event-IDThe Last-Event-ID HTTP header on a reconnect. Resolved by ParseLastEventID in internal/mesh/sse/replay_math.go to the JetStream OptStartSeq the consumer should resume from.
Replay WindowThe JetStream stream's [currentLow, currentHigh] retention bounds (24h MaxAge by default). A Last-Event-ID below currentLow has aged out; the handler returns 410 Gone with last_event_id_outside_replay_window.
Outbox CursorThe per-stream row in plexsphere.sse_relay_cursor advanced under SELECT … FOR UPDATE SKIP LOCKED by the relay. Carries (stream_name, last_outbox_id, last_xid, updated_at).
Stream Replica FactorThe JetStream stream replicas count read from PLEXSPHERE_SSE_STREAM_REPLICAS at boot. Composition refuses to start with replicas > 1 while the in-memory NonceStore is wired.
NodeRegisteredA representative Domain Event (the Tenancy bounded context emits it when a node joins). Reaches the bus as an outbox row whose payload the relay forwards into a single envelope.
NodeReachabilityChangedA Domain Event emitted by the per-Domain reachability evaluator (internal/mesh/reachability/evaluator.go) when a Node transitions between healthy, stale, and unreachable. Source: TypeNodeReachabilityChanged in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated (the bus pins one wire type — same as every other Domain Event row). Payload contract: {from_state, to_state, node_id, domain_id, occurred_at}. See ./reachability.md for the state-machine semantics.
rotate_keysA peer-aggregate outbox event_type the Key and Peer Manager appends when an operator requests a Node mesh-key rotation. Unlike the past-tense peer notifications it is an imperative COMMAND addressed to a single Node, telling it to generate a fresh Curve25519 keypair and call POST /v1/keys/rotate. Source: TypeRotateKeys in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated (the bus pins one wire type — the per-event discriminator travels on the outbox event_type). See ./key-rotation.md for the rotation workflow.
peer_key_rotatedA peer-aggregate outbox event_type the Key and Peer Manager appends when a Node completes a mesh-key rotation; every peer of the rotated Node observes it. The payload carries the rotated Node's new 32-byte Curve25519 public key plus the (kid, wrap_key_version) PSK reference — never PSK plaintext or ciphertext. Source: TypePeerKeyRotated in internal/identity/tenancy/events/events.go. Wire-level Type discriminator is node_state_updated. See ./key-rotation.md for the rotation workflow.
signing_key_rotatedA signing-aggregate outbox event_type the plexsphere-signer binary appends when a signing-key rotation opens. One outbox row per Node in the scope is written by OutboxEventPublisher.PublishKeyRotated in cmd/plexsphere-signer/event_publisher_outbox.go inside a dedicated publisher-owned pgx.Tx (NOT the rotation transaction — the rotation row commits first; the publisher's tx is independent). The per-Node rows are all-or-nothing within the publisher tx, but a crash between the rotation commit and the publisher tx leaves the rotation persisted with no fan-out (see signing-rotation.md two-phase trade-off). Source: TypeSigningKeyRotated in internal/identity/tenancy/events/events.go. Payload contract: {event_id, occurred_at, domain_id, scope_kind, scope_id, old_key_id, new_key_id, new_public_key, valid_from, opened_at, closes_at} (the 32-byte new_public_key is base64-encoded on the wire). Wire-level Type discriminator is node_state_updated. The consumer side is DomainSigningKeyResolver.Subscribe in internal/mesh/sse/signing_key_resolver.go, which calls Reset(DomainID) per event to invalidate the per-Domain public-key cache. See ../signing-rotation.md for the rotation workflow.
node_state_updatedThe default wire-level Type discriminator the publisher stamps onto every envelope that does NOT map onto a more specific wire literal. Pinned by EventTypeNodeStateUpdated in internal/mesh/sse/publisher.go; the per-row mapping is owned by the wireTypeFor dispatch table in the same file.
policy_updatedA second live wire-level Type discriminator the publisher stamps onto envelopes whose outbox-side event_type belongs to the closed policy aggregate set (policy_revision_created or policy_deleted). Pinned by EventTypePolicyUpdated in internal/mesh/sse/publisher.go; the publisher-side dispatch table and the producer-side fan-out are documented in ../policy/events.md.
bridge_config_updatedA third live wire-level Type discriminator the publisher stamps onto envelopes whose outbox-side event_type belongs to the closed seven-member bridge aggregate set (bridge.RelayConfigured, the user-access / public-ingress / site-to-site *Configured and *Removed literals). Pinned by EventTypeBridgeConfigUpdated in internal/mesh/sse/publisher.go; the producer-side per-Node fan-out is wired while the consumer-side stays deferred. The publisher-side dispatch table, the fan-out algorithm, and the BridgeConfigUpdatedPayload schema are documented in ../bridge/events.md.
PLEXSPHERE_NODE_EVENTSThe JetStream stream name. Pinned by StreamName in internal/mesh/sse/streamname.go; a single source of truth for the publisher's EnsureStream call and the consumer's Replay call.
plexsphere.node.events.<domain>.<node>The per-node JetStream subject template returned by SubjectFor(domainID, nodeID). The wire-format invariants (no ., *, >, no whitespace, no non-printables) are enforced by validateSubjectToken in the same file.

Envelope schema

The wire envelope is the envelope.Envelope value type defined in internal/signing/envelope/canonical.go. Field semantics:

FieldTypeMeaning
IDuuid.UUID (v7)Unique per envelope. The publisher allocates a fresh UUIDv7 per call so IDs sort naturally by issuance time across replicas without a separate sequence column. The ID also doubles as the nonce key on the consumer side (see Nonce set design).
TypestringThe wire-level discriminator. Always node_state_updated, pinned by the constant EventTypeNodeStateUpdated in internal/mesh/sse/publisher.go.
Scopesigning.ScopeThe signing scope the envelope was signed under — domain:<uuid> for every per-Domain envelope on this bus.
KeyIDsigning.KeyIDThe key that produced Signature. The Resolver returns this verbatim; the publisher never lets the signer pick its own active key.
IssuedAttime.Time (UTC)The publisher's wall-clock instant at envelope construction. Canonicalised to nine fractional digits via CanonicalTimeLayout.
Payloadjson.RawMessageThe Domain Event payload, taken verbatim from the outbox row. CanonicalBytes re-parses and re-emits the payload with sorted keys so a caller cannot leak its own key order into the signed pre-image.
Signature[]byteEd25519 detached signature over CanonicalBytes(env) with Signature excluded. Excluded by the canonicaliser unconditionally — a verifier re-runs CanonicalBytes on the received envelope without clearing the field manually.

The README's wire-level description of the envelope at README §Signed Event Bus lists a notional separate nonce field; the in-tree implementation uses the envelope ID (UUIDv7) as the nonce key on both publish and consume paths because the ID is already unique-per-envelope within the freshness window. A future dedicated Nonce field would slot into this same call site without touching the security control flow — see the DECISION block in internal/transport/http/v1/handlers/events.go around the nonceKey := ev.Envelope.ID.String() site.

CanonicalBytes is the byte-stable JSON pre-image the signer signs. Encoding rules — sorted object keys, no whitespace, HTML-safe escapes disabled, raw UTF-8, fixed-precision time layout — are pinned in the canonicaliser file's package doc; downstream re-canonicalisation by a verifier is therefore deterministic across Go versions, platforms, and clock resolutions.

JetStream subject layout

Two strings pin the wire contract between the publisher's EnsureStream call and the consumer's Replay call:

  • Stream name — the constant StreamName = "PLEXSPHERE_NODE_EVENTS" in internal/mesh/sse/streamname.go. This is the SINGLE SOURCE OF TRUTH; both the relay's EnsureStream and the SSE handler's Replay source the literal from this file.
  • Per-node subject templateplexsphere.node.events.<domainID>.<nodeID>, returned by SubjectFor(domainID, nodeID). The fixed prefix subjectPrefix = "plexsphere.node.events." is unexported; callers always go through SubjectFor.

validateSubjectToken (same file) rejects each token that:

  • is empty (domain id must not be empty),
  • contains the NATS subject delimiter . (collides with the hierarchy),
  • contains the wildcard * or the multi-wildcard > (would expand the publish target),
  • contains whitespace (caught via unicode.IsSpace so tab, LF, CR, and U+00A0 all surface a single error class),
  • contains a non-printable byte (caught via unicode.IsPrint so the C0/C1 control planes and DEL all hit one branch).

A rejected token surfaces ErrInvalidSubjectToken wrapped with the offending kind and value; callers branch with errors.Is. The publisher reacts by skipping the row before any bus call.

Publish pipeline diagram

The publish pipeline runs strictly in the order shown — the "sign before publish" invariant forbids storing or transmitting an unsigned envelope, so any failure in the resolve, canonicalise, or sign step short-circuits before the JetStream publish.

text
   ┌────────────────────────────────────────────────────────────┐
   │   Postgres   plexsphere.outbox_event                       │
   │              + plexsphere.sse_relay_cursor (per-stream)    │
   └────┬───────────────────────────────────────────────────────┘
        │ 1. RunTick: SELECT … FOR UPDATE SKIP LOCKED on cursor
        │            ReadOutboxBatchAfter(cursor, batch=64)
        v
   ┌────────────────────┐
   │  Relay (relay.go)  │  for each row:
   └────┬───────────────┘
        │ 2. resolve(domainID) ──────────────► SignerClient.PublicKey
        │                                       (gRPC mTLS)
        │      KeyID, Scope (or ErrSigningKeyNotProvisioned: skip)
        v
   ┌────────────────────────────┐
   │  Publisher.Publish         │
   │  (publisher.go)            │
   └────┬───────────────────────┘
        │ 3. allocate envelope ID = uuid.NewV7()
        │ 4. build envelope { ID, Type=node_state_updated,
        │                     Scope, KeyID, IssuedAt, Payload }
        │ 5. canonicalBytes = envelope.CanonicalBytes(env)
        │ 6. signResult = Signer.Sign(scope, keyID, canonicalBytes)
        │                              ──────► SignerClient.Sign
        │                                       (gRPC mTLS)
        │ 7. env.Signature = signResult.Signature
        │ 8. wireBody = json.Marshal(env)
        │ 9. subject = SubjectFor(domainID, nodeID)
        │       ────────────────► JetStream subject
        │                          plexsphere.node.events.<dom>.<node>
        v
   ┌────────────────────────────┐
   │  Bus.Publish(subject,      │
   │              wireBody,     │
   │              WithMsgID(ID))│ ──► JetStream stream
   └────┬───────────────────────┘     PLEXSPHERE_NODE_EVENTS
        │ 10. ack.Sequence (uint64)
        v
   ┌────────────────────────────┐
   │  Relay (relay.go)          │ 11. AdvanceSseRelayCursor on
   │                            │     successful suffix; on any
   │                            │     publish/sign error: log,
   │                            │     bump counter, halt batch
   │                            │     without partial advance.
   └────────────────────────────┘

Failure modes are typed sentinels. ErrSigningKeyNotProvisioned short-circuits at step 2 — the publisher never reaches the sign or publish call. signing.ErrProviderUnavailable and any other Signer.Sign error short-circuit at step 6. Either way the cursor does not advance; the next Tick re-publishes the failed row, and the JetStream Nats-Msg-Id dedup window (keyed on the envelope ID) suppresses any duplicate that might briefly land on the wire.

Last-Event-ID semantics

ParseLastEventID(header, currentLow, currentHigh) in internal/mesh/sse/replay_math.go resolves the SSE handler's Last-Event-ID header into a JetStream OptStartSeq. The four arms are non-overlapping and exhaustive:

ArmHeaderBoundsReturned startSeqReturned errorHTTP status
1emptyanycurrentHigh + 1nil200 (tail mode; consumer blocks until next publish)
2numeric ncurrentLow ≤ nn + 1nil200 (resume; folds tail-position case n > currentHigh into the resume arm so a future-sequence reconnect blocks at n+1)
3numeric nn < currentLow0ErrLastEventIDOutsideWindow410 Gone, code last_event_id_outside_replay_window
4malformedany0ErrBadLastEventID400 Bad Request

Both sentinels live in replay_math.go next to their producer:

  • ErrBadLastEventID — wraps strconv.ParseUint's error so the HTTP handler can surface a stable error code while the operator-side log carries the underlying parse detail. Trips on alphabetic input, signed integers (-7, +42), decimal/scientific notation (12.5, 1e10), hex literals, surrounding whitespace, and any base-10 integer that overflows uint64.
  • ErrLastEventIDOutsideWindow — surfaced when the parsed n falls below currentLow. The client falls back to the reconciliation pull at GET /v1/nodes/{id}/state to rebuild state without lossy replay.

Edge cases the function pins explicitly:

  • n == 0 against currentLow ≥ 1 is arm 3 (410). JetStream sequences are 1-based; a 0 bookmark cannot come from a real delivery.
  • currentLow == 0 (only when the stream is empty) with an empty header returns startSeq = 1 — the consumer blocks until the first publish.
  • A future-sequence header (n > currentHigh) with currentLow ≤ n is arm 2 — the consumer blocks at n+1 until a matching new event is published. The user-story criterion is "Reconnect with Last-Event-ID: <head+1> blocks until a matching new event is published"; the resume arm is the correct home for it.

Nonce set design

The NonceStore in internal/mesh/sse/nonce.go is an in-memory, bounded LRU + TTL set that admits each envelope ID exactly once per freshness window. Its design properties:

  • Insertion-time ordering, not access-time. The freshness window is a property of WHEN the nonce was first seen; an access-time LRU would let a steady stream of Seen() probes keep an aging nonce alive past TTL.
  • Capacity bound. Construction takes maxSize >= 1; Add evicts the oldest insertion-time entry when the post-expiry size is at the cap. A non-positive maxSize produces ErrInvalidNonceStoreConfig at constructor time.
  • TTL bound. Construction takes ttl >= 1ns; Add runs expire-on-touch before any duplicate-detection branch, so a TTL-aged nonce re-Add returns true.
  • Concurrency. All exported methods are safe for concurrent use across goroutines under a single sync.Mutex.
  • Container choice. Backed by container/list (doubly-linked list) plus a map[string]*list.Element for O(1) Add, O(1) Seen, and O(1) capacity eviction.
  • Prometheus gauge. sse_nonce_live_size (the constant NonceLiveSizeMetricName in internal/mesh/sse/metrics.go) is registered by New via prometheus.NewGaugeFunc. The callback reads len(map) under the store's mutex at scrape time only, so the steady-state hot path never touches Prometheus state. A nil registerer disables the gauge.

The in-memory store is single-replica-only. The composition root in cmd/plexsphere/sse_factory_prod.go refuses to boot when PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is set with the in-memory NonceStore wired.

Operator knobs:

Env varDefaultEffect
PLEXSPHERE_SSE_NONCE_TTLfeature defaultFreshness window for the nonce set. Parsed as a Go time.Duration; non-positive values are rejected at boot.
PLEXSPHERE_SSE_MAX_LIVE_NONCESfeature defaultHard upper bound on live nonces. Parsed as a positive integer; non-positive values are rejected at boot.

Threat model

The bus mitigates four classes of attack along the publish-to-consume path. Each mitigation is implemented in a single, named place so a reader chasing a security claim does not have to assemble it from multiple files.

  1. Signature forgery. Every envelope is signed with the per-Domain Ed25519 key resolved via the SigningKeyResolver. The consumer re-runs CanonicalBytes over the received envelope and verifies the detached signature against the per-Domain public key served by SignerClient.PublicKey. A failed verification is logged with the outcome=signature_failure audit attribute and bumps sse_envelope_verification_failures_total{outcome=bad_signature}. See EventStreamAdapter in internal/mesh/sse/event_stream.go for the audit emission path and the AuditOutcomeSignatureFailure constant.
  2. Replay of a previously-valid envelope. The symmetric nonce set rejects a duplicate envelope ID inside the freshness window; the publisher's IssuedAt field anchors freshness on the signed pre-image so an attacker cannot mint a far-future timestamp under a captured signature. A duplicate increments sse_nonce_replay_total and is dropped with a structured-log line. Both sides (publish-side relay and consume-side handler) share one NonceStore at composition time so the counter aggregates the symmetric check.
  3. Tampering in transit. JetStream guarantees the wire bytes reach the consumer unmodified at the broker level; the end-to-end Ed25519 signature over CanonicalBytes makes any in-transit byte mutation observable at verification time regardless of the broker's transport. A mutated payload fails re-canonicalisation or signature verification before the consumer accepts the envelope.
  4. Signer outage. The publisher refuses to write unsigned envelopes: ErrSigningKeyNotProvisioned short-circuits the pipeline at the resolver step, BEFORE any bus call. The relay logs a structured WARN line with the Domain ID and bumps sse_relay_skip_total{reason=no_signing_key}. A generic signer error (e.g. signing.ErrProviderUnavailable) short-circuits at the sign step and bumps sse_publisher_sign_failures_total{reason=publish_error}. In either case the cursor does not advance; the next Tick replays the row once the signer recovers.

Multi-replica handoff

The relay supports horizontal scale via the SELECT … FOR UPDATE SKIP LOCKED cursor pattern in internal/mesh/sse/relay.go. Each Tick:

  1. Begins a Postgres transaction.
  2. Locks the per-stream cursor row (stream_name = 'PLEXSPHERE_NODE_EVENTS') in plexsphere.sse_relay_cursor under FOR UPDATE SKIP LOCKED. If another replica already owns the row, the query yields zero rows and the relay returns ErrCursorBusy — translated to a nil Tick error so the relay-loop driver does not back off on a healthy contention pattern.
  3. Reads up to batchSize outbox rows strictly after the cursor's (transaction_id, occurred_at, outbox_id) triple.
  4. Calls the publish pipeline for each row; on the longest contiguous success suffix, advances the cursor inside the same transaction.
  5. Commits.

Failure suppresses the cursor advance for any row that follows the last success, so the next Tick (on this or a sibling replica) starts exactly where this one halted. Combined with the JetStream Nats-Msg-Id dedup window (keyed on the envelope ID), the at-least-once boundary is invisible to the SSE consumer: a row that two replicas briefly attempt to publish lands on JetStream once.

A reconnecting subscriber lands on a different replica without gap or duplicate because:

  • The replay payload is owned by JetStream, not by the relay. The consumer's Replay(streamName, startSeq) call (with startSeq resolved by ParseLastEventID) reads the bus directly; the receiving plexsphere replica is a transparent passthrough.
  • The cursor row's at-least-once boundary applies only to the publish side. A reconnect's Last-Event-ID does not touch the cursor; it touches the JetStream stream's (currentLow, currentHigh) view.

The in-memory NonceStore is single-replica-only: each replica maintains its own bounded set, so a duplicate publish that two replicas race on cannot be defended against in-process across replicas. The composition root therefore refuses to boot when PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is set with the in-memory store wired — operators who scale beyond one replica must wire a distributed nonce backend before scaling. The exact refusal message is pinned in cmd/plexsphere/sse_factory_prod.go and rejects the boot with a "PLEXSPHERE_SSE_STREAM_REPLICAS > 1 is incompatible with the in-memory NonceStore" error directing operators to pin replicas=1 or wire a distributed nonce store before scaling.

Operator runbook

For full step-by-step procedures see ../../how-to/mesh/inspect-the-event-bus.md. The pointers below are the entry points an operator chases when a subscriber complains about gaps, replays, or a stalled reconnect.

  • Inspect the JetStream stream. Use the NATS CLI inside the cluster — nats stream info PLEXSPHERE_NODE_EVENTS reports the current (first, last, age, max_age) triple. The stream name comes from the StreamName constant in internal/mesh/sse/streamname.go; per-node subjects follow plexsphere.node.events.<domainID>.<nodeID> and are listable via nats stream subjects PLEXSPHERE_NODE_EVENTS.
  • Drain the cursor row. The per-stream cursor lives in plexsphere.sse_relay_cursor. Operators inspect the row with SELECT stream_name, last_outbox_id, last_xid, updated_at FROM plexsphere.sse_relay_cursor WHERE stream_name = 'PLEXSPHERE_NODE_EVENTS';. The updated_at column is the last successful Tick instant; a stale value combined with a non-empty outbox is the signal that a publish error is halting batches.
  • Force a re-emit. Reset the cursor by deleting the cursor advance — UPDATE plexsphere.sse_relay_cursor SET last_outbox_id = '<earlier-id>', last_xid = <earlier-xid> WHERE stream_name = 'PLEXSPHERE_NODE_EVENTS';. The next Tick re-publishes everything after the new cursor; the JetStream Nats-Msg-Id dedup window (24h) suppresses any duplicate that the original publish already landed.
  • Watch the Prometheus surface.
    • sse_envelope_verification_failures_total{outcome} — consumer-side rejections labelled bad_signature, bad_nonce, or decode_error. A sustained rate is a malformed publisher, a key-rotation gap, or a replay attack.
    • sse_nonce_replay_totalNonceStore.Add returned false. A spike correlates with a replay storm; a slow drift correlates with a clock skew letting old envelopes fall back into the freshness window.
    • sse_relay_skip_total{reason} — relay rows skipped, labelled no_signing_key (rotation pipeline has not minted the per-Domain key yet).
    • sse_publisher_sign_failures_total{reason} — publish-pipeline errors labelled publish_error (today the relay's only label), with provider_unavailable and key_not_provisioned reserved for future direct-publisher call sites.
    • sse_active_streams — count of currently-open SSE handler subscriptions. Watch against per-Node connection caps to detect leaked subscriptions or a runaway reconnect loop.
    • sse_nonce_live_size — current nonce-set size. Watch against PLEXSPHERE_SSE_MAX_LIVE_NONCES to detect approaching the bounded-set limit before Add latency degrades.