diff --git a/src/services/backup-archive.ts b/src/services/backup-archive.ts index e93c876..9bd8f06 100644 --- a/src/services/backup-archive.ts +++ b/src/services/backup-archive.ts @@ -11,9 +11,12 @@ type SqlRow = Record; const BACKUP_FORMAT_VERSION = 1; const BACKUP_APP_VERSION = '1.3.0'; -const BACKUP_ZIP_COMPRESSION_LEVEL = 6; +const BACKUP_TEXT_COMPRESSION_LEVEL = 6; +const BACKUP_BINARY_COMPRESSION_LEVEL = 1; const BACKUP_R2_BLOB_READ_CONCURRENCY = 8; const BACKUP_KV_BLOB_READ_CONCURRENCY = 4; +const BACKUP_R2_BLOB_READ_CHUNK_SIZE = 64; +const BACKUP_KV_BLOB_READ_CHUNK_SIZE = 32; const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024; const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000; const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024; @@ -94,6 +97,10 @@ function getBackupBlobReadConcurrency(env: Env): number { return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY; } +function getBackupBlobReadChunkSize(env: Env): number { + return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CHUNK_SIZE : BACKUP_R2_BLOB_READ_CHUNK_SIZE; +} + async function mapWithConcurrency( items: readonly T[], concurrency: number, @@ -131,26 +138,32 @@ async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{ let totalBytes = 0; let largestObjectBytes = 0; - const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => { - const object = await getBlobObject(env, task.objectKey); - if (!object) { - throw new Error(task.missingMessage); - } - return { - archivePath: task.archivePath, - bytes: await streamToBytes(object.body), - kind: task.kind, - } satisfies BackupBlobTaskResult; - }); + const concurrency = getBackupBlobReadConcurrency(env); + const chunkSize = getBackupBlobReadChunkSize(env); - for (const item of loaded) { - files[item.archivePath] = item.bytes; - totalBytes += item.bytes.byteLength; - largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength); - if (item.kind === 'attachment') { - attachmentFiles += 1; - } else { - sendFiles += 1; + for (let offset = 0; offset < tasks.length; offset += chunkSize) { + const chunk = tasks.slice(offset, offset + chunkSize); + const loaded = await mapWithConcurrency(chunk, concurrency, async (task) => { + const object = await getBlobObject(env, task.objectKey); + if (!object) { + throw new Error(task.missingMessage); + } + return { + archivePath: task.archivePath, + bytes: await streamToBytes(object.body), + kind: task.kind, + } satisfies BackupBlobTaskResult; + }); + + for (const item of loaded) { + files[item.archivePath] = item.bytes; + totalBytes += item.bytes.byteLength; + largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength); + if (item.kind === 'attachment') { + attachmentFiles += 1; + } else { + sendFiles += 1; + } } } @@ -205,6 +218,15 @@ function ensureRowArray(value: unknown, table: string): SqlRow[] { return value as SqlRow[]; } +function createZipEntries(files: Record): Record { + const entries: Record = {}; + for (const [path, bytes] of Object.entries(files)) { + const isBinaryBlob = path.endsWith('.bin'); + entries[path] = [bytes, { level: isBinaryBlob ? BACKUP_BINARY_COMPRESSION_LEVEL : BACKUP_TEXT_COMPRESSION_LEVEL }]; + } + return entries; +} + export function parseBackupArchive(bytes: Uint8Array): { payload: BackupPayload; files: Record } { validateArchiveSize(bytes); let zipped: Record; @@ -435,7 +457,7 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2)); return { - bytes: zipSync(files, { level: BACKUP_ZIP_COMPRESSION_LEVEL }), + bytes: zipSync(createZipEntries(files)), fileName: buildBackupFileName(date), manifest, };