mirror of
https://github.com/shuaiplus/nodewarden.git
synced 2026-06-20 13:00:39 +00:00
feat: enhance backup archive functionality with blob task management and concurrency handling
This commit is contained in:
+108
-26
@@ -12,6 +12,8 @@ type SqlRow = Record<string, string | number | null>;
|
|||||||
const BACKUP_FORMAT_VERSION = 1;
|
const BACKUP_FORMAT_VERSION = 1;
|
||||||
const BACKUP_APP_VERSION = '1.3.0';
|
const BACKUP_APP_VERSION = '1.3.0';
|
||||||
const BACKUP_ZIP_COMPRESSION_LEVEL = 6;
|
const BACKUP_ZIP_COMPRESSION_LEVEL = 6;
|
||||||
|
const BACKUP_R2_BLOB_READ_CONCURRENCY = 8;
|
||||||
|
const BACKUP_KV_BLOB_READ_CONCURRENCY = 4;
|
||||||
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
|
const MAX_BACKUP_ARCHIVE_BYTES = 64 * 1024 * 1024;
|
||||||
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
|
const MAX_BACKUP_ARCHIVE_ENTRY_COUNT = 10_000;
|
||||||
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
|
const MAX_BACKUP_EXTRACTED_BYTES = 128 * 1024 * 1024;
|
||||||
@@ -54,6 +56,19 @@ export interface BackupArchiveBundle {
|
|||||||
manifest: BackupManifest;
|
manifest: BackupManifest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface BackupBlobTask {
|
||||||
|
archivePath: string;
|
||||||
|
objectKey: string;
|
||||||
|
kind: 'attachment' | 'send-file';
|
||||||
|
missingMessage: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface BackupBlobTaskResult {
|
||||||
|
archivePath: string;
|
||||||
|
bytes: Uint8Array;
|
||||||
|
kind: BackupBlobTask['kind'];
|
||||||
|
}
|
||||||
|
|
||||||
export function parseSendFileId(data: string | null): string | null {
|
export function parseSendFileId(data: string | null): string | null {
|
||||||
if (!data) return null;
|
if (!data) return null;
|
||||||
try {
|
try {
|
||||||
@@ -75,6 +90,79 @@ async function streamToBytes(stream: ReadableStream | null): Promise<Uint8Array>
|
|||||||
return new Uint8Array(buffer);
|
return new Uint8Array(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getBackupBlobReadConcurrency(env: Env): number {
|
||||||
|
return getBlobStorageKind(env) === 'kv' ? BACKUP_KV_BLOB_READ_CONCURRENCY : BACKUP_R2_BLOB_READ_CONCURRENCY;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function mapWithConcurrency<T, R>(
|
||||||
|
items: readonly T[],
|
||||||
|
concurrency: number,
|
||||||
|
worker: (item: T, index: number) => Promise<R>
|
||||||
|
): Promise<R[]> {
|
||||||
|
if (!items.length) return [];
|
||||||
|
|
||||||
|
const results = new Array<R>(items.length);
|
||||||
|
let nextIndex = 0;
|
||||||
|
|
||||||
|
const runWorker = async (): Promise<void> => {
|
||||||
|
while (true) {
|
||||||
|
const currentIndex = nextIndex;
|
||||||
|
nextIndex += 1;
|
||||||
|
if (currentIndex >= items.length) return;
|
||||||
|
results[currentIndex] = await worker(items[currentIndex], currentIndex);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const workerCount = Math.max(1, Math.min(concurrency, items.length));
|
||||||
|
await Promise.all(Array.from({ length: workerCount }, () => runWorker()));
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function loadBackupBlobFiles(env: Env, tasks: BackupBlobTask[]): Promise<{
|
||||||
|
files: Record<string, Uint8Array>;
|
||||||
|
attachmentFiles: number;
|
||||||
|
sendFiles: number;
|
||||||
|
totalBytes: number;
|
||||||
|
largestObjectBytes: number;
|
||||||
|
}> {
|
||||||
|
const files: Record<string, Uint8Array> = {};
|
||||||
|
let attachmentFiles = 0;
|
||||||
|
let sendFiles = 0;
|
||||||
|
let totalBytes = 0;
|
||||||
|
let largestObjectBytes = 0;
|
||||||
|
|
||||||
|
const loaded = await mapWithConcurrency(tasks, getBackupBlobReadConcurrency(env), async (task) => {
|
||||||
|
const object = await getBlobObject(env, task.objectKey);
|
||||||
|
if (!object) {
|
||||||
|
throw new Error(task.missingMessage);
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
archivePath: task.archivePath,
|
||||||
|
bytes: await streamToBytes(object.body),
|
||||||
|
kind: task.kind,
|
||||||
|
} satisfies BackupBlobTaskResult;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const item of loaded) {
|
||||||
|
files[item.archivePath] = item.bytes;
|
||||||
|
totalBytes += item.bytes.byteLength;
|
||||||
|
largestObjectBytes = Math.max(largestObjectBytes, item.bytes.byteLength);
|
||||||
|
if (item.kind === 'attachment') {
|
||||||
|
attachmentFiles += 1;
|
||||||
|
} else {
|
||||||
|
sendFiles += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
files,
|
||||||
|
attachmentFiles,
|
||||||
|
sendFiles,
|
||||||
|
totalBytes,
|
||||||
|
largestObjectBytes,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
function buildBackupFileName(date: Date = new Date()): string {
|
function buildBackupFileName(date: Date = new Date()): string {
|
||||||
const parts = [
|
const parts = [
|
||||||
date.getUTCFullYear().toString().padStart(4, '0'),
|
date.getUTCFullYear().toString().padStart(4, '0'),
|
||||||
@@ -268,10 +356,6 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
|
|||||||
queryRows(env.DB, 'SELECT id, user_id, type, name, notes, data, key, password_hash, password_salt, password_iterations, auth_type, emails, max_access_count, access_count, disabled, hide_email, created_at, updated_at, expiration_date, deletion_date FROM sends ORDER BY created_at ASC'),
|
queryRows(env.DB, 'SELECT id, user_id, type, name, notes, data, key, password_hash, password_salt, password_iterations, auth_type, emails, max_access_count, access_count, disabled, hide_email, created_at, updated_at, expiration_date, deletion_date FROM sends ORDER BY created_at ASC'),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let attachmentBlobCount = 0;
|
|
||||||
let sendFileBlobCount = 0;
|
|
||||||
let totalBlobBytes = 0;
|
|
||||||
let largestObjectBytes = 0;
|
|
||||||
const manifestBase = {
|
const manifestBase = {
|
||||||
formatVersion: BACKUP_FORMAT_VERSION,
|
formatVersion: BACKUP_FORMAT_VERSION,
|
||||||
exportedAt: date.toISOString(),
|
exportedAt: date.toISOString(),
|
||||||
@@ -311,43 +395,41 @@ export async function buildBackupArchive(env: Env, date: Date = new Date()): Pro
|
|||||||
}, null, 2)),
|
}, null, 2)),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const blobTasks: BackupBlobTask[] = [];
|
||||||
for (const row of attachmentRows) {
|
for (const row of attachmentRows) {
|
||||||
const cipherId = String(row.cipher_id || '').trim();
|
const cipherId = String(row.cipher_id || '').trim();
|
||||||
const attachmentId = String(row.id || '').trim();
|
const attachmentId = String(row.id || '').trim();
|
||||||
if (!cipherId || !attachmentId) continue;
|
if (!cipherId || !attachmentId) continue;
|
||||||
const object = await getBlobObject(env, getAttachmentObjectKey(cipherId, attachmentId));
|
blobTasks.push({
|
||||||
if (!object) {
|
archivePath: `attachments/${cipherId}/${attachmentId}.bin`,
|
||||||
throw new Error(`Attachment blob missing for ${cipherId}/${attachmentId}`);
|
objectKey: getAttachmentObjectKey(cipherId, attachmentId),
|
||||||
}
|
kind: 'attachment',
|
||||||
const bytes = await streamToBytes(object.body);
|
missingMessage: `Attachment blob missing for ${cipherId}/${attachmentId}`,
|
||||||
files[`attachments/${cipherId}/${attachmentId}.bin`] = bytes;
|
});
|
||||||
attachmentBlobCount += 1;
|
|
||||||
totalBlobBytes += bytes.byteLength;
|
|
||||||
largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const row of sendRows) {
|
for (const row of sendRows) {
|
||||||
const sendId = String(row.id || '').trim();
|
const sendId = String(row.id || '').trim();
|
||||||
const fileId = parseSendFileId(typeof row.data === 'string' ? row.data : null);
|
const fileId = parseSendFileId(typeof row.data === 'string' ? row.data : null);
|
||||||
if (!sendId || !fileId) continue;
|
if (!sendId || !fileId) continue;
|
||||||
const object = await getBlobObject(env, getSendFileObjectKey(sendId, fileId));
|
blobTasks.push({
|
||||||
if (!object) {
|
archivePath: `send-files/${sendId}/${fileId}.bin`,
|
||||||
throw new Error(`Send file blob missing for ${sendId}/${fileId}`);
|
objectKey: getSendFileObjectKey(sendId, fileId),
|
||||||
}
|
kind: 'send-file',
|
||||||
const bytes = await streamToBytes(object.body);
|
missingMessage: `Send file blob missing for ${sendId}/${fileId}`,
|
||||||
files[`send-files/${sendId}/${fileId}.bin`] = bytes;
|
});
|
||||||
sendFileBlobCount += 1;
|
|
||||||
totalBlobBytes += bytes.byteLength;
|
|
||||||
largestObjectBytes = Math.max(largestObjectBytes, bytes.byteLength);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const blobFiles = await loadBackupBlobFiles(env, blobTasks);
|
||||||
|
Object.assign(files, blobFiles.files);
|
||||||
|
|
||||||
const manifest: BackupManifest = {
|
const manifest: BackupManifest = {
|
||||||
...manifestBase,
|
...manifestBase,
|
||||||
blobSummary: {
|
blobSummary: {
|
||||||
attachmentFiles: attachmentBlobCount,
|
attachmentFiles: blobFiles.attachmentFiles,
|
||||||
sendFiles: sendFileBlobCount,
|
sendFiles: blobFiles.sendFiles,
|
||||||
totalBytes: totalBlobBytes,
|
totalBytes: blobFiles.totalBytes,
|
||||||
largestObjectBytes,
|
largestObjectBytes: blobFiles.largestObjectBytes,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));
|
files['manifest.json'] = encoder.encode(JSON.stringify(manifest, null, 2));
|
||||||
|
|||||||
Reference in New Issue
Block a user