import {BehaviorSubject, concat, Observable, of, Subject, Subscriber, throwError, timer} from 'rxjs';
import {concatMap, delay, distinctUntilChanged, mergeMap, publishReplay, refCount, skip, tap} from 'rxjs/operators';
import {CollectionResponse} from '../models/shared';
import {TrackingInterface} from '../models/shared/tracking-interface';
import {trackingArrayEquals, trackingEquals, trackingMapEquals} from './tracking';

export function fromPromiseFunctionWithRetry<T>(caller: () => Promise<T>): Observable<T> {
    return new Observable<T>(subscriber =>
        fromPromiseFunctionWithRetryRecursive(subscriber, caller));
}

function fromPromiseFunctionWithRetryRecursive<T>(
    subscriber: Subscriber<T>,
    caller: () => Promise<T>,
    backoff = 100,
    maxBackoff = 2000
) {
    const promise = caller(); // run the caller function which returns a Promise
    promise.then((value) => {
        if (!subscriber.closed) {
            subscriber.next(value);
            subscriber.complete();
        }
    }, (error) => {
        if (error.key === 'resource_not_found') {
            if (!subscriber.closed) {
                subscriber.error(error);
            }
            return;
        }
        setTimeout(() => {
            if (!subscriber.closed) {
                fromPromiseFunctionWithRetryRecursive(subscriber, caller, Math.min(backoff * 2, maxBackoff));
            }
        }, backoff);
    });
    return subscriber;
}

/** Optimized 2^n function for use in exponential backoff. Not general purpose
 *  at all. */
function fastPowerOf2(n) {
    if (n < 16) {
        // eslint-disable-next-line no-bitwise
        return 2 << (n - 1);
    } else {
        return 65536;
    }
}

export function exponentialBackoffRetry({
    retries = 50,
    backoff = 100,
    maxBackoff = 3000
}) {
    return (errors: Observable<any>) => errors
        .pipe(mergeMap((error, i) => {
            if (i < retries) {
                return timer(Math.min(backoff * fastPowerOf2(i), maxBackoff));
            } else {
                return throwError(error);
            }
        })
    );
}

export function shareCached<T>(
    observableCache: {[id: string]: Observable<T>},
    cacheKey: string,
    createObservable: () => Observable<T>
) {
    const lookup: Observable<any> = observableCache[cacheKey];
    if (lookup == null) {
        const created = createObservable().pipe(publishReplay(1), refCount());
        observableCache[cacheKey] = created;
        return created;
    } else {
        return lookup;
    }
}

/** The function `caller` should return a Promise. Once this Promise is
 *  resolved the result will be sent in the returned stream, and a timer
 *  with `pollingDelay` ms duration is started. When this timer is done
 *  the process will be repeated and `caller` will be called again. This
 *  loop continues until the returned stream is unsubscribed from. */
export function callPolling<T>(
    caller: () => Promise<T>,
    pollingDelay: number
): Observable<T> {
    // inspired by https://blog.strongbrew.io/rxjs-polling/
    /* root timer Observable, its internal value is not used */
    const timerSubject = new BehaviorSubject<T>(null);
    /* helper that triggers timerSubject after `pollingDelay` ms */
    const delayedPollTrigger: Observable<T> = of(null).pipe(delay(pollingDelay), tap(() => timerSubject.next(null)), skip(1));
    /* do the call; queue `delayedPollTrigger` *after* call completes */
    return timerSubject.pipe(concatMap(() => concat(fromPromiseFunctionWithRetry(caller), delayedPollTrigger)));
}

/** filter unchanged values using TrackingInterface */
export function callPollingDistinctObject<T extends TrackingInterface<T>>(
    caller: () => Promise<T>,
    pollingDelay: number
): Observable<T> {
    return callPolling<T>(caller, pollingDelay)
        .pipe(distinctUntilChanged((prev, curr) => trackingEquals(prev, curr)));
}

/** filter unchanged values using TrackingInterface */
export function callPollingDistinctArray<T extends TrackingInterface<T>>(
    caller: () => Promise<T[]>,
    pollingDelay: number
): Observable<T[]> {
    return callPolling<T[]>(caller, pollingDelay)
        .pipe(distinctUntilChanged((prev, curr) => trackingArrayEquals(prev, curr)));
}

/** filter unchanged values using TrackingInterface */
export function callPollingDistinctMap<T extends TrackingInterface<T>>(
    caller: () => Promise<{[key: string]: T}>,
    pollingDelay: number
): Observable<{[key: string]: T}> {
    return callPolling<{[key: string]: T}>(caller, pollingDelay)
        .pipe(distinctUntilChanged((prev, curr) => trackingMapEquals(prev, curr)));
}

/** filter unchanged values using TrackingInterface */
export function callPollingDistinctCollectionResponse<T extends TrackingInterface<T>>(
    caller: () => Promise<CollectionResponse<T>>,
    pollingDelay: number
): Observable<CollectionResponse<T>> {
    return callPolling<CollectionResponse<T>>(caller, pollingDelay)
        .pipe(distinctUntilChanged((prev, curr) => trackingArrayEquals(prev.items, curr.items)));
}

/** Returns true iff rxjs Subject has observers subscribed */
export function subjectIsObserved(subject: Subject<any>) {
    return subject != null && subject.observers != null && subject.observers.length > 0;
}
