feat: enhance backup archive functionality with blob task management and concurrency handling

This commit is contained in:
shuaiplus
2026-03-16 00:05:11 +08:00
parent fc2667501c
commit 3d33f78a0c
+108 -26
View File
@@ -12,6 +12,8 @@ type SqlRow = Record<string, string | number | null>;
const BACKUP_FORMAT_VERSION = 1; const BACKUP_FORMAT_VERSION = 1;
const BACKUP_APP_VERSION = '1.3.0'; const BACKUP_APP_VERSION = '1.3.0';
const BACKUP_ZIP_COMPRESSION_LEVEL = 6; const BACKUP_ZIP_COMPRESSION_LEVEL = 6;
const BACKUP_R2_BLOB_READ_CONCURRENCY = 8;
const BACKUP_KV_BLOB_READ_CONCURRENCY = 4;
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024; const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000; const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024; const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
@@ -54,6 +56,19 @@ export interface BackupArchiveBundle {
manifest: BackupManifest; manifest: BackupManifest;
} }
interface BackupBlobTask {
archivePath: string;
objectKey: string;
kind: 'attachment' | 'send-file';
missingMessage: string;
}
interface BackupBlobTaskResult {
archivePath: string;
bytes: Uint8Array;
kind: BackupBlobTask['kind'];
}
export function parseSendFileId(data: string | null): string | null { export function parseSendFileId(data: string | null): string | null {
if (!data) return null; if (!data) return null;
try { try {
@@ -75,6 +90,79 @@ async function streamToBytes(stream: ReadableStream | null): Promise<Uint8Array>
return new Uint8Array(buffer); return new Uint8Array(buffer);
} }
function getBackupBlobReadConcurrency(env: Env): number {
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY;
}
async function mapWithConcurrency<T, R>(
items: readonly T[],
concurrency: number,
worker: (item: T, index: number) => Promise<R>
): Promise<R[]> {
if (!items.length) return [];
const results = new Array<R>(items.length);
let nextIndex = 0;
const runWorker = async (): Promise<void> => {
while (true) {
const currentIndex = nextIndex;
nextIndex += 1;
if (currentIndex >= items.length) return;
results[currentIndex] = await worker(items[currentIndex], currentIndex);
}
};
const workerCount = Math.max(1, Math.min(concurrency, items.length));
await Promise.all(Array.from({ length: workerCount }, () => runWorker()));
return results;
}
async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{
files: Record<string, Uint8Array>;
attachmentFiles: number;
sendFiles: number;
totalBytes: number;
largestObjectBytes: number;
}> {
const files: Record<string, Uint8Array> = {};
let attachmentFiles = 0;
let sendFiles = 0;
let totalBytes = 0;
let largestObjectBytes = 0;
const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => {
const object = await getBlobObject(env, task.objectKey);
if (!object) {
throw new Error(task.missingMessage);
}
return {
archivePath: task.archivePath,
bytes: await streamToBytes(object.body),
kind: task.kind,
} satisfies BackupBlobTaskResult;
});
for (const item of loaded) {
files[item.archivePath] = item.bytes;
totalBytes += item.bytes.byteLength;
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength);
if (item.kind === 'attachment') {
attachmentFiles += 1;
} else {
sendFiles += 1;
}
}
return {
files,
attachmentFiles,
sendFiles,
totalBytes,
largestObjectBytes,
};
}
function buildBackupFileName(date: Date = new Date()): string { function buildBackupFileName(date: Date = new Date()): string {
const parts = [ const parts = [
date.getUTCFullYear().toString().padStart(4, '0'), date.getUTCFullYear().toString().padStart(4, '0'),
@@ -268,10 +356,6 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
queryRows(env.DB, 'SELECT id, user_id, type, name, notes, data, key, password_hash, password_salt, password_iterations, auth_type, emails, max_access_count, access_count, disabled, hide_email, created_at, updated_at, expiration_date, deletion_date FROM sends ORDER BY created_at ASC'), queryRows(env.DB, 'SELECT id, user_id, type, name, notes, data, key, password_hash, password_salt, password_iterations, auth_type, emails, max_access_count, access_count, disabled, hide_email, created_at, updated_at, expiration_date, deletion_date FROM sends ORDER BY created_at ASC'),
]); ]);
let attachmentBlobCount = 0;
let sendFileBlobCount = 0;
let totalBlobBytes = 0;
let largestObjectBytes = 0;
const manifestBase = { const manifestBase = {
formatVersion: BACKUP_FORMAT_VERSION, formatVersion: BACKUP_FORMAT_VERSION,
exportedAt: date.toISOString(), exportedAt: date.toISOString(),
@@ -311,43 +395,41 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
}, null, 2)), }, null, 2)),
}; };
const blobTasks: BackupBlobTask[] = [];
for (const row of attachmentRows) { for (const row of attachmentRows) {
const cipherId = String(row.cipher_id || '').trim(); const cipherId = String(row.cipher_id || '').trim();
const attachmentId = String(row.id || '').trim(); const attachmentId = String(row.id || '').trim();
if (!cipherId || !attachmentId) continue; if (!cipherId || !attachmentId) continue;
const object = await getBlobObject(env, getAttachmentObjectKey(cipherId, attachmentId)); blobTasks.push({
if (!object) { archivePath: `attachments/${cipherId}/${attachmentId}.bin`,
throw new Error(`Attachment blob missing for ${cipherId}/${attachmentId}`); objectKey: getAttachmentObjectKey(cipherId, attachmentId),
} kind: 'attachment',
const bytes = await streamToBytes(object.body); missingMessage: `Attachment blob missing for ${cipherId}/${attachmentId}`,
files[`attachments/${cipherId}/${attachmentId}.bin`] = bytes; });
attachmentBlobCount += 1;
totalBlobBytes += bytes.byteLength;
largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength);
} }
for (const row of sendRows) { for (const row of sendRows) {
const sendId = String(row.id || '').trim(); const sendId = String(row.id || '').trim();
const fileId = parseSendFileId(typeof row.data === 'string' ? row.data : null); const fileId = parseSendFileId(typeof row.data === 'string' ? row.data : null);
if (!sendId || !fileId) continue; if (!sendId || !fileId) continue;
const object = await getBlobObject(env, getSendFileObjectKey(sendId, fileId)); blobTasks.push({
if (!object) { archivePath: `send-files/${sendId}/${fileId}.bin`,
throw new Error(`Send file blob missing for ${sendId}/${fileId}`); objectKey: getSendFileObjectKey(sendId, fileId),
} kind: 'send-file',
const bytes = await streamToBytes(object.body); missingMessage: `Send file blob missing for ${sendId}/${fileId}`,
files[`send-files/${sendId}/${fileId}.bin`] = bytes; });
sendFileBlobCount += 1;
totalBlobBytes += bytes.byteLength;
largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength);
} }
const blobFiles = await loadBackupBlobFiles(env, blobTasks);
Object.assign(files, blobFiles.files);
const manifest: BackupManifest = { const manifest: BackupManifest = {
...manifestBase, ...manifestBase,
blobSummary: { blobSummary: {
attachmentFiles: attachmentBlobCount, attachmentFiles: blobFiles.attachmentFiles,
sendFiles: sendFileBlobCount, sendFiles: blobFiles.sendFiles,
totalBytes: totalBlobBytes, totalBytes: blobFiles.totalBytes,
largestObjectBytes, largestObjectBytes: blobFiles.largestObjectBytes,
}, },
}; };
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2)); files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));