import _, { isEqual } from 'lodash';
import {
  asyncScheduler,
  BehaviorSubject,
  MonoTypeOperatorFunction,
  Observable,
  Observer,
  OperatorFunction,
  SchedulerLike,
  Subject,
  Subscription,
  throwError,
  timer,
} from 'rxjs';
import {
  catchError,
  distinctUntilChanged,
  filter,
  first,
  map,
  mergeMap,
  retryWhen,
  scan,
  takeUntil,
  takeWhile,
  tap,
} from 'rxjs/operators';

export type RxReadOnlyVariable<T> = Pick<RxVariable<T>, 'value' | 'asObservable' | 'subscribe' | 'pipe'>;

export class RxVariable<T> extends BehaviorSubject<T> {}

export const rxSettings = {
  timeUnit: 1000,
};

type MaxAttempts =
  | { type: 'sequential'; count: number } // max errors achter elkaar. let op: als de stream een next emit teruggeeft dan wordt de count = (next -> error -> error -> next) === (0 - 1 - 2 - 0)
  | { type: 'total'; count: number }; // totaal aantal errors OOIT in de stream voorgekomen. count = (error -> next -> error -> next) === (1 - 1 - 2 - 2).

export interface RetryStrategyTiming {
  delay: number; // delay before retrying in ms
  backOffDisabled?: boolean;
  initialBackOffAfter?: number; // undefined = 1s
  maxAttempts?: MaxAttempts; // undefined = no max
}

export interface RetryStrategy extends RetryStrategyTiming {
  onError?: (error: Error) => void; // can be used for logging purposes

  /** only executes the strategy when the predicate returns true. when not set: all errors will be accepted */
  onlyForErrors?: (error: Error) => boolean;
}

export const firstAsPromise = <T>(source: Observable<T>) => source.pipe(first()).toPromise();

/** subscribes to an observable and sends all the values to the subject */
export function bindObservableToSubject<T>(
  source: Observable<T>,
  subject: Subject<T>,
  retryStrategy?: RetryStrategy
): Subscription {
  return source
    .pipe((obs) => (retryStrategy ? retryWithStrategy(retryStrategy)(obs) : obs))
    .subscribe((value) => subject.next(value));
}

export const retryWithStrategy =
  (strategy: RetryStrategy, scheduler: SchedulerLike = asyncScheduler) =>
  <T>(source: Observable<T>) => {
    let sequentialErrorCount = 0;
    let previousBackoffTiming: BackoffTiming | undefined = undefined;

    return source.pipe(
      // on successfull values we reset the sequentialErrorCount
      tap(() => {
        sequentialErrorCount = 0;
      }),
      retryWhen((errors: Observable<any>): Observable<any> => {
        return errors.pipe(
          mergeMap((error, i) => {
            const totalErrorCount = i + 1;
            sequentialErrorCount++;

            if (strategy.onError) {
              strategy.onError(error);
            }

            // if we give a where clause, check if the error matches it
            if (strategy.onlyForErrors && !strategy.onlyForErrors(error)) {
              return throwError(error, scheduler);
            }

            // stop retrying when we reached the total attempt limit
            if (
              strategy.maxAttempts &&
              strategy.maxAttempts.type === 'total' &&
              totalErrorCount > strategy.maxAttempts.count
            ) {
              return throwError(error, scheduler);
            }

            // stop retrying when we reached the sequential attempt limit
            if (
              strategy.maxAttempts &&
              strategy.maxAttempts.type === 'sequential' &&
              sequentialErrorCount > strategy.maxAttempts.count
            ) {
              return throwError(error, scheduler);
            }

            let delay: number;
            if (strategy.backOffDisabled === true) {
              delay = strategy.delay;
              previousBackoffTiming = undefined;
            } else {
              const backOffDelay = backOffDelayTime(previousBackoffTiming, strategy.delay, scheduler.now());
              delay = backOffDelay.delay;
              previousBackoffTiming = backOffDelay;
            }

            // retry after
            return timer(delay, undefined, scheduler);
          })
        );
      })
    );
  };

