Skip to content

Signed SSE Event Bus — deferred-wiring roadmap

Status: scaffold landed; production wiring deferred.

This document tracks every load-bearing piece of production wiring that the Signed SSE Event Bus PR ships as scaffolding. Each item names the file(s) that hold the placeholder, the security or operational guarantee that is currently NOT enforced, and the follow-up task that must close the gap before the surface can serve real traffic.

The handler at internal/transport/http/v1/handlers/events.go refuses to mount when any of the load-bearing security ports is nil; the events_dispatch.go gate returns 501 with code signed_event_bus_not_provisioned so the deferred state is loud, log-scrapable, and fail-closed.

Security ports — REQUIRED before the surface goes hot

SignatureVerifier

  • Port: internal/transport/http/v1/handlers/events_deps.go, type SignatureVerifier.
  • Backed by: per-Domain key resolver at internal/mesh/sse/signing_key_resolver.go.
  • Today: the handler refuses to mount without it; events_dispatch.go returns 501 + signed_event_bus_not_provisioned.
  • Wiring task: composition root (cmd/plexsphere) constructs a *sse.SigningKeyResolver-backed verifier and threads it onto handlers.Deps.SignatureVerifier.

RelationChecker

  • Port: internal/transport/http/v1/handlers/events_deps.go, type RelationChecker.
  • Backed by: internal/authz once the node-agent relation seam ships.
  • Today: the handler refuses to mount without it; events_dispatch.go returns 501 + signed_event_bus_not_provisioned.
  • Wiring task: build a thin adapter around *authz.Authorizer that exposes the Check(ctx, principal, relation, object) shape the transport port requires; wire it onto handlers.Deps.RelationChecker.

NodeRepo

  • Port: internal/transport/http/v1/handlers/events_deps.go, type NodeRepo.
  • Backed by: tenancy-tx Node aggregate.
  • Today: the handler refuses to mount without it; events_dispatch.go returns 501 + signed_event_bus_not_provisioned.
  • Wiring task: build a thin adapter around the tenancy repo's Node lookup; wire it onto handlers.Deps.NodeRepo.

AuditSink

  • Port: internal/transport/http/v1/handlers/events_deps.go, type AuditSink.
  • Optional today: a nil sink suppresses audit emission but the security gates still fire.
  • Wiring task: build an adapter around internal/audit.Sink that translates AuditEntry into the platform's audit.Entry shape; wire it onto handlers.Deps.AuditSink.

Production wiring — relay loop and stream provisioning

Relay loop

  • Module: internal/mesh/sse/relay_loop.go.
  • Today: RunRelay(ctx, ticker, cfg) exists but has no production caller. BuildProductionSSEFactory does not construct a Relay, does not call RunRelay, and does not call EnsureNodeEventsStream. The 0009_sse_relay.sql migration creates a table that production never reads or writes.
  • Wiring task: extend BuildProductionSSEFactory to:
    1. Construct a RelayStore adapter wrapping *db.Queries (sqlcgen) so the per-stream cursor row is read/written against a real Postgres pool.
    2. Construct a Relay instance with the wrapped store, publisher, and clock.
    3. Call EnsureNodeEventsStream(ctx, ensurer, replicas, maxBytes) once at startup.
    4. Spawn the RunRelay goroutine on the long-lived ticker.

sqlc-backed RelayStore

  • Module: internal/platform/db/gen/80_sse_relay_cursor.sql.go (generated by sqlc).
  • Today: no production code instantiates a *db.Queries-backed sse.RelayStore adapter. Tests use inline SQL.
  • Wiring task: build cmd/plexsphere/sse_relay_store.go (or similar) that wraps *db.Queries and satisfies sse.RelayStore.

Per-subject FilterSubject

  • Module: internal/mesh/sse/event_stream.go.
  • Today: the JetStream replay is keyed off streamName only; the stream-level filter plexsphere.node.events.> admits every per-(Domain, Node) subject, so a consumer receives ALL events on the stream regardless of the addressed Node.
  • Wiring task: extend messaging.Client.Replay (or MessageReader.Replay) to accept a FilterSubject argument; bind the JetStream consumer with FilterSubjects.

StartFromTail in production messaging

  • Module: internal/platform/messaging/replay.go.
  • Today: rejects startSeq == 0; does not yet support DeliverPolicy=DeliverNew.
  • Wiring task: extend Replay to translate sse.StartFromTail (or a literal "tail" sentinel parameter) into JetStream's DeliverNew policy.

EventStream Bounds seam

  • Module: internal/transport/http/v1/handlers/events.go calls sse.ParseLastEventID(header, 0, ^uint64(0)).
  • Today: the OpenAPI 410 path (last_event_id_outside_replay_window) is unreachable because the bounds path is not wired; the 410 description is removed from the spec until then.
  • Wiring task: thread a Bounds(ctx, nodeID) (low, high uint64, err error) method through the EventStream port and pass real (low, high) to ParseLastEventID.

Storage layer — the deferred 0009 migration

