Introduction
This is the reference implementation of the durable streams protocol, built in Rust with Electric SQL for Postgres sync.
What are durable streams?
A durable stream is an append-only log exposed over HTTP. Clients create streams, append messages, and read them back using standard HTTP methods (PUT, POST, GET, DELETE). The protocol adds three capabilities on top of plain HTTP:
- Offset resumption. Every read returns an offset. Clients save it and resume later without replaying history. Close a tab, reopen it, pick up exactly where you left off.
- Live delivery. Clients can long-poll or open an SSE connection to receive new messages the instant they arrive.
- Producer idempotency. Producers tag each append with an epoch and sequence number. The server deduplicates retries automatically, giving exactly-once semantics over an at-least-once transport.
What this project demonstrates
This repository contains two things:
-
A protocol server written in Rust (axum + tokio). It passes the full conformance test suite and supports multiple storage backends (in-memory, file-based, and crash-resilient acid/redb). It has no authentication, no database dependency, and no opinions about deployment.
-
A full deployment stack that shows how to compose the server with real infrastructure:
- Envoy as a JWT auth proxy in front of the server
- Postgres as durable storage
- Electric SQL for WAL-based change replication
- A sync service that bridges streams and Postgres bidirectionally
Together they implement the durable sessions pattern: real-time collaborative applications (like AI chat) where messages are delivered instantly via SSE and durably stored in Postgres for querying, analytics, and recovery.
Who should read this
- Protocol integrators who want to write a client against the durable streams API. Start with the Quickstart and the Protocol section.
- Operators who want to run the full stack with auth, Postgres, and sync. Start with the Architecture and Deployment sections.
- Contributors who want to extend the server or contribute to the protocol. Start with Contributing and the Reference section.
Conformance
This implementation targets full conformance with @durable-streams/server-conformance-tests@0.2.2 against spec commit a347312. All 239 conformance tests pass.
TL;DR
Just the server
cargo run
Server at http://localhost:4437. Streams at /v1/stream/.
# Create
curl -X PUT -H "Content-Type: text/plain" http://localhost:4437/v1/stream/demo
# Write
curl -X POST -H "Content-Type: text/plain" -d "hello" http://localhost:4437/v1/stream/demo
# Read
curl http://localhost:4437/v1/stream/demo?offset=-1
# SSE
curl -N http://localhost:4437/v1/stream/demo?offset=-1\&live=sse
# Delete
curl -X DELETE http://localhost:4437/v1/stream/demo
Full stack (auth + Postgres + sync)
docker-compose --profile sync up -d --build
| Port | What |
|---|---|
| 4437 | DS server (no auth) |
| 8080 | Envoy proxy (JWT auth) |
| 54321 | Postgres |
cd e2e && npm install
TOKEN=$(node generate-token.mjs)
curl -X PUT -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
http://localhost:8080/v1/stream/my-stream
curl -X POST -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
-d '[{"msg": "hello"}]' http://localhost:8080/v1/stream/my-stream
curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/v1/stream/my-stream?offset=-1
Dev mode (everything + UI)
make dev # server, Envoy, Postgres, Electric, sync, Adminer, heartbeat producer
make dev-ui # stream browser on :3000 (separate terminal)
make dev-down # stop
Adminer at http://localhost:8081 (server: postgres, user: postgres, pw: password).
Key env vars
| Variable | Default | What it does |
|---|---|---|
DS_SERVER__PORT | 4437 | Listen port |
DS_SERVER__LONG_POLL_TIMEOUT_SECS | 30 | Long-poll wait |
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS | 60 | SSE reconnect interval (0 = off) |
DS_HTTP__CORS_ORIGINS | * | Allowed origins |
DS_LIMITS__MAX_MEMORY_BYTES | 104857600 | Total memory cap |
DS_LIMITS__MAX_STREAM_BYTES | 10485760 | Per-stream cap |
Tests
cargo test # unit + integration
make conformance # protocol conformance suite
make integration-test # Docker e2e (auth)
make integration-test-sessions # Docker e2e (auth + Postgres sync)
Production perf build (PGO)
make pgo-train # generate + merge profile data from benchmark traffic
make release-pgo # guarded profile-use release build
make pgo-benchmark # optional compare run for profile-use build
Build the docs
make docs # build
make docs-serve # serve with live reload
Quickstart
This page gets you from zero to a working stream in under 5 minutes. You will start the server, create a stream, append data, read it back, and subscribe for live updates.
Prerequisites
- Rust toolchain (stable)
- curl
Start the server
cargo run
The server listens on http://localhost:4437 with streams at /v1/stream/.
Create a stream
curl -i -X PUT -H "Content-Type: text/plain" \
http://localhost:4437/v1/stream/my-stream
HTTP/1.1 201 Created
Location: http://localhost:4437/v1/stream/my-stream
Content-Type: text/plain
Stream-Next-Offset: 0000000000000000_0000000000000000
The stream is created with text/plain content type. The Stream-Next-Offset header shows the initial offset.
Append data
curl -i -X POST -H "Content-Type: text/plain" \
-d "hello world" \
http://localhost:4437/v1/stream/my-stream
HTTP/1.1 204 No Content
Stream-Next-Offset: 0000000000000001_000000000000000b
The offset advanced. Append a second message:
curl -i -X POST -H "Content-Type: text/plain" \
-d "second message" \
http://localhost:4437/v1/stream/my-stream
Read data
Read all messages from the beginning:
curl -i http://localhost:4437/v1/stream/my-stream?offset=-1
HTTP/1.1 200 OK
Content-Type: text/plain
Stream-Next-Offset: 0000000000000002_0000000000000019
Stream-Up-To-Date: true
ETag: "-1:0000000000000002_0000000000000019"
hello worldsecond message
The body contains the concatenated messages. Save Stream-Next-Offset to resume later without replaying:
curl -i http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_0000000000000019
This returns an empty body with Stream-Up-To-Date: true because there is no new data.
Subscribe with SSE
Open a live SSE connection to receive new messages as they arrive:
curl -N http://localhost:4437/v1/stream/my-stream?offset=-1\&live=sse
You will see the existing messages followed by a control event:
event: data
data:hello world
event: data
data:second message
event: control
data:{"streamNextOffset":"0000000000000002_0000000000000019","streamCursor":"...","upToDate":true}
The connection stays open. In another terminal, append more data:
curl -X POST -H "Content-Type: text/plain" \
-d "live update" \
http://localhost:4437/v1/stream/my-stream
The SSE connection immediately delivers:
event: data
data:live update
event: control
data:{"streamNextOffset":"0000000000000003_0000000000000024","streamCursor":"...","upToDate":true}
Delete the stream
curl -i -X DELETE http://localhost:4437/v1/stream/my-stream
HTTP/1.1 204 No Content
Persistent storage
The quickstart uses in-memory storage (the default). For persistence, set DS_STORAGE__MODE:
# Crash-resilient storage using redb
DS_STORAGE__MODE=acid DS_STORAGE__DATA_DIR=./data cargo run
Available modes: memory, file-fast, file-durable, acid (alias: redb). See Configuration for details.
Next steps
- Architecture overview to understand the full stack
- Docker Compose to run with auth and Postgres
- Protocol reference for the complete API
- Configuration for all environment variables
Architecture overview
The full deployment has five components. Each solves one problem and composes cleanly with the others.
┌──────────────────────────────────────────────┐
│ Docker Compose Stack │
│ │
Client ──────────►│ Envoy (:8080) ──► DS Server (:4437) │
(browser/curl) │ JWT validation append-only log │
│ SSE delivery │
│ │ │
│ Sync Service │
│ ▲ │ │
│ │ ▼ │
│ Electric SQL Postgres │
│ (WAL reader) (durable store) │
└──────────────────────────────────────────────┘
Why each piece exists
| Component | Problem it solves |
|---|---|
| DS server | Real-time append-only log with SSE, offset resumption, and producer idempotency. Configurable storage (memory, file, acid/redb). |
| Envoy proxy | JWT authentication. The DS server has no auth, so Envoy validates tokens and forwards the sub claim as X-JWT-Sub. |
| Postgres | Durable storage and SQL querying. Even with persistent storage modes, Postgres provides structured access, analytics, and cross-service visibility. |
| Electric SQL | Change data capture. Reads the Postgres WAL and exposes a Shape API that delivers table changes as a stream. |
| Sync service | Bidirectional bridge. Forwards Postgres changes into DS streams (PG-to-Stream) and DS stream events into Postgres (Stream-to-PG). |
Data flows
PG-to-Stream (structured data to real-time)
INSERT INTO items → Postgres WAL → Electric Shape API → Sync Service → DS Server → SSE to clients
An application writes structured data to Postgres. Electric picks up the WAL change, the sync service receives it via the Shape API, and POSTs it as JSON to a DS stream. Connected clients receive it instantly via SSE.
Stream-to-PG (real-time events to durable storage)
Client POST → DS Server → SSE → Sync Service → INSERT INTO session_events
A client appends a session event (chat message, presence update) to a DS stream. The sync service consumes the stream via SSE and inserts each event into Postgres.
What you can swap
The architecture is composable. Each piece can be replaced independently:
- Auth proxy: Envoy is one option. Any reverse proxy that validates JWTs and forwards claims works (nginx, Caddy, cloud load balancers).
- Storage: The DS server supports in-memory, file-based, and acid (redb) storage backends, configurable via
DS_STORAGE__MODE. - Sync layer: Electric SQL is the reference CDC tool. Any WAL reader (Debezium, custom logical replication) could feed the sync service.
- Database: Postgres is used here because Electric requires it. The sync service pattern works with any database that supports change notifications.
Minimal vs. full stack
You do not need the full stack to use the DS server. The server runs standalone:
cargo run
This gives you a working durable streams server with no dependencies. The full stack adds auth, persistence, and sync for production-like deployments.
See Docker Compose for running the full stack and Quickstart for the minimal server.
DS server
The durable streams server is the core component. It implements the durable streams protocol as an HTTP service: create streams, append messages, read them back with offset resumption, and subscribe for live delivery via long-poll or SSE.
Design principles
- Protocol-only. The server implements the protocol and nothing else. No authentication, no database, no application logic. This makes it composable with external infrastructure.
- Thin handlers. HTTP handlers parse requests, validate inputs, call the storage layer, and format responses. No business logic in handlers.
- Pluggable storage. Streams can run in
memory,file-fast/file-durable, oracidmode.aciduses sharded redb files for crash-safe ACID commits. - Content-agnostic. The server treats all payloads as opaque bytes (or opaque JSON objects for JSON-mode streams). It does not parse or interpret message content.
What it does
| Capability | How |
|---|---|
| Create / delete streams | PUT / DELETE on /v1/stream/{name} |
| Append messages | POST with matching Content-Type |
| Read (catch-up) | GET with offset, returns concatenated data |
| Read (long-poll) | GET with live=long-poll, waits for new data |
| Read (SSE) | GET with live=sse, streaming delivery |
| Producer idempotency | Producer-ID / Epoch / Seq headers for deduplication |
| JSON mode | Array flattening on append, array wrapping on read |
| TTL / expiry | Per-stream TTL with lazy expiration |
| Stream closure | Immutable close with conflict detection |
| ETag caching | Range-based ETags, 304 Not Modified |
Configuration
The server is configured through layered TOML files and environment variables. See Configuration for the full list.
Key defaults:
| Variable | Default | Description |
|---|---|---|
DS_SERVER__PORT | 4437 | Listen port |
DS_SERVER__LONG_POLL_TIMEOUT_SECS | 30 | Long-poll timeout |
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS | 60 | SSE reconnect interval (0 to disable) |
DS_HTTP__CORS_ORIGINS | * | Allowed CORS origins |
DS_STORAGE__MODE | memory | Storage backend selection |
DS_STORAGE__DATA_DIR | ./data/streams | Persistent backend root directory |
Storage backends
memory: fastest and simplest, no restart durability.file-fast/file-durable: one file per stream append log; durable mode fsyncs each append.acid: sharded redb backend in${DATA_DIR}/acid.- Routing is deterministic:
shard = seahash(stream_name) & (shard_count - 1). - Each shard has an independent redb write lock, so different streams can write concurrently on different shards.
- Every mutating operation is one redb transaction with immediate durability, providing crash-safe all-or-nothing commits.
- Routing is deterministic:
Security headers
The server applies security headers via middleware on all responses:
Cache-Control: no-store(prevents intermediate caches from serving stale data)X-Content-Type-Options: nosniffCross-Origin-Resource-Policy: cross-origin
SSE responses use Cache-Control: no-cache instead of no-store.
Memory limits
Two configurable limits prevent unbounded memory growth:
DS_LIMITS__MAX_MEMORY_BYTES(default 100 MB): total memory across all streamsDS_LIMITS__MAX_STREAM_BYTES(default 10 MB): maximum size per stream
When limits are exceeded, appends return 413 Payload Too Large.
Health check
GET /healthz returns 200 with "ok". This endpoint lives outside the /v1/stream/ namespace and requires no authentication when behind a proxy.
Auth proxy
The DS server has no authentication. This is intentional: it keeps the protocol implementation simple and lets adopters use whatever auth system they already have.
The pattern is to put a reverse proxy in front of the server that validates credentials and forwards identity information.
The pattern
Any reverse proxy that validates JWTs (or other credentials) and forwards the result works. The proxy needs to do three things:
- Validate tokens. Check the JWT signature against a JWKS, verify
iss,aud, andexpclaims. - Forward identity. Extract a claim (typically
sub) and pass it to the DS server as a header (e.g.,X-JWT-Sub). - Bypass health checks. Let
/healthzthrough without auth so load balancers and orchestrators can probe the server.
Client Auth Proxy DS Server
│ │ │
│── GET /healthz ─────────►│──── pass through ───────►│
│ │ │
│── PUT /v1/stream/foo ───►│ │
│ Authorization: Bearer │── validate JWT ──┐ │
│ eyJ... │ │ │
│ │◄─ valid ─────────┘ │
│ │── proxy + identity ─────►│
│ │ │
│── PUT /v1/stream/bar ───►│ │
│ (no token) │── 401 Unauthorized │
│ │ (proxy rejects) │
Streaming considerations
If the proxy sits in front of SSE or long-poll connections, it needs to support long-lived requests:
| Setting | Recommendation | Why |
|---|---|---|
| Route timeout | disabled or very large | SSE connections run indefinitely |
| Stream idle timeout | longer than server's SSE idle close | Let the server control connection lifecycle |
The DS server closes idle SSE connections after ~60 seconds by default. The proxy's idle timeout should exceed this so the server, not the proxy, decides when to close.
Proxy options
- Envoy with
jwt_authnHTTP filter - nginx with
ngx_http_auth_jwt_module - Caddy with
jwtmiddleware - AWS ALB with OIDC integration
- Traefik with
ForwardAuthmiddleware - Custom middleware in your application server
The DS server only cares that requests arrive on its port. It does not inspect auth headers.
In this repository
The e2e harness uses Envoy as the auth proxy, configured in e2e/envoy.yaml.
Envoy configuration
| Setting | Value | Notes |
|---|---|---|
| JWT validation | local_jwks from e2e/fixtures/jwks.json | Offline validation, no external JWKS endpoint |
| Issuer | durable-streams-test | Test-only issuer |
| Audience | durable-streams | Test-only audience |
| Route timeout | 0s (disabled) | Supports SSE |
| Stream idle timeout | 120s | Server controls lifecycle at 60s |
claim_to_headers | sub as X-JWT-Sub | Forwards identity |
| Admin port | 9901 | Debugging dashboard |
Test token generation
The e2e/ directory includes an RSA keypair and scripts for minting test JWTs:
cd e2e && npm install
TOKEN=$(node generate-token.mjs) # valid, 1h expiry
node generate-token.mjs --expired # expired token
node generate-token.mjs --sub alice # custom subject
node generate-token.mjs --wrong-issuer # wrong iss (rejected)
The RSA keypair in e2e/fixtures/ is committed intentionally. These are test-only keys with zero security value. For production, point your proxy at a real identity provider's JWKS endpoint.
Sync layer
The DS server supports multiple storage backends (memory, file, acid/redb). For SQL querying and cross-service visibility, you need a sync layer that bridges DS streams and a database bidirectionally.
The pattern
A sync service sits between the DS server and a database, copying data in both directions:
Database-to-Stream: A change data capture (CDC) tool watches the database for new rows. The sync service receives change events and POSTs them to a DS stream. Connected clients get the changes via SSE.
App writes to DB → CDC tool detects change → Sync service → POST to DS stream → SSE to clients
Stream-to-Database: The sync service subscribes to a DS stream via SSE. Each event is parsed and inserted into the database.
Client POSTs to DS stream → SSE → Sync service → INSERT into database
Why both directions
Neither direction is optional for a production session system:
- Without Stream-to-DB: Session events live only in DS memory. Server restart loses everything. No SQL queries, no analytics.
- Without DB-to-Stream: Structured data changes (from CRUD APIs, admin tools, batch jobs) never reach connected clients in real time.
Technology choices
The sync layer has three slots to fill:
| Slot | Role | Options |
|---|---|---|
| Database | Durable storage with change notifications | Postgres, MySQL, MongoDB, any DB with CDC support |
| CDC tool | Watches DB changes, exposes them as a stream | Electric SQL, Debezium, custom logical replication |
| Sync service | Bridges CDC events and DS streams | Custom process (any language) |
The DS server is agnostic to all three. It only sees HTTP requests.
Database requirements
For the CDC direction (DB-to-Stream), the database needs to support change notifications. For Postgres, this means:
wal_level=logicalfor logical replication- Sufficient
max_wal_sendersandmax_replication_slots
Sync service requirements
A sync service needs to:
- Subscribe to the CDC tool's change feed
- POST changes as JSON arrays to a DS stream (using JSON mode)
- Subscribe to a DS stream via SSE for the reverse direction
- Parse SSE data events and insert into the database
- (Production) persist its SSE offset so it can resume after restart
Latency
Typical latencies in a local stack:
- DB-to-Stream: under 1 second. CDC replication latency is the bottleneck.
- Stream-to-DB: under 100ms. DS broadcast + SSE delivery + DB insert.
In this repository
The e2e harness uses Postgres + Electric SQL + a Node.js sync service.
Components
| Component | Image / location | Role |
|---|---|---|
| Postgres | postgres:17-alpine (port 54321) | Source and sink tables |
| Electric SQL | electricsql/electric:1.4.2 | Reads Postgres WAL, exposes Shape API |
| Sync service | e2e/sync/sync.mjs | Bridges Electric Shape API and DS streams |
Postgres schema
-- Source table for DB-to-Stream (application writes here)
CREATE TABLE items (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
body TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Sink table for Stream-to-DB (sync service writes here)
CREATE TABLE session_events (
id SERIAL PRIMARY KEY,
stream_name TEXT NOT NULL,
event_key TEXT,
event_type TEXT,
operation TEXT,
payload JSONB NOT NULL,
ds_offset TEXT,
received_at TIMESTAMPTZ DEFAULT NOW()
);
Sync service configuration
| Variable | Default | Description |
|---|---|---|
ELECTRIC_URL | http://electric:3000 | Electric Shape API |
DS_SERVER_URL | http://server:4437 | DS server URL (internal, no auth). Use https://... when direct TLS is enabled on the DS server. |
POSTGRES_URL | postgresql://postgres:password@postgres:5432/durable_streams | Postgres connection |
Running
docker-compose --profile sync up -d --build
docker-compose logs -f sync-service
make integration-test-sessions
Without the sync layer
The DS server works standalone. If you only need real-time delivery without durable storage:
cargo run
Add the sync layer when you need data surviving restarts, SQL queries, or structured data reaching clients in real time.
Docker Compose
The docker-compose.yml at the repository root defines the full stack. Services are organized into profiles so you can run as much or as little as you need.
Profiles
| Profile | Services | Use case |
|---|---|---|
| (default) | DS server, Envoy proxy | Authenticated protocol server |
sync | + Postgres, Electric SQL, sync service | Bidirectional PG sync |
dev | + Adminer, heartbeat producer | Developer observability |
Starting the stack
Default (server + auth proxy)
docker-compose up -d
This starts the DS server on port 4437 (internal) and Envoy on port 8080 (public). The health check is at http://localhost:8080/healthz.
With sync
docker-compose --profile sync up -d --build
Adds Postgres, Electric SQL, and the sync service. Data flows bidirectionally between DS streams and Postgres.
Full dev stack
make dev
Starts everything: server, Envoy, Postgres, Electric, sync service, Adminer (DB admin UI), and a heartbeat producer. See Dev mode for details.
Building the Docker image
docker-compose build
# or
make docker
The server Dockerfile uses a multi-stage build: compile with rust:latest, run on debian:bookworm-slim.
Stopping
docker-compose down # default profile
docker-compose --profile sync down # sync profile
docker-compose --profile sync --profile dev down # everything
make dev-down # same as above
Port map
See Port map for all ports and their purposes.
Quick test
# Start the stack
docker-compose up -d
# Health check (no auth)
curl http://localhost:8080/healthz
# Generate a test JWT
cd e2e && npm install
TOKEN=$(node generate-token.mjs)
# Create a stream (authenticated)
curl -X PUT -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: text/plain" \
http://localhost:8080/v1/stream/test-1
# Append data
curl -X POST -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: text/plain" \
-d "hello" http://localhost:8080/v1/stream/test-1
# Read back
curl -H "Authorization: Bearer $TOKEN" \
http://localhost:8080/v1/stream/test-1
# Unauthenticated requests are rejected
curl -X PUT http://localhost:8080/v1/stream/test-2 # 401
Integration tests
# Full automated cycle: build, start, test, tear down
make integration-test
# With sync and sessions tests
make integration-test-sessions
Dev mode
Dev mode starts the full stack plus observability tools for watching data flow through the system in real time.
Starting
# Terminal 1: Start the full stack
make dev
# Terminal 2: Start the visual stream browser
make dev-ui
What you get
Heartbeat producer
A container that exercises both sync directions every 5 seconds:
- Odd ticks (PG-to-DS): INSERTs into the
itemsPostgres table. Electric picks up the WAL change, the sync service forwards it to thepg-itemsDS stream. - Even ticks (DS-to-PG): POSTs a session event to the
session-eventsDS stream. The sync service consumes it via SSE and INSERTs into thesession_eventsPostgres table.
Watch the logs:
docker-compose logs -f producer
After ~30 seconds, both items and session_events tables will have heartbeat rows.
Adminer (database UI)
Open http://localhost:8081 to browse Postgres tables.
| Field | Value |
|---|---|
| System | PostgreSQL |
| Server | postgres |
| Username | postgres |
| Password | password |
| Database | durable_streams |
Test UI (stream browser)
The test-ui is the official durable-streams stream browser. It is cloned into .dev/ (gitignored) on first run via make dev-ui.
Connect it to the DS server at http://localhost:4437. The producer's streams (pg-items, session-events) can be navigated to manually.
Requirements: pnpm, Node 22+
Known limitation: The server does not implement the __registry__ stream used for auto-discovery, so streams will not auto-populate in the sidebar.
Port map
| Port | Service | Purpose |
|---|---|---|
| 4437 | server | DS server (direct, no auth) |
| 8080 | envoy | JWT auth proxy |
| 9901 | envoy | Envoy admin dashboard |
| 54321 | postgres | Postgres direct access |
| 8081 | adminer | Database admin UI |
| 3000 | test-ui | Visual stream browser (host) |
Stopping
make dev-down # stop all containers
rm -rf .dev/ # remove cloned test-ui monorepo (optional)
Production
The Docker stack in this repository is a local development and testing tool. This page covers what to change for production.
Authentication
Replace the test JWKS with a real identity provider:
- Test setup: Envoy validates JWTs against a local file (
e2e/fixtures/jwks.json) with a committed RSA keypair. - Production: Point Envoy's
remote_jwksat your identity provider's JWKS endpoint (Auth0, Cognito, Keycloak, etc.). Update theissuerandaudiencesto match.
The DS server itself needs no changes. It is auth-agnostic by design.
Persistent storage
The DS server supports multiple storage modes:
memory: in-RAM only; no restart durability.file-fast/file-durable: file-backed per-stream logs.acid: sharded redb backend with ACID commits and immediate durability.
For production durability, use file-durable or acid, or run the sync layer to mirror data into Postgres.
Acid mode tuning
DS_STORAGE__MODE=acidandDS_STORAGE__DATA_DIR=/path/to/storepersist under${DATA_DIR}/acid/.DS_STORAGE__ACID_SHARD_COUNTcontrols write concurrency. Keep it power-of-2 (1..=256), default16.- Writes are serialized per shard (single writer per shard). Increase shard count for highly concurrent write workloads.
- Acid mode commits with immediate durability (fsync-class semantics on each commit), prioritizing crash safety over raw append latency.
Electric SQL configuration
- Pin the Electric SQL version in your deployment (the stack uses
electricsql/electric:1.4.2). - Configure Postgres replication slots carefully. The defaults (
max_wal_senders=10,max_replication_slots=10) work for development but may need tuning for production workloads. - Ensure
wal_level=logicalis set in your Postgres configuration. This is required for Electric's logical replication.
Sync service resilience
The reference sync service (e2e/sync/sync.mjs) is a starting point, not production-ready:
- Offset persistence: The sync service should save its SSE offset (from the
id:field) to a Postgres table so it can resume after restart without replaying the full stream. - Error handling: Add retries with backoff for Postgres insert failures and SSE reconnection.
- Scaling: The sync service is a single process. For high-throughput streams, partition by stream name or run multiple instances with offset coordination.
Memory limits
Tune the server's memory limits for your workload:
| Variable | Default | Notes |
|---|---|---|
DS_LIMITS__MAX_MEMORY_BYTES | 100 MB | Total across all streams |
DS_LIMITS__MAX_STREAM_BYTES | 10 MB | Per stream |
In production with the sync layer, streams are consumed and can be deleted after sync. When using the default in-memory mode, the store acts as a buffer, not long-term storage. For persistence without the sync layer, use file-durable or acid mode.
Monitoring
- Log and alert on sync service errors, SSE reconnections, and PG insert failures.
- The sync service logs events to stdout; aggregate with your preferred log pipeline.
- Use the Envoy admin dashboard (port 9901 in dev) for proxy metrics and connection debugging.
- The DS server logs via
tracingwith configurable levels viaRUST_LOG.
CORS
The server defaults to DS_HTTP__CORS_ORIGINS=* (allow all). For production, restrict to your application's domain:
DS_HTTP__CORS_ORIGINS=https://app.example.com cargo run
Multiple origins can be comma-separated.
TLS
Default model: terminate TLS at the proxy layer (Envoy, nginx, cloud load balancer) or at a CDN edge.
Optional model: the DS server can terminate TLS directly when both DS_TLS__CERT_PATH and DS_TLS__KEY_PATH are set.
Recommended topology matrix:
- Internet-facing traffic: terminate TLS at proxy/edge (this is still the default).
- Proxy -> DS server hop: use direct TLS on the DS server when you need encrypted in-cluster/intra-box traffic.
- mTLS: terminate and enforce mTLS at the proxy when possible; DS direct TLS currently covers server-side TLS termination.
HTTP/2 and HTTP/3 are typically negotiated at the proxy/edge. Enabling direct TLS on the DS server secures that hop, while proxy capabilities still govern external ALPN behavior.
Streams
A stream is an append-only log identified by name. Clients create, append to, read from, and delete streams using standard HTTP methods.
Create a stream (PUT)
curl -i -X PUT -H "Content-Type: text/plain" \
http://localhost:4437/v1/stream/my-stream
Headers:
Content-Type(optional): the stream's content type, fixed at creation. Defaults toapplication/octet-streamif omittedStream-TTL(optional): time-to-live in secondsStream-Expires-At(optional): absolute expiration (ISO 8601)Stream-Closed(optional):"true"to create in closed state
Responses:
201 CreatedwithLocation,Content-Type,Stream-Next-Offset200 OKif the stream already exists with the same configuration (idempotent)409 Conflictif the stream exists with different configuration400 Bad Requestfor invalid TTL or empty Content-Type header
The Content-Type is immutable after creation. Comparison is case-insensitive and ignores charset parameters.
Append data (POST)
curl -i -X POST -H "Content-Type: text/plain" \
-d "hello world" \
http://localhost:4437/v1/stream/my-stream
Headers:
Content-Type(required): must match the stream's content typeStream-Closed(optional):"true"to close the stream (with or without data)
Responses:
204 No ContentwithStream-Next-Offset404 Not Foundif stream does not exist409 Conflictfor content-type mismatch or closed stream400 Bad Requestfor empty body withoutStream-Closed
Each append generates a unique, monotonically increasing offset. Concurrent appends to the same stream are serialized to maintain offset ordering.
Read data (GET)
curl -i http://localhost:4437/v1/stream/my-stream?offset=-1
Query parameters:
offset: where to start reading-1(default): beginning of streamnow: current tail (returns empty body)- hex offset: resume from a specific position
Response headers:
Stream-Next-Offset: save this for resumptionStream-Up-To-Date:"true"when at the tailStream-Closed:"true"when closed and at tailETag: for conditional requests withIf-None-Match
The body contains all messages concatenated from the requested offset to the current tail.
Stream metadata (HEAD)
curl -I http://localhost:4437/v1/stream/my-stream
Returns the same headers as GET (Content-Type, Stream-Next-Offset, Stream-Closed, TTL info) with no body.
Delete a stream (DELETE)
curl -i -X DELETE http://localhost:4437/v1/stream/my-stream
204 No Contenton success404 Not Foundif stream does not exist
Deleting removes all data and metadata. The stream name can be reused.
Close a stream
Closing a stream prevents further appends. Close by sending a POST with Stream-Closed: true:
# Close without data
curl -X POST -H "Content-Type: text/plain" \
-H "Stream-Closed: true" \
http://localhost:4437/v1/stream/my-stream
# Close with a final message
curl -X POST -H "Content-Type: text/plain" \
-H "Stream-Closed: true" \
-d "goodbye" http://localhost:4437/v1/stream/my-stream
After closure:
- Appends return
409 ConflictwithStream-Closed: true - Reads still work;
Stream-Closed: trueappears when the reader is at the tail - Closing is idempotent (re-closing returns 204)
TTL and expiry
Streams can have a time-to-live set at creation:
# Expire in 1 hour
curl -X PUT -H "Content-Type: text/plain" \
-H "Stream-TTL: 3600" \
http://localhost:4437/v1/stream/temp
# Expire at a specific time
curl -X PUT -H "Content-Type: text/plain" \
-H "Stream-Expires-At: 2026-12-31T23:59:59Z" \
http://localhost:4437/v1/stream/temp2
- Only one of
Stream-TTLandStream-Expires-Atmay be provided - Expired streams return 404 on all operations
- Expiration is checked lazily on access
- HEAD returns remaining
Stream-TTLandStream-Expires-At
Read modes
The DS server supports three read modes: catch-up (default), long-poll, and SSE. All use GET requests with different query parameters.
Catch-up (default)
Returns all data from an offset to the current tail, then closes the connection.
# Read everything
curl http://localhost:4437/v1/stream/my-stream?offset=-1
# Resume from a saved offset
curl http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a
The response includes:
- Body with concatenated message data
Stream-Next-Offsetfor resumptionStream-Up-To-Date: truewhen at the tailETagfor conditional requests
This is the simplest mode. Use it for one-shot reads or polling.
Long-poll
Waits for new data if the client is already at the tail, avoiding the need for rapid polling.
curl http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a\&live=long-poll
Behavior:
- If data exists at the offset, returns it immediately (same as catch-up)
- If at the tail and the stream is open, waits for new data
- If at the tail and the stream is closed, returns immediately with
Stream-Closed: true - Returns
204 No Contentwhen the timeout expires with no new data
The timeout defaults to 30 seconds (configurable via DS_SERVER__LONG_POLL_TIMEOUT_SECS).
The response includes a Stream-Cursor header. Echo it back in the cursor query parameter on subsequent requests to enable CDN request collapsing:
curl http://localhost:4437/v1/stream/my-stream?offset=...&live=long-poll&cursor=...
SSE (Server-Sent Events)
Opens a persistent connection that delivers messages as they arrive.
curl -N http://localhost:4437/v1/stream/my-stream?offset=-1\&live=sse
The server returns Content-Type: text/event-stream and streams events:
Event types
event: data -- one per stored message:
event: data
data:hello world
event: control -- metadata after each batch:
event: control
data:{"streamNextOffset":"...","streamCursor":"...","upToDate":true}
Control event fields
| Field | Type | When included |
|---|---|---|
streamNextOffset | string | always |
streamCursor | string | when stream is open |
upToDate | boolean | when caught up |
streamClosed | boolean | when closed and all data sent |
Connection lifecycle
- Historical data is sent as
event: dataevents - A
controlevent withupToDate: trueindicates catch-up is complete - The connection stays open, waiting for new data
- New appends trigger additional
data+controlevents - If the stream is closed, the final
controlincludesstreamClosed: trueand the connection closes - Idle connections close after ~60 seconds (configurable via
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS)
Binary streams
For content types other than text/* and application/json, SSE data events carry base64-encoded payloads. The response includes stream-sse-data-encoding: base64.
Multi-line data
Multi-line messages are sent as multiple data: lines per SSE spec:
event: data
data:line one
data:line two
Resumption
Save the offset from control events. Reconnect with offset={saved} to resume:
curl -N http://localhost:4437/v1/stream/my-stream?offset=0000000000000002_000000000000000a\&live=sse
The offset=now sentinel skips historical data and receives only new messages.
Producers
Producer headers enable exactly-once append semantics over HTTP. A producer identifies itself with a stable ID, uses a monotonic epoch for session management, and a monotonic sequence number for per-request deduplication. The server validates state and deduplicates retries automatically.
Producer headers
All three must be provided together (or none):
| Header | Format | Description |
|---|---|---|
Producer-ID | non-empty string | Stable identifier for the producer |
Producer-Epoch | integer (0 to 2^53-1) | Incremented on producer restart |
Producer-Seq | integer (0 to 2^53-1) | Incremented per request within an epoch |
Basic flow
# First message: epoch 0, seq 0
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 0" \
-d "message 1" \
http://localhost:4437/v1/stream/my-stream
# → 200 OK (new data accepted)
# Second message: epoch 0, seq 1
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 1" \
-d "message 2" \
http://localhost:4437/v1/stream/my-stream
# → 200 OK
Response codes
| Code | Meaning | When |
|---|---|---|
200 OK | New data accepted | seq = lastSeq + 1 (or first append) |
204 No Content | Duplicate (already persisted) | seq <= lastSeq |
400 Bad Request | Invalid headers | Partial set, empty ID, non-integer, or epoch bump with seq != 0 |
403 Forbidden | Epoch fenced (zombie producer) | epoch < server's current epoch |
409 Conflict | Sequence gap | seq > lastSeq + 1 |
Duplicate detection
Retrying the same (ID + epoch + seq) is safe and returns 204:
# Retry of seq 0 (already succeeded above)
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 0" \
-d "message 1" \
http://localhost:4437/v1/stream/my-stream
# → 204 No Content (duplicate, no data stored)
Epoch fencing (zombie detection)
When a producer restarts, it bumps its epoch and starts seq at 0:
# Producer restarts with epoch 1
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 1" \
-H "Producer-Seq: 0" \
-d "restarted" \
http://localhost:4437/v1/stream/my-stream
# → 200 OK
After this, the old epoch is fenced. A request from epoch 0 returns 403 Forbidden:
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 2" \
-d "zombie" \
http://localhost:4437/v1/stream/my-stream
# → 403 Forbidden (epoch fenced)
# Producer-Epoch: 1 (server's current epoch)
Sequence gaps
If a sequence number is skipped, the server returns 409 Conflict with diagnostic headers:
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 5" \
-d "skipped" \
http://localhost:4437/v1/stream/my-stream
# → 409 Conflict
# Producer-Expected-Seq: 1
# Producer-Received-Seq: 5
Multiple producers
Multiple producers can write to the same stream. Each has independent epoch and sequence tracking:
# Producer A writes
curl -X POST -H "Producer-ID: user-alice" \
-H "Producer-Epoch: 0" -H "Producer-Seq: 0" ...
# Producer B writes
curl -X POST -H "Producer-ID: user-bob" \
-H "Producer-Epoch: 0" -H "Producer-Seq: 0" ...
Messages are ordered by arrival time, not by producer.
Closing with producer headers
A producer can atomically append and close:
curl -X POST -H "Content-Type: text/plain" \
-H "Producer-ID: my-producer" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 2" \
-H "Stream-Closed: true" \
-d "final message" \
http://localhost:4437/v1/stream/my-stream
# → 200 OK, Stream-Closed: true
Retrying the same closing request returns 204 No Content with Stream-Closed: true.
JSON mode
Streams created with Content-Type: application/json operate in JSON mode. JSON mode provides array flattening on append and array wrapping on read.
Creating a JSON stream
curl -X PUT -H "Content-Type: application/json" \
http://localhost:4437/v1/stream/events
Appending
Single value
A single JSON object is stored as one message:
curl -X POST -H "Content-Type: application/json" \
-d '{"event": "click", "x": 100}' \
http://localhost:4437/v1/stream/events
Array flattening
A JSON array is flattened: each element becomes a separate message:
curl -X POST -H "Content-Type: application/json" \
-d '[{"event": "click"}, {"event": "scroll"}]' \
http://localhost:4437/v1/stream/events
This stores two messages. Only the top-level array is flattened; nested arrays within objects are preserved.
Rejected inputs
- Empty arrays (
[]) return400 Bad Request - Invalid JSON returns
400 Bad Request
Reading
All messages are wrapped in a JSON array regardless of how they were appended:
curl http://localhost:4437/v1/stream/events?offset=-1
[
{"event": "click", "x": 100},
{"event": "click"},
{"event": "scroll"}
]
An empty stream returns [].
SSE
SSE data events for JSON streams wrap each message in an array:
event: data
data:[{"event":"click","x":100}]
Consumers must unwrap the outer array:
const parsed = JSON.parse(data);
const items = Array.isArray(parsed) ? parsed : [parsed];
See ecosystem interop CI-003 for details on this pattern.
Non-JSON streams
JSON mode only activates for Content-Type: application/json. Other content types (e.g., text/plain) store and return raw bytes with no flattening or wrapping.
Caching
The protocol includes ETag-based caching to avoid redundant data transfer and Cache-Control: no-store to prevent stale data in intermediate caches.
ETags
Every 200 OK GET response includes an ETag header encoding the read range:
ETag: "{start_offset}:{end_offset}"
The start offset is the query parameter value (including sentinels like -1 or now). The end offset is the Stream-Next-Offset from the same response.
Closed stream suffix
When the stream is closed and the reader is at the tail, the ETag has a :c suffix:
ETag: "-1:0000000000000003_000000000000001a:c"
Examples
GET ?offset=-1 → ETag: "-1:0000000000000003_000000000000001a"
GET ?offset=now → ETag: "now:0000000000000003_000000000000001a"
GET ?offset=0000...0001 → ETag: "0000...0001:0000...0003"
Conditional requests (304)
Clients can send If-None-Match with a previously received ETag:
curl -H 'If-None-Match: "-1:0000000000000003_000000000000001a"' \
http://localhost:4437/v1/stream/my-stream?offset=-1
If the ETag matches (no new data since last read), the server returns:
HTTP/1.1 304 Not Modified
Stream-Next-Offset: 0000000000000003_000000000000001a
Stream-Up-To-Date: true
Cache-Control: no-store
No body is sent, saving bandwidth.
If the ETag does not match (new data available), the server returns a normal 200 OK with updated data and a new ETag.
Cache-Control
The server includes Cache-Control: no-store on all responses (200, 201, 204, 304, 400, 404, 409, 413). This prevents intermediate HTTP caches (CDNs, proxies) from serving stale stream data.
SSE responses use Cache-Control: no-cache instead, which allows caching but requires revalidation.
Writing a client
You can interact with the DS server using any HTTP client. This page shows patterns for the @durable-streams/client SDK, plain fetch, and SSE consumption.
Using the client SDK
The @durable-streams/client SDK provides typed methods for stream operations:
import { DurableStream, stream } from "@durable-streams/client";
const baseUrl = "http://localhost:4437";
// Create a stream
await DurableStream.create({
url: `${baseUrl}/v1/stream/my-stream`,
contentType: "application/json",
});
// Append data
await DurableStream.append({
url: `${baseUrl}/v1/stream/my-stream`,
body: JSON.stringify([{ event: "click" }]),
contentType: "application/json",
});
// Read data
const res = await stream({
url: `${baseUrl}/v1/stream/my-stream`,
live: false,
json: true,
});
const data = await res.json();
const offset = res.offset; // save for resumption
SSE with the client SDK
When using SSE with JSON streams, pass json: true explicitly. The SSE transport uses Content-Type: text/event-stream, which masks the stream's actual content type:
const res = await stream({
url: `${baseUrl}/v1/stream/my-stream`,
live: "sse",
json: true, // required: SSE transport masks the stream's content type
});
for await (const item of res.subscribeJson()) {
console.log("Received:", item);
}
See ecosystem interop CI-001 for why this hint is necessary.
Using plain fetch
Create
await fetch(`${baseUrl}/v1/stream/my-stream`, {
method: "PUT",
headers: { "Content-Type": "text/plain" },
});
Append
await fetch(`${baseUrl}/v1/stream/my-stream`, {
method: "POST",
headers: { "Content-Type": "text/plain" },
body: "hello world",
});
Read with offset resumption
// First read
const res = await fetch(`${baseUrl}/v1/stream/my-stream?offset=-1`);
const body = await res.text();
const nextOffset = res.headers.get("Stream-Next-Offset");
// Resume later (only new messages)
const res2 = await fetch(`${baseUrl}/v1/stream/my-stream?offset=${nextOffset}`);
SSE consumption pattern
Node.js (manual parsing)
const res = await fetch(`${baseUrl}/v1/stream/my-stream?live=sse&offset=-1`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop(); // keep incomplete line
let currentEventType = null;
for (const line of lines) {
if (line.startsWith("event:")) {
currentEventType = line.slice(6).trim();
} else if (line.startsWith("data:") && currentEventType === "data") {
const payload = line.slice(5);
console.log("Data:", payload);
} else if (line.startsWith("data:") && currentEventType === "control") {
const control = JSON.parse(line.slice(5));
if (control.streamClosed) {
console.log("Stream closed");
reader.cancel();
}
} else if (line.startsWith("id:")) {
const offset = line.slice(3).trim();
// Save for resumption on reconnect
}
}
}
JSON streams over SSE
For JSON-mode streams, each data: line wraps the message in a JSON array. Unwrap it:
if (currentEventType === "data") {
const parsed = JSON.parse(line.slice(5));
const items = Array.isArray(parsed) ? parsed : [parsed];
for (const item of items) {
console.log("Item:", item);
}
}
With authentication
When the DS server is behind an Envoy auth proxy, include a Bearer token:
const headers = { Authorization: `Bearer ${token}` };
await fetch(`${baseUrl}/v1/stream/my-stream`, {
method: "PUT",
headers: { ...headers, "Content-Type": "text/plain" },
});
Producer idempotency
For exactly-once writes, include producer headers:
let seq = 0;
async function append(data) {
const res = await fetch(`${baseUrl}/v1/stream/my-stream`, {
method: "POST",
headers: {
"Content-Type": "text/plain",
"Producer-ID": "my-client-tab-1",
"Producer-Epoch": "0",
"Producer-Seq": String(seq),
},
body: data,
});
if (res.status === 200) {
seq++; // accepted
} else if (res.status === 204) {
seq++; // duplicate, already persisted
}
// 409: sequence gap, 403: epoch fenced
}
See Producers for the full protocol.
Electric SQL sync
Electric SQL is a CDC (change data capture) tool for Postgres. It reads the write-ahead log and exposes table changes as an HTTP stream called the Shape API. This page explains the general approach and how the e2e harness implements it.
The pattern
How Electric works
Application → INSERT → Postgres WAL → Electric (replication slot) → Shape API → Subscribers
- An application writes to a Postgres table.
- Postgres records the change in its WAL.
- Electric connects to Postgres via a logical replication slot and reads new WAL entries.
- Electric exposes changes through its Shape API: an HTTP endpoint that subscribers poll or stream.
- A sync service subscribes to the Shape API and forwards changes wherever they need to go.
Postgres requirements
Electric requires logical replication:
wal_level = logical
max_wal_senders = 10 # at least 1 per Electric instance
max_replication_slots = 10 # at least 1 per Electric instance
These are Postgres server-level settings, not per-database. Without wal_level=logical, Electric fails during startup. See ecosystem interop CI-004 for details.
Shape API subscription
The Electric client library provides ShapeStream for subscribing to table changes:
import { ShapeStream } from "@electric-sql/client";
const stream = new ShapeStream({
url: `${electricUrl}/v1/shape`,
params: { table: "my_table" },
});
stream.subscribe(async (messages) => {
const inserts = messages
.filter(m => m.headers.operation === "insert")
.map(m => m.value);
// Forward inserts to DS stream, message queue, etc.
});
Each message includes headers (operation: insert/update/delete) and the row value. The subscriber decides what to do with changes.
Bridging to a DS stream
To get Postgres changes into a DS stream, the sync service POSTs each batch as a JSON array:
if (inserts.length > 0) {
await fetch(`${dsServerUrl}/v1/stream/${streamName}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(inserts),
});
}
JSON mode flattens the array into individual messages. Connected clients receive each change via SSE.
The reverse direction
Electric handles DB-to-Stream. For Stream-to-DB, the sync service subscribes to a DS stream via SSE and inserts events into Postgres. See Durable sessions for the full bidirectional pattern.
Latency
DB-to-Stream latency depends primarily on Electric's replication lag. In local development this is typically under 1 second. In production it depends on WAL volume, replication slot configuration, and network latency to the Electric instance.
In this repository
The e2e harness runs Electric as a Docker container connecting to a Postgres instance.
Docker services
postgres:
image: postgres:17-alpine
command: [postgres, -c, wal_level=logical, -c, max_wal_senders=10, -c, max_replication_slots=10]
environment:
POSTGRES_DB: durable_streams
POSTGRES_PASSWORD: password
ports: ["54321:5432"]
electric:
image: electricsql/electric:1.4.2
environment:
DATABASE_URL: postgresql://postgres:password@postgres:5432/durable_streams
depends_on: [postgres]
Sync service
The reference sync service (e2e/sync/sync.mjs) subscribes to the items table via Electric and forwards changes to the pg-items DS stream.
| Variable | Default | Description |
|---|---|---|
ELECTRIC_URL | http://electric:3000 | Electric Shape API |
DS_SERVER_URL | http://server:4437 | DS server |
POSTGRES_URL | postgresql://postgres:password@postgres:5432/durable_streams | Postgres |
Running
docker-compose --profile sync up -d --build
docker-compose logs -f sync-service
make integration-test-sessions
Durable sessions
The durable sessions pattern combines DS streams for real-time delivery with a database for durable querying. It is the primary production use case: collaborative applications like AI chat where messages arrive instantly and are queryable forever.
The pattern
The two requirements
A durable session needs two things that no single component satisfies alone:
- Real-time delivery. Users in a session need messages instantly. Polling a database is too slow. SSE is the transport.
- Durable querying. Session history must survive server restarts and be queryable for analytics, moderation, and search. A database is the storage.
Client ← SSE ─── DS Server ← sync ── Database
Client ─ POST ──► DS Server ─ sync ──► Database
A sync service bridges both directions. Without Stream-to-DB, events are lost on restart. Without DB-to-Stream, structured data changes never reach clients in real time.
Session events
The DS server is content-agnostic. All session structure lives in the JSON payload. A typical event:
{
"key": "user:alice",
"type": "presence",
"operation": "set",
"value": { "status": "online" }
}
Common event types for collaborative chat:
| type | operation | description |
|---|---|---|
presence | set / remove | User join/leave |
chunk | append | Chat message fragment (streaming AI response) |
message | set | Complete chat message |
reaction | set / remove | Emoji reaction |
typing | set / remove | Typing indicator |
The key identifies the entity and operation describes the mutation. This follows the STATE-PROTOCOL pattern: the stream is an ordered log of state mutations that can be replayed to reconstruct current state.
Building a session
1. Create a JSON-mode stream for the session:
curl -X PUT -H "Content-Type: application/json" \
http://localhost:4437/v1/stream/session-events
2. Append events with producer idempotency for exactly-once delivery:
curl -X POST -H "Content-Type: application/json" \
-H "Producer-ID: user-alice-tab-1" \
-H "Producer-Epoch: 0" \
-H "Producer-Seq: 0" \
-d '[{"key":"msg:1","type":"message","operation":"set","value":{"text":"Hello!"}}]' \
http://localhost:4437/v1/stream/session-events
3. Subscribe for live updates via SSE:
curl -N http://localhost:4437/v1/stream/session-events?offset=-1\&live=sse
4. Resume after disconnect. Save the offset from the last SSE control event and reconnect:
curl -N http://localhost:4437/v1/stream/session-events?offset=0000000000000001_0000000000000042\&live=sse
Database schema design
The sync service needs a sink table for events. Design it to extract queryable fields from the JSON payload:
CREATE TABLE session_events (
id SERIAL PRIMARY KEY,
stream_name TEXT NOT NULL,
event_key TEXT, -- extracted from payload.key
event_type TEXT, -- extracted from payload.type
operation TEXT, -- extracted from payload.operation
payload JSONB NOT NULL, -- full event for querying
ds_offset TEXT, -- for deduplication
received_at TIMESTAMPTZ DEFAULT NOW()
);
Once events are in the database, query them with SQL:
-- All messages in a session
SELECT * FROM session_events WHERE event_type = 'message' ORDER BY received_at;
-- Active users
SELECT event_key, payload->'value'->>'status'
FROM session_events
WHERE event_type = 'presence' AND operation = 'set'
ORDER BY received_at DESC;
Multiple producers
Multiple producers can write to the same session stream. Each has independent epoch and sequence tracking:
- User Alice writes from her browser tab (
Producer-ID: alice-tab-1) - An AI assistant writes responses (
Producer-ID: ai-assistant) - A bot writes notifications (
Producer-ID: notification-bot)
Messages are ordered by arrival time, not by producer.
In this repository
The e2e harness implements the full durable sessions pattern with Postgres, Electric SQL, and a Node.js sync service.
Harness architecture
Test Runner (host)
│
├── PG-to-Stream: INSERT into items → Electric → sync-service → pg-items DS stream
│
└── Stream-to-PG: POST to session-events DS stream → sync-service SSE → session_events PG table
Running
# Automated: build, start, test, tear down
make integration-test-sessions
# Manual
docker-compose --profile sync up -d --build
cd e2e && npm install
npx vitest run --reporter=verbose sessions.test.mjs
docker-compose --profile sync down
Test scenarios
The e2e/sessions.test.mjs suite validates 8 scenarios:
Session pattern (5 tests):
- Session lifecycle: create, write presence event, read back
- Multi-producer chat: user + AI messages, verify ordering
- SSE live subscription: subscribe, write, verify real-time delivery
- Session recovery: write, save offset, write more, resume
- Producer idempotency: retry same seq (204), next seq (200)
Database sync (3 tests):
- PG-to-Stream: INSERT into Postgres, poll DS stream until it appears
- Stream-to-PG: POST to DS stream, poll Postgres until row appears
- Round trip: INSERT into PG, verify in DS stream
Configuration
The DS server supports layered TOML configuration plus env-var overrides.
Load order (later wins):
- built-in defaults
config/default.toml(if present)config/<profile>.toml(if present; selected with--profile)config/local.toml(if present; intended for local overrides)--config <path>override file- environment variables
CLI options:
| Flag | Description |
|---|---|
--profile <name> | Loads config/<name>.toml |
--config <path> | Loads an additional TOML file last |
Sample config files in this repo:
config/default.toml(baseline defaults)config/dev.toml(development overrides)config/prod.toml(production example)
Example:
cargo run -- --profile dev
cargo run -- --profile prod --config /etc/durable-streams/override.toml
Environment variables use the DS_ prefix with double-underscore section separators (e.g., DS_SERVER__PORT).
Server
| Variable | Default | Description |
|---|---|---|
DS_SERVER__PORT | 4437 | TCP port to listen on |
DS_HTTP__CORS_ORIGINS | * | Allowed CORS origins. * allows all. Multiple origins can be comma-separated (e.g., https://app.example.com,https://admin.example.com). |
RUST_LOG | info | Log level filter (tracing format: debug, info, warn, error, or per-module like durable_streams=debug). Takes precedence over DS_LOG__RUST_LOG. |
DS_LOG__RUST_LOG | info | Default log level filter, applied through the TOML config layer. Overridden by RUST_LOG when both are set. |
Transport (optional direct TLS)
| Variable | Default | Description |
|---|---|---|
DS_TLS__CERT_PATH | unset | Path to PEM certificate for direct TLS termination. Must be set together with DS_TLS__KEY_PATH. |
DS_TLS__KEY_PATH | unset | Path to PEM/PKCS#8 private key for direct TLS termination. Must be set together with DS_TLS__CERT_PATH. |
Protocol
| Variable | Default | Description |
|---|---|---|
DS_SERVER__LONG_POLL_TIMEOUT_SECS | 30 | How long to hold a long-poll request before returning 204 No Content. Set lower (e.g., 2) for fast-feedback testing. |
DS_SERVER__SSE_RECONNECT_INTERVAL_SECS | 60 | SSE reconnect interval in seconds (matches Caddy's sse_reconnect_interval). Enables CDN request collapsing. Set to 0 to disable. |
Memory limits
| Variable | Default | Description |
|---|---|---|
DS_LIMITS__MAX_MEMORY_BYTES | 104857600 (100 MB) | Maximum total memory across all streams. Appends exceeding this return 413 Payload Too Large. |
DS_LIMITS__MAX_STREAM_BYTES | 10485760 (10 MB) | Maximum bytes per individual stream. |
Storage
| Variable | Default | Description |
|---|---|---|
DS_STORAGE__MODE | memory | Storage backend mode: memory, file-fast, file-durable, acid (alias: redb). |
DS_STORAGE__DATA_DIR | ./data/streams | Root directory for persistent backends (file-*, acid). |
DS_STORAGE__ACID_SHARD_COUNT | 16 | Number of redb shards when DS_STORAGE__MODE=acid; must be power-of-2 in 1..=256 (invalid values return an error). |
Sync service (e2e stack)
These variables configure the sync service container, not the DS server itself:
| Variable | Default | Description |
|---|---|---|
ELECTRIC_URL | http://electric:3000 | Electric SQL Shape API base URL |
DS_SERVER_URL | http://server:4437 | DS server URL (internal Docker network) |
POSTGRES_URL | postgresql://postgres:password@postgres:5432/durable_streams | Postgres connection string |
Example
# Development (fast timeouts for testing)
DS_SERVER__LONG_POLL_TIMEOUT_SECS=2 DS_SERVER__SSE_RECONNECT_INTERVAL_SECS=5 cargo run
# Development profile from TOML
cargo run -- --profile dev
# Production (restricted CORS, custom port)
DS_SERVER__PORT=8080 DS_HTTP__CORS_ORIGINS=https://app.example.com cargo run
# Optional direct TLS (proxy->server encryption or direct serving)
DS_TLS__CERT_PATH=/etc/ds/tls/server.crt DS_TLS__KEY_PATH=/etc/ds/tls/server.key cargo run
# Debug logging
RUST_LOG=debug cargo run
API reference
All endpoints are under the /v1/stream/ base path. The health check is at /healthz.
Health check
GET /healthz
Returns 200 OK with body ok. No authentication required.
Create stream
PUT /v1/stream/{name}
| Header | Required | Description |
|---|---|---|
Content-Type | no | Stream content type (immutable after creation; defaults to application/octet-stream) |
Stream-TTL | no | Time-to-live in seconds |
Stream-Expires-At | no | Absolute expiration (ISO 8601) |
Stream-Closed | no | "true" to create closed |
Body is optional. If provided, it is appended as initial stream data.
| Status | Meaning |
|---|---|
201 Created | Stream created |
200 OK | Idempotent create (same config) |
409 Conflict | Stream exists with different config |
400 Bad Request | Invalid TTL or empty Content-Type header |
Response headers: Location, Content-Type, Stream-Next-Offset
Append data
POST /v1/stream/{name}
| Header | Required | Description |
|---|---|---|
Content-Type | yes | Must match stream's content type |
Stream-Closed | no | "true" to close (with or without data) |
Producer-ID | no* | Producer identifier |
Producer-Epoch | no* | Producer epoch |
Producer-Seq | no* | Producer sequence number |
*Producer headers: all three or none.
| Status | Meaning |
|---|---|
200 OK | New data accepted (producer mode) |
204 No Content | Data appended (no producer) or duplicate (producer) |
400 Bad Request | Empty body without Stream-Closed, invalid producer headers |
403 Forbidden | Epoch fenced (zombie producer) |
404 Not Found | Stream does not exist |
409 Conflict | Content-Type mismatch, closed stream, or sequence gap |
413 Payload Too Large | Memory limit exceeded |
Response headers: Stream-Next-Offset, Stream-Closed (if closed), Producer-Epoch, Producer-Seq
Read data (catch-up)
GET /v1/stream/{name}?offset={offset}
| Parameter | Default | Description |
|---|---|---|
offset | -1 | Start offset. Sentinels: -1 (start), now (tail) |
| Header | Required | Description |
|---|---|---|
If-None-Match | no | ETag from previous read |
| Status | Meaning |
|---|---|
200 OK | Data returned |
304 Not Modified | ETag match (no new data) |
400 Bad Request | Invalid offset format |
404 Not Found | Stream does not exist or expired |
Response headers: Content-Type, Stream-Next-Offset, Stream-Up-To-Date, ETag, Cache-Control, Stream-Closed (if closed and at tail)
Read data (long-poll)
GET /v1/stream/{name}?offset={offset}&live=long-poll[&cursor={cursor}]
Same as catch-up, plus:
| Parameter | Description |
|---|---|
live | Must be "long-poll" |
cursor | Echo of previous Stream-Cursor (optional) |
Additional response header: Stream-Cursor
Returns 204 No Content on timeout or closed stream at tail.
Read data (SSE)
GET /v1/stream/{name}?offset={offset}&live=sse
Returns Content-Type: text/event-stream. See Read modes for event format.
Additional response header: stream-sse-data-encoding: base64 (for binary content types)
Stream metadata
HEAD /v1/stream/{name}
Returns the same headers as GET with no body.
| Status | Meaning |
|---|---|
200 OK | Stream exists |
404 Not Found | Stream does not exist or expired |
Response headers: Content-Type, Stream-Next-Offset, Stream-Closed, Stream-TTL, Stream-Expires-At, Cache-Control
Delete stream
DELETE /v1/stream/{name}
| Status | Meaning |
|---|---|
204 No Content | Deleted |
404 Not Found | Does not exist |
Common response headers
These headers appear on all responses via middleware:
| Header | Value | Notes |
|---|---|---|
Cache-Control | no-store | no-cache for SSE |
X-Content-Type-Options | nosniff | |
Cross-Origin-Resource-Policy | cross-origin |
Port map
All ports used by the Docker Compose stack, organized by profile.
Default profile
| Port | Service | Purpose |
|---|---|---|
| 4437 | server | DS server (direct, no auth) |
| 8080 | envoy | JWT auth proxy (public endpoint) |
| 9901 | envoy | Envoy admin dashboard |
Sync profile
Includes all default ports, plus:
| Port | Service | Purpose |
|---|---|---|
| 54321 | postgres | Postgres direct access (mapped from 5432) |
Internal (Docker network only, no host port):
| Service | Internal port | Purpose |
|---|---|---|
| electric | 3000 | Electric Shape API |
| sync-service | -- | No listening port (initiates connections) |
Dev profile
Includes all default and sync ports, plus:
| Port | Service | Purpose |
|---|---|---|
| 8081 | adminer | Database admin UI |
The heartbeat producer has no listening port.
Host services (not Docker)
| Port | Service | How to start |
|---|---|---|
| 3000 | test-ui | make dev-ui |
Quick reference
| Port | Service | Profile | Auth required |
|---|---|---|---|
| 3000 | test-ui | host | no |
| 4437 | DS server | default | no |
| 8080 | Envoy proxy | default | yes (JWT) |
| 8081 | Adminer | dev | no |
| 9901 | Envoy admin | default | no |
| 54321 | Postgres | sync | password |
Protocol governance
This implementation strictly adheres to the durable streams protocol specification. This page summarizes the governance policies defined in full in docs/protocol-governance.md.
Default stance
- The spec and conformance tests are the source of truth.
- When ambiguous, take the least-committal interpretation that preserves forward compatibility.
- Never invent semantics because they "feel reasonable". If it is not in spec or tests, it is a gap.
Decision hierarchy
- Conformance tests -- behavioral truth. If a test asserts it, we implement it.
- Pinned PROTOCOL.md -- textual truth at a specific spec commit SHA.
- Maintainer clarification -- linked issue or PR in the spec repo.
- Conservative fallback -- recorded as a spec gap. No unrecorded guesses.
Spec pinning
The protocol spec is pinned by git commit SHA, not by branch. The conformance test suite is pinned by npm package version. See SPEC_VERSION.md for current pins:
- Spec SHA:
a347312a47ae510a4a2e3ee7a121d6c8d7d74e50 - Conformance:
@durable-streams/server-conformance-tests@0.2.2
Traceability
Every externally observable behavior (paths, headers, status codes, framing) must link to:
- A spec section URL with anchor, or
- A conformance test that asserts it, or
- An explicit gap in
docs/gaps.md
Conservative fallback rules
- Paths: Follow conformance defaults exactly.
- Headers: Emit only headers required by spec/tests.
- Status codes: Use only codes defined by spec/tests.
- Ordering: Preserve monotonicity, avoid replay/gap surprises.
- Security: Auth stays out of the server. Auth behavior lives in proxy config.
Required artifacts
These files are maintained and checked in CI:
| File | Purpose |
|---|---|
SPEC_VERSION.md | Pinned spec SHA and conformance version |
docs/compatibility.md | Version compatibility matrix |
docs/decisions.md | Protocol decision log |
docs/gaps.md | Spec ambiguity log |
docs/ecosystem-interop.md | Ecosystem integration observations |
Decision log
Implementation decisions are recorded in docs/decisions.md in three sections: Protocol, Architecture, and Operational.
Current protocol decisions
| Decision | Rationale |
|---|---|
SSE event type data (not message) | Matches PROTOCOL.md exactly; message is the browser default, protocol uses explicit data type |
| SSE control event uses typed struct with camelCase | Prevents field name typos, ensures consistent serialization |
Binary detection: everything not text/* or application/json | Matches PROTOCOL.md binary encoding rule |
One event: data per stored message | Preserves message boundaries per conformance tests |
| SSE idle close default 60s, configurable | Spec says SHOULD ~60s; configurable allows tuning |
| Idempotent close returns 204 | Same status for initial and repeated close |
Stream-Closed: true on all success responses for closed streams | Consistent header presence lets clients detect closure without HEAD |
Current architecture decisions
| Decision | Rationale |
|---|---|
acid backend uses sharded redb with fixed hash routing | Preserves per-stream ordering while enabling concurrent writes across shards; manifest prevents silent routing drift |
No in-place migration from file-* to acid (current limitation) | Keeps first ACID iteration safer; migration tooling can be added separately |
Current operational decisions
| Decision | Rationale |
|---|---|
Health check at /healthz outside /v1/stream/ | Health checks are infrastructure, not protocol API |
| Sessions + bidirectional DB sync replaces Electric-only test | Sessions pattern is the primary production use case |
CORS: allow all origins by default, configurable via DS_HTTP__CORS_ORIGINS | Not part of protocol semantics; production restricts via env var/proxy |
Decision format
Each decision in the full log includes:
- Decision: clear statement of what was decided
- Category: protocol, architecture, or operational
- Reference: spec/test (protocol) or subsystem/scope (architecture/operational)
- Rationale: why this choice was made
- Date: when the decision was made
When ambiguous on protocol behavior, decisions reference the corresponding gap in docs/gaps.md.
Benchmarks
This page summarizes the latest benchmark findings from a full 3x matrix run (7 variants x 3 runs = 21 total), with fresh Rust PGO profiles regenerated for this code state.
Core results (mean of run means)
| Variant | RTT latency (ms) | Small throughput (msg/s) | Large throughput (msg/s) |
|---|---|---|---|
| rust-memory | 0.407 | 377,143.9 | 1,837.2 |
rust-file (file-durable) | 5.379 | 6,441.5 | 530.7 |
rust-acid (acid) | 5.908 | 6,495.9 | 454.7 |
| node-memory | 0.573 | 334,943.9 | 1,653.9 |
| node-file | 11.377 | 3,240.0 | 360.1 |
| caddy-memory | 0.342 | 313,490.9 | 1,144.4 |
| caddy-acid | 16.067 | 2,305.0 | 271.6 |
Rust acid vs Caddy acid
| Metric | Rust acid | Caddy acid | Relative |
|---|---|---|---|
| RTT latency (ms, lower is better) | 5.908 | 16.067 | Rust acid 2.72x lower |
| Small throughput (msg/s, higher is better) | 6,495.9 | 2,305.0 | Rust acid 2.82x higher |
| Large throughput (msg/s, higher is better) | 454.7 | 271.6 | Rust acid 1.67x higher |
Durability note:
- Caddy plugin file-backed mode (named
caddy-acidin this page) documents a crash-atomicity limitation for append-data + producer-metadata updates. - Rust
aciduses transactional commits for stream state and message data. - So this Rust
acidresult is not achieved by relaxing durability semantics in the same way.
Rust storage modes
| Metric | memory | file-durable | acid |
|---|---|---|---|
| RTT latency (ms) | 0.407 | 5.379 | 5.908 |
| Small throughput (msg/s) | 377,143.9 | 6,441.5 | 6,495.9 |
| Large throughput (msg/s) | 1,837.2 | 530.7 | 454.7 |
Relative to Rust file-durable:
acidRTT latency: +9.8%acidsmall throughput: +0.8%acidlarge throughput: -14.3%
Benchmark quality notes
- These numbers are from local synthetic tests.
- Load generator and server processes share one machine in this run.
- For production-grade comparability, run clients and servers on separate hosts and keep sustained windows longer than quick local iterations.
For full methodology and caveats, see /docs/benchmark-report.md in the repo
root.
Spec gaps
Ambiguities and edge cases not fully covered by the protocol spec or conformance tests are recorded in docs/gaps.md. This page summarizes the current gaps and known ecosystem interop observations.
Active gaps
| Ambiguity | Our interpretation |
|---|---|
| SSE idle close timing (~60s) | Default 60s, configurable via DS_SERVER__SSE_RECONNECT_INTERVAL_SECS (0 disables). Spec says SHOULD close roughly every ~60s. |
Gaps are not failures. They are explicit acknowledgments that the implementation operates beyond the spec's current coverage.
Non-protocol architecture and operational decisions are tracked in docs/decisions.md.
Ecosystem interop observations
These are recorded in docs/ecosystem-interop.md. They are not protocol gaps -- the protocol is fine. They are rough edges in ecosystem components that developers encounter during integration.
| ID | Summary | Component |
|---|---|---|
| CI-001 | SSE transport masks stream Content-Type; stream() needs json: true hint | @durable-streams/client |
| CI-002 | Session events are opaque JSON; consumers must parse semantics | DS server + sync service |
| CI-003 | SSE data events for JSON streams are array-wrapped: [{...}] not {...} | DS server SSE output |
| CI-004 | Electric requires wal_level=logical in Postgres | Electric SQL + Postgres |
Proposing upstream clarifications
When a gap needs upstream resolution:
- Draft a proposed conformance test based on the clarifying test column in
docs/gaps.md - Open an issue in github.com/durable-streams/durable-streams
- Link the issue in
docs/gaps.md - If accepted upstream, update
SPEC_VERSION.mdand move to resolved gaps
Contributing
Development setup
git clone https://github.com/thesampaton/durable-streams-rust-server.git
cd durable-streams-rust-server
cargo build
cargo test
BDD workflow
This project follows an implicit BDD pattern. The conformance specifications are the primary design artifact. The implementation exists to pass them.
The loop
- Start from a failing conformance test that expresses a spec requirement.
- Write the minimum implementation to pass it.
- Run
cargo clippy -- -D warnings-- fix all warnings before moving on. - Run
cargo testto verify both unit and conformance tests pass. - Run
cargo fmtbefore considering the task done. - If a spec requirement is ambiguous, clarify the spec document first, then write the test.
Two test layers
Unit tests (#[cfg(test)] mod tests in each source file): test internal logic -- parsing, state machines, stream mechanics. Fast, no I/O.
Conformance tests (tests/ directory): boot the server on a random port, make HTTP requests, assert against the spec. These are acceptance tests. Each test maps to a specific spec requirement.
The conformance tests are deliberately black-box: they interact only through the public HTTP interface.
Code style
- Clippy pedantic lints are enabled project-wide. Treat all warnings as errors.
- Use
thiserrorfor error types. - Prefer
Resultover panicking. Reserveunwrap()/expect()for provable invariants. snake_casefor functions/variables,CamelCasefor types,SCREAMING_SNAKE_CASEfor constants.- Keep functions under ~40 lines. Extract helpers when they grow longer.
- Prefer strong typing over stringly-typed APIs. Newtypes and enums for invalid-state prevention.
Commands
cargo build # compile
cargo test # unit + integration tests
cargo test --lib # unit tests only
cargo test --test '*' # integration tests only
cargo test test_name # single test
cargo clippy -- -D warnings # lint
cargo fmt # format
cargo fmt -- --check # format check
make conformance # external conformance suite
make integration-test # Docker stack e2e tests
make integration-test-sessions # sessions + DB sync tests
make pgo-train # generate + merge PGO profile data
make release-pgo # guarded profile-use release build
make pgo-benchmark # benchmark profile-use build
make docs # build mdbook documentation
For release/performance work, prefer make release-pgo after make pgo-train
so artifacts are built from current benchmark-driven profiles.
Documentation
- Protocol behavior is documented in
specs/as structured markdown. - Implementation decisions go in
docs/decisions.md. - Spec ambiguities go in
docs/gaps.md. - Ecosystem interop observations go in
docs/ecosystem-interop.md. - The mdbook documentation site lives in
docs/book/. Build withmake docs.
Git conventions
- Write concise commit messages explaining why, not what.
- Separate logical changes into distinct commits.
- Spec changes, test additions, and implementation should be distinct commits where practical.
Protocol governance
All protocol-level decisions must comply with the governance policies in docs/protocol-governance.md. Key rule: never invent semantics. If it is not in spec or tests, record it as a gap.