Appearance
internal/platform/messaging
internal/platform/messaging is the sole sanctioned wrapper around NATS JetStream for plexsphere. Direct imports of github.com/nats-io/** from any bounded context are rejected by the no-direct-persistence-from-contexts depguard rule in .golangci.yml.
This document is the authoritative reference for:
- Client Config
- StreamConfig and defaults
- Publish contract and dedup
- Replay contract and cursor semantics
- Last-Event-ID ↔ JetStream Sequence mapping
- Readiness probe
See also:
Client Config
The exported messaging.Config struct captures the inputs to NewClient. The zero value is not usable; callers must set URL at minimum. Every other field has a safe default applied by Config.WithDefaults.
| Field | Type | Default | Purpose |
|---|---|---|---|
URL | string | — (REQUIRED) | NATS server URL (e.g. nats://host:4222). Missing URL → error tagged (REQ-003, PX-0003). |
Name | string | "plexsphere" (DefaultName) | Connection name surfaced to the NATS server for observability. |
Creds | string | "" | Optional path to a .creds file for JWT authentication. |
MaxReconnects | int | -1 (DefaultMaxReconnects) | -1 = reconnect indefinitely, matching the SSE replay contract in README § Runtime Topology. |
ReconnectWait | time.Duration | 2s (DefaultReconnectWait) | Back-off between reconnect attempts. Short by design so transient failures do not trip /readyz for long. |
Constructing a client
NewClient(ctx, cfg, logger) (*Client, error) validates, applies defaults, opens a tuned NATS connection, and wires a JetStream context. Any failure path closes the underlying connection so no goroutines leak. Close is idempotent and safe on a nil receiver.
After Close every subsequent Publish and Replay call on the same *Client returns the messaging.ErrClosed sentinel — detectable via errors.Is(err, messaging.ErrClosed) — so callers can distinguish a torn-down client from a transient NATS error without separate nil checks. The URL embedded in any connect error is scrubbed via the package-local redactNATSURL helper so passwords inside a nats://user:pw@host URL never survive into log lines.
go
client, err := messaging.NewClient(ctx, messaging.Config{
URL: "nats://nats:4222",
}, logger)
if err != nil { /* ... */ }
defer client.Close()StreamConfig and defaults
messaging.StreamConfig describes a durable JetStream stream. The zero value is not usable; callers must set Name and Subjects. Defaults match the retention and sizing promised by README § Storage Topology .
| Field | Type | Default | Purpose |
|---|---|---|---|
Name | string | — (REQUIRED) | Unique stream identifier. |
Subjects | []string | — (REQUIRED, non-empty) | Wildcarded subjects the stream binds to. |
MaxAge | time.Duration | 24h (DefaultStreamMaxAge) | Retention window matching the 24h replay ceiling consumers rely on. |
MaxBytes | int64 | 1 GiB (DefaultStreamMaxBytes, 1 << 30) | Per-stream on-disk cap. |
Replicas | int | 3 (DefaultStreamReplicas) | RAFT replica count; matches production JetStream cluster. |
Storage | jetstream.StorageType | jetstream.FileStorage (iota=0) | File- vs. memory-backed storage. |
EnsureStream
EnsureStream(ctx, client, cfg) (*jetstream.StreamInfo, error) reconciles the desired StreamConfig with the server:
- If the stream does not exist, it is created.
- If it exists, the existing
StreamInfois returned without anUpdateStreamcall — retention and sizing changes are a deliberate out-of-band operation per the messaging plan.
EnsureStream is the expected entry point; direct JetStream API usage inside bounded contexts is forbidden by depguard.
Publish
(*Client).Publish(ctx, subject, body, opts...) returns a messaging.PublishAck with the assigned stream sequence and a Duplicate flag set when the server matched the Nats-Msg-Id header against an earlier message inside the stream's dedup window.
go
ack, err := client.Publish(ctx, "sse.v1.events", payload,
messaging.WithMsgID(lastEventID))
if err != nil { /* ... */ }
if ack.Duplicate {
// server already saw this Msg-Id — do NOT treat as fresh delivery
}WithMsgID and dedup window
The WithMsgID(id) option sets the Nats-Msg-Id header. An empty id is treated as a no-op. Within the stream's configured dedup window, redelivering the same id yields PublishAck{Duplicate: true} and does not append a new message.
Replay
(*Client).Replay(ctx, streamName, startSeq) (ReplayIterator, error) creates an ordered JetStream consumer bound to streamName and returns an iterator delivering messages from startSeq onward.
| Input / output | Contract |
|---|---|
startSeq | 1-based per JetStream semantics. startSeq == 0 → error tagged (REQ-003, PX-0003). |
ErrStreamNotFound | Exported sentinel returned when streamName does not exist on the server; suffix (REQ-009, PX-0003). |
ReplayIterator.Next(ctx) | Returns a *ReplayMessage wrapping the delivered message and the server-assigned stream sequence. Per-call ctx cancellation returns ctx.Err() WITHOUT tearing down the iterator — a subsequent Next call resumes delivery. |
ReplayIterator.Close | MUST be called to release the underlying ordered consumer. Idempotent. |
ReplayMessage.StreamSequence | Authoritative JetStream stream sequence (msg.Metadata().Sequence.Stream). SSE Last-Event-ID bookmarks MUST persist this value rather than inferring via seq++ — stream sequences have gaps after delete / MaxAge expiry / purge. |
| Ack behaviour | Ordered consumers auto-ack on delivery; callers MUST NOT call Ack on returned messages. |
go
it, err := client.Replay(ctx, "sse_v1", 42)
if errors.Is(err, messaging.ErrStreamNotFound) {
// translate to HTTP 404 in SSE resume flows
}
defer it.Close()
for {
rm, err := it.Next(ctx)
if err != nil { break }
// rm.StreamSequence is the server-assigned sequence; use it as
// the SSE id: field, not a client-side counter.
fmt.Printf("seq=%d body=%s\n", rm.StreamSequence, rm.Msg.Data)
}Last-Event-ID ↔ JetStream Sequence
The SSE Last-Event-ID header and the JetStream stream sequence are two representations of the same event ordinal:
- When the core binary publishes a signed SSE envelope, it calls
PublishwithWithMsgID(lastEventID). The returnedPublishAck.Sequenceis the JetStream sequence assigned by the server; it becomes theid:field the client reads asLast-Event-IDon reconnect. - When the client reconnects carrying
Last-Event-ID: N, the core binary starts aReplay(ctx, stream, N+1)to resume from the next unread envelope.N == 0means "start from sequence 1", so callers that translate a missing header to zero must bump by one before callingReplay. - The dedup window inherits the
StreamConfig.MaxAgedefault of 24h. ALast-Event-IDolder than the retention window cannot be replayed; callers fall back to the reconcile-pull path per README § Failure Modes.
Readiness probe
messaging.ProbeFunc(client) returns a health.ProbeFunc. The probe is total — a nil client returns the canned errNotConnected so /readyz flips to 503 until the client is wired.
| Symbol | Value |
|---|---|
messaging.ProbeName | "jetstream" |
| Error suffix | (REQ-006, PX-0003) |
go
registry.Register(messaging.ProbeName, messaging.ProbeFunc(client))The probe body deliberately omits the server URL, credentials path, or any nats-side detail because /readyz bodies are served to unauthenticated Kubernetes probes.
Cross-references
../../contributing/layout.md— the bounded-context map locating this package in the codebase.../../../internal/platform/messaging/— the package source.../index.md— the Reference quadrant index.