Appearance
Observability Ingest — batched metrics / logs / audit front door for Nodes
This document is the authoritative bounded-context reference for the Observability Ingest context — the node-facing front door plexd pushes its batched telemetry through. plexd batches three independent classes of telemetry on the Node and pushes each to its own endpoint: metric samples, structured log lines, and normalised audit events. The platform admits, validates, and buffers every batch, fanning each accepted batch onto a per-signal JetStream stream from which the downstream routing consumers (Grafana Mimir for metrics, Grafana Loki for logs and audit) read it back out — that egress half lives in the Observability Routing context, which drains these buffer streams and exports each batch to its sink. The context owns the Batch model and its parser, the byte-weighted per-Node and per-Domain quota limiter, the three-stream buffer topology, and the lag / bytes / records / rejects telemetry the front door emits. The domain root that pins the ubiquitous language is ../../../internal/observability/ingest.
The Observability Ingest front door is a write-only, fire-and-buffer surface: it does not query, aggregate, or serve telemetry back. It takes a batch off the wire, proves the caller, weighs the bytes against the quota, decompresses and structurally validates the payload, and hands it to a durable buffer. The reading, aggregation, retention, and alerting all live downstream in Grafana Mimir and Grafana Loki; the platform's only job here is to be a cheap, safe, throttled admission gate that never loses an accepted batch and never blocks a noisy agent from being throttled before it can hurt its tenant.
Ubiquitous language
The terms below travel verbatim across the domain root, the quota limiter, the batch parser, the buffer port, the application service, and the transport surface. Documentation, JSON fields, message headers, and metric labels adopt the exact spelling.
| Term | Definition | Code anchor |
|---|---|---|
| Signal | One of the three independent ingestion streams — metrics, logs, or audit. The Signal doubles as the JetStream subject suffix and the metric label that scopes the lag / bytes / records / rejects counters per stream. The closed three-value set is the spine of the whole context. | ../../../internal/observability/ingest/signal.go (Signal) |
| Batch | A parsed, structurally-validated set of records for one Signal, ready to be handed to the buffer. Its Records field holds the raw JSON bytes of each record — the array-element bytes for a metrics batch, the trimmed line bytes for an NDJSON logs / audit batch — so the buffer can forward each record without re-marshalling. | ../../../internal/observability/ingest/batch.go (Batch) |
| Record | One element of a Batch: a single metric sample, log line, or audit event. The parser validates each record's required fields and closed enums but does not fully parse every value (timestamps and metric values are presence-checked, not range-checked). | ../../../internal/observability/ingest/batch.go (ParseBatch) |
| MetricSample | A metrics-signal record: a group (closed enum), a name, a value, and a timestamp, with optional labels. The wire body is a JSON array of these. | ../../../internal/observability/ingest/batch.go (validateMetricSample) |
| LogLine | A logs-signal record: a severity (closed syslog-keyword enum), a message, and a timestamp, with optional unit and hostname. The wire body is NDJSON, one LogLine per row. | ../../../internal/observability/ingest/batch.go (validateLogLine) |
| AuditEvent | An audit-signal record: a source (closed enum), an action, an outcome, and a timestamp. The wire body is NDJSON, one AuditEvent per row. | ../../../internal/observability/ingest/batch.go (validateAuditEvent) |
| QuotaLimiter | The clock-injected, byte-weighted, per-Node + per-Domain token-bucket admit gate every batch runs through before decompression. It holds two bucket maps — one keyed by Node, one by Domain — and admits a batch only when both have enough tokens for its wire byte count. | ../../../internal/observability/ingest/quota.go (QuotaLimiter) |
| BufferPublisher / BufferMessage | The outbound port the application service publishes every accepted batch through, and the value it hands across. BufferMessage carries the Signal, the Domain / Project / Node ids, the decompressed body, the record count, and the send time. | ../../../internal/observability/ingest/buffer.go (BufferPublisher, BufferMessage) |
| Receipt / IngestReceipt | The value the service returns for an accepted batch — the instant it accepted the batch (AcceptedAt) and the record count the parser recovered. The transport renders it as the 202 acceptance body. | ../../../internal/observability/ingest/services/service.go (Receipt) |
| wire bytes vs decompressed bytes | The wire bytes are the raw transport body as it crossed the network (gzip-compressed when the caller set Content-Encoding: gzip); the decompressed bytes are the inflated body. The quota gate weighs wire bytes; the byte-count metric and the decompress-limit cap measure decompressed bytes. The distinction is load-bearing for the amplification defence below. | ../../../internal/observability/ingest/services/service.go (IngestCmd, decompress) |
| ingest front door | The node-facing transport surface and the application service taken together — the admission path a batch crosses from the wire to the buffer. | ../../../internal/transport/http/v1/observability |
| ingest buffer | The per-signal JetStream stream an accepted batch lands on. It is the durable hand-off between the front door and the downstream routing consumer; the front door's job ends when the buffer accepts the publish. | ../../../internal/observability/ingest/streams.go |
The three signals
The context exposes three independent endpoints, one per Signal. Each carries a different wire body format and a different record schema and routes to a different downstream sink. The Signal is not interchangeable: a metrics body is a JSON array, a logs or audit body is NDJSON, and the parser refuses a body that is not the shape its Signal requires.
| Signal | Wire body format | Record schema | Downstream sink |
|---|---|---|---|
metrics | application/json — a JSON array of MetricSample | MetricSample | Grafana Mimir (Prometheus remote-write) |
logs | application/x-ndjson — NDJSON, one LogLine per row | LogLine | Grafana Loki |
audit | application/x-ndjson — NDJSON, one AuditEvent per row | AuditEvent | Grafana Loki (distinct retention class) |
The per-record contract each parser enforces — the required fields and the closed enums — is the validation surface a malformed batch fails against:
- MetricSample. Required:
group,name,value,timestamp. Optional:labels.groupis the closed enumnode_resources|tunnel_health|peer_latency|agent_stats.valueandtimestampare checked for presence and non-null only;namemust be a non-empty string. - LogLine. Required:
severity,message,timestamp. Optional:unit,hostname.severityis the closed syslog-keyword enumemerg|alert|crit|err|warning|notice|info|debug.messagemust be a non-empty string;timestampmust be present and non-null. - AuditEvent. Required:
source,action,outcome,timestamp.sourceis the closed enumauditd|k8s.actionandoutcomemust be non-empty strings;timestampmust be present and non-null.
The successful response is 202 with an IngestReceipt body — accepted_at (a date-time) and records (the integer count the parser recovered) — and a Cache-Control: no-store header so no intermediary or client cache retains the acceptance receipt.
The gate order
Every request traverses a fixed, ordered chain of gates: first the transport-side gates (provisioning, authentication, path-id check, content-encoding, the send-at header, and the wire-body cap), then the service-pipeline stages (quota, decompress, parse, publish, metrics). The order is the contract. Each gate is annotated with the HTTP status and the Problem.code it fails with.
- Provisioning. The application service must be wired. An unwired surface returns
501observability_ingest_not_provisioned— the deferred-wiring posture when the opt-in env knob is unset. - Authentication. The resolved NSK Node is read off the request context (the NSK middleware ran upstream and attached it). A missing Node is a defensive
401unauthorized. - Path-id double-check. The resolved NSK Node id must equal the URL
{id}. A mismatch — a Node attempting to push on behalf of another Node — is403node_id_mismatch(a PermissionDenied body) and emits one audit row (see Audit contract). - Content-Encoding. Only
gziporidentity/ empty is accepted; anything else is415ingest_encoding_unsupported. - Send-At header.
X-Plexsphere-Sent-Atis required by the handler. The generated wrapper declares it optional, so a missing value reaches the handler as nil; a missing or unparseable value is400ingest_sent_at_invalid. This timestamp drives the lag metric. - Wire-body cap. The body is read under
http.MaxBytesReaderwith a defensive ceiling (default 4 MiB). A body over the cap is413ingest_body_too_large. - Service pipeline (
Service.Ingest, fixed order):- Quota.
quota.Admit(node, domain, len(wireBytes))weighs the wire (compressed) bytes BEFORE decompression. The per-Node bucket is checked first: a per-Node refusal is429per_node_rate_limited(Retry-After ~1s) and does not touch the Domain bucket. Only if the per-Node bucket admits is the per-Domain bucket consulted; a per-Domain refusal is429capacity_exceeded(Retry-After ~5s). - Decompress. A bomb-safe gunzip under a 32 MiB decompress limit. A gzip stream that fails to inflate (or an unsupported encoding that reached the service) is
400ingest_encoding_invalid; an inflated body over the limit is413ingest_body_too_large. - Parse.
ParseBatchvalidates the body's shape against its Signal under a per-batch record cap of 10000. A shape violation, a missing required field, a closed-enum violation, or an empty batch is400ingest_batch_malformed; a batch over the cap is413ingest_batch_too_many_records. - Publish.
buffer.Publishfans the batch onto its per-Domain JetStream subject. A publish failure (backend unreachable or saturated) is503ingest_buffer_unavailable(Retry-After ~5s). - Metrics. On success the service records the decompressed
bytes_total, therecords_total, and the lag observation (clamped to ≥ 0).
- Quota.
- Accept.
202IngestReceipt withCache-Control: no-store.
A sentinel the transport does not recognise falls through to 500internal with a generic body — the underlying error is never interpolated onto the wire.
Why the quota gate weighs wire bytes before decompression. Admitting on the decompressed size would force the server to inflate a body the limiter is about to refuse, handing a noisy agent a cheap amplification lever: a tiny gzip wire inflating to a huge body the server must fully decompress before refusing it. Weighing the wire bytes keeps the refusal cheap and bounds the work an over-quota agent can force. And checking the per-Node bucket before the per-Domain bucket means one noisy Node is throttled before it can consume its Domain's shared aggregate budget.
The quota model
The front door is guarded by two hand-rolled, clock-injected, byte-weighted token-bucket limiters: one keyed per Node, one keyed per Domain. The clock is injected — never the wall clock read directly — so the rate-limit branch is testable without sleeping. The design has four load-bearing properties:
- Per-Node throttles a single noisy agent. A compromised or runaway plexd agent exhausts its own Node bucket and is refused before it can drive its Domain's shared budget down.
- Per-Domain caps a tenant's aggregate rate. The much larger Domain bucket bounds the total ingest rate one Domain can drive so it cannot exhaust the control plane on behalf of the others.
- Buckets start full. Each bucket is minted full (
tokens == burst) so the configured burst of back-to-back bytes is admitted before the sustained rate applies, then refills byelapsed × per-secondup to the burst ceiling on each take. - A request larger than burst is always refused. Because the check is
tokens >= n, a batch whose wire size exceeds the burst ceiling can never be admitted (a full bucket still holds onlybursttokens) — and the refusal never drives the token count negative.
Seven PLEXSPHERE_OBS_* environment knobs tune the surface. They are read once at the composition root; an unset knob falls back to the package default, and a present-but-non-positive or unparseable value is a boot error (the surface refuses to start rather than admitting a misconfigured budget):
| Env var | What it tunes | Default |
|---|---|---|
PLEXSPHERE_OBS_NATS_URL | The opt-in gate / the NATS URL the buffer publishes through. Unset disables the whole surface (every request returns the 501 stub). | (no default; empty disables) |
PLEXSPHERE_OBS_INGEST_NODE_BYTES_PER_SEC | Per-Node sustained ingest rate (bytes/sec) | 524288 (512 KiB/s) |
PLEXSPHERE_OBS_INGEST_NODE_BURST_BYTES | Per-Node burst ceiling (bytes) | 2097152 (2 MiB) |
PLEXSPHERE_OBS_INGEST_DOMAIN_BYTES_PER_SEC | Per-Domain aggregate sustained rate (bytes/sec) | 5242880 (5 MiB/s) |
PLEXSPHERE_OBS_INGEST_DOMAIN_BURST_BYTES | Per-Domain burst ceiling (bytes) | 10485760 (10 MiB) |
PLEXSPHERE_OBS_STREAM_REPLICAS | JetStream RAFT replica count per stream | 1 |
PLEXSPHERE_OBS_STREAM_MAX_BYTES | Per-stream on-disk size cap (bytes) | 1073741824 (1 GiB) |
The opt-in gate. PLEXSPHERE_OBS_NATS_URL is the master switch: while it is unset the whole surface stays on its 501 stub. Setting it also requires PLEXSPHERE_DSN (the NSK resolver opens a Postgres pool) and a correctly-sized shared NSK wrap key; if either is missing the boot fails fast rather than serving a half-wired surface.
Caps that are not env knobs. Three more ceilings shape the pipeline but are not operator-tunable through the environment, and a reader should not mistake them for knobs:
- The decompress limit (32 MiB) and the per-batch record cap (10000) are service options (
WithDecompressLimit/WithMaxRecords) set in code, not env vars. - The transport wire-body cap (4 MiB) is the defensive
MaxBytesReaderceiling on the raw request body. - The stream retention (
MaxAge) is pinned at 24h — it matches the replay ceiling downstream consumers rely on and is deliberately fixed, not exposed.
Error-code table
Every failure surface carries a stable Problem.code. The throttle / unavailable codes also advertise a Retry-After header in whole seconds; every other code does not.
| HTTP | Problem.code | Meaning | Retry-After |
|---|---|---|---|
400 | ingest_sent_at_invalid | Missing or unparseable X-Plexsphere-Sent-At header | no |
400 | ingest_encoding_invalid | gzip stream failed to inflate | no |
400 | ingest_batch_malformed | Body is not the shape its Signal requires (non-array metrics body, non-object NDJSON line, empty batch, missing required field, or closed-enum violation) | no |
401 | unauthorized | NSK missing or malformed | no |
401 | nsk_revoked | NSK revoked (emitted by the NSK middleware upstream) | no |
403 | node_id_mismatch | Resolved NSK Node ≠ path id (PermissionDenied body; audited) | no |
413 | ingest_body_too_large | Compressed wire body, or inflated batch, over its byte cap | no |
413 | ingest_batch_too_many_records | Batch carries more than the per-batch record cap (10000) | no |
415 | ingest_encoding_unsupported | Content-Encoding is not gzip or identity | no |
429 | per_node_rate_limited | Per-Node byte budget exhausted | yes (~1s) |
429 | capacity_exceeded | Per-Domain aggregate byte budget exhausted | yes (~5s) |
501 | observability_ingest_not_provisioned | Surface not wired (PLEXSPHERE_OBS_NATS_URL unset) | no |
503 | ingest_buffer_unavailable | JetStream buffer unreachable or saturated | yes (~5s) |
500 | internal | Unexpected server-side fallback (generic body) | no |
Stream topology
The buffer is three independent JetStream streams, one per Signal, not one shared stream:
| Stream name | Subject filter | Downstream |
|---|---|---|
PLEXSPHERE_OBS_METRICS | obs.metrics.> | Grafana Mimir |
PLEXSPHERE_OBS_LOGS | obs.logs.> | Grafana Loki |
PLEXSPHERE_OBS_AUDIT | obs.audit.> | Grafana Loki (distinct retention) |
Every accepted batch publishes onto the per-Domain subject obs.<signal>.<domainID>. The wildcard > is NATS' multi-token wildcard; combined with the fixed obs.<signal>. prefix each stream's filter captures every well-formed per-Domain publish for its Signal without admitting unrelated traffic.
Why three streams, not one obs.>. The three signals carry different retention and replication classes downstream — metrics flow to Mimir, logs and audit flow to Loki, and audit carries a distinct retention. An operator must be able to size, replicate, and purge each signal class independently; a single shared stream would couple those operations. The per-Domain split, by contrast, is a subject suffix and not a stream boundary: JetStream filters per subject within a stream, so one stream per Signal already isolates Domains for consumption without multiplying stream objects per tenant.
The subject-versus-header tagging split
The originating context is split across the subject and the message headers, and the split is deliberate:
- The Domain is encoded in the subject (
obs.<signal>.<domainID>), so a downstream consumer can subscribe to exactly one Domain's traffic through a subject filter. - The Signal, Project, Node, record count, and send time ride as message headers:
| Header | Carries |
|---|---|
X-Plexsphere-Signal | The Signal name (metrics / logs / audit) |
X-Plexsphere-Project-Id | The originating Node's Project id |
X-Plexsphere-Node-Id | The originating Node's id |
X-Plexsphere-Records | The batch's record count (decimal string) |
X-Plexsphere-Sent-At | The batch send time (RFC3339Nano UTC) |
These buffer headers are the contract the downstream replay / consume path — the Observability Routing context — reads back off the stream to recover the originating Signal, Project, and Node and the batch's record count and send time. They are kept exported and greppable in the buffer adapter precisely so the producer here and that downstream consumer stay in lockstep — any drift in the header set or the subject layout is a coordinated, single-file wire-format change.
Observability and lag metric semantics
The application service emits four Prometheus collectors in the plexsphere namespace, subsystem observability_ingest:
| Metric | Type | Labels | Buckets | Meaning |
|---|---|---|---|---|
plexsphere_observability_ingest_lag_seconds | HistogramVec | signal, domain_id | 0.25, 1, 5, 15, 60, 300, 900, 3600 | Age in seconds of accepted telemetry at ingest time |
plexsphere_observability_ingest_bytes_total | CounterVec | signal, domain_id | — | Decompressed bytes accepted |
plexsphere_observability_ingest_records_total | CounterVec | signal, domain_id | — | Records accepted |
plexsphere_observability_ingest_rejects_total | CounterVec | signal, reason | — | Batches rejected, per reason code |
The lag metric. Lag is now - SentAt — the gap between the service clock at acceptance time and the client-claimed X-Plexsphere-Sent-At. It is clamped to ≥ 0: a client whose clock is ahead of the server would otherwise produce a nonsensical negative age, so an out-of-skew client yields a 0 sample rather than a negative one. The lag histogram is the operator's signal for a Node whose batches are falling behind.
The rejects counter is transport-driven. rejects_total is bumped entirely by the transport (via the service's RecordReject seam), covering both the pre-service transport gates (a missing send-at header, an unsupported content-encoding) and the mapped service sentinels. The Ingest pipeline itself never increments it — concentrating the bump in one layer keeps the reason vocabulary in one place and counts each rejected batch exactly once.
The no-node_id-label cardinality rule. No collector carries a node_id label. A Node id is unbounded — every agent that ever reports mints a new series — so a node_id label would multiply each collector's cardinality by the fleet size and eventually overwhelm the metrics backend. The labels that survive are bounded: signal is a three-value closed set, domain_id is bounded by tenant count, and reason is a closed reason vocabulary. Per-Node attribution lives where it belongs — on the structured slog line and, for the one audited refusal, on the audit trail — not on the metric labels.
Audit contract
The transport emits exactly one audit row, and only one: the path-id mismatch. When a Node's resolved NSK identity does not equal the URL {id} — a Node attempting to push telemetry on behalf of another Node — the handler records one entry (relation observability.ingest, outcome node_id_mismatch) and refuses the request with 403.
Everything else is deliberately kept off the audit trail:
- Quota refusals are NOT audited. A throttled agent can drive a high volume of
429refusals; auditing each one would flood the append-only log. Refusals surface through therejects_totalmetric instead, where the volume is a counter delta rather than an audit row. - The success / accept trail belongs to the application service, not the transport: an accepted batch is recorded through the publish onto the buffer and the bytes / records / lag metrics, not as an audit entry.
De-personalisation and PII
Observability ingest is de-personalised by design. Ingest labels carry the Domain, the Project, the Node, and (where relevant) the workload identifier — never an Identity subject or email. The control plane attributes telemetry to where it came from (which tenant, which Node, which workload), not to who a person is, so a dashboard or alert scopes naturally by Domain / Project / Node without ever needing a human identity.
Log lines that inadvertently contain PII are the customer's ingest-side responsibility — plexsphere does not parse arbitrary log bodies for personal data on the customer's behalf. As a mitigation the platform offers a configurable per-Domain regex redaction filter applied at ingest, disabled by default: an operator who wants the front door to strip a known PII pattern out of inbound log lines opts their Domain in.
Composition root
The production wiring is assembled in ../../../cmd/plexsphere/observability_ingest_factory_prod.go. It is opt-in on PLEXSPHERE_OBS_NATS_URL (not PLEXSPHERE_DSN): with the NATS URL unset the surface stays on the 501 observability_ingest_not_provisioned path, so a dev or CI cluster that sets PLEXSPHERE_DSN but has provisioned no observability buffer still boots with the surface inert. When the URL is set, PLEXSPHERE_DSN and a correctly-sized shared NSK wrap key become required, and the seven quota / stream knobs are read once (a present-but-invalid value is a hard boot error).
The transport package ../../../internal/transport/http/v1/observability is an anti-corruption seam: it re-declares the domain sentinels locally and never imports the domain module, so the wire-facing status-and-Problem.code mapping is owned at the boundary and the domain core stays free of transport concerns. The production adapter the factory builds translates the domain sentinels onto the local transport sentinels the handler maps to HTTP.
Cross-references
../../reference/api/observability.md— the HTTP API reference for the three ingest operations, with the per-operation request / response shapes and the closedProblem.codetaxonomy../routing.md— the Observability Routing context, the egress half that drains the three JetStream buffer streams this front door fills and exports each batch to Grafana Mimir, Grafana Loki, and the audit SIEM.../index.md— the bounded-contexts landing page.../../../internal/observability— the bounded context root.../../../api/openapi/plexsphere-v1.yaml— the OpenAPI spec the three ingest operations originate from.../secrets.md— the Secret Store, the structural cousin whose NSK-authenticated, per-Node / per-Domain rate-limited node-facing posture this context mirrors.