mirror of
https://github.com/shuaiplus/nodewarden.git
synced 2026-06-20 21:00:41 +00:00
feat: implement NotificationsHub for real-time vault sync notifications
- Added NotificationsHub durable object to handle WebSocket connections for vault sync notifications. - Integrated SignalR protocol for message framing and communication. - Updated storage service methods to return revision date and user ID for vault sync notifications. - Enhanced existing handlers (attachments, ciphers, folders, sends, and import) to notify users of vault sync events. - Created new notifications handler for WebSocket negotiation and binding user IDs. - Updated frontend to establish WebSocket connection for receiving vault sync notifications. - Improved CORS headers to support new notification endpoints. - Bumped wrangler version in package.json to 4.71.0.
This commit is contained in:
@@ -0,0 +1,365 @@
|
||||
import type { Env } from '../types';
|
||||
|
||||
const SIGNALR_RECORD_SEPARATOR = 0x1e;
|
||||
const SIGNALR_HANDSHAKE_ACK = new Uint8Array([0x7b, 0x7d, SIGNALR_RECORD_SEPARATOR]);
|
||||
const SIGNALR_UPDATE_TYPE_SYNC_VAULT = 5;
|
||||
const SIGNALR_PING_INTERVAL_MS = 15_000;
|
||||
|
||||
type HubProtocol = 'json' | 'messagepack';
|
||||
|
||||
interface ConnectionState {
|
||||
handshakeComplete: boolean;
|
||||
protocol: HubProtocol;
|
||||
}
|
||||
|
||||
function concatBytes(chunks: Uint8Array[]): Uint8Array {
|
||||
const total = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
|
||||
const out = new Uint8Array(total);
|
||||
let offset = 0;
|
||||
for (const chunk of chunks) {
|
||||
out.set(chunk, offset);
|
||||
offset += chunk.length;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function encodeUtf8(value: string): Uint8Array {
|
||||
return new TextEncoder().encode(value);
|
||||
}
|
||||
|
||||
function encodeMsgPackInteger(value: number): Uint8Array {
|
||||
const normalized = Math.trunc(value);
|
||||
if (normalized >= 0 && normalized <= 0x7f) {
|
||||
return new Uint8Array([normalized]);
|
||||
}
|
||||
if (normalized >= 0 && normalized <= 0xff) {
|
||||
return new Uint8Array([0xcc, normalized]);
|
||||
}
|
||||
if (normalized >= 0 && normalized <= 0xffff) {
|
||||
return new Uint8Array([0xcd, normalized >> 8, normalized & 0xff]);
|
||||
}
|
||||
const safe = normalized >>> 0;
|
||||
return new Uint8Array([
|
||||
0xce,
|
||||
(safe >>> 24) & 0xff,
|
||||
(safe >>> 16) & 0xff,
|
||||
(safe >>> 8) & 0xff,
|
||||
safe & 0xff,
|
||||
]);
|
||||
}
|
||||
|
||||
function encodeMsgPackString(value: string): Uint8Array {
|
||||
const bytes = encodeUtf8(value);
|
||||
const len = bytes.length;
|
||||
if (len < 32) {
|
||||
return concatBytes([new Uint8Array([0xa0 | len]), bytes]);
|
||||
}
|
||||
if (len <= 0xff) {
|
||||
return concatBytes([new Uint8Array([0xd9, len]), bytes]);
|
||||
}
|
||||
return concatBytes([new Uint8Array([0xda, (len >> 8) & 0xff, len & 0xff]), bytes]);
|
||||
}
|
||||
|
||||
function encodeMsgPackTimestamp(date: Date): Uint8Array {
|
||||
const seconds = BigInt(Math.floor(date.getTime() / 1000));
|
||||
const nanos = BigInt(date.getMilliseconds()) * 1000000n;
|
||||
const timestamp = (nanos << 34n) | seconds;
|
||||
const payload = new Uint8Array(8);
|
||||
for (let i = 7; i >= 0; i--) {
|
||||
payload[i] = Number((timestamp >> BigInt((7 - i) * 8)) & 0xffn);
|
||||
}
|
||||
return concatBytes([new Uint8Array([0xc7, 0x08, 0xff]), payload]);
|
||||
}
|
||||
|
||||
function encodeMsgPackArray(values: unknown[]): Uint8Array {
|
||||
const items = values.map(encodeMsgPack);
|
||||
const len = items.length;
|
||||
const header =
|
||||
len < 16
|
||||
? new Uint8Array([0x90 | len])
|
||||
: new Uint8Array([0xdc, (len >> 8) & 0xff, len & 0xff]);
|
||||
return concatBytes([header, ...items]);
|
||||
}
|
||||
|
||||
function encodeMsgPackMap(value: Record<string, unknown>): Uint8Array {
|
||||
const entries = Object.entries(value);
|
||||
const len = entries.length;
|
||||
const header =
|
||||
len < 16
|
||||
? new Uint8Array([0x80 | len])
|
||||
: new Uint8Array([0xde, (len >> 8) & 0xff, len & 0xff]);
|
||||
const chunks: Uint8Array[] = [header];
|
||||
for (const [key, entryValue] of entries) {
|
||||
chunks.push(encodeMsgPackString(key), encodeMsgPack(entryValue));
|
||||
}
|
||||
return concatBytes(chunks);
|
||||
}
|
||||
|
||||
function encodeMsgPack(value: unknown): Uint8Array {
|
||||
if (value === null || value === undefined) return new Uint8Array([0xc0]);
|
||||
if (value instanceof Date) return encodeMsgPackTimestamp(value);
|
||||
if (typeof value === 'string') return encodeMsgPackString(value);
|
||||
if (typeof value === 'number') return encodeMsgPackInteger(value);
|
||||
if (typeof value === 'boolean') return new Uint8Array([value ? 0xc3 : 0xc2]);
|
||||
if (Array.isArray(value)) return encodeMsgPackArray(value);
|
||||
if (value instanceof Uint8Array) {
|
||||
const len = value.length;
|
||||
if (len <= 0xff) return concatBytes([new Uint8Array([0xc4, len]), value]);
|
||||
return concatBytes([new Uint8Array([0xc5, (len >> 8) & 0xff, len & 0xff]), value]);
|
||||
}
|
||||
return encodeMsgPackMap(value as Record<string, unknown>);
|
||||
}
|
||||
|
||||
function frameSignalRBinary(payload: Uint8Array): Uint8Array {
|
||||
const len = payload.length;
|
||||
const prefix: number[] = [];
|
||||
let value = len;
|
||||
do {
|
||||
let current = value & 0x7f;
|
||||
value >>>= 7;
|
||||
if (value > 0) current |= 0x80;
|
||||
prefix.push(current);
|
||||
} while (value > 0);
|
||||
return concatBytes([new Uint8Array(prefix), payload]);
|
||||
}
|
||||
|
||||
function buildSignalRJsonInvocation(userId: string, revisionDate: string, contextId: string | null): string {
|
||||
return JSON.stringify({
|
||||
type: 1,
|
||||
target: 'ReceiveMessage',
|
||||
arguments: [
|
||||
{
|
||||
ContextId: contextId,
|
||||
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
|
||||
Payload: {
|
||||
UserId: userId,
|
||||
Date: revisionDate,
|
||||
},
|
||||
},
|
||||
],
|
||||
}) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR);
|
||||
}
|
||||
|
||||
function buildSignalRJsonPing(): string {
|
||||
return JSON.stringify({ type: 6 }) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR);
|
||||
}
|
||||
|
||||
function buildSignalRMessagePackInvocation(userId: string, revisionDate: string, contextId: string | null): Uint8Array {
|
||||
// SignalR MessagePack hub protocol uses an array-based invocation shape:
|
||||
// [type, headers, invocationId, target, arguments]
|
||||
const payload = encodeMsgPack([
|
||||
1,
|
||||
{},
|
||||
null,
|
||||
'ReceiveMessage',
|
||||
[
|
||||
{
|
||||
ContextId: contextId,
|
||||
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
|
||||
Payload: {
|
||||
UserId: userId,
|
||||
Date: new Date(revisionDate),
|
||||
},
|
||||
},
|
||||
],
|
||||
]);
|
||||
return frameSignalRBinary(payload);
|
||||
}
|
||||
|
||||
function buildSignalRMessagePackPing(): Uint8Array {
|
||||
return frameSignalRBinary(encodeMsgPack([6]));
|
||||
}
|
||||
|
||||
function decodeIncomingMessage(data: string | ArrayBuffer | ArrayBufferView): string {
|
||||
if (typeof data === 'string') return data;
|
||||
if (data instanceof ArrayBuffer) return new TextDecoder().decode(new Uint8Array(data));
|
||||
return new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength));
|
||||
}
|
||||
|
||||
export class NotificationsHub {
|
||||
private readonly connections = new Map<WebSocket, ConnectionState>();
|
||||
private userId = '';
|
||||
private pingTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
constructor(private readonly state: DurableObjectState, private readonly env: Env) {
|
||||
void this.state;
|
||||
void this.env;
|
||||
}
|
||||
|
||||
async fetch(request: Request): Promise<Response> {
|
||||
const url = new URL(request.url);
|
||||
|
||||
if (url.pathname === '/internal/bind-user' && request.method === 'POST') {
|
||||
const body = (await request.json().catch(() => null)) as { userId?: string } | null;
|
||||
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
|
||||
if (url.pathname === '/internal/notify' && request.method === 'POST') {
|
||||
const body = (await request.json().catch(() => null)) as {
|
||||
revisionDate?: string;
|
||||
userId?: string;
|
||||
contextId?: string | null;
|
||||
} | null;
|
||||
const revisionDate = String(body?.revisionDate || '').trim() || new Date().toISOString();
|
||||
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
|
||||
const contextId = String(body?.contextId || '').trim() || null;
|
||||
this.broadcastVaultSync(revisionDate, contextId);
|
||||
return new Response(null, { status: 204 });
|
||||
}
|
||||
|
||||
if (url.pathname !== '/notifications/hub') {
|
||||
return new Response('Not found', { status: 404 });
|
||||
}
|
||||
|
||||
if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') {
|
||||
return new Response('Expected websocket', { status: 426 });
|
||||
}
|
||||
|
||||
if (!this.userId) {
|
||||
return new Response('Unauthorized', { status: 401 });
|
||||
}
|
||||
|
||||
const pair = new WebSocketPair();
|
||||
const client = pair[0];
|
||||
const server = pair[1];
|
||||
server.accept();
|
||||
|
||||
this.connections.set(server, {
|
||||
handshakeComplete: false,
|
||||
protocol: 'messagepack',
|
||||
});
|
||||
this.ensurePingLoop();
|
||||
|
||||
server.addEventListener('message', (event) => {
|
||||
void this.handleSocketMessage(server, event.data);
|
||||
});
|
||||
server.addEventListener('close', () => {
|
||||
this.connections.delete(server);
|
||||
this.stopPingLoopIfIdle();
|
||||
});
|
||||
server.addEventListener('error', () => {
|
||||
this.connections.delete(server);
|
||||
this.stopPingLoopIfIdle();
|
||||
try {
|
||||
server.close(1011, 'Socket error');
|
||||
} catch {
|
||||
// ignore close races
|
||||
}
|
||||
});
|
||||
|
||||
return new Response(null, {
|
||||
status: 101,
|
||||
webSocket: client,
|
||||
});
|
||||
}
|
||||
|
||||
private async handleSocketMessage(socket: WebSocket, rawData: string | ArrayBuffer | ArrayBufferView): Promise<void> {
|
||||
const connection = this.connections.get(socket);
|
||||
if (!connection) return;
|
||||
|
||||
if (!connection.handshakeComplete) {
|
||||
const text = decodeIncomingMessage(rawData);
|
||||
const frames = text.split(String.fromCharCode(SIGNALR_RECORD_SEPARATOR)).filter(Boolean);
|
||||
for (const frame of frames) {
|
||||
try {
|
||||
const handshake = JSON.parse(frame) as { protocol?: string };
|
||||
const protocol = handshake.protocol === 'json' ? 'json' : 'messagepack';
|
||||
connection.protocol = protocol;
|
||||
connection.handshakeComplete = true;
|
||||
socket.send(SIGNALR_HANDSHAKE_ACK);
|
||||
return;
|
||||
} catch {
|
||||
// Ignore malformed pre-handshake payloads.
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private ensurePingLoop(): void {
|
||||
if (this.pingTimer !== null) return;
|
||||
this.pingTimer = setInterval(() => {
|
||||
this.broadcastPing();
|
||||
}, SIGNALR_PING_INTERVAL_MS);
|
||||
}
|
||||
|
||||
private stopPingLoopIfIdle(): void {
|
||||
if (this.connections.size > 0 || this.pingTimer === null) return;
|
||||
clearInterval(this.pingTimer);
|
||||
this.pingTimer = null;
|
||||
}
|
||||
|
||||
private broadcastPing(): void {
|
||||
if (this.connections.size === 0) {
|
||||
this.stopPingLoopIfIdle();
|
||||
return;
|
||||
}
|
||||
|
||||
for (const [socket, connection] of this.connections) {
|
||||
if (!connection.handshakeComplete) continue;
|
||||
try {
|
||||
if (connection.protocol === 'json') {
|
||||
socket.send(buildSignalRJsonPing());
|
||||
} else {
|
||||
socket.send(buildSignalRMessagePackPing());
|
||||
}
|
||||
} catch {
|
||||
this.connections.delete(socket);
|
||||
try {
|
||||
socket.close(1011, 'Ping send failed');
|
||||
} catch {
|
||||
// ignore close races
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.stopPingLoopIfIdle();
|
||||
}
|
||||
|
||||
private broadcastVaultSync(revisionDate: string, contextId: string | null): void {
|
||||
if (!this.userId || this.connections.size === 0) return;
|
||||
|
||||
for (const [socket, connection] of this.connections) {
|
||||
if (!connection.handshakeComplete) continue;
|
||||
try {
|
||||
if (connection.protocol === 'json') {
|
||||
socket.send(buildSignalRJsonInvocation(this.userId, revisionDate, contextId));
|
||||
} else {
|
||||
socket.send(buildSignalRMessagePackInvocation(this.userId, revisionDate, contextId));
|
||||
}
|
||||
} catch {
|
||||
this.connections.delete(socket);
|
||||
try {
|
||||
socket.close(1011, 'Notification send failed');
|
||||
} catch {
|
||||
// ignore close races
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.stopPingLoopIfIdle();
|
||||
}
|
||||
}
|
||||
|
||||
export async function notifyUserVaultSync(
|
||||
env: Env,
|
||||
userId: string,
|
||||
revisionDate: string,
|
||||
contextId?: string | null
|
||||
): Promise<void> {
|
||||
try {
|
||||
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
|
||||
const stub = env.NOTIFICATIONS_HUB.get(id);
|
||||
await stub.fetch('https://notifications/internal/notify', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-NodeWarden-UserId': userId,
|
||||
},
|
||||
body: JSON.stringify({ revisionDate, contextId: contextId || null }),
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Failed to broadcast vault sync notification:', error);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user