WebSocket

Nitro 基于 CrossWS 和 H3 提供跨平台的 WebSocket 支持。

WebSocket 可以在客户端与服务器之间实现实时双向通信。Nitro 的 WebSocket 集成适用于所有受支持的部署目标,包括 Node.js、Bun、Deno 以及 Cloudflare Workers。

Read more in CrossWS Documentation.

启用 WebSocket

在 Nitro 配置中启用 WebSocket 支持:

import { defineConfig } from "nitro";

export default defineConfig({
  features: {
    websocket: true,
  },
});

用法

使用 defineWebSocketHandler 创建一个 WebSocket 处理器,并从路由文件中导出。WebSocket 处理器与普通请求处理器一样,遵循相同的文件路由规则。

routes/_ws.ts
import { defineWebSocketHandler } from "nitro";

export default defineWebSocketHandler({
  open(peer) {
    console.log("已连接:", peer.id);
  },
  message(peer, message) {
    console.log("收到消息:", message.text());
    peer.send("来自服务器的问候!");
  },
  close(peer, details) {
    console.log("已断开:", peer.id, details.code, details.reason);
  },
  error(peer, error) {
    console.error("错误:", error);
  },
});
你可以为 WebSocket 处理器使用任意路由路径。例如,routes/chat.ts 会处理 /chat 上的 WebSocket 连接。

从客户端连接

使用浏览器的 WebSocket API 进行连接:

const ws = new WebSocket("ws://localhost:3000/_ws");

ws.addEventListener("open", () => {
  console.log("已连接!");
  ws.send("来自客户端的问候!");
});

ws.addEventListener("message", (event) => {
  console.log("收到:", event.data);
});

钩子

WebSocket 处理器支持以下生命周期钩子:

upgrade

在 WebSocket 连接建立之前调用。你可以用它来校验请求、设置命名空间,或给 peer 附加上下文数据。

routes/chat.ts
import { defineWebSocketHandler } from "nitro";

export default defineWebSocketHandler({
  upgrade(request) {
    const url = new URL(request.url);
    const token = url.searchParams.get("token");
    if (!isValidToken(token)) {
      throw new Response("未授权", { status: 401 });
    }
    return {
      context: { userId: getUserId(token) },
    };
  },
  open(peer) {
    console.log("用户已连接:", peer.context.userId);
  },
  // ...
});

upgrade 钩子可以返回一个对象,包含:

属性类型说明
headersHeadersInit要包含在升级响应中的响应头
namespacestring覆盖此连接的发布/订阅命名空间
contextobject附加到 peer.context 的数据

抛出一个 Response 即可拒绝升级请求。

open

当 WebSocket 连接建立完成,且 peer 已可收发消息时调用。

open(peer) {
  peer.send("欢迎!");
}

message

在收到 peer 发来的消息时调用。

message(peer, message) {
  const text = message.text();
  const data = message.json();
}

close

当 WebSocket 连接关闭时调用。会收到一个 details 对象,其中可能包含 codereason

close(peer, details) {
  console.log(`已关闭:${details.code} - ${details.reason}`);
}

error

在 WebSocket 连接发生错误时调用。

error(peer, error) {
  console.error("WebSocket 错误:", error);
}

Peer

peer 对象表示一个已连接的 WebSocket 客户端。除 upgrade 外,其他所有钩子中都可以访问它。

属性

属性类型说明
idstring当前 peer 的唯一标识
namespacestring当前 peer 所属的发布/订阅命名空间
contextobjectupgrade 阶段设置的任意上下文数据
requestRequest原始升级请求
peersSet<Peer>同一命名空间下的所有已连接 peer
topicsSet<string>当前 peer 订阅的主题
remoteAddressstring?客户端 IP 地址(取决于适配器)
websocketWebSocket底层的 WebSocket 实例

方法

peer.send(data, options?)

直接向当前 peer 发送消息。支持字符串、对象(会序列化为 JSON)以及二进制数据。

peer.send("你好!");
peer.send({ type: "greeting", text: "你好!" });

peer.subscribe(topic)

让当前 peer 订阅一个发布/订阅主题。

peer.subscribe("notifications");

peer.unsubscribe(topic)

取消当前 peer 对某个主题的订阅。

peer.unsubscribe("notifications");

peer.publish(topic, data, options?)

向同一命名空间内订阅了该主题的所有 peer 广播消息。发送消息的 peer 不会收到这条消息。

peer.publish("chat", { user: "小明", text: "大家好!" });

peer.close(code?, reason?)

优雅地关闭 WebSocket 连接。

peer.close(1000, "正常关闭");

peer.terminate()

立即终止连接,不发送关闭帧。

