mirror of
https://github.com/shuaiplus/nodewarden.git
synced 2026-06-20 21:00:41 +00:00
feat: enhance backup archive processing with configurable chunk sizes and compression levels
This commit is contained in:
@@ -11,9 +11,12 @@ type SqlRow = Record<string, string | number | null>;
|
|||||||
|
|
||||||
const BACKUP_FORMAT_VERSION = 1;
|
const BACKUP_FORMAT_VERSION = 1;
|
||||||
const BACKUP_APP_VERSION = '1.3.0';
|
const BACKUP_APP_VERSION = '1.3.0';
|
||||||
const BACKUP_ZIP_COMPRESSION_LEVEL = 6;
|
const BACKUP_TEXT_COMPRESSION_LEVEL = 6;
|
||||||
|
const BACKUP_BINARY_COMPRESSION_LEVEL = 1;
|
||||||
const BACKUP_R2_BLOB_READ_CONCURRENCY = 8;
|
const BACKUP_R2_BLOB_READ_CONCURRENCY = 8;
|
||||||
const BACKUP_KV_BLOB_READ_CONCURRENCY = 4;
|
const BACKUP_KV_BLOB_READ_CONCURRENCY = 4;
|
||||||
|
const BACKUP_R2_BLOB_READ_CHUNK_SIZE = 64;
|
||||||
|
const BACKUP_KV_BLOB_READ_CHUNK_SIZE = 32;
|
||||||
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
|
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
|
||||||
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
|
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
|
||||||
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
|
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
|
||||||
@@ -94,6 +97,10 @@ function getBackupBlobReadConcurrency(env: Env): number {
|
|||||||
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY;
|
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getBackupBlobReadChunkSize(env: Env): number {
|
||||||
|
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CHUNK_SIZE : BACKUP_R2_BLOB_READ_CHUNK_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
async function mapWithConcurrency<T, R>(
|
async function mapWithConcurrency<T, R>(
|
||||||
items: readonly T[],
|
items: readonly T[],
|
||||||
concurrency: number,
|
concurrency: number,
|
||||||
@@ -131,26 +138,32 @@ async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{
|
|||||||
let totalBytes = 0;
|
let totalBytes = 0;
|
||||||
let largestObjectBytes = 0;
|
let largestObjectBytes = 0;
|
||||||
|
|
||||||
const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => {
|
const concurrency = getBackupBlobReadConcurrency(env);
|
||||||
const object = await getBlobObject(env, task.objectKey);
|
const chunkSize = getBackupBlobReadChunkSize(env);
|
||||||
if (!object) {
|
|
||||||
throw new Error(task.missingMessage);
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
archivePath: task.archivePath,
|
|
||||||
bytes: await streamToBytes(object.body),
|
|
||||||
kind: task.kind,
|
|
||||||
} satisfies BackupBlobTaskResult;
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const item of loaded) {
|
for (let offset = 0; offset < tasks.length; offset += chunkSize) {
|
||||||
files[item.archivePath] = item.bytes;
|
const chunk = tasks.slice(offset, offset + chunkSize);
|
||||||
totalBytes += item.bytes.byteLength;
|
const loaded = await mapWithConcurrency(chunk, concurrency, async (task) => {
|
||||||
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength);
|
const object = await getBlobObject(env, task.objectKey);
|
||||||
if (item.kind === 'attachment') {
|
if (!object) {
|
||||||
attachmentFiles += 1;
|
throw new Error(task.missingMessage);
|
||||||
} else {
|
}
|
||||||
sendFiles += 1;
|
return {
|
||||||
|
archivePath: task.archivePath,
|
||||||
|
bytes: await streamToBytes(object.body),
|
||||||
|
kind: task.kind,
|
||||||
|
} satisfies BackupBlobTaskResult;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const item of loaded) {
|
||||||
|
files[item.archivePath] = item.bytes;
|
||||||
|
totalBytes += item.bytes.byteLength;
|
||||||
|
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength);
|
||||||
|
if (item.kind === 'attachment') {
|
||||||
|
attachmentFiles += 1;
|
||||||
|
} else {
|
||||||
|
sendFiles += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,6 +218,15 @@ function ensureRowArray(value: unknown, table: string): SqlRow[] {
|
|||||||
return value as SqlRow[];
|
return value as SqlRow[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createZipEntries(files: Record<string, Uint8Array>): Record<string, Uint8Array | [Uint8Array, { level: 0 | 1 | 6 }]> {
|
||||||
|
const entries: Record<string, Uint8Array | [Uint8Array, { level: 0 | 1 | 6 }]> = {};
|
||||||
|
for (const [path, bytes] of Object.entries(files)) {
|
||||||
|
const isBinaryBlob = path.endsWith('.bin');
|
||||||
|
entries[path] = [bytes, { level: isBinaryBlob ? BACKUP_BINARY_COMPRESSION_LEVEL : BACKUP_TEXT_COMPRESSION_LEVEL }];
|
||||||
|
}
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
|
|
||||||
export function parseBackupArchive(bytes: Uint8Array): { payload: BackupPayload; files: Record<string, Uint8Array> } {
|
export function parseBackupArchive(bytes: Uint8Array): { payload: BackupPayload; files: Record<string, Uint8Array> } {
|
||||||
validateArchiveSize(bytes);
|
validateArchiveSize(bytes);
|
||||||
let zipped: Record<string, Uint8Array>;
|
let zipped: Record<string, Uint8Array>;
|
||||||
@@ -435,7 +457,7 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
|
|||||||
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));
|
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));
|
||||||
|
|
||||||
return {
|
return {
|
||||||
bytes: zipSync(files, { level: BACKUP_ZIP_COMPRESSION_LEVEL }),
|
bytes: zipSync(createZipEntries(files)),
|
||||||
fileName: buildBackupFileName(date),
|
fileName: buildBackupFileName(date),
|
||||||
manifest,
|
manifest,
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user