import { singleton } from "../fable-library-js.4.20.0/AsyncBuilder.js";
import { mergeInner, zipSeq } from "./Combine.fs.js";
import { Microsoft_FSharp_Control_FSharpAsync__Async_Start$0027_Static_ZF5A23EF, infinite } from "./Core.fs.js";
import { Union } from "../fable-library-js.4.20.0/Types.js";
import { union_type, int32_type, class_type } from "../fable-library-js.4.20.0/Reflection.js";
import { System_IObserver$1__IObserver$1_ToAsyncObserver, AsyncObserver$1_$ctor_Z489C0990, AsyncObserver_autoDetachObserver } from "./AsyncObserver.fs.js";
import { post, receive, start } from "../fable-library-js.4.20.0/MailboxProcessor.js";
import { value } from "../fable-library-js.4.20.0/Option.js";
import { FSharp_Control_IAsyncRxDisposable__IAsyncRxDisposable_ToDisposable, AsyncDisposable_get_Empty, AsyncDisposable_Create_Z56C29B26 } from "./AsyncDisposable.fs.js";
import { ofAsyncWorker } from "./Create.fs.js";
import { subject } from "./Subject.fs.js";

/**
 * Returns an observable sequence whose elements are the result of invoking the async mapper function on each
 * element of the source.
 */
export function transformAsync(nextAsync, source) {
    return {
        SubscribeAsync(o) {
            const aobv = o;
            const arg_1 = {
                OnNextAsync(x) {
                    return nextAsync((arg) => aobv.OnNextAsync(arg), x);
                },
                OnErrorAsync(err) {
                    return aobv.OnErrorAsync(err);
                },
                OnCompletedAsync() {
                    return aobv.OnCompletedAsync();
                },
            };
            return source.SubscribeAsync(arg_1);
        },
    };
}

/**
 * Returns an observable sequence whose elements are the result of invoking the async mapper function on each
 * element of the source.
 */
export function mapAsync(mapperAsync) {
    return (source) => transformAsync((next, x) => singleton.Delay(() => singleton.Bind(mapperAsync(x), (_arg) => singleton.ReturnFrom(next(_arg)))), source);
}

/**
 * Returns an observable sequence whose elements are the result of invoking the mapper function on each element of
 * the source.
 */
export function map(mapper) {
    return (source) => transformAsync((next, x) => next(mapper(x)), source);
}

/**
 * Returns an observable sequence whose elements are the result of invoking the async mapper function by
 * incorporating the element's index on each element of the source.
 */
export function mapiAsync(mapper) {
    const f2 = mapAsync(mapper);
    return (arg) => f2(zipSeq(infinite, arg));
}

/**
 * Returns an observable sequence whose elements are the result of invoking the mapper function and incorporating
 * the element's index on each element of the source.
 */
export function mapi(mapper) {
    const f2 = map(mapper);
    return (arg) => f2(zipSeq(infinite, arg));
}

/**
 * Projects each element of an observable sequence into an observable sequence and merges the resulting observable
 * sequences back into one observable sequence.
 */
export function flatMap(mapper) {
    const f1 = map(mapper);
    return (arg) => mergeInner(0, f1(arg));
}

/**
 * Projects each element of an observable sequence into an observable sequence by incorporating the element's index
 * on each element of the source. Merges the resulting observable sequences back into one observable sequence.
 */
export function flatMapi(mapper) {
    const f1 = mapi(mapper);
    return (arg) => mergeInner(0, f1(arg));
}

/**
 * Asynchronously projects each element of an observable sequence into an observable sequence and merges the
 * resulting observable sequences back into one observable sequence.
 */
export function flatMapAsync(mapper) {
    const f1 = mapAsync(mapper);
    return (arg) => mergeInner(0, f1(arg));
}

/**
 * Asynchronously projects each element of an observable sequence into an observable sequence by incorporating the
 * element's index on each element of the source. Merges the resulting observable sequences back into one
 * observable sequence.
 */
export function flatMapiAsync(mapperAsync) {
    const f1 = mapiAsync(mapperAsync);
    return (arg) => mergeInner(0, f1(arg));
}

export function concatMap(mapper) {
    const f1 = map(mapper);
    return (arg) => mergeInner(1, f1(arg));
}

