feat: implement BackupTransferRunner for managing backup processes and enhance backup handling

This commit is contained in:
shuaiplus
2026-06-07 20:43:43 +08:00
parent bfea5d0a1c
commit af70cab766
6 changed files with 466 additions and 169 deletions
+154 -169
View File
@@ -1,6 +1,5 @@
import type { Env, User } from '../types';
import { errorResponse, jsonResponse } from '../utils/response';
import { generateUUID } from '../utils/uuid';
import {
type BackupArchiveBundle,
buildBackupArchive,
@@ -10,12 +9,11 @@ import {
import {
type BackupDestinationRecord,
type BackupSettingsInput,
BACKUP_SCHEDULER_WINDOW_MINUTES,
type BackupSettings,
type WebDavBackupDestination,
getBackupLocalDateKey,
getDefaultBackupSettings,
getBackupSettingsRepairState,
hasBackupSlotBetween,
isBackupDueNow,
loadBackupSettings,
normalizeBackupSettingsInput,
normalizeImportedBackupSettings,
@@ -86,102 +84,6 @@ function getBackupDestinationSummary(destination: BackupDestinationRecord | null
};
}
const BACKUP_RUNNER_LOCK_KEY = 'backup.runner.lock.v1';
const BACKUP_RUNNER_LEASE_MS = 10 * 60 * 1000;
const BACKUP_RUNNER_HEARTBEAT_MS = 30 * 1000;
// CONTRACT:
// The runner lock is a config-row lease, not a queue. It only prevents two
// backup/restore jobs from overlapping. Manual runs return conflict when the
// lease is held; scheduled runs skip quietly. Never export this row in backups.
interface BackupRunnerLease {
token: string;
touch: () => Promise<void>;
release: () => Promise<void>;
}
async function acquireBackupRunnerLease(env: Env, reason: string): Promise<BackupRunnerLease | null> {
const token = generateUUID();
const nowMs = Date.now();
const expiresAtMs = nowMs + BACKUP_RUNNER_LEASE_MS;
const value = JSON.stringify({
token,
reason,
acquiredAt: new Date(nowMs).toISOString(),
touchedAt: new Date(nowMs).toISOString(),
expiresAtMs,
});
const result = await env.DB
.prepare(
`INSERT INTO config(key, value) VALUES(?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value
WHERE COALESCE(CAST(json_extract(config.value, '$.expiresAtMs') AS INTEGER), 0) <= ?`
)
.bind(BACKUP_RUNNER_LOCK_KEY, value, nowMs)
.run();
if ((result.meta?.changes || 0) < 1) {
return null;
}
return {
token,
touch: async () => {
const nextNowMs = Date.now();
const nextValue = JSON.stringify({
token,
reason,
acquiredAt: new Date(nowMs).toISOString(),
touchedAt: new Date(nextNowMs).toISOString(),
expiresAtMs: nextNowMs + BACKUP_RUNNER_LEASE_MS,
});
await env.DB
.prepare(
`UPDATE config
SET value = ?
WHERE key = ?
AND json_extract(value, '$.token') = ?`
)
.bind(nextValue, BACKUP_RUNNER_LOCK_KEY, token)
.run();
},
release: async () => {
await env.DB
.prepare(
`DELETE FROM config
WHERE key = ?
AND json_extract(value, '$.token') = ?`
)
.bind(BACKUP_RUNNER_LOCK_KEY, token)
.run();
},
};
}
async function withBackupRunnerLease<T>(
env: Env,
reason: string,
task: (keepAlive: () => Promise<void>) => Promise<T>
): Promise<T | null> {
const lease = await acquireBackupRunnerLease(env, reason);
if (!lease) return null;
let lastHeartbeatAt = 0;
const keepAlive = async () => {
const nowMs = Date.now();
if (nowMs - lastHeartbeatAt < BACKUP_RUNNER_HEARTBEAT_MS) return;
lastHeartbeatAt = nowMs;
await lease.touch();
};
try {
await keepAlive();
return await task(keepAlive);
} finally {
await lease.release();
}
}
function ensureBackupBlobName(value: string): string {
const normalized = String(value || '').trim().replace(/\\/g, '/').replace(/^\/+|\/+$/g, '');
if (!normalized) {
@@ -201,6 +103,36 @@ interface RemoteAttachmentIndexPayload {
blobs: Record<string, { sizeBytes: number; updatedAt: string }>;
}
const REMOTE_ATTACHMENT_SYNC_EXTERNAL_SUBREQUEST_LIMIT = 50;
const REMOTE_ATTACHMENT_SYNC_SUBREQUEST_RESERVE = 6;
const REMOTE_ATTACHMENT_SYNC_MAX_WEB_DAV_BATCH_SIZE = 18;
const REMOTE_ATTACHMENT_SYNC_MAX_S3_BATCH_SIZE = 40;
function countRemotePathSegments(value: string): number {
return String(value || '').replace(/\\/g, '/').split('/').filter(Boolean).length;
}
function getRemoteAttachmentSyncBatchSize(destination: BackupDestinationRecord): number {
if (destination.type === 's3') {
return REMOTE_ATTACHMENT_SYNC_MAX_S3_BATCH_SIZE;
}
const remotePath = String((destination.destination as WebDavBackupDestination).remotePath || '');
const fixedWebDavDirectoryCalls = countRemotePathSegments(remotePath) + 1; // remotePath plus the shared "attachments" dir.
const available = REMOTE_ATTACHMENT_SYNC_EXTERNAL_SUBREQUEST_LIMIT
- REMOTE_ATTACHMENT_SYNC_SUBREQUEST_RESERVE
- fixedWebDavDirectoryCalls;
if (available < 2) {
throw new Error('WebDAV remote backup path is too deep for safe attachment batching');
}
return Math.max(1, Math.min(
REMOTE_ATTACHMENT_SYNC_MAX_WEB_DAV_BATCH_SIZE,
Math.floor(available / 2)
));
}
async function loadRemoteAttachmentIndex(session: RemoteBackupTransferSession): Promise<Map<string, number>> {
try {
const file = await session.download(REMOTE_ATTACHMENT_INDEX_PATH);
@@ -256,7 +188,39 @@ async function saveRemoteAttachmentIndex(
});
}
async function executeConfiguredBackup(
async function uploadRemoteAttachmentChunk(
env: Env,
destination: BackupDestinationRecord,
attachments: Array<{ blobName: string }>
): Promise<void> {
if (!attachments.length) return;
const id = env.BACKUP_TRANSFER_RUNNER.idFromName('remote-attachment-sync');
const stub = env.BACKUP_TRANSFER_RUNNER.get(id);
const response = await stub.fetch('https://backup-transfer/internal/upload-attachment-chunk', {
method: 'POST',
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
body: JSON.stringify({
destination,
attachments,
}),
});
if (!response.ok) {
let message = `Attachment sync failed: ${response.status}`;
try {
const payload = await response.json<{ error?: string }>();
if (payload?.error) {
message = payload.error;
}
} catch {
// Ignore JSON parse failures and preserve the status-based error.
}
throw new Error(message);
}
}
export async function executeConfiguredBackup(
env: Env,
storage: StorageService,
actorUserId: string | null,
@@ -331,25 +295,20 @@ async function executeConfiguredBackup(
if (destination.includeAttachments) {
await touchLease();
const remoteAttachmentIndex = await loadRemoteAttachmentIndex(remoteSession);
let attachmentIndexChanged = false;
for (const attachment of archive.manifest.attachmentBlobs || []) {
const pendingAttachments = (archive.manifest.attachmentBlobs || [])
.filter((attachment) => remoteAttachmentIndex.get(attachment.blobName) !== attachment.sizeBytes);
const attachmentSyncBatchSize = getRemoteAttachmentSyncBatchSize(destination);
for (let i = 0; i < pendingAttachments.length; i += attachmentSyncBatchSize) {
await touchLease();
if (remoteAttachmentIndex.get(attachment.blobName) === attachment.sizeBytes) {
continue;
}
const remotePath = `attachments/${attachment.blobName}`;
const object = await getBlobObject(env, attachment.blobName);
if (!object) {
throw new Error(`Attachment blob missing for ${attachment.blobName}`);
}
const bytes = new Uint8Array(await new Response(object.body).arrayBuffer());
await remoteSession.putFile(remotePath, bytes, {
contentType: object.contentType,
});
remoteAttachmentIndex.set(attachment.blobName, attachment.sizeBytes);
attachmentIndexChanged = true;
const chunk = pendingAttachments
.slice(i, i + attachmentSyncBatchSize)
.map((attachment) => ({ blobName: attachment.blobName }));
await uploadRemoteAttachmentChunk(env, destination, chunk);
}
if (attachmentIndexChanged) {
if (pendingAttachments.length) {
for (const attachment of pendingAttachments) {
remoteAttachmentIndex.set(attachment.blobName, attachment.sizeBytes);
}
await touchLease();
await saveRemoteAttachmentIndex(remoteSession, remoteAttachmentIndex);
}
@@ -474,6 +433,76 @@ async function executeConfiguredBackup(
}
}
interface DurableBackupRunResponse {
result: {
fileName: string;
fileSize: number;
remotePath: string;
provider: string;
};
settings: BackupSettings;
}
async function runConfiguredBackupInDurableObject(
env: Env,
payload: {
actorUserId: string | null;
auditMetadata?: Record<string, unknown> | null;
destinationId?: string | null;
targetDeviceIdentifier?: string | null;
trigger: 'manual' | 'scheduled';
}
): Promise<DurableBackupRunResponse | null> {
const id = env.BACKUP_TRANSFER_RUNNER.idFromName('configured-backup-runner');
const stub = env.BACKUP_TRANSFER_RUNNER.get(id);
const response = await stub.fetch('https://backup-transfer/internal/run-configured-backup', {
method: 'POST',
headers: {
'Content-Type': 'application/json; charset=utf-8',
},
body: JSON.stringify(payload),
});
if (response.status === 409) {
return null;
}
if (!response.ok) {
let message = `Backup run failed: ${response.status}`;
try {
const body = await response.json<{ error?: string }>();
if (body?.error) message = body.error;
} catch {
// Preserve the status-based message when the DO returns a non-JSON error.
}
throw new Error(message);
}
const body = await response.json<DurableBackupRunResponse>();
if (!body?.result || !body?.settings) {
throw new Error('Backup run response is invalid');
}
return body;
}
async function runScheduledBackupsInDurableObject(env: Env): Promise<void> {
const id = env.BACKUP_TRANSFER_RUNNER.idFromName('configured-backup-runner');
const stub = env.BACKUP_TRANSFER_RUNNER.get(id);
const response = await stub.fetch('https://backup-transfer/internal/run-scheduled-backups', {
method: 'POST',
});
if (response.status === 409) {
return;
}
if (!response.ok) {
let message = `Scheduled backup failed: ${response.status}`;
try {
const body = await response.json<{ error?: string }>();
if (body?.error) message = body.error;
} catch {
// Preserve the status-based message when the DO returns a non-JSON error.
}
throw new Error(message);
}
}
function toImportStatusCode(message: string): number {
const lower = message.toLowerCase();
if (lower.includes('invalid backup') || lower.includes('invalid json')) return 400;
@@ -526,30 +555,7 @@ async function runImportAndAudit(
}
export async function runScheduledBackupIfDue(env: Env): Promise<void> {
await withBackupRunnerLease(env, 'scheduled', async (keepAlive) => {
const storage = new StorageService(env.DB);
let scanStartMs = Date.now();
while (true) {
await keepAlive();
const settings = await loadBackupSettings(storage, env, 'UTC');
const now = new Date();
const dueDestinations = settings.destinations.filter((destination) =>
isBackupDueNow(destination, now, BACKUP_SCHEDULER_WINDOW_MINUTES)
|| hasBackupSlotBetween(destination, new Date(scanStartMs), now)
);
if (!dueDestinations.length) {
return;
}
scanStartMs = now.getTime();
for (const destination of dueDestinations) {
await keepAlive();
await executeConfiguredBackup(env, storage, null, 'scheduled', destination.id, keepAlive);
}
}
});
await runScheduledBackupsInDurableObject(env);
}
export async function handleGetAdminBackupSettings(request: Request, env: Env, actorUser: User): Promise<Response> {
@@ -661,33 +667,12 @@ export async function handleRunAdminConfiguredBackup(request: Request, env: Env,
return errorResponse('Backup run payload is invalid', 400);
}
const targetDeviceIdentifier = String(request.headers.get('X-NodeWarden-Acting-Device-Id') || '').trim() || null;
const progress = async (event: {
operation: 'backup-remote-run';
step: string;
fileName: string;
stageTitle: string;
stageDetail: string;
done?: boolean;
ok?: boolean;
error?: string | null;
}) => {
await notifyUserBackupProgress(env, actorUser.id, event, targetDeviceIdentifier);
};
const outcome = await withBackupRunnerLease(env, `manual:${actorUser.id}`, async (keepAlive) => {
const storage = new StorageService(env.DB);
const result = await executeConfiguredBackup(
env,
storage,
actorUser.id,
'manual',
body?.destinationId || null,
keepAlive,
progress,
auditRequestMetadata(request)
);
const settings = await loadBackupSettings(storage, env, 'UTC');
return { result, settings };
const outcome = await runConfiguredBackupInDurableObject(env, {
actorUserId: actorUser.id,
auditMetadata: auditRequestMetadata(request),
destinationId: body?.destinationId || null,
targetDeviceIdentifier: String(request.headers.get('X-NodeWarden-Acting-Device-Id') || '').trim() || null,
trigger: 'manual',
});
if (!outcome) {
return errorResponse('Another backup run is already in progress', 409);