From f0c57a7f9c7bfefb296d76513f1fb91953f3ea11 Mon Sep 17 00:00:00 2001 From: shuaiplus <2327005759@qq.com> Date: Mon, 9 Mar 2026 00:25:34 +0800 Subject: [PATCH] feat: implement NotificationsHub for real-time vault sync notifications - Added NotificationsHub durable object to handle WebSocket connections for vault sync notifications. - Integrated SignalR protocol for message framing and communication. - Updated storage service methods to return revision date and user ID for vault sync notifications. - Enhanced existing handlers (attachments, ciphers, folders, sends, and import) to notify users of vault sync events. - Created new notifications handler for WebSocket negotiation and binding user IDs. - Updated frontend to establish WebSocket connection for receiving vault sync notifications. - Improved CORS headers to support new notification endpoints. - Bumped wrangler version in package.json to 4.71.0. --- package-lock.json | 78 +++---- package.json | 2 +- src/durable/notifications-hub.ts | 365 +++++++++++++++++++++++++++++++ src/handlers/attachments.ts | 26 ++- src/handlers/ciphers.ts | 37 +++- src/handlers/folders.ts | 20 +- src/handlers/import.ts | 5 +- src/handlers/notifications.ts | 61 ++++++ src/handlers/sends.ts | 44 +++- src/index.ts | 3 + src/router.ts | 22 +- src/services/storage.ts | 13 +- src/types/index.ts | 1 + src/utils/device.ts | 4 + src/utils/response.ts | 30 ++- webapp/src/App.tsx | 127 +++++++++++ wrangler.kv.toml | 8 + wrangler.toml | 9 + 18 files changed, 779 insertions(+), 76 deletions(-) create mode 100644 src/durable/notifications-hub.ts create mode 100644 src/handlers/notifications.ts diff --git a/package-lock.json b/package-lock.json index 8528c46..6ad70c2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,7 +25,7 @@ "tsx": "^4.21.0", "typescript": "^5.9.3", "vite": "^7.3.1", - "wrangler": "^4.69.0" + "wrangler": "^4.71.0" } }, "node_modules/@babel/code-frame": { @@ -386,14 +386,14 @@ } }, "node_modules/@cloudflare/unenv-preset": { - "version": "2.14.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/unenv-preset/-/unenv-preset-2.14.0.tgz", - "integrity": "sha512-XKAkWhi1nBdNsSEoNG9nkcbyvfUrSjSf+VYVPfOto3gLTZVc3F4g6RASCMh6IixBKCG2yDgZKQIHGKtjcnLnKg==", + "version": "2.15.0", + "resolved": "https://registry.npmmirror.com/@cloudflare/unenv-preset/-/unenv-preset-2.15.0.tgz", + "integrity": "sha512-EGYmJaGZKWl+X8tXxcnx4v2bOZSjQeNI5dWFeXivgX9+YCT69AkzHHwlNbVpqtEUTbew8eQurpyOpeN8fg00nw==", "dev": true, "license": "MIT OR Apache-2.0", "peerDependencies": { "unenv": "2.0.0-rc.24", - "workerd": "^1.20260218.0" + "workerd": "1.20260301.1 || ~1.20260302.1 || ~1.20260303.1 || ~1.20260304.1 || >1.20260305.0 <2.0.0-0" }, "peerDependenciesMeta": { "workerd": { @@ -402,9 +402,9 @@ } }, "node_modules/@cloudflare/workerd-darwin-64": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20260305.0.tgz", - "integrity": "sha512-chhKOpymo0Eh9J3nymrauMqKGboCc4uz/j0gA1G4gioMnKsN2ZDKJ+qjRZDnCoVGy8u2C4pxlmyIfsXCAfIzhQ==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-darwin-64/-/workerd-darwin-64-1.20260301.1.tgz", + "integrity": "sha512-+kJvwociLrvy1JV9BAvoSVsMEIYD982CpFmo/yMEvBwxDIjltYsLTE8DLi0mCkGsQ8Ygidv2fD9wavzXeiY7OQ==", "cpu": [ "x64" ], @@ -419,9 +419,9 @@ } }, "node_modules/@cloudflare/workerd-darwin-arm64": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20260305.0.tgz", - "integrity": "sha512-K9aG2OQk5bBfOP+fyGPqLcqZ9OR3ra6uwnxJ8f2mveq2A2LsCI7ZeGxQiAj75Ti80ytH/gJffZIx4Np2JtU3aQ==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-darwin-arm64/-/workerd-darwin-arm64-1.20260301.1.tgz", + "integrity": "sha512-PPIetY3e67YBr9O4UhILK8nbm5TqUDl14qx4rwFNrRSBOvlzuczzbd4BqgpAtbGVFxKp1PWpjAnBvGU/OI/tLQ==", "cpu": [ "arm64" ], @@ -436,9 +436,9 @@ } }, "node_modules/@cloudflare/workerd-linux-64": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20260305.0.tgz", - "integrity": "sha512-tt7XUoIw/cYFeGbkPkcZ6XX1aZm26Aju/4ih+DXxOosbBeGshFSrNJDBfAKKOvkjsAZymJ+WWVDBU+hmNaGfwA==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-linux-64/-/workerd-linux-64-1.20260301.1.tgz", + "integrity": "sha512-Gu5vaVTZuYl3cHa+u5CDzSVDBvSkfNyuAHi6Mdfut7TTUdcb3V5CIcR/mXRSyMXzEy9YxEWIfdKMxOMBjupvYQ==", "cpu": [ "x64" ], @@ -453,9 +453,9 @@ } }, "node_modules/@cloudflare/workerd-linux-arm64": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20260305.0.tgz", - "integrity": "sha512-72QTkY5EzylmvCZ8ZTrnJ9DctmQsfSof1OKyOWqu/pv/B2yACfuPMikq8RpPxvVu7hhS0ztGP6ZvXz72Htq4Zg==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-linux-arm64/-/workerd-linux-arm64-1.20260301.1.tgz", + "integrity": "sha512-igL1pkyCXW6GiGpjdOAvqMi87UW0LMc/+yIQe/CSzuZJm5GzXoAMrwVTkCFnikk6JVGELrM5x0tGYlxa0sk5Iw==", "cpu": [ "arm64" ], @@ -470,9 +470,9 @@ } }, "node_modules/@cloudflare/workerd-windows-64": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20260305.0.tgz", - "integrity": "sha512-BA0uaQPOaI2F6mJtBDqplGnQQhpXCzwEMI33p/TnDxtSk9u8CGIfBFuI6uqo8mJ6ijIaPjeBLGOn2CiRMET4qg==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20260301.1.tgz", + "integrity": "sha512-Q0wMJ4kcujXILwQKQFc1jaYamVsNvjuECzvRrTI8OxGFMx2yq9aOsswViE4X1gaS2YQQ5u0JGwuGi5WdT1Lt7A==", "cpu": [ "x64" ], @@ -2573,16 +2573,16 @@ } }, "node_modules/miniflare": { - "version": "4.20260305.0", - "resolved": "https://registry.npmmirror.com/miniflare/-/miniflare-4.20260305.0.tgz", - "integrity": "sha512-jVhtKJtiwaZa3rI+WgoLvSJmEazDsoUmAPYRUmEe2VO6VSbvkhbnDRm+dsPbYRatgNIExwrpqG1rv96jHiSb0w==", + "version": "4.20260301.1", + "resolved": "https://registry.npmmirror.com/miniflare/-/miniflare-4.20260301.1.tgz", + "integrity": "sha512-fqkHx0QMKswRH9uqQQQOU/RoaS3Wjckxy3CUX3YGJr0ZIMu7ObvI+NovdYi6RIsSPthNtq+3TPmRNxjeRiasog==", "dev": true, "license": "MIT", "dependencies": { "@cspotcode/source-map-support": "0.8.1", "sharp": "^0.34.5", "undici": "7.18.2", - "workerd": "1.20260305.0", + "workerd": "1.20260301.1", "ws": "8.18.0", "youch": "4.1.0-beta.10" }, @@ -3145,9 +3145,9 @@ } }, "node_modules/workerd": { - "version": "1.20260305.0", - "resolved": "https://registry.npmmirror.com/workerd/-/workerd-1.20260305.0.tgz", - "integrity": "sha512-JkhfCLU+w+KbQmZ9k49IcDYc78GBo7eG8Mir8E2+KVjR7otQAmpcLlsous09YLh8WQ3Bt3Mi6/WMStvMAPukeA==", + "version": "1.20260301.1", + "resolved": "https://registry.npmmirror.com/workerd/-/workerd-1.20260301.1.tgz", + "integrity": "sha512-oterQ1IFd3h7PjCfT4znSFOkJCvNQ6YMOyZ40YsnO3nrSpgB4TbJVYWFOnyJAw71/RQuupfVqZZWKvsy8GO3fw==", "dev": true, "hasInstallScript": true, "license": "Apache-2.0", @@ -3159,11 +3159,11 @@ "node": ">=16" }, "optionalDependencies": { - "@cloudflare/workerd-darwin-64": "1.20260305.0", - "@cloudflare/workerd-darwin-arm64": "1.20260305.0", - "@cloudflare/workerd-linux-64": "1.20260305.0", - "@cloudflare/workerd-linux-arm64": "1.20260305.0", - "@cloudflare/workerd-windows-64": "1.20260305.0" + "@cloudflare/workerd-darwin-64": "1.20260301.1", + "@cloudflare/workerd-darwin-arm64": "1.20260301.1", + "@cloudflare/workerd-linux-64": "1.20260301.1", + "@cloudflare/workerd-linux-arm64": "1.20260301.1", + "@cloudflare/workerd-windows-64": "1.20260301.1" } }, "node_modules/wouter": { @@ -3181,20 +3181,20 @@ } }, "node_modules/wrangler": { - "version": "4.69.0", - "resolved": "https://registry.npmmirror.com/wrangler/-/wrangler-4.69.0.tgz", - "integrity": "sha512-EmVfIM65I5b4ITHe3Y9R7zQyf4NUBQ1leStakMlWiVR9n6VlDwuEltyQI2l3i0JciDnWyR3uqe+T6C08ivniTQ==", + "version": "4.71.0", + "resolved": "https://registry.npmmirror.com/wrangler/-/wrangler-4.71.0.tgz", + "integrity": "sha512-j6pSGAncOLNQDRzqtp0EqzYj52CldDP7uz/C9cxVrIgqa5p+cc0b4pIwnapZZAGv9E1Loa3tmPD0aXonH7KTkw==", "dev": true, "license": "MIT OR Apache-2.0", "dependencies": { "@cloudflare/kv-asset-handler": "0.4.2", - "@cloudflare/unenv-preset": "2.14.0", + "@cloudflare/unenv-preset": "2.15.0", "blake3-wasm": "2.1.5", "esbuild": "0.27.3", - "miniflare": "4.20260305.0", + "miniflare": "4.20260301.1", "path-to-regexp": "6.3.0", "unenv": "2.0.0-rc.24", - "workerd": "1.20260305.0" + "workerd": "1.20260301.1" }, "bin": { "wrangler": "bin/wrangler.js", @@ -3207,7 +3207,7 @@ "fsevents": "~2.3.2" }, "peerDependencies": { - "@cloudflare/workers-types": "^4.20260305.0" + "@cloudflare/workers-types": "^4.20260226.1" }, "peerDependenciesMeta": { "@cloudflare/workers-types": { diff --git a/package.json b/package.json index 968410c..8335e59 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "tsx": "^4.21.0", "typescript": "^5.9.3", "vite": "^7.3.1", - "wrangler": "^4.69.0" + "wrangler": "^4.71.0" }, "dependencies": { "@noble/hashes": "^2.0.1", diff --git a/src/durable/notifications-hub.ts b/src/durable/notifications-hub.ts new file mode 100644 index 0000000..6b3a0aa --- /dev/null +++ b/src/durable/notifications-hub.ts @@ -0,0 +1,365 @@ +import type { Env } from '../types'; + +const SIGNALR_RECORD_SEPARATOR = 0x1e; +const SIGNALR_HANDSHAKE_ACK = new Uint8Array([0x7b, 0x7d, SIGNALR_RECORD_SEPARATOR]); +const SIGNALR_UPDATE_TYPE_SYNC_VAULT = 5; +const SIGNALR_PING_INTERVAL_MS = 15_000; + +type HubProtocol = 'json' | 'messagepack'; + +interface ConnectionState { + handshakeComplete: boolean; + protocol: HubProtocol; +} + +function concatBytes(chunks: Uint8Array[]): Uint8Array { + const total = chunks.reduce((sum, chunk) => sum + chunk.length, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + out.set(chunk, offset); + offset += chunk.length; + } + return out; +} + +function encodeUtf8(value: string): Uint8Array { + return new TextEncoder().encode(value); +} + +function encodeMsgPackInteger(value: number): Uint8Array { + const normalized = Math.trunc(value); + if (normalized >= 0 && normalized <= 0x7f) { + return new Uint8Array([normalized]); + } + if (normalized >= 0 && normalized <= 0xff) { + return new Uint8Array([0xcc, normalized]); + } + if (normalized >= 0 && normalized <= 0xffff) { + return new Uint8Array([0xcd, normalized >> 8, normalized & 0xff]); + } + const safe = normalized >>> 0; + return new Uint8Array([ + 0xce, + (safe >>> 24) & 0xff, + (safe >>> 16) & 0xff, + (safe >>> 8) & 0xff, + safe & 0xff, + ]); +} + +function encodeMsgPackString(value: string): Uint8Array { + const bytes = encodeUtf8(value); + const len = bytes.length; + if (len < 32) { + return concatBytes([new Uint8Array([0xa0 | len]), bytes]); + } + if (len <= 0xff) { + return concatBytes([new Uint8Array([0xd9, len]), bytes]); + } + return concatBytes([new Uint8Array([0xda, (len >> 8) & 0xff, len & 0xff]), bytes]); +} + +function encodeMsgPackTimestamp(date: Date): Uint8Array { + const seconds = BigInt(Math.floor(date.getTime() / 1000)); + const nanos = BigInt(date.getMilliseconds()) * 1000000n; + const timestamp = (nanos << 34n) | seconds; + const payload = new Uint8Array(8); + for (let i = 7; i >= 0; i--) { + payload[i] = Number((timestamp >> BigInt((7 - i) * 8)) & 0xffn); + } + return concatBytes([new Uint8Array([0xc7, 0x08, 0xff]), payload]); +} + +function encodeMsgPackArray(values: unknown[]): Uint8Array { + const items = values.map(encodeMsgPack); + const len = items.length; + const header = + len < 16 + ? new Uint8Array([0x90 | len]) + : new Uint8Array([0xdc, (len >> 8) & 0xff, len & 0xff]); + return concatBytes([header, ...items]); +} + +function encodeMsgPackMap(value: Record): Uint8Array { + const entries = Object.entries(value); + const len = entries.length; + const header = + len < 16 + ? new Uint8Array([0x80 | len]) + : new Uint8Array([0xde, (len >> 8) & 0xff, len & 0xff]); + const chunks: Uint8Array[] = [header]; + for (const [key, entryValue] of entries) { + chunks.push(encodeMsgPackString(key), encodeMsgPack(entryValue)); + } + return concatBytes(chunks); +} + +function encodeMsgPack(value: unknown): Uint8Array { + if (value === null || value === undefined) return new Uint8Array([0xc0]); + if (value instanceof Date) return encodeMsgPackTimestamp(value); + if (typeof value === 'string') return encodeMsgPackString(value); + if (typeof value === 'number') return encodeMsgPackInteger(value); + if (typeof value === 'boolean') return new Uint8Array([value ? 0xc3 : 0xc2]); + if (Array.isArray(value)) return encodeMsgPackArray(value); + if (value instanceof Uint8Array) { + const len = value.length; + if (len <= 0xff) return concatBytes([new Uint8Array([0xc4, len]), value]); + return concatBytes([new Uint8Array([0xc5, (len >> 8) & 0xff, len & 0xff]), value]); + } + return encodeMsgPackMap(value as Record); +} + +function frameSignalRBinary(payload: Uint8Array): Uint8Array { + const len = payload.length; + const prefix: number[] = []; + let value = len; + do { + let current = value & 0x7f; + value >>>= 7; + if (value > 0) current |= 0x80; + prefix.push(current); + } while (value > 0); + return concatBytes([new Uint8Array(prefix), payload]); +} + +function buildSignalRJsonInvocation(userId: string, revisionDate: string, contextId: string | null): string { + return JSON.stringify({ + type: 1, + target: 'ReceiveMessage', + arguments: [ + { + ContextId: contextId, + Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT, + Payload: { + UserId: userId, + Date: revisionDate, + }, + }, + ], + }) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR); +} + +function buildSignalRJsonPing(): string { + return JSON.stringify({ type: 6 }) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR); +} + +function buildSignalRMessagePackInvocation(userId: string, revisionDate: string, contextId: string | null): Uint8Array { + // SignalR MessagePack hub protocol uses an array-based invocation shape: + // [type, headers, invocationId, target, arguments] + const payload = encodeMsgPack([ + 1, + {}, + null, + 'ReceiveMessage', + [ + { + ContextId: contextId, + Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT, + Payload: { + UserId: userId, + Date: new Date(revisionDate), + }, + }, + ], + ]); + return frameSignalRBinary(payload); +} + +function buildSignalRMessagePackPing(): Uint8Array { + return frameSignalRBinary(encodeMsgPack([6])); +} + +function decodeIncomingMessage(data: string | ArrayBuffer | ArrayBufferView): string { + if (typeof data === 'string') return data; + if (data instanceof ArrayBuffer) return new TextDecoder().decode(new Uint8Array(data)); + return new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength)); +} + +export class NotificationsHub { + private readonly connections = new Map(); + private userId = ''; + private pingTimer: ReturnType | null = null; + + constructor(private readonly state: DurableObjectState, private readonly env: Env) { + void this.state; + void this.env; + } + + async fetch(request: Request): Promise { + const url = new URL(request.url); + + if (url.pathname === '/internal/bind-user' && request.method === 'POST') { + const body = (await request.json().catch(() => null)) as { userId?: string } | null; + this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim(); + return new Response(null, { status: 204 }); + } + + if (url.pathname === '/internal/notify' && request.method === 'POST') { + const body = (await request.json().catch(() => null)) as { + revisionDate?: string; + userId?: string; + contextId?: string | null; + } | null; + const revisionDate = String(body?.revisionDate || '').trim() || new Date().toISOString(); + this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim(); + const contextId = String(body?.contextId || '').trim() || null; + this.broadcastVaultSync(revisionDate, contextId); + return new Response(null, { status: 204 }); + } + + if (url.pathname !== '/notifications/hub') { + return new Response('Not found', { status: 404 }); + } + + if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') { + return new Response('Expected websocket', { status: 426 }); + } + + if (!this.userId) { + return new Response('Unauthorized', { status: 401 }); + } + + const pair = new WebSocketPair(); + const client = pair[0]; + const server = pair[1]; + server.accept(); + + this.connections.set(server, { + handshakeComplete: false, + protocol: 'messagepack', + }); + this.ensurePingLoop(); + + server.addEventListener('message', (event) => { + void this.handleSocketMessage(server, event.data); + }); + server.addEventListener('close', () => { + this.connections.delete(server); + this.stopPingLoopIfIdle(); + }); + server.addEventListener('error', () => { + this.connections.delete(server); + this.stopPingLoopIfIdle(); + try { + server.close(1011, 'Socket error'); + } catch { + // ignore close races + } + }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + private async handleSocketMessage(socket: WebSocket, rawData: string | ArrayBuffer | ArrayBufferView): Promise { + const connection = this.connections.get(socket); + if (!connection) return; + + if (!connection.handshakeComplete) { + const text = decodeIncomingMessage(rawData); + const frames = text.split(String.fromCharCode(SIGNALR_RECORD_SEPARATOR)).filter(Boolean); + for (const frame of frames) { + try { + const handshake = JSON.parse(frame) as { protocol?: string }; + const protocol = handshake.protocol === 'json' ? 'json' : 'messagepack'; + connection.protocol = protocol; + connection.handshakeComplete = true; + socket.send(SIGNALR_HANDSHAKE_ACK); + return; + } catch { + // Ignore malformed pre-handshake payloads. + } + } + return; + } + } + + private ensurePingLoop(): void { + if (this.pingTimer !== null) return; + this.pingTimer = setInterval(() => { + this.broadcastPing(); + }, SIGNALR_PING_INTERVAL_MS); + } + + private stopPingLoopIfIdle(): void { + if (this.connections.size > 0 || this.pingTimer === null) return; + clearInterval(this.pingTimer); + this.pingTimer = null; + } + + private broadcastPing(): void { + if (this.connections.size === 0) { + this.stopPingLoopIfIdle(); + return; + } + + for (const [socket, connection] of this.connections) { + if (!connection.handshakeComplete) continue; + try { + if (connection.protocol === 'json') { + socket.send(buildSignalRJsonPing()); + } else { + socket.send(buildSignalRMessagePackPing()); + } + } catch { + this.connections.delete(socket); + try { + socket.close(1011, 'Ping send failed'); + } catch { + // ignore close races + } + } + } + + this.stopPingLoopIfIdle(); + } + + private broadcastVaultSync(revisionDate: string, contextId: string | null): void { + if (!this.userId || this.connections.size === 0) return; + + for (const [socket, connection] of this.connections) { + if (!connection.handshakeComplete) continue; + try { + if (connection.protocol === 'json') { + socket.send(buildSignalRJsonInvocation(this.userId, revisionDate, contextId)); + } else { + socket.send(buildSignalRMessagePackInvocation(this.userId, revisionDate, contextId)); + } + } catch { + this.connections.delete(socket); + try { + socket.close(1011, 'Notification send failed'); + } catch { + // ignore close races + } + } + } + + this.stopPingLoopIfIdle(); + } +} + +export async function notifyUserVaultSync( + env: Env, + userId: string, + revisionDate: string, + contextId?: string | null +): Promise { + try { + const id = env.NOTIFICATIONS_HUB.idFromName(userId); + const stub = env.NOTIFICATIONS_HUB.get(id); + await stub.fetch('https://notifications/internal/notify', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-NodeWarden-UserId': userId, + }, + body: JSON.stringify({ revisionDate, contextId: contextId || null }), + }); + } catch (error) { + console.error('Failed to broadcast vault sync notification:', error); + } +} diff --git a/src/handlers/attachments.ts b/src/handlers/attachments.ts index 06d5c02..caf29f9 100644 --- a/src/handlers/attachments.ts +++ b/src/handlers/attachments.ts @@ -1,10 +1,12 @@ import { Env, Attachment, DEFAULT_DEV_SECRET } from '../types'; +import { notifyUserVaultSync } from '../durable/notifications-hub'; import { StorageService } from '../services/storage'; import { jsonResponse, errorResponse } from '../utils/response'; import { generateUUID } from '../utils/uuid'; import { createFileDownloadToken, verifyFileDownloadToken } from '../utils/jwt'; import { cipherToResponse, shouldOmitPasskeysForResponse } from './ciphers'; import { LIMITS } from '../config/limits'; +import { readActingDeviceIdentifier } from '../utils/device'; import { deleteBlobObject, getAttachmentObjectKey, @@ -13,6 +15,15 @@ import { putBlobObject, } from '../services/blob-store'; +async function notifyVaultSyncForRequest( + request: Request, + env: Env, + userId: string, + revisionDate: string +): Promise { + await notifyUserVaultSync(env, userId, revisionDate, readActingDeviceIdentifier(request)); +} + // Format file size to human readable function formatSize(bytes: number): string { if (bytes < 1024) return `${bytes} Bytes`; @@ -73,7 +84,10 @@ export async function handleCreateAttachment( await storage.addAttachmentToCipher(cipherId, attachmentId); // Update cipher revision date - await storage.updateCipherRevisionDate(cipherId); + const revisionInfo = await storage.updateCipherRevisionDate(cipherId); + if (revisionInfo) { + await notifyVaultSyncForRequest(request, env, revisionInfo.userId, revisionInfo.revisionDate); + } // Get updated cipher for response const updatedCipher = await storage.getCipher(cipherId); @@ -165,7 +179,10 @@ export async function handleUploadAttachment( } // Update cipher revision date - await storage.updateCipherRevisionDate(cipherId); + const revisionInfo = await storage.updateCipherRevisionDate(cipherId); + if (revisionInfo) { + await notifyVaultSyncForRequest(request, env, revisionInfo.userId, revisionInfo.revisionDate); + } return new Response(null, { status: 200 }); } @@ -304,7 +321,10 @@ export async function handleDeleteAttachment( await storage.removeAttachmentFromCipher(cipherId, attachmentId); // Update cipher revision date - await storage.updateCipherRevisionDate(cipherId); + const revisionInfo = await storage.updateCipherRevisionDate(cipherId); + if (revisionInfo) { + await notifyVaultSyncForRequest(request, env, revisionInfo.userId, revisionInfo.revisionDate); + } // Get updated cipher for response const updatedCipher = await storage.getCipher(cipherId); diff --git a/src/handlers/ciphers.ts b/src/handlers/ciphers.ts index 318a69e..7504802 100644 --- a/src/handlers/ciphers.ts +++ b/src/handlers/ciphers.ts @@ -1,9 +1,20 @@ import { Env, Cipher, CipherResponse, Attachment } from '../types'; import { StorageService } from '../services/storage'; +import { notifyUserVaultSync } from '../durable/notifications-hub'; import { jsonResponse, errorResponse } from '../utils/response'; import { generateUUID } from '../utils/uuid'; import { deleteAllAttachmentsForCipher } from './attachments'; import { parsePagination, encodeContinuationToken } from '../utils/pagination'; +import { readActingDeviceIdentifier } from '../utils/device'; + +async function notifyVaultSyncForRequest( + request: Request, + env: Env, + userId: string, + revisionDate: string +): Promise { + await notifyUserVaultSync(env, userId, revisionDate, readActingDeviceIdentifier(request)); +} function getAliasedProp(source: any, aliases: string[]): { present: boolean; value: any } { if (!source || typeof source !== 'object') return { present: false, value: undefined }; @@ -276,7 +287,8 @@ export async function handleCreateCipher(request: Request, env: Env, userId: str } await storage.saveCipher(cipher); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse( cipherToResponse(cipher, [], { @@ -342,7 +354,8 @@ export async function handleUpdateCipher(request: Request, env: Env, userId: str } await storage.saveCipher(cipher); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse( cipherToResponse(cipher, [], { @@ -364,7 +377,8 @@ export async function handleDeleteCipher(request: Request, env: Env, userId: str cipher.deletedAt = new Date().toISOString(); cipher.updatedAt = cipher.deletedAt; await storage.saveCipher(cipher); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse( cipherToResponse(cipher, [], { @@ -389,7 +403,8 @@ export async function handleDeleteCipherCompat(request: Request, env: Env, userI if (cipher.deletedAt) { await deleteAllAttachmentsForCipher(env, id); await storage.deleteCipher(id, userId); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return new Response(null, { status: 204 }); } @@ -409,7 +424,8 @@ export async function handlePermanentDeleteCipher(request: Request, env: Env, us await deleteAllAttachmentsForCipher(env, id); await storage.deleteCipher(id, userId); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return new Response(null, { status: 204 }); } @@ -426,7 +442,8 @@ export async function handleRestoreCipher(request: Request, env: Env, userId: st cipher.deletedAt = null; cipher.updatedAt = new Date().toISOString(); await storage.saveCipher(cipher); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse( cipherToResponse(cipher, [], { @@ -464,7 +481,8 @@ export async function handlePartialUpdateCipher(request: Request, env: Env, user cipher.updatedAt = new Date().toISOString(); await storage.saveCipher(cipher); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse( cipherToResponse(cipher, [], { @@ -493,7 +511,10 @@ export async function handleBulkMoveCiphers(request: Request, env: Env, userId: if (!folderOk) return errorResponse('Folder not found', 404); } - await storage.bulkMoveCiphers(body.ids, body.folderId || null, userId); + const revisionDate = await storage.bulkMoveCiphers(body.ids, body.folderId || null, userId); + if (revisionDate) { + await notifyVaultSyncForRequest(request, env, userId, revisionDate); + } return new Response(null, { status: 204 }); } diff --git a/src/handlers/folders.ts b/src/handlers/folders.ts index 9c20870..a569b27 100644 --- a/src/handlers/folders.ts +++ b/src/handlers/folders.ts @@ -1,9 +1,20 @@ import { Env, Folder, FolderResponse } from '../types'; +import { notifyUserVaultSync } from '../durable/notifications-hub'; import { StorageService } from '../services/storage'; import { jsonResponse, errorResponse } from '../utils/response'; +import { readActingDeviceIdentifier } from '../utils/device'; import { generateUUID } from '../utils/uuid'; import { parsePagination, encodeContinuationToken } from '../utils/pagination'; +async function notifyVaultSyncForRequest( + request: Request, + env: Env, + userId: string, + revisionDate: string +): Promise { + await notifyUserVaultSync(env, userId, revisionDate, readActingDeviceIdentifier(request)); +} + // Convert internal folder to API response format function folderToResponse(folder: Folder): FolderResponse { return { @@ -75,7 +86,8 @@ export async function handleCreateFolder(request: Request, env: Env, userId: str }; await storage.saveFolder(folder); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(folderToResponse(folder), 200); } @@ -102,7 +114,8 @@ export async function handleUpdateFolder(request: Request, env: Env, userId: str folder.updatedAt = new Date().toISOString(); await storage.saveFolder(folder); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(folderToResponse(folder)); } @@ -118,7 +131,8 @@ export async function handleDeleteFolder(request: Request, env: Env, userId: str await storage.clearFolderFromCiphers(userId, id); await storage.deleteFolder(id, userId); - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return new Response(null, { status: 204 }); } diff --git a/src/handlers/import.ts b/src/handlers/import.ts index 51bd31e..7bfc702 100644 --- a/src/handlers/import.ts +++ b/src/handlers/import.ts @@ -1,6 +1,8 @@ import { Env, Cipher, Folder, CipherType } from '../types'; +import { notifyUserVaultSync } from '../durable/notifications-hub'; import { StorageService } from '../services/storage'; import { errorResponse, jsonResponse } from '../utils/response'; +import { readActingDeviceIdentifier } from '../utils/device'; import { generateUUID } from '../utils/uuid'; import { LIMITS } from '../config/limits'; import { normalizeCipherLoginForStorage, normalizeCipherSshKeyForCompatibility } from './ciphers'; @@ -268,7 +270,8 @@ export async function handleCiphersImport(request: Request, env: Env, userId: st } // Update revision date - await storage.updateRevisionDate(userId); + const revisionDate = await storage.updateRevisionDate(userId); + await notifyUserVaultSync(env, userId, revisionDate, readActingDeviceIdentifier(request)); if (returnCipherMap) { return jsonResponse({ diff --git a/src/handlers/notifications.ts b/src/handlers/notifications.ts new file mode 100644 index 0000000..5796b57 --- /dev/null +++ b/src/handlers/notifications.ts @@ -0,0 +1,61 @@ +import { AuthService } from '../services/auth'; +import type { Env } from '../types'; +import { errorResponse, jsonResponse } from '../utils/response'; +import { generateUUID } from '../utils/uuid'; + +function extractAccessToken(request: Request): string | null { + const url = new URL(request.url); + const queryToken = String(url.searchParams.get('access_token') || '').trim(); + if (queryToken) return queryToken; + + const authHeader = String(request.headers.get('Authorization') || '').trim(); + const match = authHeader.match(/^Bearer\s+(.+)$/i); + return match?.[1]?.trim() || null; +} + +async function authenticateNotificationsRequest(request: Request, env: Env): Promise { + const accessToken = extractAccessToken(request); + if (!accessToken) return null; + + const auth = new AuthService(env); + const payload = await auth.verifyAccessToken(`Bearer ${accessToken}`); + return payload?.sub || null; +} + +export async function handleNotificationsNegotiate(request: Request, env: Env): Promise { + const userId = await authenticateNotificationsRequest(request, env); + if (!userId) return errorResponse('Unauthorized', 401); + + const connectionId = generateUUID(); + return jsonResponse({ + connectionId, + connectionToken: connectionId, + negotiateVersion: 1, + availableTransports: [ + { + transport: 'WebSockets', + transferFormats: ['Text', 'Binary'], + }, + ], + }); +} + +export async function handleNotificationsHub(request: Request, env: Env): Promise { + const userId = await authenticateNotificationsRequest(request, env); + if (!userId) return errorResponse('Unauthorized', 401); + if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') { + return errorResponse('Expected websocket', 426); + } + + const id = env.NOTIFICATIONS_HUB.idFromName(userId); + const stub = env.NOTIFICATIONS_HUB.get(id); + await stub.fetch('https://notifications/internal/bind-user', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-NodeWarden-UserId': userId, + }, + body: JSON.stringify({ userId }), + }); + return stub.fetch(request); +} diff --git a/src/handlers/sends.ts b/src/handlers/sends.ts index 3f55337..5119c7d 100644 --- a/src/handlers/sends.ts +++ b/src/handlers/sends.ts @@ -1,7 +1,9 @@ import { Env, Send, SendAuthType, SendResponse, SendType, DEFAULT_DEV_SECRET } from '../types'; +import { notifyUserVaultSync } from '../durable/notifications-hub'; import { StorageService } from '../services/storage'; import { RateLimitService, getClientIdentifier } from '../services/ratelimit'; import { jsonResponse, errorResponse } from '../utils/response'; +import { readActingDeviceIdentifier } from '../utils/device'; import { generateUUID } from '../utils/uuid'; import { parsePagination, encodeContinuationToken } from '../utils/pagination'; import { LIMITS } from '../config/limits'; @@ -23,6 +25,15 @@ const SEND_INACCESSIBLE_MSG = 'Send does not exist or is no longer available'; const SEND_PASSWORD_ITERATIONS = 100_000; const SEND_PASSWORD_LIMIT_SCOPE = 'send-password'; +async function notifyVaultSyncForRequest( + request: Request, + env: Env, + userId: string, + revisionDate: string +): Promise { + await notifyUserVaultSync(env, userId, revisionDate, readActingDeviceIdentifier(request)); +} + function getAliasedProp(source: unknown, aliases: string[]): { present: boolean; value: unknown } { if (!source || typeof source !== 'object') return { present: false, value: undefined }; for (const key of aliases) { @@ -604,7 +615,8 @@ export async function handleCreateSend(request: Request, env: Env, userId: strin } await storage.saveSend(send); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(sendToResponse(send)); } @@ -727,7 +739,8 @@ export async function handleCreateFileSendV2(request: Request, env: Env, userId: } await storage.saveSend(send); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse({ fileUploadType: 0, @@ -835,7 +848,8 @@ export async function handleUploadSendFile( return errorResponse('Attachment storage is not configured', 500); } - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return new Response(null, { status: 200 }); } @@ -981,7 +995,8 @@ export async function handleUpdateSend(request: Request, env: Env, userId: strin send.updatedAt = new Date().toISOString(); await storage.saveSend(send); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(sendToResponse(send)); } @@ -1004,7 +1019,8 @@ export async function handleDeleteSend(request: Request, env: Env, userId: strin } await storage.deleteSend(sendId, userId); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return new Response(null, { status: 200 }); } @@ -1021,7 +1037,8 @@ export async function handleRemoveSendPassword(request: Request, env: Env, userI await setSendPassword(send, null); send.updatedAt = new Date().toISOString(); await storage.saveSend(send); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(sendToResponse(send)); } @@ -1039,7 +1056,8 @@ export async function handleRemoveSendAuth(request: Request, env: Env, userId: s send.emails = null; send.updatedAt = new Date().toISOString(); await storage.saveSend(send); - await storage.updateRevisionDate(userId); + let revisionDate = await storage.updateRevisionDate(userId); + await notifyVaultSyncForRequest(request, env, userId, revisionDate); return jsonResponse(sendToResponse(send)); } @@ -1100,7 +1118,8 @@ export async function handleAccessSend(request: Request, env: Env, accessId: str return errorResponse(SEND_INACCESSIBLE_MSG, 404); } send.accessCount += 1; - await storage.updateRevisionDate(send.userId); + const revisionDate = await storage.updateRevisionDate(send.userId); + await notifyVaultSyncForRequest(request, env, send.userId, revisionDate); } const creatorIdentifier = await getCreatorIdentifier(storage, send); @@ -1173,7 +1192,8 @@ export async function handleAccessSendFile( return errorResponse(SEND_INACCESSIBLE_MSG, 404); } send.accessCount += 1; - await storage.updateRevisionDate(send.userId); + const revisionDate = await storage.updateRevisionDate(send.userId); + await notifyVaultSyncForRequest(request, env, send.userId, revisionDate); const token = await createSendFileDownloadToken(send.id, fileId, secret); const url = new URL(request.url); @@ -1213,7 +1233,8 @@ export async function handleAccessSendV2(request: Request, env: Env): Promise { - if (ids.length === 0) return; + async bulkMoveCiphers(ids: string[], folderId: string | null, userId: string): Promise { + if (ids.length === 0) return null; const now = new Date().toISOString(); const uniqueIds = Array.from(new Set(ids)); const patch = JSON.stringify({ @@ -528,7 +528,7 @@ export class StorageService { .run(); } - await this.updateRevisionDate(userId); + return this.updateRevisionDate(userId); } // --- Folders --- @@ -744,12 +744,13 @@ export class StorageService { await this.db.prepare('DELETE FROM attachments WHERE cipher_id = ?').bind(cipherId).run(); } - async updateCipherRevisionDate(cipherId: string): Promise { + async updateCipherRevisionDate(cipherId: string): Promise<{ userId: string; revisionDate: string } | null> { const cipher = await this.getCipher(cipherId); - if (!cipher) return; + if (!cipher) return null; cipher.updatedAt = new Date().toISOString(); await this.saveCipher(cipher); - await this.updateRevisionDate(cipher.userId); + const revisionDate = await this.updateRevisionDate(cipher.userId); + return { userId: cipher.userId, revisionDate }; } // --- Refresh tokens --- diff --git a/src/types/index.ts b/src/types/index.ts index 71dd79d..5ffdc2d 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,6 +1,7 @@ // Environment bindings export interface Env { DB: D1Database; + NOTIFICATIONS_HUB: DurableObjectNamespace; // Prefer R2 when available. Optional to support KV-only deployments. ATTACHMENTS?: R2Bucket; // Optional fallback for attachment/send file storage (no credit card required). diff --git a/src/utils/device.ts b/src/utils/device.ts index ec3fdb6..ad30730 100644 --- a/src/utils/device.ts +++ b/src/utils/device.ts @@ -72,3 +72,7 @@ export function readKnownDeviceProbe(request: Request): { email: string | null; return { email, deviceIdentifier }; } +export function readActingDeviceIdentifier(request: Request): string | null { + return normalizeDeviceIdentifier(request.headers.get('X-NodeWarden-Acting-Device-Id')); +} + diff --git a/src/utils/response.ts b/src/utils/response.ts index d2bac28..26e787c 100644 --- a/src/utils/response.ts +++ b/src/utils/response.ts @@ -1,7 +1,21 @@ import { LIMITS } from '../config/limits'; const CORS_METHODS = 'GET, POST, PUT, DELETE, PATCH, OPTIONS'; -const CORS_HEADERS = 'Content-Type, Authorization, Accept, Device-Type, Bitwarden-Client-Name, Bitwarden-Client-Version, X-Request-Email, X-Device-Identifier, X-Device-Name'; +const DEFAULT_CORS_HEADERS = [ + 'Content-Type', + 'Authorization', + 'Accept', + 'Device-Type', + 'Device-Identifier', + 'Device-Name', + 'Bitwarden-Client-Name', + 'Bitwarden-Client-Version', + 'Bitwarden-Package-Type', + 'Is-Prerelease', + 'X-Request-Email', + 'X-Device-Identifier', + 'X-Device-Name', +]; function isTrustedClientOrigin(origin: string): boolean { // Official browser extension / desktop-webview common origins. @@ -25,9 +39,15 @@ function getAllowedOrigin(request: Request): string | null { } function buildCorsHeaders(request: Request): Record { + const requestedHeaders = String(request.headers.get('Access-Control-Request-Headers') || '') + .split(',') + .map((value) => value.trim()) + .filter(Boolean); + const allowHeaders = Array.from(new Set([...DEFAULT_CORS_HEADERS, ...requestedHeaders])); + const headers: Record = { 'Access-Control-Allow-Methods': CORS_METHODS, - 'Access-Control-Allow-Headers': CORS_HEADERS, + 'Access-Control-Allow-Headers': allowHeaders.join(', '), 'Access-Control-Max-Age': String(LIMITS.cors.preflightMaxAgeSeconds), }; @@ -44,6 +64,12 @@ export function applyCors( request: Request, response: Response ): Response { + // WebSocket upgrade responses must be returned untouched. + const webSocket = (response as Response & { webSocket?: unknown }).webSocket; + if (response.status === 101 || webSocket) { + return response; + } + const headers = new Headers(response.headers); const corsHeaders = buildCorsHeaders(request); for (const [k, v] of Object.entries(corsHeaders)) { diff --git a/webapp/src/App.tsx b/webapp/src/App.tsx index 59c3389..18bc1eb 100644 --- a/webapp/src/App.tsx +++ b/webapp/src/App.tsx @@ -295,6 +295,37 @@ function buildPublicSendUrl(origin: string, accessId: string, keyPart: string): return `${origin}/#/send/${accessId}/${keyPart}`; } +const SIGNALR_RECORD_SEPARATOR = String.fromCharCode(0x1e); + +interface WebVaultSignalRInvocation { + type?: number; + target?: string; + arguments?: Array<{ + ContextId?: string | null; + Type?: number; + Payload?: { + UserId?: string; + Date?: string; + RevisionDate?: string; + }; + }>; +} + +function parseSignalRTextFrames(raw: string): WebVaultSignalRInvocation[] { + return raw + .split(SIGNALR_RECORD_SEPARATOR) + .map((frame) => frame.trim()) + .filter(Boolean) + .map((frame) => { + try { + return JSON.parse(frame) as WebVaultSignalRInvocation; + } catch { + return null; + } + }) + .filter((frame): frame is WebVaultSignalRInvocation => !!frame); +} + async function deriveSendKeyParts(sendKeyMaterial: Uint8Array): Promise<{ enc: Uint8Array; mac: Uint8Array }> { if (sendKeyMaterial.length >= 64) { return { enc: sendKeyMaterial.slice(0, 32), mac: sendKeyMaterial.slice(32, 64) }; @@ -344,6 +375,7 @@ export default function App() { const [decryptedCiphers, setDecryptedCiphers] = useState([]); const [decryptedSends, setDecryptedSends] = useState([]); const migratedPlainFolderIdsRef = useRef>(new Set()); + const silentRefreshVaultRef = useRef<() => Promise>(async () => {}); useEffect(() => { const syncInviteFromUrl = () => { @@ -953,6 +985,101 @@ export default function App() { pushToast('success', t('txt_vault_synced')); } + async function refreshVaultSilently() { + await Promise.all([ciphersQuery.refetch(), foldersQuery.refetch(), sendsQuery.refetch()]); + } + + silentRefreshVaultRef.current = refreshVaultSilently; + + useEffect(() => { + if (phase !== 'app' || !session?.accessToken || !session?.symEncKey || !session?.symMacKey) return; + + let disposed = false; + let socket: WebSocket | null = null; + let reconnectTimer: number | null = null; + let reconnectAttempts = 0; + + const clearReconnectTimer = () => { + if (reconnectTimer !== null) { + window.clearTimeout(reconnectTimer); + reconnectTimer = null; + } + }; + + const scheduleReconnect = () => { + if (disposed) return; + clearReconnectTimer(); + const delay = Math.min(10000, 1000 * Math.max(1, reconnectAttempts + 1)); + reconnectAttempts += 1; + reconnectTimer = window.setTimeout(() => { + reconnectTimer = null; + connect(); + }, delay); + }; + + const connect = () => { + if (disposed) return; + try { + const hubUrl = new URL('/notifications/hub', window.location.origin); + hubUrl.searchParams.set('access_token', session.accessToken); + hubUrl.protocol = hubUrl.protocol === 'https:' ? 'wss:' : 'ws:'; + socket = new WebSocket(hubUrl.toString()); + } catch { + scheduleReconnect(); + return; + } + + socket.addEventListener('open', () => { + reconnectAttempts = 0; + try { + socket?.send(`{"protocol":"json","version":1}${SIGNALR_RECORD_SEPARATOR}`); + } catch { + socket?.close(); + } + }); + + socket.addEventListener('message', (event) => { + if (disposed) return; + if (typeof event.data !== 'string') return; + + const frames = parseSignalRTextFrames(event.data); + for (const frame of frames) { + if (frame.type !== 1 || frame.target !== 'ReceiveMessage') continue; + const contextId = String(frame.arguments?.[0]?.ContextId || '').trim(); + if (contextId && contextId === getCurrentDeviceIdentifier()) continue; + void silentRefreshVaultRef.current(); + } + }); + + socket.addEventListener('close', () => { + socket = null; + scheduleReconnect(); + }); + + socket.addEventListener('error', () => { + try { + socket?.close(); + } catch { + // ignore close races + } + }); + }; + + connect(); + + return () => { + disposed = true; + clearReconnectTimer(); + if (socket && socket.readyState === WebSocket.OPEN) { + try { + socket.close(); + } catch { + // ignore close races + } + } + }; + }, [phase, session?.accessToken, session?.symEncKey, session?.symMacKey]); + async function refreshAuthorizedDevices() { await authorizedDevicesQuery.refetch(); } diff --git a/wrangler.kv.toml b/wrangler.kv.toml index b11672a..47f6c62 100644 --- a/wrangler.kv.toml +++ b/wrangler.kv.toml @@ -14,5 +14,13 @@ command = "npm run build" binding = "DB" database_name = "nodewarden-db" +[[durable_objects.bindings]] +name = "NOTIFICATIONS_HUB" +class_name = "NotificationsHub" + [[kv_namespaces]] binding = "ATTACHMENTS_KV" + +[[migrations]] +tag = "v1-notifications-hub" +new_sqlite_classes = ["NotificationsHub"] diff --git a/wrangler.toml b/wrangler.toml index 683e772..54ca5f7 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -13,7 +13,16 @@ command = "npm run build" [[d1_databases]] binding = "DB" database_name = "nodewarden-db" +database_id = "d67608e7-c1a4-46ba-97fc-8247b6b55329" + +[[durable_objects.bindings]] +name = "NOTIFICATIONS_HUB" +class_name = "NotificationsHub" [[r2_buckets]] binding = "ATTACHMENTS" bucket_name = "nodewarden-attachments" + +[[migrations]] +tag = "v1-notifications-hub" +new_sqlite_classes = [ "NotificationsHub" ]