Message

message 钩子中的 message 对象提供了多种方法,用于以不同格式读取传入数据。

方法返回类型说明
text()string将消息作为 UTF-8 字符串读取
json()T将消息解析为 JSON
uint8Array()Uint8Array将消息读取为字节数组
arrayBuffer()ArrayBuffer将消息读取为 ArrayBuffer
blob()Blob将消息读取为 Blob
message(peer, message) {
  // 按文本解析
  const text = message.text();

  // 按带类型的 JSON 解析
  const data = message.json<{ type: string; payload: unknown }>();
}

发布/订阅

发布/订阅(pub/sub)允许你通过主题向一组已连接的 peer 广播消息。peer 订阅主题后,就能收到发布到这些主题的消息。

routes/chat.ts
import { defineWebSocketHandler } from "nitro";

export default defineWebSocketHandler({
  open(peer) {
    peer.subscribe("chat");
    peer.publish("chat", { system: `${peer} 加入了聊天室` });
    peer.send({ system: "欢迎来到聊天室!" });
  },
  message(peer, message) {
    // 广播给其他所有订阅者
    peer.publish("chat", {
      user: peer.toString(),
      text: message.text(),
    });
    // 回显给发送者
    peer.send({ user: "你", text: message.text() });
  },
  close(peer) {
    peer.publish("chat", { system: `${peer} 离开了聊天室` });
  },
});
peer.publish() 会将消息发送给该主题的所有订阅者,但不包括发送消息的 peer 自己。如果也要发给发布者,请使用 peer.send()

命名空间

命名空间为 WebSocket 连接提供隔离的发布/订阅分组。每个 peer 都属于某一个命名空间,而 peer.publish() 只会向同一命名空间内的 peer 广播。

默认情况下,命名空间由请求 URL 的路径名推导而来。这与动态路由能够自然配合,也就是说每个路径都会拥有自己独立的命名空间:

routes/rooms/[room].ts
import { defineWebSocketHandler } from "nitro";

export default defineWebSocketHandler({
  open(peer) {
    peer.subscribe("messages");
    peer.publish("messages", `${peer} 加入了 ${peer.namespace}`);
  },
  message(peer, message) {
    // 只会发送给同一房间内的 peer
    peer.publish("messages", `${peer}: ${message.text()}`);
  },
  close(peer) {
    peer.publish("messages", `${peer} 离开了`);
  },
});

在这个示例中,连接到 /rooms/game 的客户端会与连接到 /rooms/lobby 的客户端彼此隔离,因为每个路径都对应自己的命名空间。

如果要覆盖默认命名空间,可以在 upgrade 钩子中返回自定义的 namespace

routes/chat.ts
import { defineWebSocketHandler } from "nitro";

export default defineWebSocketHandler({
  upgrade(request) {
    // 按查询参数分组连接,而不是按路径名
    const url = new URL(request.url);
    const channel = url.searchParams.get("channel") || "general";
    return {
      namespace: `chat:${channel}`,
    };
  },
  open(peer) {
    peer.subscribe("messages");
    peer.publish("messages", `${peer} 加入了`);
  },
  message(peer, message) {
    peer.publish("messages", `${peer}: ${message.text()}`);
  },
  close(peer) {
    peer.publish("messages", `${peer} 离开了`);
  },
});

Server-Sent Events(SSE)

Server-Sent Events 在只需要服务端到客户端单向流式推送时,是一种更简单的替代方案。与 WebSocket 不同,SSE 使用标准 HTTP,并支持自动重连。

routes/sse.ts
import { defineHandler } from "nitro";
import { createEventStream } from "nitro/h3";

export default defineHandler((event) => {
  const stream = createEventStream(event);

  const interval = setInterval(async () => {
    await stream.push(`Message @ ${new Date().toLocaleTimeString()}`);
  }, 1000);

  stream.onClosed(() => {
    clearInterval(interval);
  });

  return stream.send();
});

客户端可以通过 EventSource API 连接:

const source = new EventSource("/sse");

source.onmessage = (event) => {
  console.log(event.data);
};

结构化消息

SSE 消息支持可选的 ideventretry 字段:

routes/events.ts
import { defineHandler } from "nitro";
import { createEventStream } from "nitro/h3";

export default defineHandler((event) => {
  const stream = createEventStream(event);
  let id = 0;

  const interval = setInterval(async () => {
    await stream.push({
      id: String(id++),
      event: "update",
      data: JSON.stringify({ value: Math.random() }),
      retry: 3000,
    });
  }, 1000);

  stream.onClosed(() => {
    clearInterval(interval);
  });

  return stream.send();
});
Read more in H3 Documentation.