diff --git a/src/durable/backup-transfer-runner.ts b/src/durable/backup-transfer-runner.ts new file mode 100644 index 0000000..b0b099c --- /dev/null +++ b/src/durable/backup-transfer-runner.ts @@ -0,0 +1,293 @@ +import type { Env } from '../types'; +import type { BackupDestinationRecord } from '../services/backup-config'; +import { + BACKUP_SCHEDULER_WINDOW_MINUTES, + hasBackupSlotBetween, + isBackupDueNow, + loadBackupSettings, +} from '../services/backup-config'; +import { createRemoteBackupTransferSession } from '../services/backup-uploader'; +import { getBlobObject } from '../services/blob-store'; +import { StorageService } from '../services/storage'; +import { notifyUserBackupProgress } from './notifications-hub'; +import { executeConfiguredBackup } from '../handlers/backup'; + +const BACKUP_JOB_STATE_KEY = 'backup.job.state.v1'; +const BACKUP_JOB_LEASE_MS = 10 * 60 * 1000; +const BACKUP_JOB_HEARTBEAT_MS = 30 * 1000; + +interface BackupJobState { + token: string; + reason: string; + acquiredAt: string; + touchedAt: string; + expiresAtMs: number; +} + +interface RemoteAttachmentChunkRequest { + destination: BackupDestinationRecord; + attachments: Array<{ + blobName: string; + }>; +} + +interface ConfiguredBackupRunRequest { + actorUserId?: string | null; + auditMetadata?: Record | null; + destinationId?: string | null; + targetDeviceIdentifier?: string | null; + trigger?: 'manual' | 'scheduled'; +} + +function badRequest(message: string, status: number = 400): Response { + return new Response(JSON.stringify({ error: message }), { + status, + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'Cache-Control': 'no-store', + }, + }); +} + +export class BackupTransferRunner { + private lastHeartbeatAt = 0; + + constructor( + private readonly state: DurableObjectState, + private readonly env: Env + ) { + } + + private async acquireJob(reason: string): Promise { + const nowMs = Date.now(); + const current = await this.state.storage.get(BACKUP_JOB_STATE_KEY); + if (current?.expiresAtMs && current.expiresAtMs > nowMs) { + return null; + } + + const token = crypto.randomUUID(); + const nowIso = new Date(nowMs).toISOString(); + await this.state.storage.put(BACKUP_JOB_STATE_KEY, { + token, + reason, + acquiredAt: nowIso, + touchedAt: nowIso, + expiresAtMs: nowMs + BACKUP_JOB_LEASE_MS, + }); + this.lastHeartbeatAt = 0; + return token; + } + + private async touchJob(token: string): Promise { + const nowMs = Date.now(); + if (nowMs - this.lastHeartbeatAt < BACKUP_JOB_HEARTBEAT_MS) return; + this.lastHeartbeatAt = nowMs; + + const current = await this.state.storage.get(BACKUP_JOB_STATE_KEY); + if (current?.token !== token) return; + + await this.state.storage.put(BACKUP_JOB_STATE_KEY, { + ...current, + touchedAt: new Date(nowMs).toISOString(), + expiresAtMs: nowMs + BACKUP_JOB_LEASE_MS, + }); + } + + private async releaseJob(token: string): Promise { + const current = await this.state.storage.get(BACKUP_JOB_STATE_KEY); + if (current?.token === token) { + await this.state.storage.delete(BACKUP_JOB_STATE_KEY); + } + } + + private async runConfiguredBackup(request: Request): Promise { + let body: ConfiguredBackupRunRequest; + try { + body = await request.json(); + } catch { + return badRequest('Backup run payload is invalid'); + } + + const trigger = body.trigger === 'scheduled' ? 'scheduled' : 'manual'; + const actorUserId = String(body.actorUserId || '').trim() || null; + if (trigger === 'manual' && !actorUserId) { + return badRequest('Manual backup run requires an actor'); + } + + const token = await this.acquireJob(`${trigger}:${actorUserId || 'system'}`); + if (!token) { + return badRequest('Another backup run is already in progress', 409); + } + + try { + await this.touchJob(token); + const storage = new StorageService(this.env.DB); + const progress = actorUserId + ? async (event: { + operation: 'backup-remote-run'; + step: string; + fileName: string; + stageTitle: string; + stageDetail: string; + done?: boolean; + ok?: boolean; + error?: string | null; + }) => { + await notifyUserBackupProgress( + this.env, + actorUserId, + event, + String(body.targetDeviceIdentifier || '').trim() || null + ); + } + : null; + + const result = await executeConfiguredBackup( + this.env, + storage, + actorUserId, + trigger, + body.destinationId || null, + () => this.touchJob(token), + progress, + body.auditMetadata || null + ); + const settings = await loadBackupSettings(storage, this.env, 'UTC'); + + return new Response(JSON.stringify({ + object: 'backup-runner-result', + result, + settings, + }), { + status: 200, + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'Cache-Control': 'no-store', + }, + }); + } catch (error) { + return badRequest(error instanceof Error ? error.message : 'Backup run failed', 500); + } finally { + await this.releaseJob(token); + } + } + + private async runScheduledBackups(): Promise { + const token = await this.acquireJob('scheduled'); + if (!token) { + return badRequest('Another backup run is already in progress', 409); + } + + let completed = 0; + try { + await this.touchJob(token); + const storage = new StorageService(this.env.DB); + let scanStartMs = Date.now(); + + while (true) { + await this.touchJob(token); + const settings = await loadBackupSettings(storage, this.env, 'UTC'); + const now = new Date(); + const dueDestinations = settings.destinations.filter((destination) => + isBackupDueNow(destination, now, BACKUP_SCHEDULER_WINDOW_MINUTES) + || hasBackupSlotBetween(destination, new Date(scanStartMs), now) + ); + + if (!dueDestinations.length) { + break; + } + + scanStartMs = now.getTime(); + for (const destination of dueDestinations) { + await this.touchJob(token); + await executeConfiguredBackup( + this.env, + storage, + null, + 'scheduled', + destination.id, + () => this.touchJob(token) + ); + completed += 1; + } + } + + return new Response(JSON.stringify({ + ok: true, + completed, + }), { + status: 200, + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'Cache-Control': 'no-store', + }, + }); + } catch (error) { + return badRequest(error instanceof Error ? error.message : 'Scheduled backup failed', 500); + } finally { + await this.releaseJob(token); + } + } + + async fetch(request: Request): Promise { + const url = new URL(request.url); + if (request.method !== 'POST') { + return badRequest('Not found', 404); + } + + if (url.pathname === '/internal/run-configured-backup') { + return this.runConfiguredBackup(request); + } + + if (url.pathname === '/internal/run-scheduled-backups') { + return this.runScheduledBackups(); + } + + if (url.pathname !== '/internal/upload-attachment-chunk') { + return badRequest('Not found', 404); + } + + let body: RemoteAttachmentChunkRequest; + try { + body = await request.json(); + } catch { + return badRequest('Attachment chunk payload is invalid'); + } + + if (!body?.destination || !Array.isArray(body.attachments)) { + return badRequest('Attachment chunk payload is invalid'); + } + + const remoteSession = createRemoteBackupTransferSession(body.destination); + let uploaded = 0; + + for (const attachment of body.attachments) { + const blobName = String(attachment?.blobName || '').trim(); + if (!blobName) { + return badRequest('Attachment chunk payload is invalid'); + } + + const object = await getBlobObject(this.env, blobName); + if (!object) { + return badRequest(`Attachment blob missing for ${blobName}`, 409); + } + + const bytes = new Uint8Array(await new Response(object.body).arrayBuffer()); + await remoteSession.putFile(`attachments/${blobName}`, bytes, { + contentType: object.contentType, + }); + uploaded += 1; + } + + return new Response(JSON.stringify({ + ok: true, + uploaded, + }), { + status: 200, + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'Cache-Control': 'no-store', + }, + }); + } +} diff --git a/src/handlers/backup.ts b/src/handlers/backup.ts index 1fef3fe..db600bf 100644 --- a/src/handlers/backup.ts +++ b/src/handlers/backup.ts @@ -1,6 +1,5 @@ import type { Env, User } from '../types'; import { errorResponse, jsonResponse } from '../utils/response'; -import { generateUUID } from '../utils/uuid'; import { type BackupArchiveBundle, buildBackupArchive, @@ -10,12 +9,11 @@ import { import { type BackupDestinationRecord, type BackupSettingsInput, - BACKUP_SCHEDULER_WINDOW_MINUTES, + type BackupSettings, + type WebDavBackupDestination, getBackupLocalDateKey, getDefaultBackupSettings, getBackupSettingsRepairState, - hasBackupSlotBetween, - isBackupDueNow, loadBackupSettings, normalizeBackupSettingsInput, normalizeImportedBackupSettings, @@ -86,102 +84,6 @@ function getBackupDestinationSummary(destination: BackupDestinationRecord | null }; } -const BACKUP_RUNNER_LOCK_KEY = 'backup.runner.lock.v1'; -const BACKUP_RUNNER_LEASE_MS = 10 * 60 * 1000; -const BACKUP_RUNNER_HEARTBEAT_MS = 30 * 1000; - -// CONTRACT: -// The runner lock is a config-row lease, not a queue. It only prevents two -// backup/restore jobs from overlapping. Manual runs return conflict when the -// lease is held; scheduled runs skip quietly. Never export this row in backups. -interface BackupRunnerLease { - token: string; - touch: () => Promise; - release: () => Promise; -} - -async function acquireBackupRunnerLease(env: Env, reason: string): Promise { - const token = generateUUID(); - const nowMs = Date.now(); - const expiresAtMs = nowMs + BACKUP_RUNNER_LEASE_MS; - const value = JSON.stringify({ - token, - reason, - acquiredAt: new Date(nowMs).toISOString(), - touchedAt: new Date(nowMs).toISOString(), - expiresAtMs, - }); - const result = await env.DB - .prepare( - `INSERT INTO config(key, value) VALUES(?, ?) - ON CONFLICT(key) DO UPDATE SET value = excluded.value - WHERE COALESCE(CAST(json_extract(config.value, '$.expiresAtMs') AS INTEGER), 0) <= ?` - ) - .bind(BACKUP_RUNNER_LOCK_KEY, value, nowMs) - .run(); - - if ((result.meta?.changes || 0) < 1) { - return null; - } - - return { - token, - touch: async () => { - const nextNowMs = Date.now(); - const nextValue = JSON.stringify({ - token, - reason, - acquiredAt: new Date(nowMs).toISOString(), - touchedAt: new Date(nextNowMs).toISOString(), - expiresAtMs: nextNowMs + BACKUP_RUNNER_LEASE_MS, - }); - await env.DB - .prepare( - `UPDATE config - SET value = ? - WHERE key = ? - AND json_extract(value, '$.token') = ?` - ) - .bind(nextValue, BACKUP_RUNNER_LOCK_KEY, token) - .run(); - }, - release: async () => { - await env.DB - .prepare( - `DELETE FROM config - WHERE key = ? - AND json_extract(value, '$.token') = ?` - ) - .bind(BACKUP_RUNNER_LOCK_KEY, token) - .run(); - }, - }; -} - -async function withBackupRunnerLease( - env: Env, - reason: string, - task: (keepAlive: () => Promise) => Promise -): Promise { - const lease = await acquireBackupRunnerLease(env, reason); - if (!lease) return null; - - let lastHeartbeatAt = 0; - const keepAlive = async () => { - const nowMs = Date.now(); - if (nowMs - lastHeartbeatAt < BACKUP_RUNNER_HEARTBEAT_MS) return; - lastHeartbeatAt = nowMs; - await lease.touch(); - }; - - try { - await keepAlive(); - return await task(keepAlive); - } finally { - await lease.release(); - } -} - function ensureBackupBlobName(value: string): string { const normalized = String(value || '').trim().replace(/\\/g, '/').replace(/^\/+|\/+$/g, ''); if (!normalized) { @@ -201,6 +103,36 @@ interface RemoteAttachmentIndexPayload { blobs: Record; } +const REMOTE_ATTACHMENT_SYNC_EXTERNAL_SUBREQUEST_LIMIT = 50; +const REMOTE_ATTACHMENT_SYNC_SUBREQUEST_RESERVE = 6; +const REMOTE_ATTACHMENT_SYNC_MAX_WEB_DAV_BATCH_SIZE = 18; +const REMOTE_ATTACHMENT_SYNC_MAX_S3_BATCH_SIZE = 40; + +function countRemotePathSegments(value: string): number { + return String(value || '').replace(/\\/g, '/').split('/').filter(Boolean).length; +} + +function getRemoteAttachmentSyncBatchSize(destination: BackupDestinationRecord): number { + if (destination.type === 's3') { + return REMOTE_ATTACHMENT_SYNC_MAX_S3_BATCH_SIZE; + } + + const remotePath = String((destination.destination as WebDavBackupDestination).remotePath || ''); + const fixedWebDavDirectoryCalls = countRemotePathSegments(remotePath) + 1; // remotePath plus the shared "attachments" dir. + const available = REMOTE_ATTACHMENT_SYNC_EXTERNAL_SUBREQUEST_LIMIT + - REMOTE_ATTACHMENT_SYNC_SUBREQUEST_RESERVE + - fixedWebDavDirectoryCalls; + + if (available < 2) { + throw new Error('WebDAV remote backup path is too deep for safe attachment batching'); + } + + return Math.max(1, Math.min( + REMOTE_ATTACHMENT_SYNC_MAX_WEB_DAV_BATCH_SIZE, + Math.floor(available / 2) + )); +} + async function loadRemoteAttachmentIndex(session: RemoteBackupTransferSession): Promise> { try { const file = await session.download(REMOTE_ATTACHMENT_INDEX_PATH); @@ -256,7 +188,39 @@ async function saveRemoteAttachmentIndex( }); } -async function executeConfiguredBackup( +async function uploadRemoteAttachmentChunk( + env: Env, + destination: BackupDestinationRecord, + attachments: Array<{ blobName: string }> +): Promise { + if (!attachments.length) return; + const id = env.BACKUP_TRANSFER_RUNNER.idFromName('remote-attachment-sync'); + const stub = env.BACKUP_TRANSFER_RUNNER.get(id); + const response = await stub.fetch('https://backup-transfer/internal/upload-attachment-chunk', { + method: 'POST', + headers: { + 'Content-Type': 'application/json; charset=utf-8', + }, + body: JSON.stringify({ + destination, + attachments, + }), + }); + if (!response.ok) { + let message = `Attachment sync failed: ${response.status}`; + try { + const payload = await response.json<{ error?: string }>(); + if (payload?.error) { + message = payload.error; + } + } catch { + // Ignore JSON parse failures and preserve the status-based error. + } + throw new Error(message); + } +} + +export async function executeConfiguredBackup( env: Env, storage: StorageService, actorUserId: string | null, @@ -331,25 +295,20 @@ async function executeConfiguredBackup( if (destination.includeAttachments) { await touchLease(); const remoteAttachmentIndex = await loadRemoteAttachmentIndex(remoteSession); - let attachmentIndexChanged = false; - for (const attachment of archive.manifest.attachmentBlobs || []) { + const pendingAttachments = (archive.manifest.attachmentBlobs || []) + .filter((attachment) => remoteAttachmentIndex.get(attachment.blobName) !== attachment.sizeBytes); + const attachmentSyncBatchSize = getRemoteAttachmentSyncBatchSize(destination); + for (let i = 0; i < pendingAttachments.length; i += attachmentSyncBatchSize) { await touchLease(); - if (remoteAttachmentIndex.get(attachment.blobName) === attachment.sizeBytes) { - continue; - } - const remotePath = `attachments/${attachment.blobName}`; - const object = await getBlobObject(env, attachment.blobName); - if (!object) { - throw new Error(`Attachment blob missing for ${attachment.blobName}`); - } - const bytes = new Uint8Array(await new Response(object.body).arrayBuffer()); - await remoteSession.putFile(remotePath, bytes, { - contentType: object.contentType, - }); - remoteAttachmentIndex.set(attachment.blobName, attachment.sizeBytes); - attachmentIndexChanged = true; + const chunk = pendingAttachments + .slice(i, i + attachmentSyncBatchSize) + .map((attachment) => ({ blobName: attachment.blobName })); + await uploadRemoteAttachmentChunk(env, destination, chunk); } - if (attachmentIndexChanged) { + if (pendingAttachments.length) { + for (const attachment of pendingAttachments) { + remoteAttachmentIndex.set(attachment.blobName, attachment.sizeBytes); + } await touchLease(); await saveRemoteAttachmentIndex(remoteSession, remoteAttachmentIndex); } @@ -474,6 +433,76 @@ async function executeConfiguredBackup( } } +interface DurableBackupRunResponse { + result: { + fileName: string; + fileSize: number; + remotePath: string; + provider: string; + }; + settings: BackupSettings; +} + +async function runConfiguredBackupInDurableObject( + env: Env, + payload: { + actorUserId: string | null; + auditMetadata?: Record | null; + destinationId?: string | null; + targetDeviceIdentifier?: string | null; + trigger: 'manual' | 'scheduled'; + } +): Promise { + const id = env.BACKUP_TRANSFER_RUNNER.idFromName('configured-backup-runner'); + const stub = env.BACKUP_TRANSFER_RUNNER.get(id); + const response = await stub.fetch('https://backup-transfer/internal/run-configured-backup', { + method: 'POST', + headers: { + 'Content-Type': 'application/json; charset=utf-8', + }, + body: JSON.stringify(payload), + }); + if (response.status === 409) { + return null; + } + if (!response.ok) { + let message = `Backup run failed: ${response.status}`; + try { + const body = await response.json<{ error?: string }>(); + if (body?.error) message = body.error; + } catch { + // Preserve the status-based message when the DO returns a non-JSON error. + } + throw new Error(message); + } + const body = await response.json(); + if (!body?.result || !body?.settings) { + throw new Error('Backup run response is invalid'); + } + return body; +} + +async function runScheduledBackupsInDurableObject(env: Env): Promise { + const id = env.BACKUP_TRANSFER_RUNNER.idFromName('configured-backup-runner'); + const stub = env.BACKUP_TRANSFER_RUNNER.get(id); + const response = await stub.fetch('https://backup-transfer/internal/run-scheduled-backups', { + method: 'POST', + }); + if (response.status === 409) { + return; + } + if (!response.ok) { + let message = `Scheduled backup failed: ${response.status}`; + try { + const body = await response.json<{ error?: string }>(); + if (body?.error) message = body.error; + } catch { + // Preserve the status-based message when the DO returns a non-JSON error. + } + throw new Error(message); + } +} + function toImportStatusCode(message: string): number { const lower = message.toLowerCase(); if (lower.includes('invalid backup') || lower.includes('invalid json')) return 400; @@ -526,30 +555,7 @@ async function runImportAndAudit( } export async function runScheduledBackupIfDue(env: Env): Promise { - await withBackupRunnerLease(env, 'scheduled', async (keepAlive) => { - const storage = new StorageService(env.DB); - let scanStartMs = Date.now(); - - while (true) { - await keepAlive(); - const settings = await loadBackupSettings(storage, env, 'UTC'); - const now = new Date(); - const dueDestinations = settings.destinations.filter((destination) => - isBackupDueNow(destination, now, BACKUP_SCHEDULER_WINDOW_MINUTES) - || hasBackupSlotBetween(destination, new Date(scanStartMs), now) - ); - - if (!dueDestinations.length) { - return; - } - - scanStartMs = now.getTime(); - for (const destination of dueDestinations) { - await keepAlive(); - await executeConfiguredBackup(env, storage, null, 'scheduled', destination.id, keepAlive); - } - } - }); + await runScheduledBackupsInDurableObject(env); } export async function handleGetAdminBackupSettings(request: Request, env: Env, actorUser: User): Promise { @@ -661,33 +667,12 @@ export async function handleRunAdminConfiguredBackup(request: Request, env: Env, return errorResponse('Backup run payload is invalid', 400); } - const targetDeviceIdentifier = String(request.headers.get('X-NodeWarden-Acting-Device-Id') || '').trim() || null; - const progress = async (event: { - operation: 'backup-remote-run'; - step: string; - fileName: string; - stageTitle: string; - stageDetail: string; - done?: boolean; - ok?: boolean; - error?: string | null; - }) => { - await notifyUserBackupProgress(env, actorUser.id, event, targetDeviceIdentifier); - }; - const outcome = await withBackupRunnerLease(env, `manual:${actorUser.id}`, async (keepAlive) => { - const storage = new StorageService(env.DB); - const result = await executeConfiguredBackup( - env, - storage, - actorUser.id, - 'manual', - body?.destinationId || null, - keepAlive, - progress, - auditRequestMetadata(request) - ); - const settings = await loadBackupSettings(storage, env, 'UTC'); - return { result, settings }; + const outcome = await runConfiguredBackupInDurableObject(env, { + actorUserId: actorUser.id, + auditMetadata: auditRequestMetadata(request), + destinationId: body?.destinationId || null, + targetDeviceIdentifier: String(request.headers.get('X-NodeWarden-Acting-Device-Id') || '').trim() || null, + trigger: 'manual', }); if (!outcome) { return errorResponse('Another backup run is already in progress', 409); diff --git a/src/index.ts b/src/index.ts index 1f3d4cf..58bbd5c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { Env } from './types'; import { NotificationsHub } from './durable/notifications-hub'; +import { BackupTransferRunner } from './durable/backup-transfer-runner'; import { handleRequest } from './router'; import { StorageService } from './services/storage'; import { applyCors, jsonResponse } from './utils/response'; @@ -127,3 +128,4 @@ export default { }; export { NotificationsHub }; +export { BackupTransferRunner }; diff --git a/src/types/index.ts b/src/types/index.ts index 1eafc9c..ec03972 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -2,6 +2,7 @@ export interface Env { DB: D1Database; NOTIFICATIONS_HUB: DurableObjectNamespace; + BACKUP_TRANSFER_RUNNER: DurableObjectNamespace; ASSETS?: { fetch(input: RequestInfo | URL, init?: RequestInit): Promise; }; diff --git a/wrangler.kv.toml b/wrangler.kv.toml index 97f16c7..8ca8f22 100644 --- a/wrangler.kv.toml +++ b/wrangler.kv.toml @@ -22,9 +22,17 @@ database_name = "nodewarden-db" name = "NOTIFICATIONS_HUB" class_name = "NotificationsHub" +[[durable_objects.bindings]] +name = "BACKUP_TRANSFER_RUNNER" +class_name = "BackupTransferRunner" + [[kv_namespaces]] binding = "ATTACHMENTS_KV" [[migrations]] tag = "v1-notifications-hub" new_sqlite_classes = [ "NotificationsHub" ] + +[[migrations]] +tag = "v2-backup-transfer-runner" +new_sqlite_classes = [ "BackupTransferRunner" ] diff --git a/wrangler.toml b/wrangler.toml index b1af92d..df0f11e 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -22,6 +22,10 @@ database_name = "nodewarden-db" name = "NOTIFICATIONS_HUB" class_name = "NotificationsHub" +[[durable_objects.bindings]] +name = "BACKUP_TRANSFER_RUNNER" +class_name = "BackupTransferRunner" + [[r2_buckets]] binding = "ATTACHMENTS" bucket_name = "nodewarden-attachments" @@ -29,3 +33,7 @@ bucket_name = "nodewarden-attachments" [[migrations]] tag = "v1-notifications-hub" new_sqlite_classes = [ "NotificationsHub" ] + +[[migrations]] +tag = "v2-backup-transfer-runner" +new_sqlite_classes = [ "BackupTransferRunner" ]