Appearance
Signed SSE Event Bus — deferred-wiring roadmap
Status: scaffold landed; production wiring deferred.
This document tracks every load-bearing piece of production wiring that the Signed SSE Event Bus PR ships as scaffolding. Each item names the file(s) that hold the placeholder, the security or operational guarantee that is currently NOT enforced, and the follow-up task that must close the gap before the surface can serve real traffic.
The handler at internal/transport/http/v1/handlers/events.go refuses to mount when any of the load-bearing security ports is nil; the events_dispatch.go gate returns 501 with code signed_event_bus_not_provisioned so the deferred state is loud, log-scrapable, and fail-closed.
Security ports — REQUIRED before the surface goes hot
SignatureVerifier
- Port:
internal/transport/http/v1/handlers/events_deps.go, typeSignatureVerifier. - Backed by: per-Domain key resolver at
internal/mesh/sse/signing_key_resolver.go. - Today: the handler refuses to mount without it; events_dispatch.go returns 501 +
signed_event_bus_not_provisioned. - Wiring task: composition root (cmd/plexsphere) constructs a
*sse.SigningKeyResolver-backed verifier and threads it ontohandlers.Deps.SignatureVerifier.
RelationChecker
- Port:
internal/transport/http/v1/handlers/events_deps.go, typeRelationChecker. - Backed by:
internal/authzonce the node-agent relation seam ships. - Today: the handler refuses to mount without it; events_dispatch.go returns 501 +
signed_event_bus_not_provisioned. - Wiring task: build a thin adapter around
*authz.Authorizerthat exposes theCheck(ctx, principal, relation, object)shape the transport port requires; wire it ontohandlers.Deps.RelationChecker.
NodeRepo
- Port:
internal/transport/http/v1/handlers/events_deps.go, typeNodeRepo. - Backed by: tenancy-tx Node aggregate.
- Today: the handler refuses to mount without it; events_dispatch.go returns 501 +
signed_event_bus_not_provisioned. - Wiring task: build a thin adapter around the tenancy repo's Node lookup; wire it onto
handlers.Deps.NodeRepo.
AuditSink
- Port:
internal/transport/http/v1/handlers/events_deps.go, typeAuditSink. - Optional today: a nil sink suppresses audit emission but the security gates still fire.
- Wiring task: build an adapter around
internal/audit.Sinkthat translatesAuditEntryinto the platform'saudit.Entryshape; wire it ontohandlers.Deps.AuditSink.
Production wiring — relay loop and stream provisioning
Relay loop
- Module:
internal/mesh/sse/relay_loop.go. - Today:
RunRelay(ctx, ticker, cfg)exists but has no production caller.BuildProductionSSEFactorydoes not construct aRelay, does not callRunRelay, and does not callEnsureNodeEventsStream. The0009_sse_relay.sqlmigration creates a table that production never reads or writes. - Wiring task: extend
BuildProductionSSEFactoryto:- Construct a
RelayStoreadapter wrapping*db.Queries(sqlcgen) so the per-stream cursor row is read/written against a real Postgres pool. - Construct a
Relayinstance with the wrapped store, publisher, and clock. - Call
EnsureNodeEventsStream(ctx, ensurer, replicas, maxBytes)once at startup. - Spawn the
RunRelaygoroutine on the long-lived ticker.
- Construct a
sqlc-backed RelayStore
- Module:
internal/platform/db/gen/80_sse_relay_cursor.sql.go(generated by sqlc). - Today: no production code instantiates a
*db.Queries-backedsse.RelayStoreadapter. Tests use inline SQL. - Wiring task: build
cmd/plexsphere/sse_relay_store.go(or similar) that wraps*db.Queriesand satisfiessse.RelayStore.
Per-subject FilterSubject
- Module:
internal/mesh/sse/event_stream.go. - Today: the JetStream replay is keyed off
streamNameonly; the stream-level filterplexsphere.node.events.>admits every per-(Domain, Node) subject, so a consumer receives ALL events on the stream regardless of the addressed Node. - Wiring task: extend
messaging.Client.Replay(orMessageReader.Replay) to accept aFilterSubjectargument; bind the JetStream consumer withFilterSubjects.
StartFromTail in production messaging
- Module:
internal/platform/messaging/replay.go. - Today: rejects
startSeq == 0; does not yet supportDeliverPolicy=DeliverNew. - Wiring task: extend
Replayto translatesse.StartFromTail(or a literal "tail" sentinel parameter) into JetStream'sDeliverNewpolicy.
EventStream Bounds seam
- Module:
internal/transport/http/v1/handlers/events.gocallssse.ParseLastEventID(header, 0, ^uint64(0)). - Today: the OpenAPI 410 path (
last_event_id_outside_replay_window) is unreachable because the bounds path is not wired; the 410 description is removed from the spec until then. - Wiring task: thread a
Bounds(ctx, nodeID) (low, high uint64, err error)method through the EventStream port and pass real(low, high)toParseLastEventID.
Storage layer — the deferred 0009 migration
internal/platform/db/migrations/0009_sse_relay.sql creates a sse_relay_cursor table that production does not yet read or write. Until the relay-loop wiring lands the table is reserved state — a downgrade that drops it before the wiring catches up risks losing any cursor an early-adopter pre-production cluster has materialised. The migration is intentionally kept in this PR to lock the schema shape early; the operator-facing note is in the migration file's header comment.
Operational hardening — relay HOL blocking
internal/mesh/sse/relay.go returns successes=0 on ErrSigningKeyNotProvisioned, so a Domain whose signing key is never provisioned blocks every subsequent event for that Domain forever. Today the sse_relay_skip_total{reason=no_signing_key} counter and the WarnContext line are the operator signal; the only remediation is to provision the missing key.
A future "skip and advance" fix requires either a sse_relay_skipped table or an outbox_events.status column — both are schema changes that sit outside this PR's scope. Track the schema choice and implementation in a follow-up story under the Signed Event Bus epic.
NonceStore split / dual-store invariant
SSEWiring exposes ONE NonceStore; the production composition root MUST wire it onto exactly one consumer layer. Today (scaffold posture, single-replica) the NonceStore is wired onto the handler. When the production wiring lands and replicas > 1 becomes the supported posture, the fix is:
- Replace the in-memory NonceStore with a distributed store (e.g. Redis-backed).
- Split
SSEWiringintoAdapterNonceStoreandHandlerNonceStorefields (or document that the same distributed store is wired onto both layers). - Make
BuildProductionSSEFactoryenforce pointer non-equality when the in-memory store is in use.
Test harness symmetry
internal/platform/testutil/sse/capture.go now computes envelope.CanonicalBytes(env_with_sig_cleared) to verify production envelopes; legacy producers that signed raw data: bytes still verify because signedPayload() falls back to the raw payload on parse failure.
Verification on every PR
- The build gate refuses to ship if any of the security ports ends up wired on a production code path without a paired test.
go list -m -u allandgovulncheck ./...should run on the new modules underinternal/mesh,internal/transport/http, andcmd/sse-stub-plexdwhenever this roadmap is touched. At time of writing, the locally-installed govulncheck@v1.1.4 and golangci-lint@v2.11.4 binaries are built with a pre-1.26 Go toolchain and refuse to scan a tree that requires Go 1.26 (file requires newer Go version go1.26). CI installs both tools viaGOTOOLCHAIN=local go installagainst a Go 1.26 toolchain (see.github/workflows/ci.yaml); the dependency audit therefore runs in CI rather than at developer-side pre-commit. When the upstream releases catch up to Go 1.26, the audit can move pre-commit.
Producer-side closures already landed
Reachability evaluator outbox emitter
The per-Domain reachability evaluator's writer-side closure is live: every Healthy → Stale → Unreachable transition writes one tenancy.NodeReachabilityChanged row to plexsphere.outbox_events inside the same transaction as the UpdateReachabilityState SQL write (see internal/identity/tenancy/repo/node_heartbeat_repo.go::TransitionAndEmit and the writer adapter in cmd/plexsphere/evaluator_factory_prod.go). Two cross-cutting invariants are gated by integration tests at tests/integration/reachability_evaluator_emits_outbox_test.go:
- One outbox row per state transition; idempotent re-runs do NOT duplicate the row (the SQL-side
reachability_state <> $2predicate is the load-bearing guard). - The outbox row and the SQL UPDATE share one
transaction_id— the same-tx atomicity invariant a crash between the two writes would otherwise violate.
The companion chain-bound AuditSink adapter (*chainedReachabilityAuditSink) wraps the canonical hash-chained audit.Sink so every transition also lands one row on the per-Domain audit chain. The chain row's wire shape is pinned by tests/integration/reachability_evaluator_chained_audit_test.go: Relation = "node_reachability.transition", Object = "domain:<hex16>" (matching the audit factory's defaultDomainResolver), Reason = audit.ReasonGranted, RelationPath = [from_state, to_state, reason]; the per-Domain audit_log_chain_head.next_seq advances by exactly the number of transitions and chain.VerifyChain succeeds end-to-end.
The SSE drain that surfaces the outbox row on the wire as a node_state_updated envelope is the relay-loop closure tracked under "Production wiring" above; once that lands, the tests/e2e/mesh/reachability-transitions/ chainsaw fixture flips from skip: true to live and asserts the cross-process producer→consumer convergence.
Policy compile-arm wire fan-out (policy_updated)
The policy compile arm's producer-side closure is live alongside the peer_* and node_state_updated per-source wire-type expansions. The publisher's wireTypeFor dispatch table at internal/mesh/sse/publisher.go maps the closed two-event policy outbox set (policy_revision_created and policy_deleted) onto the single wire literal policy_updated; the compile service emits one publish per matched Node on revision-created via the policy.WirePublisher port (production binding cmd/plexsphere/policy_wire_publisher_adapter.go) and one per previously compiled Node on delete via lookup-before-cascade through CompiledRulesetRepo.ListNodesByPolicy. Three cross-cutting invariants are gated by integration tests at tests/integration/policy_updated_wire_fanout_test.go:
- Per-Node arity: one wire publish per (Node, Policy) row on revision-created; one wire publish per previously compiled Node on delete.
- Payload byte-equality with the persisted
rule_payloadcolumn on revision-created; canonical empty-rules payload[]byte("[]")on delete. - JetStream
Nats-Msg-Iddedup suppresses a re-driven outbox row asack.Duplicate=true.
The wire envelope's signature chain is pinned by tests/integration/policy_updated_signature_chain_test.go against the production per-Domain SigningKeyResolver. The remaining work is the relay-loop side (the RelayStore adapter, the EnsureNodeEventsStream call, and the RunRelay goroutine) tracked under "Production wiring — relay loop and stream provisioning" above; once that lands, the tests/e2e/policy/policy-updated/chainsaw-test.yaml chainsaw fixture's existing stub-binary producer/consumer pair extends to the real plexsphere binary without a schema change.
Bridge wire fan-out (bridge_config_updated)
The bridge orchestrator's producer-side closure is live alongside the policy_updated and node_state_updated per-source wire-type expansions. The publisher's wireTypeFor dispatch table at internal/mesh/sse/publisher.go collapses the closed seven-member set of bridge.* outbox literals (bridge.RelayConfigured, the user-access / public-ingress / site-to-site *Configured and *Removed literals) onto the single wire literal bridge_config_updated. The four bridge application services each call the bridge.WirePublisher port (internal/bridge/ports.go::PublishBridgeConfigUpdated) after their RunInTx closure commits; the production adapter enumerates the Nodes the changed bridge Resource hosts via the bridge.BridgeNodeLister port, builds the effective config once through the effective.EffectiveConfigBuilder, and emits one signed envelope per hosted Node — the 1-bridge-row → N-Node fan-out. The wire effective_config bytes equal the reconciliation-pull bridge block for the same Node and bridge Resource; the parity is pinned by tests/integration/bridge_config_updated_pull_parity_test.go. A nil WirePublisher keeps a service on its single-arm posture (aggregate write + outbox row + audit row, no wire emission), and a publish error is logged and swallowed because the command has already committed and the pull fallback guarantees eventual convergence.
The remaining work is the consumer-side /v1/nodes/{id}/events HTTP plumbing — the relay-loop side (the RelayStore adapter, the EnsureNodeEventsStream call, and the RunRelay goroutine) tracked under "Production wiring — relay loop and stream provisioning" above, plus the signature-verification and ReBAC node-agent gates tracked under "Security ports". The publisher-side dispatch table, the per-Node fan-out algorithm, the BridgeConfigUpdatedPayload schema, and the depguard posture are documented at docs/contexts/bridge/events.md.
Downstream blocked items
Reconciliation-pull chainsaw equivalence E2E
tests/e2e/mesh/reconciliation-pull/chainsaw-test.yaml carries skip: true until the deferred wiring above lands. The fixture's terminal convergence gate registers a Domain + N Nodes through the production POST /v1/domains/{id}/nodes surface, captures the resulting SSE peer_registered envelope from the running plexsphere binary, issues GET /v1/nodes/{id}/state against the same binary, and asserts that the (node_id, mesh_ip, public_key) tuple is byte-equal between the SSE projection and the pull peers array. That gate depends on the SSE producer surface being live end-to-end — concretely on cmd/plexsphere/sse_factory_prod.go having a non-nil EventStream and EventPublisher so that the peer_registered envelope actually lands on the JetStream stream the chainsaw fixture subscribes to. Until the production wiring above closes, the fixture stays skipped so a chainsaw run does NOT post a green signal that misrepresents end-to-end coverage of the SSE↔pull equivalence story.
The integration-tier equivalent at tests/integration/state_pull_sse_equivalence_test.go exercises the same convergence story via in-process fakes and is GREEN today; the chainsaw fixture is the cross-process kind-cluster mirror that adds the production-binary regression layer once unblocked. The reconciliation-pull surface itself, its security-gate ordering, the four-block convergence with the SSE event taxonomy, the OpenAPI contract, and the wiring-receipt regression test against BuildProductionStateFactory are documented at docs/contexts/mesh/reconciliation-pull.md.
Cross-references
internal/mesh/sse/doc.go— package overview and the production wire-format contract.cmd/plexsphere/sse_factory_prod.go— composition-root validation gates and the in-memory NonceStore wiring.internal/transport/http/v1/handlers/events.goandevents_dispatch.go— handler body and the fail-closed mount gate.docs/contexts/mesh/reconciliation-pull.md— the pull-channel sibling of the SSE bus; pinned here so a reader chasing the deferred-wiring trail finds the reconciliation-pull surface that shares thesigned_event_bus_not_provisionedproblem code and unblocks together with the SSE peer endpoint when the deferred wiring above lands.docs/contexts/mesh/peers.md— Key and Peer Manager bounded sub-context reference; pinned here because the three peer event_type strings (peer_registered,peer_psk_assigned,peer_deregistered) ride the SSE bus once the deferred wiring above lands.