feat(notifications): enhance NotificationsHub with device status updates and logout notifications

This commit is contained in:
shuaiplus
2026-03-09 01:21:39 +08:00
parent 1285f6296e
commit bc5efbf2fd
8 changed files with 187 additions and 40 deletions
+104 -16
View File
@@ -3,6 +3,8 @@ import type { Env } from '../types';
const SIGNALR_RECORD_SEPARATOR = 0x1e;
const SIGNALR_HANDSHAKE_ACK = new Uint8Array([0x7b, 0x7d, SIGNALR_RECORD_SEPARATOR]);
const SIGNALR_UPDATE_TYPE_SYNC_VAULT = 5;
const SIGNALR_UPDATE_TYPE_LOG_OUT = 11;
const SIGNALR_UPDATE_TYPE_DEVICE_STATUS = 12;
const SIGNALR_PING_INTERVAL_MS = 15_000;
type HubProtocol = 'json' | 'messagepack';
@@ -10,6 +12,7 @@ type HubProtocol = 'json' | 'messagepack';
interface ConnectionState {
handshakeComplete: boolean;
protocol: HubProtocol;
deviceIdentifier: string | null;
}
function concatBytes(chunks: Uint8Array[]): Uint8Array {
@@ -123,14 +126,19 @@ function frameSignalRBinary(payload: Uint8Array): Uint8Array {
return concatBytes([new Uint8Array(prefix), payload]);
}
function buildSignalRJsonInvocation(userId: string, revisionDate: string, contextId: string | null): string {
function buildSignalRJsonInvocation(
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null
): string {
return JSON.stringify({
type: 1,
target: 'ReceiveMessage',
arguments: [
{
ContextId: contextId,
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
Type: updateType,
Payload: {
UserId: userId,
Date: revisionDate,
@@ -144,7 +152,12 @@ function buildSignalRJsonPing(): string {
return JSON.stringify({ type: 6 }) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR);
}
function buildSignalRMessagePackInvocation(userId: string, revisionDate: string, contextId: string | null): Uint8Array {
function buildSignalRMessagePackInvocation(
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null
): Uint8Array {
// SignalR MessagePack hub protocol uses an array-based invocation shape:
// [type, headers, invocationId, target, arguments]
const payload = encodeMsgPack([
@@ -155,7 +168,7 @@ function buildSignalRMessagePackInvocation(userId: string, revisionDate: string,
[
{
ContextId: contextId,
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
Type: updateType,
Payload: {
UserId: userId,
Date: new Date(revisionDate),
@@ -189,25 +202,32 @@ export class NotificationsHub {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/internal/bind-user' && request.method === 'POST') {
const body = (await request.json().catch(() => null)) as { userId?: string } | null;
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
return new Response(null, { status: 204 });
}
if (url.pathname === '/internal/notify' && request.method === 'POST') {
const body = (await request.json().catch(() => null)) as {
revisionDate?: string;
userId?: string;
contextId?: string | null;
updateType?: number;
targetDeviceIdentifier?: string | null;
} | null;
const revisionDate = String(body?.revisionDate || '').trim() || new Date().toISOString();
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
const contextId = String(body?.contextId || '').trim() || null;
this.broadcastVaultSync(revisionDate, contextId);
const updateType = Number(body?.updateType || SIGNALR_UPDATE_TYPE_SYNC_VAULT) || SIGNALR_UPDATE_TYPE_SYNC_VAULT;
const targetDeviceIdentifier = String(body?.targetDeviceIdentifier || '').trim() || null;
this.broadcastMessage(updateType, revisionDate, contextId, targetDeviceIdentifier);
return new Response(null, { status: 204 });
}
if (url.pathname === '/internal/online' && request.method === 'GET') {
return new Response(JSON.stringify({ deviceIdentifiers: this.getOnlineDeviceIdentifiers() }), {
status: 200,
headers: {
'Content-Type': 'application/json',
},
});
}
if (url.pathname !== '/notifications/hub') {
return new Response('Not found', { status: 404 });
}
@@ -216,6 +236,12 @@ export class NotificationsHub {
return new Response('Expected websocket', { status: 426 });
}
const requestUserId = String(url.searchParams.get('nw_uid') || '').trim();
const requestDeviceIdentifier = String(url.searchParams.get('nw_did') || '').trim() || null;
if (requestUserId) {
this.userId = requestUserId;
}
if (!this.userId) {
return new Response('Unauthorized', { status: 401 });
}
@@ -228,6 +254,7 @@ export class NotificationsHub {
this.connections.set(server, {
handshakeComplete: false,
protocol: 'messagepack',
deviceIdentifier: requestDeviceIdentifier,
});
this.ensurePingLoop();
@@ -235,12 +262,16 @@ export class NotificationsHub {
void this.handleSocketMessage(server, event.data);
});
server.addEventListener('close', () => {
const shouldBroadcast = !!this.connections.get(server)?.handshakeComplete;
this.connections.delete(server);
this.stopPingLoopIfIdle();
if (shouldBroadcast) this.broadcastDeviceStatus();
});
server.addEventListener('error', () => {
const shouldBroadcast = !!this.connections.get(server)?.handshakeComplete;
this.connections.delete(server);
this.stopPingLoopIfIdle();
if (shouldBroadcast) this.broadcastDeviceStatus();
try {
server.close(1011, 'Socket error');
} catch {
@@ -268,6 +299,7 @@ export class NotificationsHub {
connection.protocol = protocol;
connection.handshakeComplete = true;
socket.send(SIGNALR_HANDSHAKE_ACK);
this.broadcastDeviceStatus();
return;
} catch {
// Ignore malformed pre-handshake payloads.
@@ -317,16 +349,31 @@ export class NotificationsHub {
this.stopPingLoopIfIdle();
}
private broadcastVaultSync(revisionDate: string, contextId: string | null): void {
private getOnlineDeviceIdentifiers(): string[] {
const out = new Set<string>();
for (const connection of this.connections.values()) {
if (!connection.handshakeComplete || !connection.deviceIdentifier) continue;
out.add(connection.deviceIdentifier);
}
return Array.from(out);
}
private broadcastMessage(
updateType: number,
revisionDate: string,
contextId: string | null,
targetDeviceIdentifier: string | null
): void {
if (!this.userId || this.connections.size === 0) return;
for (const [socket, connection] of this.connections) {
if (!connection.handshakeComplete) continue;
if (targetDeviceIdentifier && connection.deviceIdentifier !== targetDeviceIdentifier) continue;
try {
if (connection.protocol === 'json') {
socket.send(buildSignalRJsonInvocation(this.userId, revisionDate, contextId));
socket.send(buildSignalRJsonInvocation(this.userId, updateType, revisionDate, contextId));
} else {
socket.send(buildSignalRMessagePackInvocation(this.userId, revisionDate, contextId));
socket.send(buildSignalRMessagePackInvocation(this.userId, updateType, revisionDate, contextId));
}
} catch {
this.connections.delete(socket);
@@ -340,6 +387,10 @@ export class NotificationsHub {
this.stopPingLoopIfIdle();
}
private broadcastDeviceStatus(): void {
this.broadcastMessage(SIGNALR_UPDATE_TYPE_DEVICE_STATUS, new Date().toISOString(), null, null);
}
}
export async function notifyUserVaultSync(
@@ -347,6 +398,38 @@ export async function notifyUserVaultSync(
userId: string,
revisionDate: string,
contextId?: string | null
): Promise<void> {
return notifyUserUpdate(env, userId, SIGNALR_UPDATE_TYPE_SYNC_VAULT, revisionDate, contextId ?? null, null);
}
export async function notifyUserLogout(
env: Env,
userId: string,
targetDeviceIdentifier?: string | null
): Promise<void> {
return notifyUserUpdate(env, userId, SIGNALR_UPDATE_TYPE_LOG_OUT, new Date().toISOString(), null, targetDeviceIdentifier ?? null);
}
export async function getOnlineUserDevices(env: Env, userId: string): Promise<string[]> {
try {
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
const stub = env.NOTIFICATIONS_HUB.get(id);
const response = await stub.fetch('https://notifications/internal/online');
if (!response.ok) return [];
const body = (await response.json().catch(() => null)) as { deviceIdentifiers?: string[] } | null;
return Array.isArray(body?.deviceIdentifiers) ? body.deviceIdentifiers.filter((value) => !!String(value || '').trim()) : [];
} catch {
return [];
}
}
async function notifyUserUpdate(
env: Env,
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null,
targetDeviceIdentifier: string | null
): Promise<void> {
try {
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
@@ -357,9 +440,14 @@ export async function notifyUserVaultSync(
'Content-Type': 'application/json',
'X-NodeWarden-UserId': userId,
},
body: JSON.stringify({ revisionDate, contextId: contextId || null }),
body: JSON.stringify({
revisionDate,
contextId: contextId || null,
updateType,
targetDeviceIdentifier: targetDeviceIdentifier || null,
}),
});
} catch (error) {
console.error('Failed to broadcast vault sync notification:', error);
console.error('Failed to broadcast realtime notification:', error);
}
}
+10 -1
View File
@@ -1,4 +1,5 @@
import { Env } from '../types';
import { getOnlineUserDevices, notifyUserLogout } from '../durable/notifications-hub';
import { StorageService } from '../services/storage';
import { errorResponse, jsonResponse } from '../utils/response';
import { readKnownDeviceProbe } from '../utils/device';
@@ -46,10 +47,12 @@ export async function handleGetDevices(request: Request, env: Env, userId: strin
export async function handleGetAuthorizedDevices(request: Request, env: Env, userId: string): Promise<Response> {
void request;
const storage = new StorageService(env.DB);
const [devices, trusted] = await Promise.all([
const [devices, trusted, onlineDeviceIdentifiers] = await Promise.all([
storage.getDevicesByUserId(userId),
storage.getTrustedDeviceTokenSummariesByUserId(userId),
getOnlineUserDevices(env, userId),
]);
const onlineSet = new Set(onlineDeviceIdentifiers);
const trustedByIdentifier = new Map<string, { expiresAt: number; tokenCount: number }>();
for (const row of trusted) {
@@ -67,6 +70,7 @@ export async function handleGetAuthorizedDevices(request: Request, env: Env, use
type: device.type,
creationDate: device.createdAt,
revisionDate: device.updatedAt,
online: onlineSet.has(device.deviceIdentifier),
trusted: !!trustedInfo,
trustedTokenCount: trustedInfo?.tokenCount || 0,
trustedUntil: trustedInfo?.expiresAt ? new Date(trustedInfo.expiresAt).toISOString() : null,
@@ -83,6 +87,7 @@ export async function handleGetAuthorizedDevices(request: Request, env: Env, use
type: 14,
creationDate: '',
revisionDate: '',
online: onlineSet.has(row.deviceIdentifier),
trusted: true,
trustedTokenCount: row.tokenCount,
trustedUntil: row.expiresAt ? new Date(row.expiresAt).toISOString() : null,
@@ -136,6 +141,9 @@ export async function handleDeleteDevice(
await storage.deleteTrustedTwoFactorTokensByDevice(userId, normalized);
await storage.deleteRefreshTokensByDevice(userId, normalized);
const deleted = await storage.deleteDevice(userId, normalized);
if (deleted) {
await notifyUserLogout(env, userId, normalized);
}
return jsonResponse({ success: deleted });
}
@@ -154,6 +162,7 @@ export async function handleDeleteAllDevices(request: Request, env: Env, userId:
user.securityStamp = generateUUID();
user.updatedAt = new Date().toISOString();
await storage.saveUser(user);
await notifyUserLogout(env, userId, null);
return jsonResponse({ success: true, removedTrusted, removedSessions: removedSessions ?? 0, removedDevices });
}
+14 -17
View File
@@ -1,5 +1,5 @@
import { AuthService } from '../services/auth';
import type { Env } from '../types';
import type { Env, JWTPayload } from '../types';
import { errorResponse, jsonResponse } from '../utils/response';
import { generateUUID } from '../utils/uuid';
@@ -13,18 +13,17 @@ function extractAccessToken(request: Request): string | null {
return match?.[1]?.trim() || null;
}
async function authenticateNotificationsRequest(request: Request, env: Env): Promise<string | null> {
async function authenticateNotificationsRequest(request: Request, env: Env): Promise<JWTPayload | null> {
const accessToken = extractAccessToken(request);
if (!accessToken) return null;
const auth = new AuthService(env);
const payload = await auth.verifyAccessToken(`Bearer ${accessToken}`);
return payload?.sub || null;
return auth.verifyAccessToken(`Bearer ${accessToken}`);
}
export async function handleNotificationsNegotiate(request: Request, env: Env): Promise<Response> {
const userId = await authenticateNotificationsRequest(request, env);
if (!userId) return errorResponse('Unauthorized', 401);
const payload = await authenticateNotificationsRequest(request, env);
if (!payload?.sub) return errorResponse('Unauthorized', 401);
const connectionId = generateUUID();
return jsonResponse({
@@ -41,21 +40,19 @@ export async function handleNotificationsNegotiate(request: Request, env: Env):
}
export async function handleNotificationsHub(request: Request, env: Env): Promise<Response> {
const userId = await authenticateNotificationsRequest(request, env);
if (!userId) return errorResponse('Unauthorized', 401);
const payload = await authenticateNotificationsRequest(request, env);
if (!payload?.sub) return errorResponse('Unauthorized', 401);
if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') {
return errorResponse('Expected websocket', 426);
}
const userId = payload.sub;
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
const stub = env.NOTIFICATIONS_HUB.get(id);
await stub.fetch('https://notifications/internal/bind-user', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-NodeWarden-UserId': userId,
},
body: JSON.stringify({ userId }),
});
return stub.fetch(request);
const forwardedUrl = new URL(request.url);
forwardedUrl.searchParams.set('nw_uid', userId);
if (payload.did) {
forwardedUrl.searchParams.set('nw_did', payload.did);
}
return stub.fetch(new Request(forwardedUrl.toString(), request));
}