mirror of
https://github.com/immich-app/immich.git
synced 2026-02-22 04:40:34 +09:00
Some checks failed
Test / Unit Test CLI (Windows) (push) Blocked by required conditions
Test / Medium Tests (Server) (push) Blocked by required conditions
Test / End-to-End Tests (Server & CLI) (push) Blocked by required conditions
Test / End-to-End Tests (Web) (push) Blocked by required conditions
CLI Build / CLI Publish (push) Successful in 36s
CodeQL / Analyze (javascript) (push) Failing after 1m51s
CodeQL / Analyze (python) (push) Failing after 25s
Docker / pre-job (push) Successful in 8s
Docs build / pre-job (push) Successful in 5s
Static Code Analysis / pre-job (push) Successful in 6s
Test / pre-job (push) Successful in 6s
Test / ShellCheck (push) Successful in 9s
Test / OpenAPI Clients (push) Failing after 2m24s
Test / TypeORM Checks (push) Failing after 13s
CLI Build / Docker (push) Failing after 1m12s
Docker / Re-Tag ML () (push) Failing after 4s
Docker / Re-Tag ML (-armnn) (push) Failing after 4s
Docker / Re-Tag ML (-cuda) (push) Failing after 4s
Docker / Re-Tag ML (-openvino) (push) Failing after 5s
Docker / Re-Tag Server () (push) Failing after 4s
Docker / Build and Push ML (armnn, linux/arm64, -armnn) (push) Has been skipped
Docker / Build and Push ML (cpu, linux/amd64,linux/arm64) (push) Has been skipped
Docker / Build and Push ML (cuda, linux/amd64, -cuda) (push) Has been skipped
Docker / Build and Push ML (openvino, linux/amd64, -openvino) (push) Has been skipped
Docker / Build and Push Server (cpu, linux/amd64,linux/arm64) (push) Has been skipped
Docs build / Docs Build (push) Failing after 3m45s
Static Code Analysis / Run Dart Code Analysis (push) Has been skipped
Test / Test & Lint Server (push) Has been skipped
Test / Unit Test CLI (push) Successful in 18s
Test / Test & Lint Web (push) Has been skipped
Test / End-to-End Lint (push) Successful in 21s
Test / Unit Test Mobile (push) Has been skipped
Test / Unit Test ML (push) Has been skipped
Docker / Docker Build & Push ML Success (push) Successful in 2s
Docker / Docker Build & Push Server Success (push) Successful in 2s
132 lines
3.8 KiB
TypeScript
132 lines
3.8 KiB
TypeScript
import * as fastq from 'fastq';
|
|
import { uniqueId } from 'lodash-es';
|
|
|
|
export type Task<T, R> = {
|
|
readonly id: string;
|
|
status: 'idle' | 'processing' | 'succeeded' | 'failed';
|
|
data: T;
|
|
error: unknown | undefined;
|
|
count: number;
|
|
// TODO: Could be useful to adding progress property.
|
|
// TODO: Could be useful to adding start_at/end_at/duration properties.
|
|
result: undefined | R;
|
|
};
|
|
|
|
export type QueueOptions = {
|
|
verbose?: boolean;
|
|
concurrency?: number;
|
|
retry?: number;
|
|
// TODO: Could be useful to adding timeout property for retry.
|
|
};
|
|
|
|
export type ComputedQueueOptions = Required<QueueOptions>;
|
|
|
|
export const defaultQueueOptions = {
|
|
concurrency: 1,
|
|
retry: 0,
|
|
verbose: false,
|
|
};
|
|
|
|
/**
|
|
* An in-memory queue that processes tasks in parallel with a given concurrency.
|
|
* @see {@link https://www.npmjs.com/package/fastq}
|
|
* @template T - The type of the worker task data.
|
|
* @template R - The type of the worker output data.
|
|
*/
|
|
export class Queue<T, R> {
|
|
private readonly queue: fastq.queueAsPromised<string, Task<T, R>>;
|
|
private readonly store = new Map<string, Task<T, R>>();
|
|
readonly options: ComputedQueueOptions;
|
|
readonly worker: (data: T) => Promise<R>;
|
|
|
|
/**
|
|
* Create a new queue.
|
|
* @param worker - The worker function that processes the task.
|
|
* @param options - The queue options.
|
|
*/
|
|
constructor(worker: (data: T) => Promise<R>, options?: QueueOptions) {
|
|
this.options = { ...defaultQueueOptions, ...options };
|
|
this.worker = worker;
|
|
this.store = new Map<string, Task<T, R>>();
|
|
this.queue = this.buildQueue();
|
|
}
|
|
|
|
get tasks(): Task<T, R>[] {
|
|
const tasks: Task<T, R>[] = [];
|
|
for (const task of this.store.values()) {
|
|
tasks.push(task);
|
|
}
|
|
return tasks;
|
|
}
|
|
|
|
getTask(id: string): Task<T, R> {
|
|
const task = this.store.get(id);
|
|
if (!task) {
|
|
throw new Error(`Task with id ${id} not found`);
|
|
}
|
|
return task;
|
|
}
|
|
|
|
/**
|
|
* Wait for the queue to be empty.
|
|
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
|
|
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
|
*/
|
|
drained(): Promise<void> {
|
|
return this.queue.drained();
|
|
}
|
|
|
|
/**
|
|
* Add a task at the end of the queue.
|
|
* @see {@link https://www.npmjs.com/package/fastq}
|
|
* @param data
|
|
* @returns Promise<void> - A Promise that will be fulfilled (rejected) when the task is completed successfully (unsuccessfully).
|
|
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
|
*/
|
|
async push(data: T): Promise<Task<T, R>> {
|
|
const id = uniqueId();
|
|
const task: Task<T, R> = { id, status: 'idle', error: undefined, count: 0, data, result: undefined };
|
|
this.store.set(id, task);
|
|
return this.queue.push(id);
|
|
}
|
|
|
|
// TODO: Support more function delegation to fastq.
|
|
|
|
private buildQueue(): fastq.queueAsPromised<string, Task<T, R>> {
|
|
return fastq.promise((id: string) => {
|
|
const task = this.getTask(id);
|
|
return this.work(task);
|
|
}, this.options.concurrency);
|
|
}
|
|
|
|
private async work(task: Task<T, R>): Promise<Task<T, R>> {
|
|
task.count += 1;
|
|
task.error = undefined;
|
|
task.status = 'processing';
|
|
if (this.options.verbose) {
|
|
console.log('[task] processing:', task);
|
|
}
|
|
try {
|
|
task.result = await this.worker(task.data);
|
|
task.status = 'succeeded';
|
|
if (this.options.verbose) {
|
|
console.log('[task] succeeded:', task);
|
|
}
|
|
return task;
|
|
} catch (error) {
|
|
task.error = error;
|
|
task.status = 'failed';
|
|
if (this.options.verbose) {
|
|
console.log('[task] failed:', task);
|
|
}
|
|
if (this.options.retry > 0 && task.count < this.options.retry) {
|
|
if (this.options.verbose) {
|
|
console.log('[task] retry:', task);
|
|
}
|
|
return this.work(task);
|
|
}
|
|
return task;
|
|
}
|
|
}
|
|
}
|