export class InnerSubscriptionCmd$1 extends Union {
    constructor(tag, fields) {
        super();
        this.tag = tag;
        this.fields = fields;
    }
    cases() {
        return ["InnerObservable", "InnerCompleted", "Completed", "Dispose"];
    }
}

export function InnerSubscriptionCmd$1_$reflection(gen0) {
    return union_type("FSharp.Control.Transform.InnerSubscriptionCmd`1", [gen0], InnerSubscriptionCmd$1, () => [[["Item", class_type("FSharp.Control.IAsyncObservable`1", [gen0])]], [["Item", int32_type]], [], []]);
}

/**
 * Transforms an observable sequence of observable sequences into an observable sequence producing values only from
 * the most recent observable sequence.
 */
export function switchLatest(source) {
    return {
        SubscribeAsync(o) {
            const patternInput = AsyncObserver_autoDetachObserver(o);
            const safeObv = patternInput[0];
            const innerAgent = start((inbox) => {
                const messageLoop = (tupledArg) => {
                    const current = tupledArg[0];
                    const isStopped = tupledArg[1];
                    const currentId = tupledArg[2] | 0;
                    return singleton.Delay(() => singleton.Bind(receive(inbox), (_arg) => {
                        const cmd = _arg;
                        return singleton.Bind(singleton.Delay(() => {
                            switch (cmd.tag) {
                                case 1:
                                    return (isStopped && (cmd.fields[0] === currentId)) ? singleton.Bind(safeObv.OnCompletedAsync(), () => singleton.Return([undefined, true, currentId])) : singleton.Return([current, isStopped, currentId]);
                                case 2:
                                    return singleton.Combine((current == null) ? singleton.Bind(safeObv.OnCompletedAsync(), () => singleton.Return(undefined)) : singleton.Zero(), singleton.Delay(() => singleton.Return([current, true, currentId])));
                                case 3:
                                    return singleton.Combine((current != null) ? singleton.Bind(value(current).DisposeAsync(), () => singleton.Return(undefined)) : singleton.Zero(), singleton.Delay(() => singleton.Return([undefined, true, currentId])));
                                default: {
                                    const nextId = (currentId + 1) | 0;
                                    return singleton.Combine((current != null) ? singleton.Bind(value(current).DisposeAsync(), () => singleton.Return(undefined)) : singleton.Zero(), singleton.Delay(() => singleton.Bind(cmd.fields[0].SubscribeAsync({
                                        OnNextAsync(x) {
                                            return safeObv.OnNextAsync(x);
                                        },
                                        OnErrorAsync(err) {
                                            return safeObv.OnErrorAsync(err);
                                        },
                                        OnCompletedAsync() {
                                            return singleton.Delay(() => {
                                                post(inbox, new InnerSubscriptionCmd$1(1, [nextId]));
                                                return singleton.Zero();
                                            });
                                        },
                                    }), (_arg_2) => singleton.Return([_arg_2, isStopped, nextId]))));
                                }
                            }
                        }), (_arg_6) => singleton.ReturnFrom(messageLoop([_arg_6[0], _arg_6[1], _arg_6[2]])));
                    }));
                };
                return messageLoop([undefined, false, 0]);
            });
            return singleton.Delay(() => {
                let arg;
                return singleton.Bind(patternInput[1]((arg = AsyncObserver$1_$ctor_Z489C0990((ns) => singleton.Delay(() => {
                    switch (ns.tag) {
                        case 1:
                            return singleton.Bind(safeObv.OnErrorAsync(ns.fields[0]), () => singleton.Return(undefined));
                        case 2: {
                            post(innerAgent, new InnerSubscriptionCmd$1(2, []));
                            return singleton.Zero();
                        }
                        default: {
                            post(innerAgent, new InnerSubscriptionCmd$1(0, [ns.fields[0]]));
                            return singleton.Zero();
                        }
                    }
                })), source.SubscribeAsync(arg))), (_arg_8) => singleton.Return(AsyncDisposable_Create_Z56C29B26(() => singleton.Delay(() => singleton.Bind(_arg_8.DisposeAsync(), () => {
                    post(innerAgent, new InnerSubscriptionCmd$1(3, []));
                    return singleton.Zero();
                })))));
            });
        },
    };
}

/**
 * Asynchronosly transforms the items emitted by an source sequence into observable streams, and mirror those items
 * emitted by the most-recently transformed observable sequence.
 */
