Skip to content

@hapi/sse v1.0.0

console
npm install @hapi/sse@1.0.0
console
yarn add @hapi/sse@1.0.0
console
pnpm add @hapi/sse@1.0.0

Server-Sent Events plugin for hapi.

Compatibility

Major versionLicenseNode.js
1BSD>= 22

@hapi/sse

SSE (Server-Sent Events) plugin for hapi. WHATWG spec-compliant with subscription-based pub/sub, event replay, backpressure, built-in stats, and lifecycle hooks.

Install

npm install @hapi/sse

Peer dependencies: @hapi/hapi ^21, @hapi/boom ^10.

Quick Start

typescript
import Hapi from '@hapi/hapi';
import { SsePlugin } from '@hapi/sse';

const server = Hapi.server({ port: 3000 });

await server.register({ plugin: SsePlugin });

server.sse.subscription('/chat/{room}');

await server.start();

// Publish from anywhere
await server.sse.publish(
  '/chat/general',
  { text: 'hello', user: 'alice' },
  { event: 'message' },
);

Plugin Options

typescript
await server.register({
    plugin: SsePlugin,
    options: {
        retry: 2000,                            // retry interval in ms (default: 2000, null to disable)
        keepAlive: { interval: 15_000 },         // keep-alive comment interval (default: 15s, false to disable)
        headers: { 'X-Custom': 'value' },        // extra headers on every SSE response
        backpressure: { maxBytes: 65536, strategy: 'drop' },  // optional
        hooks: { ... },                          // optional, see Hooks section
    },
});

API

server.sse.subscription(path, config?)

Registers a subscription route. Clients connect via GET <path>.

typescript
server.sse.subscription('/chat/{room}', {
  auth: 'jwt',
  retry: 5000,
  keepAlive: { interval: 10_000 },
  filter: async (path, message, { credentials, params, internal }) => {
    if (params.room !== internal.targetRoom) {
      return false; // don't deliver
    }
    return { override: { ...message, filtered: true } }; // or transform
  },
  onSubscribe: async (session, path, params) => {},
  onUnsubscribe: (session, path, params) => {},
  onReconnect: async (session, path, params) => {},
  replay: new FiniteReplayer({ size: 100 }), // optional, see Replay section
});

Config options:

OptionTypeDescription
authRouteOptions['auth']hapi auth config for the route
retrynumber | nullOverride plugin-level retry
keepAlive{ interval: number } | falseOverride plugin-level keep-alive
filter(path, message, opts) => boolean | { override } | Promise<...>Per-session delivery filter
onSubscribe(session, path, params) => void | Promise<void>Fires before SSE headers are sent. Throwing a Boom error returns that HTTP error to the client.
onUnsubscribe(session, path, params) => voidFires on client disconnect
onReconnect(session, path, params) => void | Promise<void>Fires when Last-Event-ID is present (after replay). Errors close the session gracefully.
replayReplayerReplay provider for automatic reconnection replay
maxSessionsnumberMaximum concurrent sessions for this subscription. Excess connections receive a 503 response.
maxDurationnumberMaximum connection lifetime in ms. Sessions are closed after this duration (with ±10% jitter to prevent thundering herd reconnections). A : session expired comment is sent before closing.

server.sse.publish(path, data, opts?)

Publishes an event to all matching subscribers. Returns the number of sessions that received the event.

typescript
const delivered = await server.sse.publish(
  '/chat/general',
  { text: 'hello everyone', user: 'alice' },
  {
    event: 'message',
    id: 'msg-42',
    internal: { targetRoom: 'general' }, // passed to filter
    matchMode: 'literal', // 'pattern' (default) or 'literal'
  },
);

console.log(`Delivered to ${delivered} sessions`);

matchMode:

  • 'pattern' (default) — delivers to all sessions on a matching subscription pattern (e.g. /chat/{room})
  • 'literal' — only delivers to sessions whose actual connected path equals path exactly. Useful for parameterized subscriptions where you want to target /chat/general but not /chat/random.

Note: Only events published with an explicit id are recorded by the replayer. Events without an id are delivered but not stored for replay.

server.sse.broadcast(data, opts?)

Sends an event to every connected session across all subscriptions. Returns the delivery count.

typescript
const count = await server.sse.broadcast(
  { text: 'Server restarting in 5 minutes', user: 'system' },
  { event: 'system' },
);

server.sse.eachSession(fn, opts?)

Iterates over connected sessions. Optionally filter by subscription pattern.

