feat: enhance backup archive processing with configurable chunk sizes and compression levels

This commit is contained in:
shuaiplus
2026-03-16 00:24:14 +08:00
parent 3d33f78a0c
commit 557f4bfbbd
+43 -21
View File
@@ -11,9 +11,12 @@ type SqlRow = Record<string, string | number | null>;
const BACKUP_FORMAT_VERSION = 1; const BACKUP_FORMAT_VERSION = 1;
const BACKUP_APP_VERSION = '1.3.0'; const BACKUP_APP_VERSION = '1.3.0';
const BACKUP_ZIP_COMPRESSION_LEVEL = 6; const BACKUP_TEXT_COMPRESSION_LEVEL = 6;
const BACKUP_BINARY_COMPRESSION_LEVEL = 1;
const BACKUP_R2_BLOB_READ_CONCURRENCY = 8; const BACKUP_R2_BLOB_READ_CONCURRENCY = 8;
const BACKUP_KV_BLOB_READ_CONCURRENCY = 4; const BACKUP_KV_BLOB_READ_CONCURRENCY = 4;
const BACKUP_R2_BLOB_READ_CHUNK_SIZE = 64;
const BACKUP_KV_BLOB_READ_CHUNK_SIZE = 32;
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024; const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000; const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024; const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
@@ -94,6 +97,10 @@ function getBackupBlobReadConcurrency(env: Env): number {
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY; return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY;
} }
function getBackupBlobReadChunkSize(env: Env): number {
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CHUNK_SIZE : BACKUP_R2_BLOB_READ_CHUNK_SIZE;
}
async function mapWithConcurrency<T, R>( async function mapWithConcurrency<T, R>(
items: readonly T[], items: readonly T[],
concurrency: number, concurrency: number,
@@ -131,26 +138,32 @@ async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{
let totalBytes = 0; let totalBytes = 0;
let largestObjectBytes = 0; let largestObjectBytes = 0;
const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => { const concurrency = getBackupBlobReadConcurrency(env);
const object = await getBlobObject(env, task.objectKey); const chunkSize = getBackupBlobReadChunkSize(env);
if (!object) {
throw new Error(task.missingMessage);
}
return {
archivePath: task.archivePath,
bytes: await streamToBytes(object.body),
kind: task.kind,
} satisfies BackupBlobTaskResult;
});
for (const item of loaded) { for (let offset = 0; offset < tasks.length; offset += chunkSize) {
files[item.archivePath] = item.bytes; const chunk = tasks.slice(offset, offset + chunkSize);
totalBytes += item.bytes.byteLength; const loaded = await mapWithConcurrency(chunk, concurrency, async (task) => {
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength); const object = await getBlobObject(env, task.objectKey);
if (item.kind === 'attachment') { if (!object) {
attachmentFiles += 1; throw new Error(task.missingMessage);
} else { }
sendFiles += 1; return {
archivePath: task.archivePath,
bytes: await streamToBytes(object.body),
kind: task.kind,
} satisfies BackupBlobTaskResult;
});
for (const item of loaded) {
files[item.archivePath] = item.bytes;
totalBytes += item.bytes.byteLength;
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength);
if (item.kind === 'attachment') {
attachmentFiles += 1;
} else {
sendFiles += 1;
}
} }
} }
@@ -205,6 +218,15 @@ function ensureRowArray(value: unknown, table: string): SqlRow[] {
return value as SqlRow[]; return value as SqlRow[];
} }
function createZipEntries(files: Record<string, Uint8Array>): Record<string, Uint8Array | [Uint8Array, { level: 0 | 1 | 6 }]> {
const entries: Record<string, Uint8Array | [Uint8Array, { level: 0 | 1 | 6 }]> = {};
for (const [path, bytes] of Object.entries(files)) {
const isBinaryBlob = path.endsWith('.bin');
entries[path] = [bytes, { level: isBinaryBlob ? BACKUP_BINARY_COMPRESSION_LEVEL : BACKUP_TEXT_COMPRESSION_LEVEL }];
}
return entries;
}
export function parseBackupArchive(bytes: Uint8Array): { payload: BackupPayload; files: Record<string, Uint8Array> } { export function parseBackupArchive(bytes: Uint8Array): { payload: BackupPayload; files: Record<string, Uint8Array> } {
validateArchiveSize(bytes); validateArchiveSize(bytes);
let zipped: Record<string, Uint8Array>; let zipped: Record<string, Uint8Array>;
@@ -435,7 +457,7 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2)); files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));
return { return {
bytes: zipSync(files, { level: BACKUP_ZIP_COMPRESSION_LEVEL }), bytes: zipSync(createZipEntries(files)),
fileName: buildBackupFileName(date), fileName: buildBackupFileName(date),
manifest, manifest,
}; };