export function flatMapLatestAsync(mapperAsync) {
    const f1 = mapAsync(mapperAsync);
    return (arg) => switchLatest(f1(arg));
}

/**
 * Transforms the items emitted by an source sequence into observable streams, and mirror those items emitted by
 * the most-recently transformed observable sequence.
 */
export function flatMapLatest(mapper) {
    const f1 = map(mapper);
    return (arg) => switchLatest(f1(arg));
}

/**
 * Returns an observable sequence containing the first sequence's elements, followed by the elements of the handler
 * sequence in case an exception occurred.
 */
export function catch$(handler, source) {
    return {
        SubscribeAsync(o) {
            const aobv = o;
            return singleton.Delay(() => {
                let disposable = AsyncDisposable_get_Empty();
                const action = (source_1) => singleton.Delay(() => {
                    const _obv = {
                        OnNextAsync(x) {
                            return aobv.OnNextAsync(x);
                        },
                        OnErrorAsync(err) {
                            return action(handler(err));
                        },
                        OnCompletedAsync() {
                            return aobv.OnCompletedAsync();
                        },
                    };
                    return singleton.Bind(disposable.DisposeAsync(), () => singleton.Bind(source_1.SubscribeAsync(_obv), (_arg_1) => {
                        disposable = _arg_1;
                        return singleton.Zero();
                    }));
                });
                return singleton.Bind(action(source), () => singleton.Return(AsyncDisposable_Create_Z56C29B26(() => disposable.DisposeAsync())));
            });
        },
    };
}

export function retry(retryCount, source) {
    let count = retryCount;
    return catch$((exn) => {
        if (count === 0) {
            return ofAsyncWorker((obv, _arg) => obv.OnErrorAsync(exn));
        }
        else {
            count = ((count - 1) | 0);
            return source;
        }
    }, source);
}

export class Cmd extends Union {
    constructor(tag, fields) {
        super();
        this.tag = tag;
        this.fields = fields;
    }
    cases() {
        return ["Connect", "Dispose"];
    }
}

export function Cmd_$reflection() {
    return union_type("FSharp.Control.Transform.Cmd", [], Cmd, () => [[], []]);
}

/**
 * Share a single subscription among multple observers. Returns a new Observable that multicasts (shares) the
 * original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting
 * data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.
 */
export function share(source) {
    const patternInput = subject();
    const mb = start((inbox) => {
        const messageLoop = (count, subscription) => singleton.Delay(() => singleton.Bind(receive(inbox), (_arg) => singleton.Bind(singleton.Delay(() => ((_arg.tag === 1) ? ((count === 1) ? singleton.Bind(subscription.DisposeAsync(), () => singleton.Return([count - 1, AsyncDisposable_get_Empty()])) : singleton.Return([count - 1, subscription])) : ((count === 0) ? singleton.Bind(source.SubscribeAsync(patternInput[0]), (_arg_1) => singleton.Return([count + 1, _arg_1])) : singleton.Return([count + 1, subscription])))), (_arg_3) => singleton.ReturnFrom(messageLoop(_arg_3[0], _arg_3[1])))));
        return messageLoop(0, AsyncDisposable_get_Empty());
    });
    return {
        SubscribeAsync(o) {
            return singleton.Delay(() => {
                post(mb, new Cmd(0, []));
                return singleton.Bind(patternInput[1].SubscribeAsync(o), (_arg_4) => singleton.Return(AsyncDisposable_Create_Z56C29B26(() => {
                    post(mb, new Cmd(1, []));
                    return _arg_4.DisposeAsync();
                })));
            });
        },
    };
}

export function toObservable(source) {
    let subscription = AsyncDisposable_get_Empty();
    return {
        Subscribe(obv) {
            Microsoft_FSharp_Control_FSharpAsync__Async_Start$0027_Static_ZF5A23EF(singleton.Delay(() => {
                const aobv = System_IObserver$1__IObserver$1_ToAsyncObserver(obv);
                return singleton.Bind(source.SubscribeAsync(aobv), (_arg) => {
                    subscription = _arg;
                    return singleton.Zero();
                });
            }));
            return FSharp_Control_IAsyncRxDisposable__IAsyncRxDisposable_ToDisposable(subscription);
        },
    };
}

