Writing a client

You can interact with the DS server using any HTTP client. This page shows patterns for the @durable-streams/client SDK, plain fetch, and SSE consumption.

Using the client SDK

The @durable-streams/client SDK provides typed methods for stream operations:

import { DurableStream, stream } from "@durable-streams/client";

const baseUrl = "http://localhost:4437";

// Create a stream
await DurableStream.create({
  url: `${baseUrl}/v1/stream/my-stream`,
  contentType: "application/json",
});

// Append data
await DurableStream.append({
  url: `${baseUrl}/v1/stream/my-stream`,
  body: JSON.stringify([{ event: "click" }]),
  contentType: "application/json",
});

// Read data
const res = await stream({
  url: `${baseUrl}/v1/stream/my-stream`,
  live: false,
  json: true,
});
const data = await res.json();
const offset = res.offset; // save for resumption

SSE with the client SDK

When using SSE with JSON streams, pass json: true explicitly. The SSE transport uses Content-Type: text/event-stream, which masks the stream's actual content type:

const res = await stream({
  url: `${baseUrl}/v1/stream/my-stream`,
  live: "sse",
  json: true, // required: SSE transport masks the stream's content type
});

for await (const item of res.subscribeJson()) {
  console.log("Received:", item);
}

See ecosystem interop CI-001 for why this hint is necessary.

Using plain fetch

Create

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "PUT",
  headers: { "Content-Type": "text/plain" },
});

Append

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "POST",
  headers: { "Content-Type": "text/plain" },
  body: "hello world",
});

Read with offset resumption

// First read
const res = await fetch(`${baseUrl}/v1/stream/my-stream?offset=-1`);
const body = await res.text();
const nextOffset = res.headers.get("Stream-Next-Offset");

// Resume later (only new messages)
const res2 = await fetch(`${baseUrl}/v1/stream/my-stream?offset=${nextOffset}`);

SSE consumption pattern

Node.js (manual parsing)

const res = await fetch(`${baseUrl}/v1/stream/my-stream?live=sse&offset=-1`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop(); // keep incomplete line

  let currentEventType = null;
  for (const line of lines) {
    if (line.startsWith("event:")) {
      currentEventType = line.slice(6).trim();
    } else if (line.startsWith("data:") && currentEventType === "data") {
      const payload = line.slice(5);
      console.log("Data:", payload);
    } else if (line.startsWith("data:") && currentEventType === "control") {
      const control = JSON.parse(line.slice(5));
      if (control.streamClosed) {
        console.log("Stream closed");
        reader.cancel();
      }
    } else if (line.startsWith("id:")) {
      const offset = line.slice(3).trim();
      // Save for resumption on reconnect
    }
  }
}

JSON streams over SSE

For JSON-mode streams, each data: line wraps the message in a JSON array. Unwrap it:

if (currentEventType === "data") {
  const parsed = JSON.parse(line.slice(5));
  const items = Array.isArray(parsed) ? parsed : [parsed];
  for (const item of items) {
    console.log("Item:", item);
  }
}

With authentication

When the DS server is behind an Envoy auth proxy, include a Bearer token:

const headers = { Authorization: `Bearer ${token}` };

await fetch(`${baseUrl}/v1/stream/my-stream`, {
  method: "PUT",
  headers: { ...headers, "Content-Type": "text/plain" },
});

Producer idempotency

For exactly-once writes, include producer headers:

let seq = 0;

async function append(data) {
  const res = await fetch(`${baseUrl}/v1/stream/my-stream`, {
    method: "POST",
    headers: {
      "Content-Type": "text/plain",
      "Producer-ID": "my-client-tab-1",
      "Producer-Epoch": "0",
      "Producer-Seq": String(seq),
    },
    body: data,
  });

  if (res.status === 200) {
    seq++; // accepted
  } else if (res.status === 204) {
    seq++; // duplicate, already persisted
  }
  // 409: sequence gap, 403: epoch fenced
}

See Producers for the full protocol.