Skip to content

internal/platform/messaging

internal/platform/messaging is the sole sanctioned wrapper around NATS JetStream for plexsphere. Direct imports of github.com/nats-io/** from any bounded context are rejected by the no-direct-persistence-from-contexts depguard rule in .golangci.yml.

This document is the authoritative reference for:

See also:

Client Config

The exported messaging.Config struct captures the inputs to NewClient. The zero value is not usable; callers must set URL at minimum. Every other field has a safe default applied by Config.WithDefaults.

FieldTypeDefaultPurpose
URLstring— (REQUIRED)NATS server URL (e.g. nats://host:4222). Missing URL → error tagged (REQ-003, PX-0003).
Namestring"plexsphere" (DefaultName)Connection name surfaced to the NATS server for observability.
Credsstring""Optional path to a .creds file for JWT authentication.
MaxReconnectsint-1 (DefaultMaxReconnects)-1 = reconnect indefinitely, matching the SSE replay contract in README § Runtime Topology.
ReconnectWaittime.Duration2s (DefaultReconnectWait)Back-off between reconnect attempts. Short by design so transient failures do not trip /readyz for long.

Constructing a client

NewClient(ctx, cfg, logger) (*Client, error) validates, applies defaults, opens a tuned NATS connection, and wires a JetStream context. Any failure path closes the underlying connection so no goroutines leak. Close is idempotent and safe on a nil receiver.

After Close every subsequent Publish and Replay call on the same *Client returns the messaging.ErrClosed sentinel — detectable via errors.Is(err, messaging.ErrClosed) — so callers can distinguish a torn-down client from a transient NATS error without separate nil checks. The URL embedded in any connect error is scrubbed via the package-local redactNATSURL helper so passwords inside a nats://user:pw@host URL never survive into log lines.

go
client, err := messaging.NewClient(ctx, messaging.Config{
    URL: "nats://nats:4222",
}, logger)
if err != nil { /* ... */ }
defer client.Close()

StreamConfig and defaults

messaging.StreamConfig describes a durable JetStream stream. The zero value is not usable; callers must set Name and Subjects. Defaults match the retention and sizing promised by README § Storage Topology .

FieldTypeDefaultPurpose
Namestring— (REQUIRED)Unique stream identifier.
Subjects[]string— (REQUIRED, non-empty)Wildcarded subjects the stream binds to.
MaxAgetime.Duration24h (DefaultStreamMaxAge)Retention window matching the 24h replay ceiling consumers rely on.
MaxBytesint641 GiB (DefaultStreamMaxBytes, 1 << 30)Per-stream on-disk cap.
Replicasint3 (DefaultStreamReplicas)RAFT replica count; matches production JetStream cluster.
Storagejetstream.StorageTypejetstream.FileStorage (iota=0)File- vs. memory-backed storage.

EnsureStream

EnsureStream(ctx, client, cfg) (*jetstream.StreamInfo, error) reconciles the desired StreamConfig with the server:

  • If the stream does not exist, it is created.
  • If it exists, the existing StreamInfo is returned without an UpdateStream call — retention and sizing changes are a deliberate out-of-band operation per the messaging plan.

EnsureStream is the expected entry point; direct JetStream API usage inside bounded contexts is forbidden by depguard.

Publish

(*Client).Publish(ctx, subject, body, opts...) returns a messaging.PublishAck with the assigned stream sequence and a Duplicate flag set when the server matched the Nats-Msg-Id header against an earlier message inside the stream's dedup window.

go
ack, err := client.Publish(ctx, "sse.v1.events", payload,
    messaging.WithMsgID(lastEventID))
if err != nil { /* ... */ }
if ack.Duplicate {
    // server already saw this Msg-Id — do NOT treat as fresh delivery
}

WithMsgID and dedup window

The WithMsgID(id) option sets the Nats-Msg-Id header. An empty id is treated as a no-op. Within the stream's configured dedup window, redelivering the same id yields PublishAck{Duplicate: true} and does not append a new message.

Replay

(*Client).Replay(ctx, streamName, startSeq) (ReplayIterator, error) creates an ordered JetStream consumer bound to streamName and returns an iterator delivering messages from startSeq onward.

Input / outputContract
startSeq1-based per JetStream semantics. startSeq == 0 → error tagged (REQ-003, PX-0003).
ErrStreamNotFoundExported sentinel returned when streamName does not exist on the server; suffix (REQ-009, PX-0003).
ReplayIterator.Next(ctx)Returns a *ReplayMessage wrapping the delivered message and the server-assigned stream sequence. Per-call ctx cancellation returns ctx.Err() WITHOUT tearing down the iterator — a subsequent Next call resumes delivery.
ReplayIterator.CloseMUST be called to release the underlying ordered consumer. Idempotent.
ReplayMessage.StreamSequenceAuthoritative JetStream stream sequence (msg.Metadata().Sequence.Stream). SSE Last-Event-ID bookmarks MUST persist this value rather than inferring via seq++ — stream sequences have gaps after delete / MaxAge expiry / purge.
Ack behaviourOrdered consumers auto-ack on delivery; callers MUST NOT call Ack on returned messages.
go
it, err := client.Replay(ctx, "sse_v1", 42)
if errors.Is(err, messaging.ErrStreamNotFound) {
    // translate to HTTP 404 in SSE resume flows
}
defer it.Close()
for {
    rm, err := it.Next(ctx)
    if err != nil { break }
    // rm.StreamSequence is the server-assigned sequence; use it as
    // the SSE id: field, not a client-side counter.
    fmt.Printf("seq=%d body=%s\n", rm.StreamSequence, rm.Msg.Data)
}

Last-Event-ID ↔ JetStream Sequence

The SSE Last-Event-ID header and the JetStream stream sequence are two representations of the same event ordinal:

  • When the core binary publishes a signed SSE envelope, it calls Publish with WithMsgID(lastEventID). The returned PublishAck.Sequence is the JetStream sequence assigned by the server; it becomes the id: field the client reads as Last-Event-ID on reconnect.
  • When the client reconnects carrying Last-Event-ID: N, the core binary starts a Replay(ctx, stream, N+1) to resume from the next unread envelope. N == 0 means "start from sequence 1", so callers that translate a missing header to zero must bump by one before calling Replay.
  • The dedup window inherits the StreamConfig.MaxAge default of 24h. A Last-Event-ID older than the retention window cannot be replayed; callers fall back to the reconcile-pull path per README § Failure Modes.

Readiness probe

messaging.ProbeFunc(client) returns a health.ProbeFunc. The probe is total — a nil client returns the canned errNotConnected so /readyz flips to 503 until the client is wired.

SymbolValue
messaging.ProbeName"jetstream"
Error suffix(REQ-006, PX-0003)
go
registry.Register(messaging.ProbeName, messaging.ProbeFunc(client))

The probe body deliberately omits the server URL, credentials path, or any nats-side detail because /readyz bodies are served to unauthenticated Kubernetes probes.

Cross-references