import { isNil } from 'lodash';
import { wait } from './TimeUtils';

export interface BatcherParams {
  batchSize: number;
  waitTimeMs: number;
}

export class Batcher {
  private readonly batchSize: number;
  private readonly waitTimeMs: number;

  public constructor(params: BatcherParams) {
    this.batchSize = params.batchSize;
    this.waitTimeMs = params.waitTimeMs;
  }

  public async process<T>(
    documents: Iterable<T | null> | AsyncIterable<T | null>,
    onFlushBatch: (batch: T[]) => void | Promise<void>,
    onProgress?: (processedCount: number) => void | Promise<void>
  ): Promise<void> {
    let batch: T[] = [];
    let processedCount = 0;

    for await (const doc of documents) {
      if (isNil(doc)) {
        continue;
      }

      batch.push(doc);

      if (batch.length >= this.batchSize) {
        await this.flushBatch(batch, onFlushBatch);
        processedCount += batch.length;
        await this.reportProgress(processedCount, onProgress);

        batch = [];

        if (this.waitTimeMs > 0) {
          await wait(this.waitTimeMs);
        }
      }
    }

    await this.flushBatch(batch, onFlushBatch);
    processedCount += batch.length;
    await this.reportProgress(processedCount, onProgress);
  }

  private async flushBatch<T>(batch: T[], onFlushBatch: (batch: T[]) => void | Promise<void>): Promise<void> {
    if (batch.length > 0) {
      await onFlushBatch(batch);
    }
  }

  private async reportProgress(processedCount: number, onProgress?: (processedCount: number) => void | Promise<void>) {
    if (onProgress) {
      await onProgress(processedCount);
    }
  }
}
