Appearance
Observability Routing — egress consumer that drains the buffer to Mimir / Loki / SIEM
This document is the authoritative bounded-context reference for the Observability Routing context — the egress half of the observability pipeline. Where the ingest front dooradmits, validates, and buffers every batch onto a per-Domain JetStream stream, this context drains those streams back out: it consumes one buffered batch at a time, transforms it into the wire shape each downstream backend expects, and exports it to Grafana Mimir (metrics), Grafana Loki (logs and audit), and the audit SIEM (audit and logs). The domain root that pins the ubiquitous language is ../../../internal/observability/routing.
Routing is a pure backend consumer. Unlike the ingest front door it mounts no /v1 HTTP route and installs no NSK middleware — a batch does not enter this pipeline off a request, it enters off a JetStream durable consumer. The context owns no aggregate the way ingest owns Batch; it owns the three Sink-specific transforms, the outbound Exporter port and its three net/http adapters, the export-outcome classification, and the application service that drives one Sink-bound consume loop from buffered delivery to ack or nak. Its only HTTP surfaces are outbound: the POSTs each Exporter makes to its downstream backend.
Ubiquitous language
The terms below travel verbatim across the domain root, the three transforms, the Exporter port and its adapters, and the application service. Documentation, message headers, and metric labels adopt the exact spelling.
| Term | Definition | Code anchor |
|---|---|---|
| Route | The act of consuming one buffered batch and exporting it to a Sink: read the batch off a consumer, transform it into the wire shape its destination expects, hand the payload to an Exporter, then ack or nak the delivery on the outcome. | ../../../internal/observability/routing/doc.go |
| Sink | One of the three downstream destinations a Route delivers to — Grafana Mimir for metrics, Grafana Loki for logs and audit, or the audit SIEM. The Sink doubles as the metric label that scopes the export-outcome counters per destination. The closed three-value set has string values mimir, loki, siem. | ../../../internal/observability/routing/sink.go (Sink) |
| Exporter | The outbound port — Export(ctx, ExportRequest) error — that ships one already-transformed payload to a single Sink and classifies the outcome. A nil error means the export was accepted; a non-nil error MUST wrap either ErrExportRetryable (the consumer redelivers) or ErrExportRejected (the consumer drops). | ../../../internal/observability/routing/export.go (Exporter) |
| ExportRequest | The unit of work an Exporter ships: the transformed, Sink-specific wire body (Payload) plus the routing metadata that addresses and labels the destination — Domain, Project, Node, Signal, and the optional per-request Headers a transform produced (the SIEM envelope). | ../../../internal/observability/routing/export.go (ExportRequest) |
| Router | The application service that drives one Sink-bound consume loop: it parses the subject and the buffer-stream headers, parses the batch body, runs the Sink-specific transform, hands the transformed payload to the Sink's Exporter, and then acks or naks the delivery according to the export outcome. | ../../../internal/observability/routing/services/router.go (Router) |
| Sink-bound consumer | A durable JetStream consumer that binds exactly one Sink to one ingest stream. Its name is a wire identifier operators see in nats consumer ls; the five names are the single source of truth the reconcile provisions against. | ../../../internal/observability/routing/consumers.go (EnsureRoutingConsumers) |
| Export-outcome sentinels | The two sentinels every export attempt is classified into — ErrExportRetryable (a transport error, a 429, or any 5xx: the consumer redelivers) and ErrExportRejected (any other non-2xx, i.e. a non-429 4xx: terminal, the consumer drops). Callers detect them with errors.Is. | ../../../internal/observability/routing/errors.go (ErrExportRetryable, ErrExportRejected) |
The three routes
Each Sink reads one or more ingest streams, transforms the buffered batch into a different wire format, and POSTs it to a different endpoint with a different tenant / auth header set. The transform is not interchangeable: a metrics batch becomes remote-write protobuf, a logs / audit batch becomes a Loki JSON push body, and the same logs / audit batch becomes verbatim NDJSON for the SIEM.
| Sink | Source stream(s) | Wire format | Endpoint (POST) | Tenant / auth headers |
|---|---|---|---|---|
Mimir (mimir) | metrics | snappy-compressed Prometheus remote-write protobuf | <MimirURL>/api/v1/push | Content-Encoding: snappy, Content-Type: application/x-protobuf, X-Prometheus-Remote-Write-Version: 0.1.0, X-Scope-OrgID: <domain> |
Loki (loki) | logs, audit | JSON push body | <LokiURL>/loki/api/v1/push | Content-Type: application/json, X-Scope-OrgID: <domain> |
SIEM (siem) | logs, audit | verbatim NDJSON | <SIEMURL> (the full endpoint — no path suffix appended) | the SIEM envelope (see SIEM forwarder) |
The remote-write specification Mimir consumes is the public Prometheus remote-write protocol; the Loki push body matches the public Grafana Loki push API. X-Scope-OrgID carries the originating Domain as the multitenancy key for both Mimir and Loki, so each tenant's series and streams land in that tenant's isolated store rather than collapsing into one shared namespace.
The endpoint construction differs across the three: Mimir and Loki append a fixed path (/api/v1/push and /loki/api/v1/push) to the configured base URL, while the SIEM URL is treated as the full endpoint and POSTed to verbatim — the SIEM endpoint a customer configures is the exact ingest URL, not a base to which a path is added.
Consumer topology
The pipeline drains the three ingest streams through five durable consumers, one per (sink, stream) pair. These names are the wire identifiers an operator sees in nats consumer ls:
| Durable consumer | Sink | Source stream |
|---|---|---|
obs-route-mimir | Mimir | metrics |
obs-route-loki-logs | Loki | logs |
obs-route-loki-audit | Loki | audit |
obs-route-siem-logs | SIEM | logs |
obs-route-siem-audit | SIEM | audit |
Why one consumer per (sink, stream), not one shared consumer per stream. JetStream tracks a single ack cursor per durable consumer. If Loki and SIEM both read the logs stream through one shared consumer, a slow or down sink (the SIEM during an outage, say) holding un-acked messages would head-of-line block the healthy sink (Loki) reading the same stream — stalling healthy egress behind a sick one. A dedicated consumer per (sink, stream) gives each sink its own ack cursor, so a sink that falls behind only delays itself, and a sink enabled later starts from DeliverAll over whatever the stream still retains.
Each consumer is configured AckExplicit + DeliverAll with no MaxDeliver. A poison batch is therefore dropped by the Router's terminal-ack logic, never by a JetStream MaxDeliver dead-letter: the Router acks on a successful export and on a terminal drop, and naks (with a delay) only on a retryable outcome, so a batch the downstream will never accept is acked-and-dropped exactly once rather than circulating until a delivery ceiling expires it.
Deterministic capped backoff. A retryable outcome schedules the redelivery with a deterministic delay of min(5s · 2^(n-1), 60s), where n is the server-tracked delivery count. The sequence is 5s, 10s, 20s, 40s, 60s, 60s, … — exponential until it clamps at the one-minute ceiling — so a transient downstream outage backs off rather than hot-looping, without ever exceeding a minute between attempts.
The 24h replay horizon. The ingest streams' MaxAge is fixed at 24h. That window bounds how far back a freshly-enabled or recovered consumer can replay: a sink wired up (or brought back) after an outage catches up over only the last 24h of retained telemetry, and anything older has already aged out of the stream. The horizon is the same one the ingest doc calls out as deliberately fixed, not an operator knob.
At-least-once delivery and the SIEM dedup note. Because there is no MaxDeliver and a nak redelivers, delivery is at-least-once: a batch may be exported more than once (a successful export whose ack is lost, or a downstream that accepted a batch but returned a retryable status). A downstream SIEM should therefore treat ingestion idempotently and de-duplicate on the (node, sent_at, record) tuple — the originating Node id, the batch send time, and the verbatim record — so a redelivery does not double-count an audit event.
Mimir mapping
The Mimir transform decodes each buffered metric record to {group, name, value, timestamp, labels} and turns each surviving record into one remote-write TimeSeries.
The label set. Each surviving record carries five reserved labels the transform sets itself, plus the record's sanitized user labels:
| Reserved label | Value |
|---|---|
__name__ | the sanitized metric name |
group | the record's metric group |
domain | the originating tenant |
project | the originating Project id |
node | the originating Node id |
A user label whose sanitized name collides with one of the five reserved names is dropped and counted (the record_drops_total-adjacent reserved-collision count), so a caller can never spoof the tenant-scoping labels Mimir keys on.
Sanitization. Label and metric names are coerced into the Prometheus identifier grammar — a metric name to [a-zA-Z_:][a-zA-Z0-9_:]* and a label name to [a-zA-Z_][a-zA-Z0-9_]*. Note : is permitted in a metric name but not in a label name; a result that is empty or begins with a digit is _-prefixed. Label values are not sanitized — they are opaque UTF-8 that Mimir accepts verbatim, and coercing a value would silently corrupt a legitimate URL or path.
Sorting and timestamps. A series' labels are sorted lexicographically by name before encoding, and each record's timestamp is emitted in milliseconds (UnixMilli).
The closed drop-reason set. A record that fails one of the parse steps becomes a drop, in record order, carrying one of three reasons:
| Drop reason | Meaning |
|---|---|
malformed_value | the value field is present but not a JSON number |
malformed_timestamp | the timestamp field is not a JSON string, or does not parse as RFC3339Nano |
undecodable | the record bytes are not a JSON object at all |
Surviving records still ship even when some siblings drop. Only when every record is malformed does the transform yield a nil payload; the Router then reaches a terminal disposition with nothing to export and the batch is dropped (counted dropped), never POSTed.
Why the wire shape is hand-encoded. The WriteRequest / TimeSeries / Label / Sample four-message subset is hand-encoded with google.golang.org/protobuf/encoding/protowire and snappy-framed with github.com/klauspost/compress/s2, rather than vendoring github.com/prometheus/prometheus/prompb. Those four messages are field-stable and have not changed across remote-write versions, and the transform only ever serialises them — it never reflects over them — so pulling a large, transitively heavy generated proto package and its descriptor machinery for four fixed messages was rejected in favour of a std-lib-plus-protowire/s2 core.
Loki mapping
The Loki transform turns one buffered logs or audit batch into a single Loki stream whose labels are exactly {signal, domain, project, node}. Each record contributes one [<unix-ns-string>, <line>] value entry, where:
- the line is the raw record bytes verbatim — the transform does not re-serialize the record, so the line Loki stores is byte-for-byte what the Node produced; and
- the nanosecond timestamp is the record's own RFC3339Nano
timestampfield, or the batchsentAtwhen that field is missing, not a JSON string, or unparseable — in which case the record is still emitted and a timestamp-fallback is counted. Records are never dropped on the Loki path; the fallback count is a quality signal, not an error count.
X-Scope-OrgID carries the Domain as the Loki tenant.
Why the label set is fixed at four bounded keys. Promoting a log's severity (or an audit event's source) to a fifth stream label was rejected: Loki indexes one stream per distinct label-value combination, so a per-record-varying label multiplies stream cardinality within a batch and across the tenant — the failure mode Loki operators most need to avoid. severity, source, and every other field stay queryable inside the JSON line through LogQL's json parser, so nothing is lost by keeping them out of the index.
The SIEM forwarder
The SIEM forwarder ships the buffered records verbatim: it joins the raw record byte slices with \n and a trailing \n into one NDJSON body, with no re-serialization of any record. This preserves byte-exact audit provenance and leaves all parsing and normalization to the SIEM, which owns its own schema — re-encoding here would both lose provenance and risk a lossy round-trip through Go's JSON encoder.
The body is POSTed to the configured SIEM endpoint under the SIEM ingest envelope:
| Header | Carries |
|---|---|
Content-Type | application/x-ndjson |
X-Plexsphere-Signal | the source Signal (logs / audit) |
X-Plexsphere-Domain-Id | the originating tenant |
X-Plexsphere-Project-Id | the originating Project id |
X-Plexsphere-Node-Id | the originating Node id |
Authorization | Bearer <token> — only when a SIEM token is configured |
The bearer is attached by the transform, not the exporter, so the token policy lives in exactly one place and the exporter stays a pure request shipper.
Production-TLS expectation. The outbound SIEM connection's TLS is satisfied by configuring an https:// SIEM endpoint URL at the composition root. The pure transform and the exporter hold no TLS policy of their own — the shared platform HTTP client carries the transport configuration, and choosing an https scheme for the endpoint is what puts the SIEM POST on TLS.
Error classification
Every export attempt is classified by classifyResponse into exactly one outcome:
| Outcome | Condition | Consumer action |
|---|---|---|
| success | a 2xx | ack |
ErrExportRetryable | a transport error, a 429, or any 5xx | nak with the backoff delay; the batch stays on the stream |
ErrExportRejected | any other non-2xx (a non-429 4xx) | ack-and-drop (terminal) |
Why a non-429 4xx is terminal. Per the Prometheus remote-write spec a 4xx means the receiver will never accept this payload, so a 4xx that is not 429 is permanent. Retrying it forever would wedge the consumer behind a poison batch it can never drain; treating every error as retryable was rejected for exactly that wedge — a single malformed batch would stall the whole consumer indefinitely. A 429 and a 5xx (rate limit, backend restart) are transient, and a transport error is by nature retryable, so all three are safe to redeliver.
The Router's terminal and retryable paths follow from that classification:
- an unparseable subject or an unparseable batch → terminal ack-and-drop (a wire-contract violation never parses on retry);
- an all-records-malformed Mimir transform (nil payload) → terminal ack-and-drop;
ErrExportRejected→ terminal ack-and-drop;ErrExportRetryable→NakWithDelay(backoff); the batch stays on the stream for a later delivery.
Observability and metric semantics
The application service emits six Prometheus collectors in the plexsphere namespace, subsystem observability_routing:
| Metric | Type | Labels | Buckets | Meaning |
|---|---|---|---|---|
plexsphere_observability_routing_batches_total | CounterVec | sink, signal, outcome | — | Batches driven to a terminal disposition. outcome is the closed two-value set exported / dropped. |
plexsphere_observability_routing_records_total | CounterVec | sink, signal | — | Records exported. |
plexsphere_observability_routing_record_drops_total | CounterVec | sink, signal, reason | — | Records dropped during transformation. reason is the closed Mimir drop vocabulary (malformed_value / malformed_timestamp / undecodable). |
plexsphere_observability_routing_timestamp_fallbacks_total | CounterVec | sink, signal | — | Records that fell back to the batch send time for their timestamp (the Loki path). |
plexsphere_observability_routing_retries_total | CounterVec | sink, signal | — | Retryable export outcomes that scheduled a redelivery. |
plexsphere_observability_routing_lag_seconds | HistogramVec | sink, signal | 0.25, 1, 5, 15, 60, 300, 900, 3600 | Age in seconds of telemetry at export time, clamped to ≥ 0. |
The lag metric. Lag is the gap between the consumer clock at export time and the batch's sentAt. It is clamped to ≥ 0 so a producer clock running ahead of the consumer yields a 0 sample rather than a nonsensical negative age.
A retryable outcome does not increment batches_total. A retryable export is not a terminal disposition: it bumps retries_total and schedules a redelivery, leaving batches_total untouched. The batch only reaches a terminal batches_total outcome (exported or dropped) on a later delivery — so batches_total counts each batch's final disposition exactly once while retries_total counts the redeliveries along the way.
The no-node_id-label cardinality rule. No collector carries a node_id label. A Node id is unbounded — every agent that ever reports mints a new series — so a node_id label would multiply each collector's cardinality by the fleet size and eventually overwhelm the metrics backend. The surviving labels are bounded: sink and signal are three-value closed sets, outcome is a two-value closed set, and reason is a closed drop-cause vocabulary. Per-Node attribution lives where high-cardinality attribution belongs — on the structured slog line and the audit trail — not on the metric labels. This mirrors the same rule the ingest bundle enforces.
Composition root
The production wiring is assembled in ../../../cmd/plexsphere/observability_routing_factory_prod.go. It is a pure backend consumer: it mounts no /v1 route and installs no NSK middleware, so its wiring carries only a reconcile probe (gating /readyz on durable-consumer provisioning) and a Consume-loop launcher run under the same goroutine supervisor as the other reconcile sweeps.
Env knobs. Each read is strings.TrimSpace-d; an empty sink URL disables that Sink.
| Env var | What it tunes | Default |
|---|---|---|
PLEXSPHERE_OBS_MIMIR_URL | Grafana Mimir remote-write endpoint; empty disables the Mimir sink | (none) |
PLEXSPHERE_OBS_LOKI_URL | Grafana Loki push endpoint; empty disables the Loki sink | (none) |
PLEXSPHERE_OBS_SIEM_URL | audit SIEM ingest endpoint; empty disables the SIEM sink | (none) |
PLEXSPHERE_OBS_SIEM_TOKEN | bearer the SIEM exporter attaches; only meaningful alongside the SIEM URL | (none) |
PLEXSPHERE_OBS_NATS_URL | the shared JetStream buffer the routing consumers drain; threaded from the same source as the ingest front door by main.go (one source of truth for the buffer cluster); required when any sink URL is set | (none) |
The per-export HTTP round-trip timeout defaults to 10s — a code default applied via WithDefaults, not an env knob.
Boot-error behaviour. The opt-in gate is "at least one sink URL is set" — deliberately not the NATS URL. A deployment can run the ingest front door (which does gate on NATS) without yet wiring any downstream backend, and in that state the routing pipeline must stay fully inert rather than dial NATS to do nothing. The cases:
- no sink URL set → the factory returns a nil factory and the routing pipeline boots fully inert — no consumers provisioned, no consume loops spawned;
- a sink URL set but
PLEXSPHERE_OBS_NATS_URLempty → boot errorErrObservabilityRoutingNATSRequired(the operator asked to route somewhere but gave the pipeline no buffer to drain); - a malformed sink URL, or a SIEM token set without a SIEM URL → boot error wrapping
ErrInvalidConfig, naming the offending knob.
Validation runs at build time, so a misconfigured operator sees the failure before /readyz lights green rather than as a runtime export failure on the hot path.
The reconcile probe and the consume launcher. The reconcile probe is registered under the name observability-routing; it re-ensures the enabled sinks' durable consumers once at boot and on every /readyz tick (boot-once-then-readyz), so a NATS outage that prevents the consumers from being reconciled flips readiness to 503 rather than failing silently. The ConsumeStart launcher runs one JetStream Consume loop per enabled sink+stream under the goroutine supervisor, dispatching each delivery to the Router, and returns ctx.Err() on cancellation after stopping every loop.
Cross-references
./ingest.md— the ingest front door whose per-Domain JetStream buffer this context drains; it owns theBatchmodel, the three-stream topology, the subject / header wire contract this pipeline reads back off the stream, and the 24h retention horizon.../index.md— the bounded-contexts landing page.../../../internal/observability/routing— the bounded-context root that pins the ubiquitous language.../../../cmd/plexsphere/observability_routing_factory_prod.go— the composition root that validates the egress config, provisions the durable consumers, builds the exporters, and launches the consume loops.../../reference/api/observability.mdand../../../api/openapi/plexsphere-v1.yaml— the upstream producer's HTTP surface: the ingest operations that feed the buffer this context drains. Routing itself exposes no/v1HTTP API — it is a backend consumer with no inbound HTTP surface.