internal/platform/db/migrations/0009_sse_relay.sql creates a sse_relay_cursor table that production does not yet read or write. Until the relay-loop wiring lands the table is reserved state — a downgrade that drops it before the wiring catches up risks losing any cursor an early-adopter pre-production cluster has materialised. The migration is intentionally kept in this PR to lock the schema shape early; the operator-facing note is in the migration file's header comment.

Operational hardening — relay HOL blocking

internal/mesh/sse/relay.go returns successes=0 on ErrSigningKeyNotProvisioned, so a Domain whose signing key is never provisioned blocks every subsequent event for that Domain forever. Today the sse_relay_skip_total{reason=no_signing_key} counter and the WarnContext line are the operator signal; the only remediation is to provision the missing key.

A future "skip and advance" fix requires either a sse_relay_skipped table or an outbox_events.status column — both are schema changes that sit outside this PR's scope. Track the schema choice and implementation in a follow-up story under the Signed Event Bus epic.

NonceStore split / dual-store invariant

SSEWiring exposes ONE NonceStore; the production composition root MUST wire it onto exactly one consumer layer. Today (scaffold posture, single-replica) the NonceStore is wired onto the handler. When the production wiring lands and replicas > 1 becomes the supported posture, the fix is:

  1. Replace the in-memory NonceStore with a distributed store (e.g. Redis-backed).
  2. Split SSEWiring into AdapterNonceStore and HandlerNonceStore fields (or document that the same distributed store is wired onto both layers).
  3. Make BuildProductionSSEFactory enforce pointer non-equality when the in-memory store is in use.

Test harness symmetry

internal/platform/testutil/sse/capture.go now computes envelope.CanonicalBytes(env_with_sig_cleared) to verify production envelopes; legacy producers that signed raw data: bytes still verify because signedPayload() falls back to the raw payload on parse failure.

Verification on every PR

  • The build gate refuses to ship if any of the security ports ends up wired on a production code path without a paired test.
  • go list -m -u all and govulncheck ./... should run on the new modules under internal/mesh, internal/transport/http, and cmd/sse-stub-plexd whenever this roadmap is touched. At time of writing, the locally-installed govulncheck@v1.1.4 and golangci-lint@v2.11.4 binaries are built with a pre-1.26 Go toolchain and refuse to scan a tree that requires Go 1.26 (file requires newer Go version go1.26). CI installs both tools via GOTOOLCHAIN=local go install against a Go 1.26 toolchain (see .github/workflows/ci.yaml); the dependency audit therefore runs in CI rather than at developer-side pre-commit. When the upstream releases catch up to Go 1.26, the audit can move pre-commit.

Producer-side closures already landed

Reachability evaluator outbox emitter

The per-Domain reachability evaluator's writer-side closure is live: every Healthy → Stale → Unreachable transition writes one tenancy.NodeReachabilityChanged row to plexsphere.outbox_events inside the same transaction as the UpdateReachabilityState SQL write (see internal/identity/tenancy/repo/node_heartbeat_repo.go::TransitionAndEmit and the writer adapter in cmd/plexsphere/evaluator_factory_prod.go). Two cross-cutting invariants are gated by integration tests at tests/integration/reachability_evaluator_emits_outbox_test.go:

  • One outbox row per state transition; idempotent re-runs do NOT duplicate the row (the SQL-side reachability_state <> $2 predicate is the load-bearing guard).
  • The outbox row and the SQL UPDATE share one transaction_id — the same-tx atomicity invariant a crash between the two writes would otherwise violate.

The companion chain-bound AuditSink adapter (*chainedReachabilityAuditSink) wraps the canonical hash-chained audit.Sink so every transition also lands one row on the per-Domain audit chain. The chain row's wire shape is pinned by tests/integration/reachability_evaluator_chained_audit_test.go: Relation = "node_reachability.transition", Object = "domain:<hex16>" (matching the audit factory's defaultDomainResolver), Reason = audit.ReasonGranted, RelationPath = [from_state, to_state, reason]; the per-Domain audit_log_chain_head.next_seq advances by exactly the number of transitions and chain.VerifyChain succeeds end-to-end.

The SSE drain that surfaces the outbox row on the wire as a node_state_updated envelope is the relay-loop closure tracked under "Production wiring" above; once that lands, the tests/e2e/mesh/reachability-transitions/ chainsaw fixture flips from skip: true to live and asserts the cross-process producer→consumer convergence.

Policy compile-arm wire fan-out (policy_updated)

The policy compile arm's producer-side closure is live alongside the peer_* and node_state_updated per-source wire-type expansions. The publisher's wireTypeFor dispatch table at internal/mesh/sse/publisher.go maps the closed two-event policy outbox set (policy_revision_created and policy_deleted) onto the single wire literal policy_updated; the compile service emits one publish per matched Node on revision-created via the policy.WirePublisher port (production binding cmd/plexsphere/policy_wire_publisher_adapter.go) and one per previously compiled Node on delete via lookup-before-cascade through CompiledRulesetRepo.ListNodesByPolicy. Three cross-cutting invariants are gated by integration tests at tests/integration/policy_updated_wire_fanout_test.go:

  • Per-Node arity: one wire publish per (Node, Policy) row on revision-created; one wire publish per previously compiled Node on delete.
  • Payload byte-equality with the persisted rule_payload column on revision-created; canonical empty-rules payload []byte("[]") on delete.
  • JetStream Nats-Msg-Id dedup suppresses a re-driven outbox row as ack.Duplicate=true.

