feat(notifications): enhance NotificationsHub with device status updates and logout notifications

This commit is contained in:
shuaiplus
2026-03-09 01:21:39 +08:00
parent cb137fe0c7
commit 616d6273bb
8 changed files with 187 additions and 40 deletions
+104 -16
View File
@@ -3,6 +3,8 @@ import type { Env } from '../types';
const SIGNALR_RECORD_SEPARATOR = 0x1e;
const SIGNALR_HANDSHAKE_ACK = new Uint8Array([0x7b, 0x7d, SIGNALR_RECORD_SEPARATOR]);
const SIGNALR_UPDATE_TYPE_SYNC_VAULT = 5;
const SIGNALR_UPDATE_TYPE_LOG_OUT = 11;
const SIGNALR_UPDATE_TYPE_DEVICE_STATUS = 12;
const SIGNALR_PING_INTERVAL_MS = 15_000;
type HubProtocol = 'json' | 'messagepack';
@@ -10,6 +12,7 @@ type HubProtocol = 'json' | 'messagepack';
interface ConnectionState {
handshakeComplete: boolean;
protocol: HubProtocol;
deviceIdentifier: string | null;
}
function concatBytes(chunks: Uint8Array[]): Uint8Array {
@@ -123,14 +126,19 @@ function frameSignalRBinary(payload: Uint8Array): Uint8Array {
return concatBytes([new Uint8Array(prefix), payload]);
}
function buildSignalRJsonInvocation(userId: string, revisionDate: string, contextId: string | null): string {
function buildSignalRJsonInvocation(
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null
): string {
return JSON.stringify({
type: 1,
target: 'ReceiveMessage',
arguments: [
{
ContextId: contextId,
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
Type: updateType,
Payload: {
UserId: userId,
Date: revisionDate,
@@ -144,7 +152,12 @@ function buildSignalRJsonPing(): string {
return JSON.stringify({ type: 6 }) + String.fromCharCode(SIGNALR_RECORD_SEPARATOR);
}
function buildSignalRMessagePackInvocation(userId: string, revisionDate: string, contextId: string | null): Uint8Array {
function buildSignalRMessagePackInvocation(
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null
): Uint8Array {
// SignalR MessagePack hub protocol uses an array-based invocation shape:
// [type, headers, invocationId, target, arguments]
const payload = encodeMsgPack([
@@ -155,7 +168,7 @@ function buildSignalRMessagePackInvocation(userId: string, revisionDate: string,
[
{
ContextId: contextId,
Type: SIGNALR_UPDATE_TYPE_SYNC_VAULT,
Type: updateType,
Payload: {
UserId: userId,
Date: new Date(revisionDate),
@@ -189,25 +202,32 @@ export class NotificationsHub {
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/internal/bind-user' && request.method === 'POST') {
const body = (await request.json().catch(() => null)) as { userId?: string } | null;
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
return new Response(null, { status: 204 });
}
if (url.pathname === '/internal/notify' && request.method === 'POST') {
const body = (await request.json().catch(() => null)) as {
revisionDate?: string;
userId?: string;
contextId?: string | null;
updateType?: number;
targetDeviceIdentifier?: string | null;
} | null;
const revisionDate = String(body?.revisionDate || '').trim() || new Date().toISOString();
this.userId = String(request.headers.get('X-NodeWarden-UserId') || body?.userId || this.userId).trim();
const contextId = String(body?.contextId || '').trim() || null;
this.broadcastVaultSync(revisionDate, contextId);
const updateType = Number(body?.updateType || SIGNALR_UPDATE_TYPE_SYNC_VAULT) || SIGNALR_UPDATE_TYPE_SYNC_VAULT;
const targetDeviceIdentifier = String(body?.targetDeviceIdentifier || '').trim() || null;
this.broadcastMessage(updateType, revisionDate, contextId, targetDeviceIdentifier);
return new Response(null, { status: 204 });
}
if (url.pathname === '/internal/online' && request.method === 'GET') {
return new Response(JSON.stringify({ deviceIdentifiers: this.getOnlineDeviceIdentifiers() }), {
status: 200,
headers: {
'Content-Type': 'application/json',
},
});
}
if (url.pathname !== '/notifications/hub') {
return new Response('Not found', { status: 404 });
}
@@ -216,6 +236,12 @@ export class NotificationsHub {
return new Response('Expected websocket', { status: 426 });
}
const requestUserId = String(url.searchParams.get('nw_uid') || '').trim();
const requestDeviceIdentifier = String(url.searchParams.get('nw_did') || '').trim() || null;
if (requestUserId) {
this.userId = requestUserId;
}
if (!this.userId) {
return new Response('Unauthorized', { status: 401 });
}
@@ -228,6 +254,7 @@ export class NotificationsHub {
this.connections.set(server, {
handshakeComplete: false,
protocol: 'messagepack',
deviceIdentifier: requestDeviceIdentifier,
});
this.ensurePingLoop();
@@ -235,12 +262,16 @@ export class NotificationsHub {
void this.handleSocketMessage(server, event.data);
});
server.addEventListener('close', () => {
const shouldBroadcast = !!this.connections.get(server)?.handshakeComplete;
this.connections.delete(server);
this.stopPingLoopIfIdle();
if (shouldBroadcast) this.broadcastDeviceStatus();
});
server.addEventListener('error', () => {
const shouldBroadcast = !!this.connections.get(server)?.handshakeComplete;
this.connections.delete(server);
this.stopPingLoopIfIdle();
if (shouldBroadcast) this.broadcastDeviceStatus();
try {
server.close(1011, 'Socket error');
} catch {
@@ -268,6 +299,7 @@ export class NotificationsHub {
connection.protocol = protocol;
connection.handshakeComplete = true;
socket.send(SIGNALR_HANDSHAKE_ACK);
this.broadcastDeviceStatus();
return;
} catch {
// Ignore malformed pre-handshake payloads.
@@ -317,16 +349,31 @@ export class NotificationsHub {
this.stopPingLoopIfIdle();
}
private broadcastVaultSync(revisionDate: string, contextId: string | null): void {
private getOnlineDeviceIdentifiers(): string[] {
const out = new Set<string>();
for (const connection of this.connections.values()) {
if (!connection.handshakeComplete || !connection.deviceIdentifier) continue;
out.add(connection.deviceIdentifier);
}
return Array.from(out);
}
private broadcastMessage(
updateType: number,
revisionDate: string,
contextId: string | null,
targetDeviceIdentifier: string | null
): void {
if (!this.userId || this.connections.size === 0) return;
for (const [socket, connection] of this.connections) {
if (!connection.handshakeComplete) continue;
if (targetDeviceIdentifier && connection.deviceIdentifier !== targetDeviceIdentifier) continue;
try {
if (connection.protocol === 'json') {
socket.send(buildSignalRJsonInvocation(this.userId, revisionDate, contextId));
socket.send(buildSignalRJsonInvocation(this.userId, updateType, revisionDate, contextId));
} else {
socket.send(buildSignalRMessagePackInvocation(this.userId, revisionDate, contextId));
socket.send(buildSignalRMessagePackInvocation(this.userId, updateType, revisionDate, contextId));
}
} catch {
this.connections.delete(socket);
@@ -340,6 +387,10 @@ export class NotificationsHub {
this.stopPingLoopIfIdle();
}
private broadcastDeviceStatus(): void {
this.broadcastMessage(SIGNALR_UPDATE_TYPE_DEVICE_STATUS, new Date().toISOString(), null, null);
}
}
export async function notifyUserVaultSync(
@@ -347,6 +398,38 @@ export async function notifyUserVaultSync(
userId: string,
revisionDate: string,
contextId?: string | null
): Promise<void> {
return notifyUserUpdate(env, userId, SIGNALR_UPDATE_TYPE_SYNC_VAULT, revisionDate, contextId ?? null, null);
}
export async function notifyUserLogout(
env: Env,
userId: string,
targetDeviceIdentifier?: string | null
): Promise<void> {
return notifyUserUpdate(env, userId, SIGNALR_UPDATE_TYPE_LOG_OUT, new Date().toISOString(), null, targetDeviceIdentifier ?? null);
}
export async function getOnlineUserDevices(env: Env, userId: string): Promise<string[]> {
try {
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
const stub = env.NOTIFICATIONS_HUB.get(id);
const response = await stub.fetch('https://notifications/internal/online');
if (!response.ok) return [];
const body = (await response.json().catch(() => null)) as { deviceIdentifiers?: string[] } | null;
return Array.isArray(body?.deviceIdentifiers) ? body.deviceIdentifiers.filter((value) => !!String(value || '').trim()) : [];
} catch {
return [];
}
}
async function notifyUserUpdate(
env: Env,
userId: string,
updateType: number,
revisionDate: string,
contextId: string | null,
targetDeviceIdentifier: string | null
): Promise<void> {
try {
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
@@ -357,9 +440,14 @@ export async function notifyUserVaultSync(
'Content-Type': 'application/json',
'X-NodeWarden-UserId': userId,
},
body: JSON.stringify({ revisionDate, contextId: contextId || null }),
body: JSON.stringify({
revisionDate,
contextId: contextId || null,
updateType,
targetDeviceIdentifier: targetDeviceIdentifier || null,
}),
});
} catch (error) {
console.error('Failed to broadcast vault sync notification:', error);
console.error('Failed to broadcast realtime notification:', error);
}
}
+10 -1
View File
@@ -1,4 +1,5 @@
import { Env } from '../types';
import { getOnlineUserDevices, notifyUserLogout } from '../durable/notifications-hub';
import { StorageService } from '../services/storage';
import { errorResponse, jsonResponse } from '../utils/response';
import { readKnownDeviceProbe } from '../utils/device';
@@ -46,10 +47,12 @@ export async function handleGetDevices(request: Request, env: Env, userId: strin
export async function handleGetAuthorizedDevices(request: Request, env: Env, userId: string): Promise<Response> {
void request;
const storage = new StorageService(env.DB);
const [devices, trusted] = await Promise.all([
const [devices, trusted, onlineDeviceIdentifiers] = await Promise.all([
storage.getDevicesByUserId(userId),
storage.getTrustedDeviceTokenSummariesByUserId(userId),
getOnlineUserDevices(env, userId),
]);
const onlineSet = new Set(onlineDeviceIdentifiers);
const trustedByIdentifier = new Map<string, { expiresAt: number; tokenCount: number }>();
for (const row of trusted) {
@@ -67,6 +70,7 @@ export async function handleGetAuthorizedDevices(request: Request, env: Env, use
type: device.type,
creationDate: device.createdAt,
revisionDate: device.updatedAt,
online: onlineSet.has(device.deviceIdentifier),
trusted: !!trustedInfo,
trustedTokenCount: trustedInfo?.tokenCount || 0,
trustedUntil: trustedInfo?.expiresAt ? new Date(trustedInfo.expiresAt).toISOString() : null,
@@ -83,6 +87,7 @@ export async function handleGetAuthorizedDevices(request: Request, env: Env, use
type: 14,
creationDate: '',
revisionDate: '',
online: onlineSet.has(row.deviceIdentifier),
trusted: true,
trustedTokenCount: row.tokenCount,
trustedUntil: row.expiresAt ? new Date(row.expiresAt).toISOString() : null,
@@ -136,6 +141,9 @@ export async function handleDeleteDevice(
await storage.deleteTrustedTwoFactorTokensByDevice(userId, normalized);
await storage.deleteRefreshTokensByDevice(userId, normalized);
const deleted = await storage.deleteDevice(userId, normalized);
if (deleted) {
await notifyUserLogout(env, userId, normalized);
}
return jsonResponse({ success: deleted });
}
@@ -154,6 +162,7 @@ export async function handleDeleteAllDevices(request: Request, env: Env, userId:
user.securityStamp = generateUUID();
user.updatedAt = new Date().toISOString();
await storage.saveUser(user);
await notifyUserLogout(env, userId, null);
return jsonResponse({ success: true, removedTrusted, removedSessions: removedSessions ?? 0, removedDevices });
}
+14 -17
View File
@@ -1,5 +1,5 @@
import { AuthService } from '../services/auth';
import type { Env } from '../types';
import type { Env, JWTPayload } from '../types';
import { errorResponse, jsonResponse } from '../utils/response';
import { generateUUID } from '../utils/uuid';
@@ -13,18 +13,17 @@ function extractAccessToken(request: Request): string | null {
return match?.[1]?.trim() || null;
}
async function authenticateNotificationsRequest(request: Request, env: Env): Promise<string | null> {
async function authenticateNotificationsRequest(request: Request, env: Env): Promise<JWTPayload | null> {
const accessToken = extractAccessToken(request);
if (!accessToken) return null;
const auth = new AuthService(env);
const payload = await auth.verifyAccessToken(`Bearer ${accessToken}`);
return payload?.sub || null;
return auth.verifyAccessToken(`Bearer ${accessToken}`);
}
export async function handleNotificationsNegotiate(request: Request, env: Env): Promise<Response> {
const userId = await authenticateNotificationsRequest(request, env);
if (!userId) return errorResponse('Unauthorized', 401);
const payload = await authenticateNotificationsRequest(request, env);
if (!payload?.sub) return errorResponse('Unauthorized', 401);
const connectionId = generateUUID();
return jsonResponse({
@@ -41,21 +40,19 @@ export async function handleNotificationsNegotiate(request: Request, env: Env):
}
export async function handleNotificationsHub(request: Request, env: Env): Promise<Response> {
const userId = await authenticateNotificationsRequest(request, env);
if (!userId) return errorResponse('Unauthorized', 401);
const payload = await authenticateNotificationsRequest(request, env);
if (!payload?.sub) return errorResponse('Unauthorized', 401);
if (request.headers.get('Upgrade')?.toLowerCase() !== 'websocket') {
return errorResponse('Expected websocket', 426);
}
const userId = payload.sub;
const id = env.NOTIFICATIONS_HUB.idFromName(userId);
const stub = env.NOTIFICATIONS_HUB.get(id);
await stub.fetch('https://notifications/internal/bind-user', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-NodeWarden-UserId': userId,
},
body: JSON.stringify({ userId }),
});
return stub.fetch(request);
const forwardedUrl = new URL(request.url);
forwardedUrl.searchParams.set('nw_uid', userId);
if (payload.did) {
forwardedUrl.searchParams.set('nw_did', payload.did);
}
return stub.fetch(new Request(forwardedUrl.toString(), request));
}
+24 -5
View File
@@ -1,7 +1,7 @@
import { useEffect, useMemo, useRef, useState } from 'preact/hooks';
import { Link, Route, Switch, useLocation } from 'wouter';
import { useQuery } from '@tanstack/react-query';
import { ArrowUpDown, Cloud, Clock3, Folder, KeyRound, Lock, LogOut, Send as SendIcon, Settings as SettingsIcon, Shield, ShieldUser } from 'lucide-preact';
import { ArrowUpDown, Cloud, Clock3, Folder as FolderIcon, KeyRound, Lock, LogOut, Send as SendIcon, Settings as SettingsIcon, Shield, ShieldUser } from 'lucide-preact';
import AuthViews from '@/components/AuthViews';
import ConfirmDialog from '@/components/ConfirmDialog';
import ToastHost from '@/components/ToastHost';
@@ -87,7 +87,7 @@ import {
} from '@/lib/export-formats';
import { t } from '@/lib/i18n';
import type { CiphersImportPayload } from '@/lib/api';
import type { AppPhase, AuthorizedDevice, Cipher, Folder, Profile, Send, SendDraft, SessionState, ToastMessage, VaultDraft } from '@/lib/types';
import type { AppPhase, AuthorizedDevice, Cipher, Folder as VaultFolder, Profile, Send, SendDraft, SessionState, ToastMessage, VaultDraft } from '@/lib/types';
interface PendingTotp {
email: string;
@@ -296,6 +296,9 @@ function buildPublicSendUrl(origin: string, accessId: string, keyPart: string):
}
const SIGNALR_RECORD_SEPARATOR = String.fromCharCode(0x1e);
const SIGNALR_UPDATE_TYPE_SYNC_VAULT = 5;
const SIGNALR_UPDATE_TYPE_LOG_OUT = 11;
const SIGNALR_UPDATE_TYPE_DEVICE_STATUS = 12;
interface WebVaultSignalRInvocation {
type?: number;
@@ -371,11 +374,12 @@ export default function App() {
const [toasts, setToasts] = useState<ToastMessage[]>([]);
const [mobileLayout, setMobileLayout] = useState(false);
const [decryptedFolders, setDecryptedFolders] = useState<Folder[]>([]);
const [decryptedFolders, setDecryptedFolders] = useState<VaultFolder[]>([]);
const [decryptedCiphers, setDecryptedCiphers] = useState<Cipher[]>([]);
const [decryptedSends, setDecryptedSends] = useState<Send[]>([]);
const migratedPlainFolderIdsRef = useRef<Set<string>>(new Set());
const silentRefreshVaultRef = useRef<() => Promise<void>>(async () => {});
const refreshAuthorizedDevicesRef = useRef<() => Promise<void>>(async () => {});
useEffect(() => {
const syncInviteFromUrl = () => {
@@ -1031,6 +1035,7 @@ export default function App() {
socket.addEventListener('open', () => {
reconnectAttempts = 0;
void refreshAuthorizedDevicesRef.current();
try {
socket?.send(`{"protocol":"json","version":1}${SIGNALR_RECORD_SEPARATOR}`);
} catch {
@@ -1045,6 +1050,16 @@ export default function App() {
const frames = parseSignalRTextFrames(event.data);
for (const frame of frames) {
if (frame.type !== 1 || frame.target !== 'ReceiveMessage') continue;
const updateType = Number(frame.arguments?.[0]?.Type || 0);
if (updateType === SIGNALR_UPDATE_TYPE_LOG_OUT) {
logoutNow();
return;
}
if (updateType === SIGNALR_UPDATE_TYPE_DEVICE_STATUS) {
void refreshAuthorizedDevicesRef.current();
continue;
}
if (updateType !== SIGNALR_UPDATE_TYPE_SYNC_VAULT) continue;
const contextId = String(frame.arguments?.[0]?.ContextId || '').trim();
if (contextId && contextId === getCurrentDeviceIdentifier()) continue;
void silentRefreshVaultRef.current();
@@ -1053,6 +1068,7 @@ export default function App() {
socket.addEventListener('close', () => {
socket = null;
void refreshAuthorizedDevicesRef.current();
scheduleReconnect();
});
@@ -1084,6 +1100,8 @@ export default function App() {
await authorizedDevicesQuery.refetch();
}
refreshAuthorizedDevicesRef.current = refreshAuthorizedDevices;
async function revokeDeviceTrustAction(device: AuthorizedDevice) {
await revokeAuthorizedDeviceTrust(authedFetch, device.identifier);
await authorizedDevicesQuery.refetch();
@@ -1751,7 +1769,8 @@ export default function App() {
}
function downloadBytesAsFile(bytes: Uint8Array, fileName: string, mimeType: string) {
const blob = new Blob([bytes], { type: mimeType || 'application/octet-stream' });
const payload = bytes.slice();
const blob = new Blob([payload], { type: mimeType || 'application/octet-stream' });
const objectUrl = URL.createObjectURL(blob);
const anchor = document.createElement('a');
anchor.href = objectUrl;
@@ -1967,7 +1986,7 @@ export default function App() {
title={sidebarToggleTitle}
onClick={() => window.dispatchEvent(new CustomEvent('nodewarden:toggle-sidebar'))}
>
<Folder size={16} className="btn-icon" />
<FolderIcon size={16} className="btn-icon" />
</button>
)}
<button type="button" className="btn btn-secondary small mobile-lock-btn" aria-label={t('txt_lock')} title={t('txt_lock')} onClick={handleLock}>
@@ -75,6 +75,7 @@ export default function SecurityDevicesPage(props: SecurityDevicesPageProps) {
<tr>
<th>{t('txt_device')}</th>
<th>{t('txt_type')}</th>
<th>{t('txt_status')}</th>
<th>{t('txt_added')}</th>
<th>{t('txt_last_seen')}</th>
<th>{t('txt_trusted_until')}</th>
@@ -89,6 +90,11 @@ export default function SecurityDevicesPage(props: SecurityDevicesPageProps) {
<div className="muted-inline">{device.identifier}</div>
</td>
<td data-label={t('txt_type')}>{mapDeviceTypeName(device.type)}</td>
<td data-label={t('txt_status')}>
<span className={`device-status-pill ${device.online ? 'online' : 'offline'}`}>
{device.online ? t('txt_online') : t('txt_offline')}
</span>
</td>
<td data-label={t('txt_added')}>{formatDateTime(device.creationDate)}</td>
<td data-label={t('txt_last_seen')}>{formatDateTime(device.revisionDate)}</td>
<td data-label={t('txt_trusted_until')}>
@@ -122,7 +128,7 @@ export default function SecurityDevicesPage(props: SecurityDevicesPageProps) {
))}
{!props.loading && props.devices.length === 0 && (
<tr>
<td colSpan={6}>
<td colSpan={7}>
<div className="empty" style={{ minHeight: 80 }}>{t('txt_no_devices_found')}</div>
</td>
</tr>
+4
View File
@@ -346,6 +346,8 @@ const messages: Record<Locale, Record<string, string>> = {
txt_ssn: "SSN",
txt_state_province: "State / Province",
txt_status: "Status",
txt_online: "Online",
txt_offline: "Offline",
txt_submit: "Submit",
txt_sync: "Sync",
txt_sync_vault: "Sync Vault",
@@ -624,6 +626,8 @@ const zhCNOverrides: Record<string, string> = {
txt_manage_device_sessions_and_30_day_totp_trusted_sessions: '管理设备会话和 30 天 TOTP 受信状态。',
txt_role: '角色',
txt_status: '状态',
txt_online: '在线',
txt_offline: '离线',
txt_actions: '操作',
txt_type: '类型',
txt_revoke_all_trusted: '撤销全部受信任设备',
+1
View File
@@ -304,6 +304,7 @@ export interface AuthorizedDevice {
type: number;
creationDate: string | null;
revisionDate: string | null;
online: boolean;
trusted: boolean;
trustedTokenCount: number;
trustedUntil: string | null;
+23
View File
@@ -1600,6 +1600,29 @@ input[type='file'].input::file-selector-button:hover {
gap: 6px;
}
.device-status-pill {
display: inline-flex;
align-items: center;
justify-content: center;
min-width: 58px;
height: 26px;
padding: 0 10px;
border-radius: 999px;
font-size: 12px;
font-weight: 700;
line-height: 1;
}
.device-status-pill.online {
background: #dcfce7;
color: #166534;
}
.device-status-pill.offline {
background: #e2e8f0;
color: #475569;
}
.dialog-mask {
position: fixed;
inset: 0;