typescript
await server.sse.eachSession(
  async (session) => {
    session.push({ text: 'ping', user: 'system' });
  },
  { subscription: '/chat/{room}' },
);

server.sse.subscriptions()

Returns a snapshot of all registered subscriptions with active session counts.

typescript
const subs = server.sse.subscriptions();
// [{ pattern: '/chat/{room}', activeSessions: 12 }]

server.sse.closeSessions(pattern)

Closes all sessions for a specific subscription pattern.

typescript
server.sse.closeSessions('/chat/{room}');

server.sse.sessionCount

Total number of active sessions across all subscriptions.

typescript
console.log(server.sse.sessionCount); // 42

server.sse.stats()

Returns built-in metrics tracked by the plugin. No configuration needed.

typescript
const stats = server.sse.stats();
// {
//     totalConnections: 150,
//     totalDisconnections: 108,
//     totalPublishes: 5230,
//     totalBroadcasts: 12,
//     totalEventsDelivered: 48700,
//     activeSessions: 42,
// }
StatDescription
totalConnectionsCumulative subscription connections since server start
totalDisconnectionsCumulative disconnections since server start
totalPublishesNumber of publish() calls
totalBroadcastsNumber of broadcast() calls
totalEventsDeliveredSum of all individual event deliveries across publish and broadcast
activeSessionsCurrent connected session count (same as sessionCount)

Session

The Session object represents a single SSE connection.

typescript
session.push(data, event?, id?)   // Send an event. Returns boolean (false if dropped/closed).
session.comment(text?)            // Send a comment (invisible to EventSource)
session.close()                   // End the connection
session.isOpen                    // true if connection is still active
session.connectedAt               // Unix timestamp (ms) when the session was created
session.lastEventId               // Value of Last-Event-ID header (empty string if absent)
session.request                   // The original hapi Request object

Metadata — attach arbitrary key-value data to a session:

typescript
session.set('userId', 'alice');
session.get('userId'); // 'alice'
session.has('userId'); // true
session.delete('userId'); // true

Metadata persists for the lifetime of the session. Useful for tagging sessions in onSubscribe and reading in filters or eachSession.

Custom Handler Mode

For full control over the stream (e.g. AI-assisted chat responses), use the handler decorator instead of subscriptions:

typescript
server.route({
  method: 'GET',
  path: '/chat/{room}/ai',
  handler: {
    sse: {
      stream: async (request, session) => {
        for (const token of tokens) {
          session.push({ token, user: 'assistant' }, 'token');
        }
        session.close();
      },
      retry: 3000, // override plugin-level retry
      keepAlive: { interval: 10_000 }, // override plugin-level keep-alive
      headers: { 'X-Chat-Bot': 'true' }, // override plugin-level headers
      backpressure: { maxBytes: 32768, strategy: 'close' },
    },
  },
});

Handler options:

OptionTypeDescription
stream(request, session) => void | Promise<void>Required. Called after SSE headers are sent. Errors close the session gracefully.
retrynumber | nullOverride plugin-level retry (default: inherits from plugin)
keepAlive{ interval: number } | falseOverride plugin-level keep-alive (default: inherits from plugin)
headersRecord<string, string>Override plugin-level headers (default: inherits from plugin)
backpressureBackpressureOptionsOverride plugin-level backpressure (default: inherits from plugin)
maxDurationnumberMaximum connection lifetime in ms (with ±10% jitter). Sends a comment before closing.

Event Replay

SSE clients automatically send a Last-Event-ID header when reconnecting after a dropped connection. When a replayer is configured, the plugin uses that ID to find where the client left off and pushes any events published after it — so the client catches up on what it missed while disconnected.

Only events published with an explicit id are recorded. Events without an id are delivered but not stored for replay — this prevents the buffer from filling with unaddressable entries.

Two built-in replayers:

FiniteReplayer

Keeps the last N events in a fixed-size ring buffer. When full, the oldest entry is dropped to make room. Memory usage is predictable — bounded by size.

typescript
import { FiniteReplayer } from '@hapi/sse';

const replayer = new FiniteReplayer({ size: 100, autoId: true });

server.sse.subscription('/chat/{room}', { replay: replayer });

ValidReplayer

Keeps events for a fixed duration. A periodic cleanup timer removes expired entries, so memory usage varies with publish rate but replayed events are never older than ttl.

typescript
import { ValidReplayer } from '@hapi/sse';

const replayer = new ValidReplayer({ ttl: 60_000, autoId: true });

server.sse.subscription('/chat/{room}', { replay: replayer });

