Introduction

This is the reference implementation of the durable streams protocol, built in Rust with Electric SQL for Postgres sync.

What are durable streams?

A durable stream is an append-only log exposed over HTTP. Clients create streams, append messages, and read them back using standard HTTP methods (PUT, POST, GET, DELETE). The protocol adds three capabilities on top of plain HTTP:

  • Offset resumption. Every read returns an offset. Clients save it and resume later without replaying history. Close a tab, reopen it, pick up exactly where you left off.
  • Live delivery. Clients can long-poll or open an SSE connection to receive new messages the instant they arrive.
  • Producer idempotency. Producers tag each append with an epoch and sequence number. The server deduplicates retries automatically, giving exactly-once semantics over an at-least-once transport.

What this project demonstrates

This repository contains two things:

  1. A protocol server written in Rust (axum + tokio). It passes the full conformance test suite and supports multiple storage backends (in-memory, file-based, and crash-resilient acid/redb). It has no authentication, no database dependency, and no opinions about deployment.

  2. A full deployment stack that shows how to compose the server with real infrastructure:

    • Envoy as a JWT auth proxy in front of the server
    • Postgres as durable storage
    • Electric SQL for WAL-based change replication
    • A sync service that bridges streams and Postgres bidirectionally

Together they implement the durable sessions pattern: real-time collaborative applications (like AI chat) where messages are delivered instantly via SSE and durably stored in Postgres for querying, analytics, and recovery.

Who should read this

  • Protocol integrators who want to write a client against the durable streams API. Start with the Quickstart and the Protocol section.
  • Operators who want to run the full stack with auth, Postgres, and sync. Start with the Architecture and Deployment sections.
  • Contributors who want to extend the server or contribute to the protocol. Start with Contributing and the Reference section.

Conformance

This implementation targets full conformance with @durable-streams/server-conformance-tests@0.2.2 against spec commit a347312. All 239 conformance tests pass.

TL;DR

Just the server

cargo run

Server at http://localhost:4437. Streams at /v1/stream/.

# Create
curl -X PUT -H "Content-Type: text/plain" http://localhost:4437/v1/stream/demo

# Write
curl -X POST -H "Content-Type: text/plain" -d "hello" http://localhost:4437/v1/stream/demo

# Read
curl http://localhost:4437/v1/stream/demo?offset=-1

# SSE
curl -N http://localhost:4437/v1/stream/demo?offset=-1\&live=sse

# Delete
curl -X DELETE http://localhost:4437/v1/stream/demo

Full stack (auth + Postgres + sync)

docker-compose --profile sync up -d --build
PortWhat
4437DS server (no auth)
8080Envoy proxy (JWT auth)
54321Postgres
cd e2e && npm install
TOKEN=$(node generate-token.mjs)

curl -X PUT -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  http://localhost:8080/v1/stream/my-stream

curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  -d '[{"msg": "hello"}]' http://localhost:8080/v1/stream/my-stream

curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/v1/stream/my-stream?offset=-1

Dev mode (everything + UI)

make dev          # server, Envoy, Postgres, Electric, sync, Adminer, heartbeat producer
make dev-ui       # stream browser on :3000 (separate terminal)
make dev-down     # stop

Adminer at http://localhost:8081 (server: postgres, user: postgres, pw: password).

Key env vars

VariableDefaultWhat it does
DS_SERVER__PORT4437Listen port
DS_SERVER__LONG_POLL_TIMEOUT_SECS30Long-poll wait
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS60SSE reconnect interval (0 = off)
DS_HTTP__CORS_ORIGINS*Allowed origins
DS_LIMITS__MAX_MEMORY_BYTES104857600Total memory cap
DS_LIMITS__MAX_STREAM_BYTES10485760Per-stream cap

Tests

cargo test                      # unit + integration
make conformance                # protocol conformance suite
make integration-test           # Docker e2e (auth)
make integration-test-sessions  # Docker e2e (auth + Postgres sync)

Production perf build (PGO)

make pgo-train      # generate + merge profile data from benchmark traffic
make release-pgo    # guarded profile-use release build
make pgo-benchmark  # optional compare run for profile-use build

Build the docs

make docs         # build
make docs-serve   # serve with live reload

Quickstart

This page gets you from zero to a working stream in under 5 minutes. You will start the server, create a stream, append data, read it back, and subscribe for live updates.

Prerequisites

  • Rust toolchain (stable)
  • curl

Start the server

cargo run

The server listens on http://localhost:4437 with streams at /v1/stream/.

Create a stream

curl -i -X PUT -H "Content-Type: text/plain" \
  http://localhost:4437/v1/stream/my-stream
HTTP/1.1 201 Created
Location: http://localhost:4437/v1/stream/my-stream
Content-Type: text/plain
Stream-Next-Offset: 0000000000000000_0000000000000000

The stream is created with text/plain content type. The Stream-Next-Offset header shows the initial offset.

Append data

curl -i -X POST -H "Content-Type: text/plain" \
  -d "hello world" \
  http://localhost:4437/v1/stream/my-stream
HTTP/1.1 204 No Content
Stream-Next-Offset: 0000000000000001_000000000000000b

The offset advanced. Append a second message:

curl -i -X POST -H "Content-Type: text/plain" \
  -d "second message" \
  http://localhost:4437/v1/stream/my-stream

Read data

Read all messages from the beginning:

curl -i http://localhost:4437/v1/stream/my-stream?offset=-1
HTTP/1.1 200 OK
Content-Type: text/plain
Stream-Next-Offset: 0000000000000002_0000000000000019
Stream-Up-To-Date: true
ETag: "-1:0000000000000002_0000000000000019"

hello worldsecond message

The body contains the concatenated messages. Save Stream-Next-Offset to resume later without replaying:

curl -i http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_0000000000000019

This returns an empty body with Stream-Up-To-Date: true because there is no new data.

Subscribe with SSE

Open a live SSE connection to receive new messages as they arrive:

curl -N http://localhost:4437/v1/stream/my-stream?offset=-1\&live=sse

You will see the existing messages followed by a control event:

event: data
data:hello world

event: data
data:second message

event: control
data:{"streamNextOffset":"0000000000000002_0000000000000019","streamCursor":"...","upToDate":true}

The connection stays open. In another terminal, append more data:

curl -X POST -H "Content-Type: text/plain" \
  -d "live update" \
  http://localhost:4437/v1/stream/my-stream

The SSE connection immediately delivers:

event: data
data:live update

event: control
data:{"streamNextOffset":"0000000000000003_0000000000000024","streamCursor":"...","upToDate":true}

Delete the stream

curl -i -X DELETE http://localhost:4437/v1/stream/my-stream
HTTP/1.1 204 No Content

Persistent storage

The quickstart uses in-memory storage (the default). For persistence, set DS_STORAGE__MODE:

# Crash-resilient storage using redb
DS_STORAGE__MODE=acid DS_STORAGE__DATA_DIR=./data cargo run

Available modes: memory, file-fast, file-durable, acid (alias: redb). See Configuration for details.

Next steps

Architecture overview

The full deployment has five components. Each solves one problem and composes cleanly with the others.

                    ┌──────────────────────────────────────────────┐
                    │              Docker Compose Stack            │
                    │                                              │
 Client ──────────►│  Envoy (:8080)  ──►  DS Server (:4437)      │
 (browser/curl)    │  JWT validation      append-only log          │
                    │                      SSE delivery             │
                    │                          │                    │
                    │                     Sync Service              │
                    │                      ▲         │              │
                    │                      │         ▼              │
                    │              Electric SQL    Postgres         │
                    │              (WAL reader)    (durable store)  │
                    └──────────────────────────────────────────────┘

Why each piece exists

ComponentProblem it solves
DS serverReal-time append-only log with SSE, offset resumption, and producer idempotency. Configurable storage (memory, file, acid/redb).
Envoy proxyJWT authentication. The DS server has no auth, so Envoy validates tokens and forwards the sub claim as X-JWT-Sub.
PostgresDurable storage and SQL querying. Even with persistent storage modes, Postgres provides structured access, analytics, and cross-service visibility.
Electric SQLChange data capture. Reads the Postgres WAL and exposes a Shape API that delivers table changes as a stream.
Sync serviceBidirectional bridge. Forwards Postgres changes into DS streams (PG-to-Stream) and DS stream events into Postgres (Stream-to-PG).

Data flows

PG-to-Stream (structured data to real-time)

INSERT INTO items → Postgres WAL → Electric Shape API → Sync Service → DS Server → SSE to clients

An application writes structured data to Postgres. Electric picks up the WAL change, the sync service receives it via the Shape API, and POSTs it as JSON to a DS stream. Connected clients receive it instantly via SSE.

Stream-to-PG (real-time events to durable storage)

Client POST → DS Server → SSE → Sync Service → INSERT INTO session_events