The wire envelope's signature chain is pinned by tests/integration/policy_updated_signature_chain_test.go against the production per-Domain SigningKeyResolver. The remaining work is the relay-loop side (the RelayStore adapter, the EnsureNodeEventsStream call, and the RunRelay goroutine) tracked under "Production wiring — relay loop and stream provisioning" above; once that lands, the tests/e2e/policy/policy-updated/chainsaw-test.yaml chainsaw fixture's existing stub-binary producer/consumer pair extends to the real plexsphere binary without a schema change.

Bridge wire fan-out (bridge_config_updated)

The bridge orchestrator's producer-side closure is live alongside the policy_updated and node_state_updated per-source wire-type expansions. The publisher's wireTypeFor dispatch table at internal/mesh/sse/publisher.go collapses the closed seven-member set of bridge.* outbox literals (bridge.RelayConfigured, the user-access / public-ingress / site-to-site *Configured and *Removed literals) onto the single wire literal bridge_config_updated. The four bridge application services each call the bridge.WirePublisher port (internal/bridge/ports.go::PublishBridgeConfigUpdated) after their RunInTx closure commits; the production adapter enumerates the Nodes the changed bridge Resource hosts via the bridge.BridgeNodeLister port, builds the effective config once through the effective.EffectiveConfigBuilder, and emits one signed envelope per hosted Node — the 1-bridge-row → N-Node fan-out. The wire effective_config bytes equal the reconciliation-pull bridge block for the same Node and bridge Resource; the parity is pinned by tests/integration/bridge_config_updated_pull_parity_test.go. A nil WirePublisher keeps a service on its single-arm posture (aggregate write + outbox row + audit row, no wire emission), and a publish error is logged and swallowed because the command has already committed and the pull fallback guarantees eventual convergence.

The remaining work is the consumer-side /v1/nodes/{id}/events HTTP plumbing — the relay-loop side (the RelayStore adapter, the EnsureNodeEventsStream call, and the RunRelay goroutine) tracked under "Production wiring — relay loop and stream provisioning" above, plus the signature-verification and ReBAC node-agent gates tracked under "Security ports". The publisher-side dispatch table, the per-Node fan-out algorithm, the BridgeConfigUpdatedPayload schema, and the depguard posture are documented at docs/contexts/bridge/events.md.

Downstream blocked items

Reconciliation-pull chainsaw equivalence E2E

tests/e2e/mesh/reconciliation-pull/chainsaw-test.yaml carries skip: true until the deferred wiring above lands. The fixture's terminal convergence gate registers a Domain + N Nodes through the production POST /v1/domains/{id}/nodes surface, captures the resulting SSE peer_registered envelope from the running plexsphere binary, issues GET /v1/nodes/{id}/state against the same binary, and asserts that the (node_id, mesh_ip, public_key) tuple is byte-equal between the SSE projection and the pull peers array. That gate depends on the SSE producer surface being live end-to-end — concretely on cmd/plexsphere/sse_factory_prod.go having a non-nil EventStream and EventPublisher so that the peer_registered envelope actually lands on the JetStream stream the chainsaw fixture subscribes to. Until the production wiring above closes, the fixture stays skipped so a chainsaw run does NOT post a green signal that misrepresents end-to-end coverage of the SSE↔pull equivalence story.

The integration-tier equivalent at tests/integration/state_pull_sse_equivalence_test.go exercises the same convergence story via in-process fakes and is GREEN today; the chainsaw fixture is the cross-process kind-cluster mirror that adds the production-binary regression layer once unblocked. The reconciliation-pull surface itself, its security-gate ordering, the four-block convergence with the SSE event taxonomy, the OpenAPI contract, and the wiring-receipt regression test against BuildProductionStateFactory are documented at docs/contexts/mesh/reconciliation-pull.md.

Cross-references

  • internal/mesh/sse/doc.go — package overview and the production wire-format contract.
  • cmd/plexsphere/sse_factory_prod.go — composition-root validation gates and the in-memory NonceStore wiring.
  • internal/transport/http/v1/handlers/events.go and events_dispatch.go — handler body and the fail-closed mount gate.
  • docs/contexts/mesh/reconciliation-pull.md — the pull-channel sibling of the SSE bus; pinned here so a reader chasing the deferred-wiring trail finds the reconciliation-pull surface that shares the signed_event_bus_not_provisioned problem code and unblocks together with the SSE peer endpoint when the deferred wiring above lands.
  • docs/contexts/mesh/peers.md — Key and Peer Manager bounded sub-context reference; pinned here because the three peer event_type strings (peer_registered, peer_psk_assigned, peer_deregistered) ride the SSE bus once the deferred wiring above lands.