WebSocket
WebSocket 可以在客户端与服务器之间实现实时双向通信。Nitro 的 WebSocket 集成适用于所有受支持的部署目标,包括 Node.js、Bun、Deno 以及 Cloudflare Workers。
启用 WebSocket
在 Nitro 配置中启用 WebSocket 支持:
import { defineConfig } from "nitro";
export default defineConfig({
features: {
websocket: true,
},
});
用法
使用 defineWebSocketHandler 创建一个 WebSocket 处理器,并从路由文件中导出。WebSocket 处理器与普通请求处理器一样,遵循相同的文件路由规则。
import { defineWebSocketHandler } from "nitro";
export default defineWebSocketHandler({
open(peer) {
console.log("已连接:", peer.id);
},
message(peer, message) {
console.log("收到消息:", message.text());
peer.send("来自服务器的问候!");
},
close(peer, details) {
console.log("已断开:", peer.id, details.code, details.reason);
},
error(peer, error) {
console.error("错误:", error);
},
});
routes/chat.ts 会处理 /chat 上的 WebSocket 连接。从客户端连接
使用浏览器的 WebSocket API 进行连接:
const ws = new WebSocket("ws://localhost:3000/_ws");
ws.addEventListener("open", () => {
console.log("已连接!");
ws.send("来自客户端的问候!");
});
ws.addEventListener("message", (event) => {
console.log("收到:", event.data);
});
钩子
WebSocket 处理器支持以下生命周期钩子:
upgrade
在 WebSocket 连接建立之前调用。你可以用它来校验请求、设置命名空间,或给 peer 附加上下文数据。
import { defineWebSocketHandler } from "nitro";
export default defineWebSocketHandler({
upgrade(request) {
const url = new URL(request.url);
const token = url.searchParams.get("token");
if (!isValidToken(token)) {
throw new Response("未授权", { status: 401 });
}
return {
context: { userId: getUserId(token) },
};
},
open(peer) {
console.log("用户已连接:", peer.context.userId);
},
// ...
});
upgrade 钩子可以返回一个对象,包含:
| 属性 | 类型 | 说明 |
|---|---|---|
headers | HeadersInit | 要包含在升级响应中的响应头 |
namespace | string | 覆盖此连接的发布/订阅命名空间 |
context | object | 附加到 peer.context 的数据 |
抛出一个 Response 即可拒绝升级请求。
open
当 WebSocket 连接建立完成,且 peer 已可收发消息时调用。
open(peer) {
peer.send("欢迎!");
}
message
在收到 peer 发来的消息时调用。
message(peer, message) {
const text = message.text();
const data = message.json();
}
close
当 WebSocket 连接关闭时调用。会收到一个 details 对象,其中可能包含 code 和 reason。
close(peer, details) {
console.log(`已关闭:${details.code} - ${details.reason}`);
}
error
在 WebSocket 连接发生错误时调用。
error(peer, error) {
console.error("WebSocket 错误:", error);
}
Peer
peer 对象表示一个已连接的 WebSocket 客户端。除 upgrade 外,其他所有钩子中都可以访问它。
属性
| 属性 | 类型 | 说明 |
|---|---|---|
id | string | 当前 peer 的唯一标识 |
namespace | string | 当前 peer 所属的发布/订阅命名空间 |
context | object | 在 upgrade 阶段设置的任意上下文数据 |
request | Request | 原始升级请求 |
peers | Set<Peer> | 同一命名空间下的所有已连接 peer |
topics | Set<string> | 当前 peer 订阅的主题 |
remoteAddress | string? | 客户端 IP 地址(取决于适配器) |
websocket | WebSocket | 底层的 WebSocket 实例 |
方法
peer.send(data, options?)
直接向当前 peer 发送消息。支持字符串、对象(会序列化为 JSON)以及二进制数据。
peer.send("你好!");
peer.send({ type: "greeting", text: "你好!" });
peer.subscribe(topic)
让当前 peer 订阅一个发布/订阅主题。
peer.subscribe("notifications");
peer.unsubscribe(topic)
取消当前 peer 对某个主题的订阅。
peer.unsubscribe("notifications");
peer.publish(topic, data, options?)
向同一命名空间内订阅了该主题的所有 peer 广播消息。发送消息的 peer 不会收到这条消息。
peer.publish("chat", { user: "小明", text: "大家好!" });
peer.close(code?, reason?)
优雅地关闭 WebSocket 连接。
peer.close(1000, "正常关闭");
peer.terminate()
立即终止连接,不发送关闭帧。
Message
message 钩子中的 message 对象提供了多种方法,用于以不同格式读取传入数据。
| 方法 | 返回类型 | 说明 |
|---|---|---|
text() | string | 将消息作为 UTF-8 字符串读取 |
json() | T | 将消息解析为 JSON |
uint8Array() | Uint8Array | 将消息读取为字节数组 |
arrayBuffer() | ArrayBuffer | 将消息读取为 ArrayBuffer |
blob() | Blob | 将消息读取为 Blob |
message(peer, message) {
// 按文本解析
const text = message.text();
// 按带类型的 JSON 解析
const data = message.json<{ type: string; payload: unknown }>();
}
发布/订阅
发布/订阅(pub/sub)允许你通过主题向一组已连接的 peer 广播消息。peer 订阅主题后,就能收到发布到这些主题的消息。
import { defineWebSocketHandler } from "nitro";
export default defineWebSocketHandler({
open(peer) {
peer.subscribe("chat");
peer.publish("chat", { system: `${peer} 加入了聊天室` });
peer.send({ system: "欢迎来到聊天室!" });
},
message(peer, message) {
// 广播给其他所有订阅者
peer.publish("chat", {
user: peer.toString(),
text: message.text(),
});
// 回显给发送者
peer.send({ user: "你", text: message.text() });
},
close(peer) {
peer.publish("chat", { system: `${peer} 离开了聊天室` });
},
});
peer.publish() 会将消息发送给该主题的所有订阅者,但不包括发送消息的 peer 自己。如果也要发给发布者,请使用 peer.send()。命名空间
命名空间为 WebSocket 连接提供隔离的发布/订阅分组。每个 peer 都属于某一个命名空间,而 peer.publish() 只会向同一命名空间内的 peer 广播。
默认情况下,命名空间由请求 URL 的路径名推导而来。这与动态路由能够自然配合,也就是说每个路径都会拥有自己独立的命名空间:
import { defineWebSocketHandler } from "nitro";
export default defineWebSocketHandler({
open(peer) {
peer.subscribe("messages");
peer.publish("messages", `${peer} 加入了 ${peer.namespace}`);
},
message(peer, message) {
// 只会发送给同一房间内的 peer
peer.publish("messages", `${peer}: ${message.text()}`);
},
close(peer) {
peer.publish("messages", `${peer} 离开了`);
},
});
在这个示例中,连接到 /rooms/game 的客户端会与连接到 /rooms/lobby 的客户端彼此隔离,因为每个路径都对应自己的命名空间。
如果要覆盖默认命名空间,可以在 upgrade 钩子中返回自定义的 namespace:
import { defineWebSocketHandler } from "nitro";
export default defineWebSocketHandler({
upgrade(request) {
// 按查询参数分组连接,而不是按路径名
const url = new URL(request.url);
const channel = url.searchParams.get("channel") || "general";
return {
namespace: `chat:${channel}`,
};
},
open(peer) {
peer.subscribe("messages");
peer.publish("messages", `${peer} 加入了`);
},
message(peer, message) {
peer.publish("messages", `${peer}: ${message.text()}`);
},
close(peer) {
peer.publish("messages", `${peer} 离开了`);
},
});
Server-Sent Events(SSE)
Server-Sent Events 在只需要服务端到客户端单向流式推送时,是一种更简单的替代方案。与 WebSocket 不同,SSE 使用标准 HTTP,并支持自动重连。
import { defineHandler } from "nitro";
import { createEventStream } from "nitro/h3";
export default defineHandler((event) => {
const stream = createEventStream(event);
const interval = setInterval(async () => {
await stream.push(`Message @ ${new Date().toLocaleTimeString()}`);
}, 1000);
stream.onClosed(() => {
clearInterval(interval);
});
return stream.send();
});
客户端可以通过 EventSource API 连接:
const source = new EventSource("/sse");
source.onmessage = (event) => {
console.log(event.data);
};
结构化消息
SSE 消息支持可选的 id、event 和 retry 字段:
import { defineHandler } from "nitro";
import { createEventStream } from "nitro/h3";
export default defineHandler((event) => {
const stream = createEventStream(event);
let id = 0;
const interval = setInterval(async () => {
await stream.push({
id: String(id++),
event: "update",
data: JSON.stringify({ value: Math.random() }),
retry: 3000,
});
}, 1000);
stream.onClosed(() => {
clearInterval(interval);
});
return stream.send();
});