A client appends a session event (chat message, presence update) to a DS stream. The sync service consumes the stream via SSE and inserts each event into Postgres.

What you can swap

The architecture is composable. Each piece can be replaced independently:

  • Auth proxy: Envoy is one option. Any reverse proxy that validates JWTs and forwards claims works (nginx, Caddy, cloud load balancers).
  • Storage: The DS server supports in-memory, file-based, and acid (redb) storage backends, configurable via DS_STORAGE__MODE.
  • Sync layer: Electric SQL is the reference CDC tool. Any WAL reader (Debezium, custom logical replication) could feed the sync service.
  • Database: Postgres is used here because Electric requires it. The sync service pattern works with any database that supports change notifications.

Minimal vs. full stack

You do not need the full stack to use the DS server. The server runs standalone:

cargo run

This gives you a working durable streams server with no dependencies. The full stack adds auth, persistence, and sync for production-like deployments.

See Docker Compose for running the full stack and Quickstart for the minimal server.

DS server

The durable streams server is the core component. It implements the durable streams protocol as an HTTP service: create streams, append messages, read them back with offset resumption, and subscribe for live delivery via long-poll or SSE.

Design principles

  • Protocol-only. The server implements the protocol and nothing else. No authentication, no database, no application logic. This makes it composable with external infrastructure.
  • Thin handlers. HTTP handlers parse requests, validate inputs, call the storage layer, and format responses. No business logic in handlers.
  • Pluggable storage. Streams can run in memory, file-fast / file-durable, or acid mode. acid uses sharded redb files for crash-safe ACID commits.
  • Content-agnostic. The server treats all payloads as opaque bytes (or opaque JSON objects for JSON-mode streams). It does not parse or interpret message content.

What it does

CapabilityHow
Create / delete streamsPUT / DELETE on /v1/stream/{name}
Append messagesPOST with matching Content-Type
Read (catch-up)GET with offset, returns concatenated data
Read (long-poll)GET with live=long-poll, waits for new data
Read (SSE)GET with live=sse, streaming delivery
Producer idempotencyProducer-ID / Epoch / Seq headers for deduplication
JSON modeArray flattening on append, array wrapping on read
TTL / expiryPer-stream TTL with lazy expiration
Stream closureImmutable close with conflict detection
ETag cachingRange-based ETags, 304 Not Modified

Configuration

The server is configured through layered TOML files and environment variables. See Configuration for the full list.

Key defaults:

VariableDefaultDescription
DS_SERVER__PORT4437Listen port
DS_SERVER__LONG_POLL_TIMEOUT_SECS30Long-poll timeout
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS60SSE reconnect interval (0 to disable)
DS_HTTP__CORS_ORIGINS*Allowed CORS origins
DS_STORAGE__MODEmemoryStorage backend selection
DS_STORAGE__DATA_DIR./data/streamsPersistent backend root directory

Storage backends

  • memory: fastest and simplest, no restart durability.
  • file-fast / file-durable: one file per stream append log; durable mode fsyncs each append.
  • acid: sharded redb backend in ${DATA_DIR}/acid.
    • Routing is deterministic: shard = seahash(stream_name) & (shard_count - 1).
    • Each shard has an independent redb write lock, so different streams can write concurrently on different shards.
    • Every mutating operation is one redb transaction with immediate durability, providing crash-safe all-or-nothing commits.

Security headers

The server applies security headers via middleware on all responses:

  • Cache-Control: no-store (prevents intermediate caches from serving stale data)
  • X-Content-Type-Options: nosniff
  • Cross-Origin-Resource-Policy: cross-origin

SSE responses use Cache-Control: no-cache instead of no-store.

Memory limits

Two configurable limits prevent unbounded memory growth:

  • DS_LIMITS__MAX_MEMORY_BYTES (default 100 MB): total memory across all streams
  • DS_LIMITS__MAX_STREAM_BYTES (default 10 MB): maximum size per stream

When limits are exceeded, appends return 413 Payload Too Large.

Health check

GET /healthz returns 200 with "ok". This endpoint lives outside the /v1/stream/ namespace and requires no authentication when behind a proxy.

Auth proxy

The DS server has no authentication. This is intentional: it keeps the protocol implementation simple and lets adopters use whatever auth system they already have.

The pattern is to put a reverse proxy in front of the server that validates credentials and forwards identity information.

The pattern

Any reverse proxy that validates JWTs (or other credentials) and forwards the result works. The proxy needs to do three things:

  1. Validate tokens. Check the JWT signature against a JWKS, verify iss, aud, and exp claims.
  2. Forward identity. Extract a claim (typically sub) and pass it to the DS server as a header (e.g., X-JWT-Sub).
  3. Bypass health checks. Let /healthz through without auth so load balancers and orchestrators can probe the server.
Client                    Auth Proxy                DS Server
  │                          │                          │
  │── GET /healthz ─────────►│──── pass through ───────►│
  │                          │                          │
  │── PUT /v1/stream/foo ───►│                          │
  │   Authorization: Bearer  │── validate JWT ──┐       │
  │   eyJ...                 │                  │       │
  │                          │◄─ valid ─────────┘       │
  │                          │── proxy + identity ─────►│
  │                          │                          │
  │── PUT /v1/stream/bar ───►│                          │
  │   (no token)             │── 401 Unauthorized       │
  │                          │   (proxy rejects)        │

Streaming considerations

If the proxy sits in front of SSE or long-poll connections, it needs to support long-lived requests:

SettingRecommendationWhy
Route timeoutdisabled or very largeSSE connections run indefinitely
Stream idle timeoutlonger than server's SSE idle closeLet the server control connection lifecycle

The DS server closes idle SSE connections after ~60 seconds by default. The proxy's idle timeout should exceed this so the server, not the proxy, decides when to close.

Proxy options

  • Envoy with jwt_authn HTTP filter
  • nginx with ngx_http_auth_jwt_module
  • Caddy with jwt middleware
  • AWS ALB with OIDC integration
  • Traefik with ForwardAuth middleware
  • Custom middleware in your application server

The DS server only cares that requests arrive on its port. It does not inspect auth headers.

In this repository

The e2e harness uses Envoy as the auth proxy, configured in e2e/envoy.yaml.

Envoy configuration

SettingValueNotes
JWT validationlocal_jwks from e2e/fixtures/jwks.jsonOffline validation, no external JWKS endpoint
Issuerdurable-streams-testTest-only issuer
Audiencedurable-streamsTest-only audience
Route timeout0s (disabled)Supports SSE
Stream idle timeout120sServer controls lifecycle at 60s
claim_to_headerssub as X-JWT-SubForwards identity
Admin port9901Debugging dashboard

Test token generation

The e2e/ directory includes an RSA keypair and scripts for minting test JWTs:

cd e2e && npm install

TOKEN=$(node generate-token.mjs)              # valid, 1h expiry
node generate-token.mjs --expired             # expired token
node generate-token.mjs --sub alice           # custom subject
node generate-token.mjs --wrong-issuer        # wrong iss (rejected)

The RSA keypair in e2e/fixtures/ is committed intentionally. These are test-only keys with zero security value. For production, point your proxy at a real identity provider's JWKS endpoint.

Sync layer

The DS server supports multiple storage backends (memory, file, acid/redb). For SQL querying and cross-service visibility, you need a sync layer that bridges DS streams and a database bidirectionally.

The pattern

A sync service sits between the DS server and a database, copying data in both directions:

Database-to-Stream: A change data capture (CDC) tool watches the database for new rows. The sync service receives change events and POSTs them to a DS stream. Connected clients get the changes via SSE.

App writes to DB → CDC tool detects change → Sync service → POST to DS stream → SSE to clients

Stream-to-Database: The sync service subscribes to a DS stream via SSE. Each event is parsed and inserted into the database.

Client POSTs to DS stream → SSE → Sync service → INSERT into database

Why both directions

Neither direction is optional for a production session system:

  • Without Stream-to-DB: Session events live only in DS memory. Server restart loses everything. No SQL queries, no analytics.
  • Without DB-to-Stream: Structured data changes (from CRUD APIs, admin tools, batch jobs) never reach connected clients in real time.

Technology choices

The sync layer has three slots to fill:

SlotRoleOptions
DatabaseDurable storage with change notificationsPostgres, MySQL, MongoDB, any DB with CDC support
CDC toolWatches DB changes, exposes them as a streamElectric SQL, Debezium, custom logical replication
Sync serviceBridges CDC events and DS streamsCustom process (any language)

The DS server is agnostic to all three. It only sees HTTP requests.

Database requirements

For the CDC direction (DB-to-Stream), the database needs to support change notifications. For Postgres, this means:

  • wal_level=logical for logical replication
  • Sufficient max_wal_senders and max_replication_slots

Sync service requirements