/** removes all nils from the stream. Also typechecks */
export const filterNotNil =
  () =>
  <T>(source: Observable<T | undefined>): Observable<T> =>
    source.pipe((inner) => filter((value: T | undefined) => !_.isNil(value))(inner) as Observable<T>);

/**
 * Debug tools for RxJs pipeline. Logs extra information for observables
 *
 * @param tag Identifier to keep track of what logs belong to what stream
 * @returns
 */
export const rxDebug =
  (tag: string) =>
  <T>(source: Observable<T>): Observable<T> => {
    return Observable.create((observer: Observer<T>) => {
      const started = Date.now();

      console.info(`${tag}.subscribe`, started);

      const subscription = source.subscribe(
        (value) => {
          console.info(`${tag}.next`, value, Date.now(), Date.now() - started);
          observer.next(value);
        },
        (error) => {
          console.error(`${tag}.error`, error, Date.now(), Date.now() - started);
          observer.error(error);
        },
        () => {
          console.info(`${tag}.complete`, Date.now(), Date.now() - started);
          observer.complete();
        }
      );

      return () => {
        console.info(`${tag}.unsubscribe`, Date.now(), Date.now() - started);
        subscription.unsubscribe();
      };
    });
  };

export const delayError =
  (dueTime: number) =>
  <T>(source: Observable<T>): Observable<T> =>
    source.pipe(
      catchError((error) =>
        // start a timer
        timer(dueTime).pipe(
          map(() => {
            throw error;
          })
        )
      )
    );

export interface BackoffTiming {
  delay: number;
  attempt: number; // backoff attempt
  expectedRetryAt: number; // timestamp
}

/** calculate the backoff delay time based on the given input */
export function backOffDelayTime(
  previous: BackoffTiming | undefined,
  initialDelay: number,
  now: number = Date.now()
): BackoffTiming {
  const backOffTiming = [2, 5, 15, 60, 60, 120, 120, 120, 300].map((d) => d * rxSettings.timeUnit);

  let attempt: number;

  // if an error occurs within 15 seconds of the expected retry, we want to continue the backoff
  if (previous && now - previous.expectedRetryAt <= rxSettings.timeUnit * 15) {
    attempt = previous.attempt + 1;
  } else {
    // otherwise we start the backoff
    attempt = 1;
  }

  const extraDelay = !_.isNil(backOffTiming[attempt - 1]) ? backOffTiming[attempt - 1] : _.last(backOffTiming) || 0;
  const totalDelay = initialDelay + extraDelay;

  const timing: BackoffTiming = {
    attempt,
    delay: totalDelay,
    expectedRetryAt: now + totalDelay,
  };

  return timing;
}

export function waitOnPredicate<Value>(
  observable: Observable<Value>,
  predicate: (arg0: Value) => boolean,
  timeout: number
) {
  return observable
    .pipe(
      map(predicate),
      takeWhile((bool) => !bool),
      takeUntil(timer(timeout))
    )
    .toPromise();
}

export function lastValues<T>(count: number): OperatorFunction<T, T[]> {
  return scan<T, T[]>((acc, val) => {
    acc.push(val);
    return acc.slice(-count);
  }, []);
}

/** isEqual casts the value to any */
export function distinctUntilChangedIsEqual<T>(): MonoTypeOperatorFunction<T> {
  return distinctUntilChanged<T>(isEqual);
}

type TimeStampValuePair<T> = { ts: number; val: T };
/** Returns if called within timeframe and then resets */
export function calledXTimesWithinTimeFrame<T = any>(obs: Observable<T>, count: number, timeDifference: number) {
  return obs.pipe(
    map((val) => ({
      ts: Date.now(),
      val,
    })),
    scan<TimeStampValuePair<T>, { found: boolean; list: TimeStampValuePair<T>[] }>(
      (acc, val) => {
        // reset
        if (acc.found) {
          acc.found = false;
          acc.list = [];
        }
        acc.list.push(val);
        acc.list = acc.list.slice(-count);
        const length = acc.list.length;
        if (length >= count && acc.list[length - 1].ts - acc.list[0].ts < timeDifference) {
          acc.found = true;
        }
        return acc;
      },
      { found: false, list: [] }
    ),
    filter((res) => res.found),
    map((acc) => acc.list)
  );
}