Call replayer.stop() to clear the cleanup timer (handled automatically on server stop).

Options:

OptionTypeDescription
sizenumber(FiniteReplayer) Max entries to keep
ttlnumber(ValidReplayer) Entry lifetime in ms
autoIdbooleanAuto-generate sequential IDs when events have no explicit ID

Custom replayer — implement the Replayer interface:

typescript
import type { Replayer, ReplayEntry } from '@hapi/sse';

class RedisReplayer implements Replayer {
  record(entry: ReplayEntry): void {
    /* store to Redis stream */
  }
  replay(lastEventId: string): ReplayEntry[] {
    /* read from Redis */
  }
  stop?(): void {
    /* cleanup */
  }
}

Backpressure

Protects against slow consumers accumulating unbounded memory. Uses Node's writableLength to accurately measure bytes queued in the kernel buffer. Configurable at plugin level or per-handler.

typescript
// Plugin level — applies to all subscription sessions
await server.register({
    plugin: SsePlugin,
    options: {
        backpressure: { maxBytes: 65536, strategy: 'close' },
    },
});

// Handler level — overrides plugin level
server.route({
    method: 'GET',
    path: '/chat/{room}/ai',
    handler: {
        sse: {
            stream: async (req, session) => { ... },
            backpressure: { maxBytes: 32768, strategy: 'drop' },
        },
    },
});

Strategies:

StrategyBehavior
'close'Closes the session when pending bytes exceed maxBytes
'drop'Silently drops the event but keeps the session open

When backpressure triggers, session.push() returns false.

Hooks

Optional lifecycle hooks for side effects (logging, external telemetry). All hooks are wrapped in try/catch — errors never break the stream.

typescript
await server.register({
  plugin: SsePlugin,
  options: {
    hooks: {
      onSession: (session, path, params) => {
        console.log(`Joined: ${path}`);
      },
      onSessionClose: (session, path, params) => {
        console.log(`Left: ${path}`);
      },
      onPublish: (path, data, deliveryCount) => {
        console.log(`Message in ${path}: ${deliveryCount} recipients`);
      },
    },
  },
});

For metrics, prefer server.sse.stats() which tracks counters automatically. Use hooks for side effects like logging or pushing to external systems.

Generics

Subscription config and publish are generic for type-safe event payloads:

typescript
interface ChatMessage {
  text: string;
  user: string;
}

server.sse.subscription<ChatMessage>('/chat/{room}', {
  filter: (path, message) => {
    // message is typed as ChatMessage
    return message.user !== 'blocked';
  },
});

await server.sse.publish<ChatMessage>('/chat/general', {
  text: 'hello',
  user: 'alice',
});

Security

The plugin includes several built-in defenses against known SSE attack vectors:

Retry floor — The retry value is silently clamped to a minimum of 1000ms. This prevents reconnection storm attacks where a malicious or misconfigured retry: 0 causes clients to reconnect thousands of times per second. Setting retry: null disables the retry field entirely (no clamping).

Last-Event-ID sanitization — Control characters (\x00\x1f) are stripped from the incoming Last-Event-ID header. This prevents null byte injection and CRLF attacks via the reconnection header.

Connection limiting — Use maxSessions on subscriptions to cap concurrent connections. Excess connections receive an HTTP 503 response before SSE headers are sent, preventing connection exhaustion.

Connection TTL — Use maxDuration to enforce a maximum connection lifetime. A ±10% jitter is applied to prevent thundering herd reconnections when many clients connect at the same time. Clients automatically reconnect via the standard SSE reconnection mechanism.

CRLF injection protection — The EventBuffer serializer strips or splits newlines in event and id fields, and splits data fields on line terminators. This prevents SSE event injection attacks (CVE-2026-33128, CVE-2026-22735, CVE-2026-29085 pattern).

Backpressure — Slow consumers are handled via configurable backpressure strategies (drop or close), preventing unbounded memory growth from write buffer accumulation.

Not in scope — Origin header validation, CSRF protection, and authentication are handled by hapi's auth system and middleware (onPreAuth extensions or reverse proxy configuration), not by the SSE plugin.

Exports

typescript
// Classes
export { EventBuffer, Session, SsePlugin, FiniteReplayer, ValidReplayer };

// Types
export type {
  SsePluginOptions,
  SseApi,
  SseHandlerOptions,
  SseHooks,
  SseStats,
  SubscriptionConfig,
  SubscriptionInfo,
  FilterOptions,
  BackpressureOptions,
  Replayer,
  ReplayEntry,
};

Deploys by Netlify