feat: add decodeIncomingMessage function and improve webSocketMessage handling

This commit is contained in:
shuaiplus
2026-03-28 15:28:46 +08:00
parent 10707cf902
commit 144d3d9406
+9 -15
View File
@@ -32,6 +32,12 @@ function encodeUtf8(value: string): Uint8Array {
return new TextEncoder().encode(value); return new TextEncoder().encode(value);
} }
function decodeIncomingMessage(data: string | ArrayBuffer | ArrayBufferView): string {
if (typeof data === 'string') return data;
if (data instanceof ArrayBuffer) return new TextDecoder().decode(new Uint8Array(data));
return new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength));
}
function encodeMsgPackInteger(value: number): Uint8Array { function encodeMsgPackInteger(value: number): Uint8Array {
const normalized = Math.trunc(value); const normalized = Math.trunc(value);
if (normalized >= 0 && normalized <= 0x7f) { if (normalized >= 0 && normalized <= 0x7f) {
@@ -254,14 +260,12 @@ export class NotificationsHub extends DurableObject<Env> {
}); });
} }
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> { async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer | ArrayBufferView): Promise<void> {
const attachment = ws.deserializeAttachment() as WsAttachment | null; const attachment = ws.deserializeAttachment() as WsAttachment | null;
if (!attachment) return; if (!attachment) return;
if (!attachment.handshakeComplete) { if (!attachment.handshakeComplete) {
const text = typeof message === 'string' const text = decodeIncomingMessage(message);
? message
: new TextDecoder().decode(new Uint8Array(message));
const frames = text.split(String.fromCharCode(SIGNALR_RECORD_SEPARATOR)).filter(Boolean); const frames = text.split(String.fromCharCode(SIGNALR_RECORD_SEPARATOR)).filter(Boolean);
for (const frame of frames) { for (const frame of frames) {
try { try {
@@ -279,7 +283,7 @@ export class NotificationsHub extends DurableObject<Env> {
return; return;
} }
if (message instanceof ArrayBuffer) { if (typeof message !== 'string') {
try { try {
ws.send(message); ws.send(message);
} catch { } catch {
@@ -291,11 +295,6 @@ export class NotificationsHub extends DurableObject<Env> {
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> { async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> {
const attachment = ws.deserializeAttachment() as WsAttachment | null; const attachment = ws.deserializeAttachment() as WsAttachment | null;
const shouldBroadcast = !!attachment?.handshakeComplete; const shouldBroadcast = !!attachment?.handshakeComplete;
try {
ws.close(code, 'Durable Object is closing WebSocket');
} catch {
// ignore close races
}
if (shouldBroadcast && attachment?.userId) { if (shouldBroadcast && attachment?.userId) {
this.broadcastDeviceStatus(attachment.userId); this.broadcastDeviceStatus(attachment.userId);
} }
@@ -304,11 +303,6 @@ export class NotificationsHub extends DurableObject<Env> {
async webSocketError(ws: WebSocket, error: unknown): Promise<void> { async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
const attachment = ws.deserializeAttachment() as WsAttachment | null; const attachment = ws.deserializeAttachment() as WsAttachment | null;
const shouldBroadcast = !!attachment?.handshakeComplete; const shouldBroadcast = !!attachment?.handshakeComplete;
try {
ws.close(1011, 'Socket error');
} catch {
// ignore close races
}
if (shouldBroadcast && attachment?.userId) { if (shouldBroadcast && attachment?.userId) {
this.broadcastDeviceStatus(attachment.userId); this.broadcastDeviceStatus(attachment.userId);
} }