diff --git a/src/services/backup-archive.ts b/src/services/backup-archive.ts index f84dff2..e93c876 100644 --- a/src/services/backup-archive.ts +++ b/src/services/backup-archive.ts @@ -12,6 +12,8 @@ type SqlRow = Record; const BACKUP_FORMAT_VERSION = 1; const BACKUP_APP_VERSION = '1.3.0'; const BACKUP_ZIP_COMPRESSION_LEVEL = 6; +const BACKUP_R2_BLOB_READ_CONCURRENCY = 8; +const BACKUP_KV_BLOB_READ_CONCURRENCY = 4; const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024; const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000; const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024; @@ -54,6 +56,19 @@ export interface BackupArchiveBundle { manifest: BackupManifest; } +interface BackupBlobTask { + archivePath: string; + objectKey: string; + kind: 'attachment' | 'send-file'; + missingMessage: string; +} + +interface BackupBlobTaskResult { + archivePath: string; + bytes: Uint8Array; + kind: BackupBlobTask['kind']; +} + export function parseSendFileId(data: string | null): string | null { if (!data) return null; try { @@ -75,6 +90,79 @@ async function streamToBytes(stream: ReadableStream | null): Promise return new Uint8Array(buffer); } +function getBackupBlobReadConcurrency(env: Env): number { + return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY; +} + +async function mapWithConcurrency( + items: readonly T[], + concurrency: number, + worker: (item: T, index: number) => Promise +): Promise { + if (!items.length) return []; + + const results = new Array(items.length); + let nextIndex = 0; + + const runWorker = async (): Promise => { + while (true) { + const currentIndex = nextIndex; + nextIndex += 1; + if (currentIndex >= items.length) return; + results[currentIndex] = await worker(items[currentIndex], currentIndex); + } + }; + + const workerCount = Math.max(1, Math.min(concurrency, items.length)); + await Promise.all(Array.from({ length: workerCount }, () => runWorker())); + return results; +} + +async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{ + files: Record; + attachmentFiles: number; + sendFiles: number; + totalBytes: number; + largestObjectBytes: number; +}> { + const files: Record = {}; + let attachmentFiles = 0; + let sendFiles = 0; + let totalBytes = 0; + let largestObjectBytes = 0; + + const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => { + const object = await getBlobObject(env, task.objectKey); + if (!object) { + throw new Error(task.missingMessage); + } + return { + archivePath: task.archivePath, + bytes: await streamToBytes(object.body), + kind: task.kind, + } satisfies BackupBlobTaskResult; + }); + + for (const item of loaded) { + files[item.archivePath] = item.bytes; + totalBytes += item.bytes.byteLength; + largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength); + if (item.kind === 'attachment') { + attachmentFiles += 1; + } else { + sendFiles += 1; + } + } + + return { + files, + attachmentFiles, + sendFiles, + totalBytes, + largestObjectBytes, + }; +} + function buildBackupFileName(date: Date = new Date()): string { const parts = [ date.getUTCFullYear().toString().padStart(4, '0'), @@ -268,10 +356,6 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro queryRows(env.DB, 'SELECT id, user_id, type, name, notes, data, key, password_hash, password_salt, password_iterations, auth_type, emails, max_access_count, access_count, disabled, hide_email, created_at, updated_at, expiration_date, deletion_date FROM sends ORDER BY created_at ASC'), ]); - let attachmentBlobCount = 0; - let sendFileBlobCount = 0; - let totalBlobBytes = 0; - let largestObjectBytes = 0; const manifestBase = { formatVersion: BACKUP_FORMAT_VERSION, exportedAt: date.toISOString(), @@ -311,43 +395,41 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro }, null, 2)), }; + const blobTasks: BackupBlobTask[] = []; for (const row of attachmentRows) { const cipherId = String(row.cipher_id || '').trim(); const attachmentId = String(row.id || '').trim(); if (!cipherId || !attachmentId) continue; - const object = await getBlobObject(env, getAttachmentObjectKey(cipherId, attachmentId)); - if (!object) { - throw new Error(`Attachment blob missing for ${cipherId}/${attachmentId}`); - } - const bytes = await streamToBytes(object.body); - files[`attachments/${cipherId}/${attachmentId}.bin`] = bytes; - attachmentBlobCount += 1; - totalBlobBytes += bytes.byteLength; - largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength); + blobTasks.push({ + archivePath: `attachments/${cipherId}/${attachmentId}.bin`, + objectKey: getAttachmentObjectKey(cipherId, attachmentId), + kind: 'attachment', + missingMessage: `Attachment blob missing for ${cipherId}/${attachmentId}`, + }); } for (const row of sendRows) { const sendId = String(row.id || '').trim(); const fileId = parseSendFileId(typeof row.data === 'string' ? row.data : null); if (!sendId || !fileId) continue; - const object = await getBlobObject(env, getSendFileObjectKey(sendId, fileId)); - if (!object) { - throw new Error(`Send file blob missing for ${sendId}/${fileId}`); - } - const bytes = await streamToBytes(object.body); - files[`send-files/${sendId}/${fileId}.bin`] = bytes; - sendFileBlobCount += 1; - totalBlobBytes += bytes.byteLength; - largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength); + blobTasks.push({ + archivePath: `send-files/${sendId}/${fileId}.bin`, + objectKey: getSendFileObjectKey(sendId, fileId), + kind: 'send-file', + missingMessage: `Send file blob missing for ${sendId}/${fileId}`, + }); } + const blobFiles = await loadBackupBlobFiles(env, blobTasks); + Object.assign(files, blobFiles.files); + const manifest: BackupManifest = { ...manifestBase, blobSummary: { - attachmentFiles: attachmentBlobCount, - sendFiles: sendFileBlobCount, - totalBytes: totalBlobBytes, - largestObjectBytes, + attachmentFiles: blobFiles.attachmentFiles, + sendFiles: blobFiles.sendFiles, + totalBytes: blobFiles.totalBytes, + largestObjectBytes: blobFiles.largestObjectBytes, }, }; files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));