A sync service needs to:

  1. Subscribe to the CDC tool's change feed
  2. POST changes as JSON arrays to a DS stream (using JSON mode)
  3. Subscribe to a DS stream via SSE for the reverse direction
  4. Parse SSE data events and insert into the database
  5. (Production) persist its SSE offset so it can resume after restart

Latency

Typical latencies in a local stack:

  • DB-to-Stream: under 1 second. CDC replication latency is the bottleneck.
  • Stream-to-DB: under 100ms. DS broadcast + SSE delivery + DB insert.

In this repository

The e2e harness uses Postgres + Electric SQL + a Node.js sync service.

Components

ComponentImage / locationRole
Postgrespostgres:17-alpine (port 54321)Source and sink tables
Electric SQLelectricsql/electric:1.4.2Reads Postgres WAL, exposes Shape API
Sync servicee2e/sync/sync.mjsBridges Electric Shape API and DS streams

Postgres schema

-- Source table for DB-to-Stream (application writes here)
CREATE TABLE items (
  id SERIAL PRIMARY KEY,
  title TEXT NOT NULL,
  body TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Sink table for Stream-to-DB (sync service writes here)
CREATE TABLE session_events (
  id SERIAL PRIMARY KEY,
  stream_name TEXT NOT NULL,
  event_key TEXT,
  event_type TEXT,
  operation TEXT,
  payload JSONB NOT NULL,
  ds_offset TEXT,
  received_at TIMESTAMPTZ DEFAULT NOW()
);

Sync service configuration

VariableDefaultDescription
ELECTRIC_URLhttp://electric:3000Electric Shape API
DS_SERVER_URLhttp://server:4437DS server URL (internal, no auth). Use https://... when direct TLS is enabled on the DS server.
POSTGRES_URLpostgresql://postgres:password@postgres:5432/durable_streamsPostgres connection

Running

docker-compose --profile sync up -d --build
docker-compose logs -f sync-service
make integration-test-sessions

Without the sync layer

The DS server works standalone. If you only need real-time delivery without durable storage:

cargo run

Add the sync layer when you need data surviving restarts, SQL queries, or structured data reaching clients in real time.

Docker Compose

The docker-compose.yml at the repository root defines the full stack. Services are organized into profiles so you can run as much or as little as you need.

Profiles

ProfileServicesUse case
(default)DS server, Envoy proxyAuthenticated protocol server
sync+ Postgres, Electric SQL, sync serviceBidirectional PG sync
dev+ Adminer, heartbeat producerDeveloper observability

Starting the stack

Default (server + auth proxy)

docker-compose up -d

This starts the DS server on port 4437 (internal) and Envoy on port 8080 (public). The health check is at http://localhost:8080/healthz.

With sync

docker-compose --profile sync up -d --build

Adds Postgres, Electric SQL, and the sync service. Data flows bidirectionally between DS streams and Postgres.

Full dev stack

make dev

Starts everything: server, Envoy, Postgres, Electric, sync service, Adminer (DB admin UI), and a heartbeat producer. See Dev mode for details.

Building the Docker image

docker-compose build
# or
make docker

The server Dockerfile uses a multi-stage build: compile with rust:latest, run on debian:bookworm-slim.

Stopping

docker-compose down                          # default profile
docker-compose --profile sync down           # sync profile
docker-compose --profile sync --profile dev down  # everything
make dev-down                                 # same as above

Port map

See Port map for all ports and their purposes.

Quick test

# Start the stack
docker-compose up -d

# Health check (no auth)
curl http://localhost:8080/healthz

# Generate a test JWT
cd e2e && npm install
TOKEN=$(node generate-token.mjs)

# Create a stream (authenticated)
curl -X PUT -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: text/plain" \
  http://localhost:8080/v1/stream/test-1

# Append data
curl -X POST -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: text/plain" \
  -d "hello" http://localhost:8080/v1/stream/test-1

# Read back
curl -H "Authorization: Bearer $TOKEN" \
  http://localhost:8080/v1/stream/test-1

# Unauthenticated requests are rejected
curl -X PUT http://localhost:8080/v1/stream/test-2  # 401

Integration tests

# Full automated cycle: build, start, test, tear down
make integration-test

# With sync and sessions tests
make integration-test-sessions

Dev mode

Dev mode starts the full stack plus observability tools for watching data flow through the system in real time.

Starting

# Terminal 1: Start the full stack
make dev

# Terminal 2: Start the visual stream browser
make dev-ui

What you get

Heartbeat producer

A container that exercises both sync directions every 5 seconds:

  • Odd ticks (PG-to-DS): INSERTs into the items Postgres table. Electric picks up the WAL change, the sync service forwards it to the pg-items DS stream.
  • Even ticks (DS-to-PG): POSTs a session event to the session-events DS stream. The sync service consumes it via SSE and INSERTs into the session_events Postgres table.

Watch the logs:

docker-compose logs -f producer

After ~30 seconds, both items and session_events tables will have heartbeat rows.

Adminer (database UI)

Open http://localhost:8081 to browse Postgres tables.

FieldValue
SystemPostgreSQL
Serverpostgres
Usernamepostgres
Passwordpassword
Databasedurable_streams

Test UI (stream browser)

The test-ui is the official durable-streams stream browser. It is cloned into .dev/ (gitignored) on first run via make dev-ui.

Connect it to the DS server at http://localhost:4437. The producer's streams (pg-items, session-events) can be navigated to manually.

Requirements: pnpm, Node 22+

Known limitation: The server does not implement the __registry__ stream used for auto-discovery, so streams will not auto-populate in the sidebar.

Port map

PortServicePurpose
4437serverDS server (direct, no auth)
8080envoyJWT auth proxy
9901envoyEnvoy admin dashboard
54321postgresPostgres direct access
8081adminerDatabase admin UI
3000test-uiVisual stream browser (host)

Stopping

make dev-down          # stop all containers
rm -rf .dev/           # remove cloned test-ui monorepo (optional)

Production

The Docker stack in this repository is a local development and testing tool. This page covers what to change for production.

Authentication

Replace the test JWKS with a real identity provider:

  • Test setup: Envoy validates JWTs against a local file (e2e/fixtures/jwks.json) with a committed RSA keypair.
  • Production: Point Envoy's remote_jwks at your identity provider's JWKS endpoint (Auth0, Cognito, Keycloak, etc.). Update the issuer and audiences to match.

The DS server itself needs no changes. It is auth-agnostic by design.

Persistent storage

The DS server supports multiple storage modes:

  • memory: in-RAM only; no restart durability.
  • file-fast / file-durable: file-backed per-stream logs.
  • acid: sharded redb backend with ACID commits and immediate durability.

For production durability, use file-durable or acid, or run the sync layer to mirror data into Postgres.

Acid mode tuning

  • DS_STORAGE__MODE=acid and DS_STORAGE__DATA_DIR=/path/to/store persist under ${DATA_DIR}/acid/.
  • DS_STORAGE__ACID_SHARD_COUNT controls write concurrency. Keep it power-of-2 (1..=256), default 16.
  • Writes are serialized per shard (single writer per shard). Increase shard count for highly concurrent write workloads.
  • Acid mode commits with immediate durability (fsync-class semantics on each commit), prioritizing crash safety over raw append latency.

Electric SQL configuration

  • Pin the Electric SQL version in your deployment (the stack uses electricsql/electric:1.4.2).
  • Configure Postgres replication slots carefully. The defaults (max_wal_senders=10, max_replication_slots=10) work for development but may need tuning for production workloads.
  • Ensure wal_level=logical is set in your Postgres configuration. This is required for Electric's logical replication.

Sync service resilience

The reference sync service (e2e/sync/sync.mjs) is a starting point, not production-ready:

  • Offset persistence: The sync service should save its SSE offset (from the id: field) to a Postgres table so it can resume after restart without replaying the full stream.
  • Error handling: Add retries with backoff for Postgres insert failures and SSE reconnection.
  • Scaling: The sync service is a single process. For high-throughput streams, partition by stream name or run multiple instances with offset coordination.

Memory limits

Tune the server's memory limits for your workload:

VariableDefaultNotes
DS_LIMITS__MAX_MEMORY_BYTES100 MBTotal across all streams
DS_LIMITS__MAX_STREAM_BYTES10 MBPer stream

In production with the sync layer, streams are consumed and can be deleted after sync. When using the default in-memory mode, the store acts as a buffer, not long-term storage. For persistence without the sync layer, use file-durable or acid mode.

Monitoring

  • Log and alert on sync service errors, SSE reconnections, and PG insert failures.
  • The sync service logs events to stdout; aggregate with your preferred log pipeline.
  • Use the Envoy admin dashboard (port 9901 in dev) for proxy metrics and connection debugging.
  • The DS server logs via tracing with configurable levels via RUST_LOG.

CORS

The server defaults to DS_HTTP__CORS_ORIGINS=* (allow all). For production, restrict to your application's domain:

DS_HTTP__CORS_ORIGINS=https://app.example.com cargo run

Multiple origins can be comma-separated.

TLS

Default model: terminate TLS at the proxy layer (Envoy, nginx, cloud load balancer) or at a CDN edge.

Optional model: the DS server can terminate TLS directly when both DS_TLS__CERT_PATH and DS_TLS__KEY_PATH are set.

Recommended topology matrix:

  • Internet-facing traffic: terminate TLS at proxy/edge (this is still the default).
  • Proxy -> DS server hop: use direct TLS on the DS server when you need encrypted in-cluster/intra-box traffic.
  • mTLS: terminate and enforce mTLS at the proxy when possible; DS direct TLS currently covers server-side TLS termination.

HTTP/2 and HTTP/3 are typically negotiated at the proxy/edge. Enabling direct TLS on the DS server secures that hop, while proxy capabilities still govern external ALPN behavior.

Streams

A stream is an append-only log identified by name. Clients create, append to, read from, and delete streams using standard HTTP methods.

Create a stream (PUT)

curl -i -X PUT -H "Content-Type: text/plain" \
  http://localhost:4437/v1/stream/my-stream

Headers:

  • Content-Type (optional): the stream's content type, fixed at creation. Defaults to application/octet-stream if omitted
  • Stream-TTL (optional): time-to-live in seconds
  • Stream-Expires-At (optional): absolute expiration (ISO 8601)
  • Stream-Closed (optional): "true" to create in closed state

Responses:

  • 201 Created with Location, Content-Type, Stream-Next-Offset
  • 200 OK if the stream already exists with the same configuration (idempotent)
  • 409 Conflict if the stream exists with different configuration
  • 400 Bad Request for invalid TTL or empty Content-Type header

The Content-Type is immutable after creation. Comparison is case-insensitive and ignores charset parameters.

Append data (POST)

curl -i -X POST -H "Content-Type: text/plain" \
  -d "hello world" \
  http://localhost:4437/v1/stream/my-stream

Headers:

  • Content-Type (required): must match the stream's content type
  • Stream-Closed (optional): "true" to close the stream (with or without data)

Responses:

  • 204 No Content with Stream-Next-Offset
  • 404 Not Found if stream does not exist
  • 409 Conflict for content-type mismatch or closed stream
  • 400 Bad Request for empty body without Stream-Closed

Each append generates a unique, monotonically increasing offset. Concurrent appends to the same stream are serialized to maintain offset ordering.

Read data (GET)

curl -i http://localhost:4437/v1/stream/my-stream?offset=-1

Query parameters:

  • offset: where to start reading
    • -1 (default): beginning of stream
    • now: current tail (returns empty body)
    • hex offset: resume from a specific position

Response headers:

  • Stream-Next-Offset: save this for resumption
  • Stream-Up-To-Date: "true" when at the tail
  • Stream-Closed: "true" when closed and at tail
  • ETag: for conditional requests with If-None-Match

The body contains all messages concatenated from the requested offset to the current tail.

Stream metadata (HEAD)

curl -I http://localhost:4437/v1/stream/my-stream

Returns the same headers as GET (Content-Type, Stream-Next-Offset, Stream-Closed, TTL info) with no body.

Delete a stream (DELETE)

curl -i -X DELETE http://localhost:4437/v1/stream/my-stream
  • 204 No Content on success
  • 404 Not Found if stream does not exist

Deleting removes all data and metadata. The stream name can be reused.

Close a stream

Closing a stream prevents further appends. Close by sending a POST with Stream-Closed: true:

# Close without data
curl -X POST -H "Content-Type: text/plain" \
  -H "Stream-Closed: true" \
  http://localhost:4437/v1/stream/my-stream

# Close with a final message
curl -X POST -H "Content-Type: text/plain" \
  -H "Stream-Closed: true" \
  -d "goodbye" http://localhost:4437/v1/stream/my-stream

After closure:

  • Appends return 409 Conflict with Stream-Closed: true
  • Reads still work; Stream-Closed: true appears when the reader is at the tail
  • Closing is idempotent (re-closing returns 204)

TTL and expiry

Streams can have a time-to-live set at creation:

# Expire in 1 hour
curl -X PUT -H "Content-Type: text/plain" \
  -H "Stream-TTL: 3600" \
  http://localhost:4437/v1/stream/temp

# Expire at a specific time
curl -X PUT -H "Content-Type: text/plain" \
  -H "Stream-Expires-At: 2026-12-31T23:59:59Z" \
  http://localhost:4437/v1/stream/temp2
  • Only one of Stream-TTL and Stream-Expires-At may be provided
  • Expired streams return 404 on all operations
  • Expiration is checked lazily on access
  • HEAD returns remaining Stream-TTL and Stream-Expires-At

Read modes

The DS server supports three read modes: catch-up (default), long-poll, and SSE. All use GET requests with different query parameters.

Catch-up (default)

Returns all data from an offset to the current tail, then closes the connection.

# Read everything
curl http://localhost:4437/v1/stream/my-stream?offset=-1

# Resume from a saved offset
curl http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a

The response includes:

  • Body with concatenated message data
  • Stream-Next-Offset for resumption
  • Stream-Up-To-Date: true when at the tail
  • ETag for conditional requests

This is the simplest mode. Use it for one-shot reads or polling.

Long-poll

Waits for new data if the client is already at the tail, avoiding the need for rapid polling.

curl http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a\&live=long-poll

Behavior:

  • If data exists at the offset, returns it immediately (same as catch-up)
  • If at the tail and the stream is open, waits for new data
  • If at the tail and the stream is closed, returns immediately with Stream-Closed: true
  • Returns 204 No Content when the timeout expires with no new data

The timeout defaults to 30 seconds (configurable via DS_SERVER__LONG_POLL_TIMEOUT_SECS).

The response includes a Stream-Cursor header. Echo it back in the cursor query parameter on subsequent requests to enable CDN request collapsing:

curl http://localhost:4437/v1/stream/my-stream?offset=...&live=long-poll&cursor=...

SSE (Server-Sent Events)

Opens a persistent connection that delivers messages as they arrive.

curl -N http://localhost:4437/v1/stream/my-stream?offset=-1\&live=sse

The server returns Content-Type: text/event-stream and streams events:

Event types

event: data -- one per stored message:

event: data
data:hello world

event: control -- metadata after each batch:

event: control
data:{"streamNextOffset":"...","streamCursor":"...","upToDate":true}

Control event fields

FieldTypeWhen included
streamNextOffsetstringalways
streamCursorstringwhen stream is open
upToDatebooleanwhen caught up
streamClosedbooleanwhen closed and all data sent

Connection lifecycle

  1. Historical data is sent as event: data events
  2. A control event with upToDate: true indicates catch-up is complete
  3. The connection stays open, waiting for new data
  4. New appends trigger additional data + control events
  5. If the stream is closed, the final control includes streamClosed: true and the connection closes
  6. Idle connections close after ~60 seconds (configurable via DS_SERVER__SSE_RECONNECT_INTERVAL_SECS)

Binary streams

For content types other than text/* and application/json, SSE data events carry base64-encoded payloads. The response includes stream-sse-data-encoding: base64.

Multi-line data

Multi-line messages are sent as multiple data: lines per SSE spec:

event: data
data:line one
data:line two

Resumption

Save the offset from control events. Reconnect with offset={saved} to resume:

curl -N http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a\&live=sse

The offset=now sentinel skips historical data and receives only new messages.

Producers

Producer headers enable exactly-once append semantics over HTTP. A producer identifies itself with a stable ID, uses a monotonic epoch for session management, and a monotonic sequence number for per-request deduplication. The server validates state and deduplicates retries automatically.

Producer headers

All three must be provided together (or none):

HeaderFormatDescription
Producer-IDnon-empty stringStable identifier for the producer
Producer-Epochinteger (0 to 2^53-1)Incremented on producer restart
Producer-Seqinteger (0 to 2^53-1)Incremented per request within an epoch

Basic flow

# First message: epoch 0, seq 0
curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 0" \
  -d "message 1" \
  http://localhost:4437/v1/stream/my-stream
# → 200 OK (new data accepted)

# Second message: epoch 0, seq 1
curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 1" \
  -d "message 2" \
  http://localhost:4437/v1/stream/my-stream
# → 200 OK

Response codes

CodeMeaningWhen
200 OKNew data acceptedseq = lastSeq + 1 (or first append)
204 No ContentDuplicate (already persisted)seq <= lastSeq
400 Bad RequestInvalid headersPartial set, empty ID, non-integer, or epoch bump with seq != 0
403 ForbiddenEpoch fenced (zombie producer)epoch < server's current epoch
409 ConflictSequence gapseq > lastSeq + 1

Duplicate detection

Retrying the same (ID + epoch + seq) is safe and returns 204:

# Retry of seq 0 (already succeeded above)
curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 0" \
  -d "message 1" \
  http://localhost:4437/v1/stream/my-stream
# → 204 No Content (duplicate, no data stored)

Epoch fencing (zombie detection)

When a producer restarts, it bumps its epoch and starts seq at 0:

# Producer restarts with epoch 1
curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 1" \
  -H "Producer-Seq: 0" \
  -d "restarted" \
  http://localhost:4437/v1/stream/my-stream
# → 200 OK

After this, the old epoch is fenced. A request from epoch 0 returns 403 Forbidden:

curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 2" \
  -d "zombie" \
  http://localhost:4437/v1/stream/my-stream
# → 403 Forbidden (epoch fenced)
# Producer-Epoch: 1 (server's current epoch)

Sequence gaps

If a sequence number is skipped, the server returns 409 Conflict with diagnostic headers:

curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 5" \
  -d "skipped" \
  http://localhost:4437/v1/stream/my-stream
# → 409 Conflict
# Producer-Expected-Seq: 1
# Producer-Received-Seq: 5

Multiple producers

Multiple producers can write to the same stream. Each has independent epoch and sequence tracking:

# Producer A writes
curl -X POST -H "Producer-ID: user-alice" \
  -H "Producer-Epoch: 0" -H "Producer-Seq: 0" ...

# Producer B writes
curl -X POST -H "Producer-ID: user-bob" \
  -H "Producer-Epoch: 0" -H "Producer-Seq: 0" ...

Messages are ordered by arrival time, not by producer.

Closing with producer headers

A producer can atomically append and close:

curl -X POST -H "Content-Type: text/plain" \
  -H "Producer-ID: my-producer" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 2" \
  -H "Stream-Closed: true" \
  -d "final message" \
  http://localhost:4437/v1/stream/my-stream
# → 200 OK, Stream-Closed: true

Retrying the same closing request returns 204 No Content with Stream-Closed: true.

JSON mode

Streams created with Content-Type: application/json operate in JSON mode. JSON mode provides array flattening on append and array wrapping on read.

Creating a JSON stream

curl -X PUT -H "Content-Type: application/json" \
  http://localhost:4437/v1/stream/events

Appending

Single value

A single JSON object is stored as one message:

curl -X POST -H "Content-Type: application/json" \
  -d '{"event": "click", "x": 100}' \
  http://localhost:4437/v1/stream/events

Array flattening

A JSON array is flattened: each element becomes a separate message:

curl -X POST -H "Content-Type: application/json" \
  -d '[{"event": "click"}, {"event": "scroll"}]' \
  http://localhost:4437/v1/stream/events

This stores two messages. Only the top-level array is flattened; nested arrays within objects are preserved.

Rejected inputs

  • Empty arrays ([]) return 400 Bad Request
  • Invalid JSON returns 400 Bad Request

Reading

All messages are wrapped in a JSON array regardless of how they were appended:

curl http://localhost:4437/v1/stream/events?offset=-1
[
  {"event": "click", "x": 100},
  {"event": "click"},
  {"event": "scroll"}
]

An empty stream returns [].

SSE

SSE data events for JSON streams wrap each message in an array:

event: data
data:[{"event":"click","x":100}]

Consumers must unwrap the outer array:

const parsed = JSON.parse(data);
const items = Array.isArray(parsed) ? parsed : [parsed];

See ecosystem interop CI-003 for details on this pattern.

Non-JSON streams

JSON mode only activates for Content-Type: application/json. Other content types (e.g., text/plain) store and return raw bytes with no flattening or wrapping.

Caching

The protocol includes ETag-based caching to avoid redundant data transfer and Cache-Control: no-store to prevent stale data in intermediate caches.

ETags

Every 200 OK GET response includes an ETag header encoding the read range:

ETag: "{start_offset}:{end_offset}"

The start offset is the query parameter value (including sentinels like -1 or now). The end offset is the Stream-Next-Offset from the same response.

Closed stream suffix

When the stream is closed and the reader is at the tail, the ETag has a :c suffix:

ETag: "-1:0000000000000003_000000000000001a:c"

Examples

GET ?offset=-1          → ETag: "-1:0000000000000003_000000000000001a"
GET ?offset=now         → ETag: "now:0000000000000003_000000000000001a"
GET ?offset=0000...0001 → ETag: "0000...0001:0000...0003"

Conditional requests (304)

Clients can send If-None-Match with a previously received ETag:

curl -H 'If-None-Match: "-1:0000000000000003_000000000000001a"' \
  http://localhost:4437/v1/stream/my-stream?offset=-1

If the ETag matches (no new data since last read), the server returns:

HTTP/1.1 304 Not Modified
Stream-Next-Offset: 0000000000000003_000000000000001a
Stream-Up-To-Date: true
Cache-Control: no-store

No body is sent, saving bandwidth.

If the ETag does not match (new data available), the server returns a normal 200 OK with updated data and a new ETag.

Cache-Control

The server includes Cache-Control: no-store on all responses (200, 201, 204, 304, 400, 404, 409, 413). This prevents intermediate HTTP caches (CDNs, proxies) from serving stale stream data.

SSE responses use Cache-Control: no-cache instead, which allows caching but requires revalidation.

Writing a client

You can interact with the DS server using any HTTP client. This page shows patterns for the @durable-streams/client SDK, plain fetch, and SSE consumption.

Using the client SDK

The @durable-streams/client SDK provides typed methods for stream operations:

import { DurableStream, stream } from "@durable-streams/client";

const baseUrl = "http://localhost:4437";

// Create a stream
await DurableStream.create({
  url: `${baseUrl}/v1/stream/my-stream`,
  contentType: "application/json",
});

// Append data
await DurableStream.append({
  url: `${baseUrl}/v1/stream/my-stream`,
  body: JSON.stringify([{ event: "click" }]),
  contentType: "application/json",
});

// Read data
const res = await stream({
  url: `${baseUrl}/v1/stream/my-stream`,
  live: false,
  json: true,
});
const data = await res.json();
const offset = res.offset; // save for resumption

SSE with the client SDK

When using SSE with JSON streams, pass json: true explicitly. The SSE transport uses Content-Type: text/event-stream, which masks the stream's actual content type:

const res = await stream({
  url: `${baseUrl}/v1/stream/my-stream`,
  live: "sse",
  json: true, // required: SSE transport masks the stream's content type
});

for await (const item of res.subscribeJson()) {
  console.log("Received:", item);
}

See ecosystem interop CI-001 for why this hint is necessary.

Using plain fetch

Create

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "PUT",
  headers: { "Content-Type": "text/plain" },
});

Append

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "POST",
  headers: { "Content-Type": "text/plain" },
  body: "hello world",
});

Read with offset resumption

// First read
const res = await fetch(`${baseUrl}/v1/stream/my-stream?offset=-1`);
const body = await res.text();
const nextOffset = res.headers.get("Stream-Next-Offset");

// Resume later (only new messages)
const res2 = await fetch(`${baseUrl}/v1/stream/my-stream?offset=${nextOffset}`);

SSE consumption pattern

Node.js (manual parsing)

const res = await fetch(`${baseUrl}/v1/stream/my-stream?live=sse&offset=-1`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop(); // keep incomplete line

  let currentEventType = null;
  for (const line of lines) {
    if (line.startsWith("event:")) {
      currentEventType = line.slice(6).trim();
    } else if (line.startsWith("data:") && currentEventType === "data") {
      const payload = line.slice(5);
      console.log("Data:", payload);
    } else if (line.startsWith("data:") && currentEventType === "control") {
      const control = JSON.parse(line.slice(5));
      if (control.streamClosed) {
        console.log("Stream closed");
        reader.cancel();
      }
    } else if (line.startsWith("id:")) {
      const offset = line.slice(3).trim();
      // Save for resumption on reconnect
    }
  }
}

JSON streams over SSE

For JSON-mode streams, each data: line wraps the message in a JSON array. Unwrap it:

if (currentEventType === "data") {
  const parsed = JSON.parse(line.slice(5));
  const items = Array.isArray(parsed) ? parsed : [parsed];
  for (const item of items) {
    console.log("Item:", item);
  }
}

With authentication

When the DS server is behind an Envoy auth proxy, include a Bearer token:

const headers = { Authorization: `Bearer ${token}` };

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "PUT",
  headers: { ...headers, "Content-Type": "text/plain" },
});

Producer idempotency

For exactly-once writes, include producer headers:

let seq = 0;

async function append(data) {
  const res = await fetch(`${baseUrl}/v1/stream/my-stream`, {
    method: "POST",
    headers: {
      "Content-Type": "text/plain",
      "Producer-ID": "my-client-tab-1",
      "Producer-Epoch": "0",
      "Producer-Seq": String(seq),
    },
    body: data,
  });

  if (res.status === 200) {
    seq++; // accepted
  } else if (res.status === 204) {
    seq++; // duplicate, already persisted
  }
  // 409: sequence gap, 403: epoch fenced
}

See Producers for the full protocol.

Electric SQL sync

Electric SQL is a CDC (change data capture) tool for Postgres. It reads the write-ahead log and exposes table changes as an HTTP stream called the Shape API. This page explains the general approach and how the e2e harness implements it.

The pattern

How Electric works

Application → INSERT → Postgres WAL → Electric (replication slot) → Shape API → Subscribers
  1. An application writes to a Postgres table.
  2. Postgres records the change in its WAL.
  3. Electric connects to Postgres via a logical replication slot and reads new WAL entries.
  4. Electric exposes changes through its Shape API: an HTTP endpoint that subscribers poll or stream.
  5. A sync service subscribes to the Shape API and forwards changes wherever they need to go.

Postgres requirements

Electric requires logical replication:

wal_level = logical
max_wal_senders = 10        # at least 1 per Electric instance
max_replication_slots = 10   # at least 1 per Electric instance

These are Postgres server-level settings, not per-database. Without wal_level=logical, Electric fails during startup. See ecosystem interop CI-004 for details.

Shape API subscription

The Electric client library provides ShapeStream for subscribing to table changes:

import { ShapeStream } from "@electric-sql/client";

const stream = new ShapeStream({
  url: `${electricUrl}/v1/shape`,
  params: { table: "my_table" },
});

stream.subscribe(async (messages) => {
  const inserts = messages
    .filter(m => m.headers.operation === "insert")
    .map(m => m.value);
  // Forward inserts to DS stream, message queue, etc.
});

Each message includes headers (operation: insert/update/delete) and the row value. The subscriber decides what to do with changes.

Bridging to a DS stream

To get Postgres changes into a DS stream, the sync service POSTs each batch as a JSON array:

if (inserts.length > 0) {
  await fetch(`${dsServerUrl}/v1/stream/${streamName}`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(inserts),
  });
}

JSON mode flattens the array into individual messages. Connected clients receive each change via SSE.

The reverse direction

Electric handles DB-to-Stream. For Stream-to-DB, the sync service subscribes to a DS stream via SSE and inserts events into Postgres. See Durable sessions for the full bidirectional pattern.

Latency

DB-to-Stream latency depends primarily on Electric's replication lag. In local development this is typically under 1 second. In production it depends on WAL volume, replication slot configuration, and network latency to the Electric instance.

In this repository

The e2e harness runs Electric as a Docker container connecting to a Postgres instance.

Docker services

postgres:
  image: postgres:17-alpine
  command: [postgres, -c, wal_level=logical, -c, max_wal_senders=10, -c, max_replication_slots=10]
  environment:
    POSTGRES_DB: durable_streams
    POSTGRES_PASSWORD: password
  ports: ["54321:5432"]

electric:
  image: electricsql/electric:1.4.2
  environment:
    DATABASE_URL: postgresql://postgres:password@postgres:5432/durable_streams
  depends_on: [postgres]

Sync service

The reference sync service (e2e/sync/sync.mjs) subscribes to the items table via Electric and forwards changes to the pg-items DS stream.

VariableDefaultDescription
ELECTRIC_URLhttp://electric:3000Electric Shape API
DS_SERVER_URLhttp://server:4437DS server
POSTGRES_URLpostgresql://postgres:password@postgres:5432/durable_streamsPostgres

Running

docker-compose --profile sync up -d --build
docker-compose logs -f sync-service
make integration-test-sessions

Durable sessions

The durable sessions pattern combines DS streams for real-time delivery with a database for durable querying. It is the primary production use case: collaborative applications like AI chat where messages arrive instantly and are queryable forever.

The pattern

The two requirements

A durable session needs two things that no single component satisfies alone:

  1. Real-time delivery. Users in a session need messages instantly. Polling a database is too slow. SSE is the transport.
  2. Durable querying. Session history must survive server restarts and be queryable for analytics, moderation, and search. A database is the storage.
Client  ← SSE ───  DS Server  ← sync ──  Database
Client  ─ POST ──► DS Server  ─ sync ──► Database

A sync service bridges both directions. Without Stream-to-DB, events are lost on restart. Without DB-to-Stream, structured data changes never reach clients in real time.

Session events

The DS server is content-agnostic. All session structure lives in the JSON payload. A typical event:

{
  "key": "user:alice",
  "type": "presence",
  "operation": "set",
  "value": { "status": "online" }
}

Common event types for collaborative chat:

typeoperationdescription
presenceset / removeUser join/leave
chunkappendChat message fragment (streaming AI response)
messagesetComplete chat message
reactionset / removeEmoji reaction
typingset / removeTyping indicator

The key identifies the entity and operation describes the mutation. This follows the STATE-PROTOCOL pattern: the stream is an ordered log of state mutations that can be replayed to reconstruct current state.

Building a session

1. Create a JSON-mode stream for the session:

curl -X PUT -H "Content-Type: application/json" \
  http://localhost:4437/v1/stream/session-events

2. Append events with producer idempotency for exactly-once delivery:

curl -X POST -H "Content-Type: application/json" \
  -H "Producer-ID: user-alice-tab-1" \
  -H "Producer-Epoch: 0" \
  -H "Producer-Seq: 0" \
  -d '[{"key":"msg:1","type":"message","operation":"set","value":{"text":"Hello!"}}]' \
  http://localhost:4437/v1/stream/session-events

3. Subscribe for live updates via SSE:

curl -N http://localhost:4437/v1/stream/session-events?offset=-1\&live=sse

4. Resume after disconnect. Save the offset from the last SSE control event and reconnect:

curl -N http://localhost:4437/v1/stream/session-events?offset=0000000000000001_0000000000000042\&live=sse

Database schema design

The sync service needs a sink table for events. Design it to extract queryable fields from the JSON payload:

CREATE TABLE session_events (
  id SERIAL PRIMARY KEY,
  stream_name TEXT NOT NULL,
  event_key TEXT,          -- extracted from payload.key
  event_type TEXT,         -- extracted from payload.type
  operation TEXT,          -- extracted from payload.operation
  payload JSONB NOT NULL,  -- full event for querying
  ds_offset TEXT,          -- for deduplication
  received_at TIMESTAMPTZ DEFAULT NOW()
);

Once events are in the database, query them with SQL:

-- All messages in a session
SELECT * FROM session_events WHERE event_type = 'message' ORDER BY received_at;

-- Active users
SELECT event_key, payload->'value'->>'status'
FROM session_events
WHERE event_type = 'presence' AND operation = 'set'
ORDER BY received_at DESC;

Multiple producers

Multiple producers can write to the same session stream. Each has independent epoch and sequence tracking:

  • User Alice writes from her browser tab (Producer-ID: alice-tab-1)
  • An AI assistant writes responses (Producer-ID: ai-assistant)
  • A bot writes notifications (Producer-ID: notification-bot)

Messages are ordered by arrival time, not by producer.

In this repository

The e2e harness implements the full durable sessions pattern with Postgres, Electric SQL, and a Node.js sync service.

Harness architecture

Test Runner (host)
  │
  ├── PG-to-Stream: INSERT into items → Electric → sync-service → pg-items DS stream
  │
  └── Stream-to-PG: POST to session-events DS stream → sync-service SSE → session_events PG table

Running

# Automated: build, start, test, tear down
make integration-test-sessions

# Manual
docker-compose --profile sync up -d --build
cd e2e && npm install
npx vitest run --reporter=verbose sessions.test.mjs
docker-compose --profile sync down

Test scenarios

The e2e/sessions.test.mjs suite validates 8 scenarios:

Session pattern (5 tests):

  1. Session lifecycle: create, write presence event, read back
  2. Multi-producer chat: user + AI messages, verify ordering
  3. SSE live subscription: subscribe, write, verify real-time delivery
  4. Session recovery: write, save offset, write more, resume
  5. Producer idempotency: retry same seq (204), next seq (200)

Database sync (3 tests):

  1. PG-to-Stream: INSERT into Postgres, poll DS stream until it appears
  2. Stream-to-PG: POST to DS stream, poll Postgres until row appears
  3. Round trip: INSERT into PG, verify in DS stream

Configuration

The DS server supports layered TOML configuration plus env-var overrides.

Load order (later wins):

  1. built-in defaults
  2. config/default.toml (if present)
  3. config/<profile>.toml (if present; selected with --profile)
  4. config/local.toml (if present; intended for local overrides)
  5. --config <path> override file
  6. environment variables

CLI options:

FlagDescription
--profile <name>Loads config/<name>.toml
--config <path>Loads an additional TOML file last

Sample config files in this repo:

  • config/default.toml (baseline defaults)
  • config/dev.toml (development overrides)
  • config/prod.toml (production example)

Example:

cargo run -- --profile dev
cargo run -- --profile prod --config /etc/durable-streams/override.toml

Environment variables use the DS_ prefix with double-underscore section separators (e.g., DS_SERVER__PORT).

Server

VariableDefaultDescription
DS_SERVER__PORT4437TCP port to listen on
DS_HTTP__CORS_ORIGINS*Allowed CORS origins. * allows all. Multiple origins can be comma-separated (e.g., https://app.example.com,https://admin.example.com).
RUST_LOGinfoLog level filter (tracing format: debug, info, warn, error, or per-module like durable_streams=debug). Takes precedence over DS_LOG__RUST_LOG.
DS_LOG__RUST_LOGinfoDefault log level filter, applied through the TOML config layer. Overridden by RUST_LOG when both are set.

Transport (optional direct TLS)

VariableDefaultDescription
DS_TLS__CERT_PATHunsetPath to PEM certificate for direct TLS termination. Must be set together with DS_TLS__KEY_PATH.
DS_TLS__KEY_PATHunsetPath to PEM/PKCS#8 private key for direct TLS termination. Must be set together with DS_TLS__CERT_PATH.

Protocol

VariableDefaultDescription
DS_SERVER__LONG_POLL_TIMEOUT_SECS30How long to hold a long-poll request before returning 204 No Content. Set lower (e.g., 2) for fast-feedback testing.
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS60SSE reconnect interval in seconds (matches Caddy's sse_reconnect_interval). Enables CDN request collapsing. Set to 0 to disable.

Memory limits

VariableDefaultDescription
DS_LIMITS__MAX_MEMORY_BYTES104857600 (100 MB)Maximum total memory across all streams. Appends exceeding this return 413 Payload Too Large.
DS_LIMITS__MAX_STREAM_BYTES10485760 (10 MB)Maximum bytes per individual stream.

Storage

VariableDefaultDescription
DS_STORAGE__MODEmemoryStorage backend mode: memory, file-fast, file-durable, acid (alias: redb).
DS_STORAGE__DATA_DIR./data/streamsRoot directory for persistent backends (file-*, acid).
DS_STORAGE__ACID_SHARD_COUNT16Number of redb shards when DS_STORAGE__MODE=acid; must be power-of-2 in 1..=256 (invalid values return an error).

Sync service (e2e stack)

These variables configure the sync service container, not the DS server itself:

VariableDefaultDescription
ELECTRIC_URLhttp://electric:3000Electric SQL Shape API base URL
DS_SERVER_URLhttp://server:4437DS server URL (internal Docker network)
POSTGRES_URLpostgresql://postgres:password@postgres:5432/durable_streamsPostgres connection string

Example

# Development (fast timeouts for testing)
DS_SERVER__LONG_POLL_TIMEOUT_SECS=2 DS_SERVER__SSE_RECONNECT_INTERVAL_SECS=5 cargo run

# Development profile from TOML
cargo run -- --profile dev

# Production (restricted CORS, custom port)
DS_SERVER__PORT=8080 DS_HTTP__CORS_ORIGINS=https://app.example.com cargo run

# Optional direct TLS (proxy->server encryption or direct serving)
DS_TLS__CERT_PATH=/etc/ds/tls/server.crt DS_TLS__KEY_PATH=/etc/ds/tls/server.key cargo run

# Debug logging
RUST_LOG=debug cargo run

API reference

All endpoints are under the /v1/stream/ base path. The health check is at /healthz.

Health check

GET /healthz

Returns 200 OK with body ok. No authentication required.


Create stream

PUT /v1/stream/{name}
HeaderRequiredDescription
Content-TypenoStream content type (immutable after creation; defaults to application/octet-stream)
Stream-TTLnoTime-to-live in seconds
Stream-Expires-AtnoAbsolute expiration (ISO 8601)
Stream-Closedno"true" to create closed

Body is optional. If provided, it is appended as initial stream data.

StatusMeaning
201 CreatedStream created
200 OKIdempotent create (same config)
409 ConflictStream exists with different config
400 Bad RequestInvalid TTL or empty Content-Type header

Response headers: Location, Content-Type, Stream-Next-Offset


Append data

POST /v1/stream/{name}
HeaderRequiredDescription
Content-TypeyesMust match stream's content type
Stream-Closedno"true" to close (with or without data)
Producer-IDno*Producer identifier
Producer-Epochno*Producer epoch
Producer-Seqno*Producer sequence number

*Producer headers: all three or none.

StatusMeaning
200 OKNew data accepted (producer mode)
204 No ContentData appended (no producer) or duplicate (producer)
400 Bad RequestEmpty body without Stream-Closed, invalid producer headers
403 ForbiddenEpoch fenced (zombie producer)
404 Not FoundStream does not exist
409 ConflictContent-Type mismatch, closed stream, or sequence gap
413 Payload Too LargeMemory limit exceeded

Response headers: Stream-Next-Offset, Stream-Closed (if closed), Producer-Epoch, Producer-Seq


Read data (catch-up)

GET /v1/stream/{name}?offset={offset}
ParameterDefaultDescription
offset-1Start offset. Sentinels: -1 (start), now (tail)
HeaderRequiredDescription
If-None-MatchnoETag from previous read
StatusMeaning
200 OKData returned
304 Not ModifiedETag match (no new data)
400 Bad RequestInvalid offset format
404 Not FoundStream does not exist or expired

Response headers: Content-Type, Stream-Next-Offset, Stream-Up-To-Date, ETag, Cache-Control, Stream-Closed (if closed and at tail)


Read data (long-poll)

GET /v1/stream/{name}?offset={offset}&live=long-poll[&cursor={cursor}]

Same as catch-up, plus:

ParameterDescription
liveMust be "long-poll"
cursorEcho of previous Stream-Cursor (optional)

Additional response header: Stream-Cursor

Returns 204 No Content on timeout or closed stream at tail.


Read data (SSE)

GET /v1/stream/{name}?offset={offset}&live=sse

Returns Content-Type: text/event-stream. See Read modes for event format.

Additional response header: stream-sse-data-encoding: base64 (for binary content types)


Stream metadata

HEAD /v1/stream/{name}

Returns the same headers as GET with no body.

StatusMeaning
200 OKStream exists
404 Not FoundStream does not exist or expired

Response headers: Content-Type, Stream-Next-Offset, Stream-Closed, Stream-TTL, Stream-Expires-At, Cache-Control


Delete stream

DELETE /v1/stream/{name}
StatusMeaning
204 No ContentDeleted
404 Not FoundDoes not exist

Common response headers

These headers appear on all responses via middleware:

HeaderValueNotes
Cache-Controlno-storeno-cache for SSE
X-Content-Type-Optionsnosniff
Cross-Origin-Resource-Policycross-origin

Port map

All ports used by the Docker Compose stack, organized by profile.

Default profile

PortServicePurpose
4437serverDS server (direct, no auth)
8080envoyJWT auth proxy (public endpoint)
9901envoyEnvoy admin dashboard

Sync profile

Includes all default ports, plus:

PortServicePurpose
54321postgresPostgres direct access (mapped from 5432)

Internal (Docker network only, no host port):

ServiceInternal portPurpose
electric3000Electric Shape API
sync-service--No listening port (initiates connections)

Dev profile

Includes all default and sync ports, plus:

PortServicePurpose
8081adminerDatabase admin UI

The heartbeat producer has no listening port.

Host services (not Docker)

PortServiceHow to start
3000test-uimake dev-ui

Quick reference

PortServiceProfileAuth required
3000test-uihostno
4437DS serverdefaultno
8080Envoy proxydefaultyes (JWT)
8081Adminerdevno
9901Envoy admindefaultno
54321Postgressyncpassword

Protocol governance

This implementation strictly adheres to the durable streams protocol specification. This page summarizes the governance policies defined in full in docs/protocol-governance.md.

Default stance

  • The spec and conformance tests are the source of truth.
  • When ambiguous, take the least-committal interpretation that preserves forward compatibility.
  • Never invent semantics because they "feel reasonable". If it is not in spec or tests, it is a gap.

Decision hierarchy

  1. Conformance tests -- behavioral truth. If a test asserts it, we implement it.
  2. Pinned PROTOCOL.md -- textual truth at a specific spec commit SHA.
  3. Maintainer clarification -- linked issue or PR in the spec repo.
  4. Conservative fallback -- recorded as a spec gap. No unrecorded guesses.

Spec pinning

The protocol spec is pinned by git commit SHA, not by branch. The conformance test suite is pinned by npm package version. See SPEC_VERSION.md for current pins:

  • Spec SHA: a347312a47ae510a4a2e3ee7a121d6c8d7d74e50
  • Conformance: @durable-streams/server-conformance-tests@0.2.2

Traceability

Every externally observable behavior (paths, headers, status codes, framing) must link to:

  • A spec section URL with anchor, or
  • A conformance test that asserts it, or
  • An explicit gap in docs/gaps.md

Conservative fallback rules

  • Paths: Follow conformance defaults exactly.
  • Headers: Emit only headers required by spec/tests.
  • Status codes: Use only codes defined by spec/tests.
  • Ordering: Preserve monotonicity, avoid replay/gap surprises.
  • Security: Auth stays out of the server. Auth behavior lives in proxy config.

Required artifacts

These files are maintained and checked in CI:

FilePurpose
SPEC_VERSION.mdPinned spec SHA and conformance version
docs/compatibility.mdVersion compatibility matrix
docs/decisions.mdProtocol decision log
docs/gaps.mdSpec ambiguity log
docs/ecosystem-interop.mdEcosystem integration observations

Decision log

Implementation decisions are recorded in docs/decisions.md in three sections: Protocol, Architecture, and Operational.

Current protocol decisions

DecisionRationale
SSE event type data (not message)Matches PROTOCOL.md exactly; message is the browser default, protocol uses explicit data type
SSE control event uses typed struct with camelCasePrevents field name typos, ensures consistent serialization
Binary detection: everything not text/* or application/jsonMatches PROTOCOL.md binary encoding rule
One event: data per stored messagePreserves message boundaries per conformance tests
SSE idle close default 60s, configurableSpec says SHOULD ~60s; configurable allows tuning
Idempotent close returns 204Same status for initial and repeated close
Stream-Closed: true on all success responses for closed streamsConsistent header presence lets clients detect closure without HEAD

Current architecture decisions

DecisionRationale
acid backend uses sharded redb with fixed hash routingPreserves per-stream ordering while enabling concurrent writes across shards; manifest prevents silent routing drift
No in-place migration from file-* to acid (current limitation)Keeps first ACID iteration safer; migration tooling can be added separately

Current operational decisions

DecisionRationale
Health check at /healthz outside /v1/stream/Health checks are infrastructure, not protocol API
Sessions + bidirectional DB sync replaces Electric-only testSessions pattern is the primary production use case
CORS: allow all origins by default, configurable via DS_HTTP__CORS_ORIGINSNot part of protocol semantics; production restricts via env var/proxy

Decision format

Each decision in the full log includes:

  1. Decision: clear statement of what was decided
  2. Category: protocol, architecture, or operational
  3. Reference: spec/test (protocol) or subsystem/scope (architecture/operational)
  4. Rationale: why this choice was made
  5. Date: when the decision was made

When ambiguous on protocol behavior, decisions reference the corresponding gap in docs/gaps.md.

Benchmarks

This page summarizes the latest benchmark findings from a full 3x matrix run (7 variants x 3 runs = 21 total), with fresh Rust PGO profiles regenerated for this code state.

Core results (mean of run means)

VariantRTT latency (ms)Small throughput (msg/s)Large throughput (msg/s)
rust-memory0.407377,143.91,837.2
rust-file (file-durable)5.3796,441.5530.7
rust-acid (acid)5.9086,495.9454.7
node-memory0.573334,943.91,653.9
node-file11.3773,240.0360.1
caddy-memory0.342313,490.91,144.4
caddy-acid16.0672,305.0271.6

Rust acid vs Caddy acid

MetricRust acidCaddy acidRelative
RTT latency (ms, lower is better)5.90816.067Rust acid 2.72x lower
Small throughput (msg/s, higher is better)6,495.92,305.0Rust acid 2.82x higher
Large throughput (msg/s, higher is better)454.7271.6Rust acid 1.67x higher

Durability note:

  • Caddy plugin file-backed mode (named caddy-acid in this page) documents a crash-atomicity limitation for append-data + producer-metadata updates.
  • Rust acid uses transactional commits for stream state and message data.
  • So this Rust acid result is not achieved by relaxing durability semantics in the same way.

Rust storage modes

Metricmemoryfile-durableacid
RTT latency (ms)0.4075.3795.908
Small throughput (msg/s)377,143.96,441.56,495.9
Large throughput (msg/s)1,837.2530.7454.7

Relative to Rust file-durable:

  • acid RTT latency: +9.8%
  • acid small throughput: +0.8%
  • acid large throughput: -14.3%

Benchmark quality notes

  • These numbers are from local synthetic tests.
  • Load generator and server processes share one machine in this run.
  • For production-grade comparability, run clients and servers on separate hosts and keep sustained windows longer than quick local iterations.

For full methodology and caveats, see /docs/benchmark-report.md in the repo root.

Spec gaps

Ambiguities and edge cases not fully covered by the protocol spec or conformance tests are recorded in docs/gaps.md. This page summarizes the current gaps and known ecosystem interop observations.

Active gaps

AmbiguityOur interpretation
SSE idle close timing (~60s)Default 60s, configurable via DS_SERVER__SSE_RECONNECT_INTERVAL_SECS (0 disables). Spec says SHOULD close roughly every ~60s.

Gaps are not failures. They are explicit acknowledgments that the implementation operates beyond the spec's current coverage.

Non-protocol architecture and operational decisions are tracked in docs/decisions.md.

Ecosystem interop observations

These are recorded in docs/ecosystem-interop.md. They are not protocol gaps -- the protocol is fine. They are rough edges in ecosystem components that developers encounter during integration.

IDSummaryComponent
CI-001SSE transport masks stream Content-Type; stream() needs json: true hint@durable-streams/client
CI-002Session events are opaque JSON; consumers must parse semanticsDS server + sync service
CI-003SSE data events for JSON streams are array-wrapped: [{...}] not {...}DS server SSE output
CI-004Electric requires wal_level=logical in PostgresElectric SQL + Postgres

Proposing upstream clarifications

When a gap needs upstream resolution:

  1. Draft a proposed conformance test based on the clarifying test column in docs/gaps.md
  2. Open an issue in github.com/durable-streams/durable-streams
  3. Link the issue in docs/gaps.md
  4. If accepted upstream, update SPEC_VERSION.md and move to resolved gaps

Contributing

Development setup

git clone https://github.com/thesampaton/durable-streams-rust-server.git
cd durable-streams-rust-server
cargo build
cargo test

BDD workflow

This project follows an implicit BDD pattern. The conformance specifications are the primary design artifact. The implementation exists to pass them.

The loop

  1. Start from a failing conformance test that expresses a spec requirement.
  2. Write the minimum implementation to pass it.
  3. Run cargo clippy -- -D warnings -- fix all warnings before moving on.
  4. Run cargo test to verify both unit and conformance tests pass.
  5. Run cargo fmt before considering the task done.
  6. If a spec requirement is ambiguous, clarify the spec document first, then write the test.

Two test layers

Unit tests (#[cfg(test)] mod tests in each source file): test internal logic -- parsing, state machines, stream mechanics. Fast, no I/O.

Conformance tests (tests/ directory): boot the server on a random port, make HTTP requests, assert against the spec. These are acceptance tests. Each test maps to a specific spec requirement.

The conformance tests are deliberately black-box: they interact only through the public HTTP interface.

Code style

  • Clippy pedantic lints are enabled project-wide. Treat all warnings as errors.
  • Use thiserror for error types.
  • Prefer Result over panicking. Reserve unwrap()/expect() for provable invariants.
  • snake_case for functions/variables, CamelCase for types, SCREAMING_SNAKE_CASE for constants.
  • Keep functions under ~40 lines. Extract helpers when they grow longer.
  • Prefer strong typing over stringly-typed APIs. Newtypes and enums for invalid-state prevention.

Commands

cargo build                    # compile
cargo test                     # unit + integration tests
cargo test --lib               # unit tests only
cargo test --test '*'          # integration tests only
cargo test test_name           # single test
cargo clippy -- -D warnings    # lint
cargo fmt                      # format
cargo fmt -- --check           # format check
make conformance               # external conformance suite
make integration-test          # Docker stack e2e tests
make integration-test-sessions # sessions + DB sync tests
make pgo-train                 # generate + merge PGO profile data
make release-pgo               # guarded profile-use release build
make pgo-benchmark             # benchmark profile-use build
make docs                      # build mdbook documentation

For release/performance work, prefer make release-pgo after make pgo-train so artifacts are built from current benchmark-driven profiles.

Documentation

  • Protocol behavior is documented in specs/ as structured markdown.
  • Implementation decisions go in docs/decisions.md.
  • Spec ambiguities go in docs/gaps.md.
  • Ecosystem interop observations go in docs/ecosystem-interop.md.
  • The mdbook documentation site lives in docs/book/. Build with make docs.

Git conventions

  • Write concise commit messages explaining why, not what.
  • Separate logical changes into distinct commits.
  • Spec changes, test additions, and implementation should be distinct commits where practical.

Protocol governance

All protocol-level decisions must comply with the governance policies in docs/protocol-governance.md. Key rule: never invent semantics. If it is not in spec or tests, record it as a gap.