import { ReplicantError, ReplicantErrorCode } from '../Errors';
import { ActionType } from '../ReplicantActions';
import { Replicant, ReplicantConfig } from '../ReplicantConfig';
import { applyEventHandlerMessage } from '../ReplicantEventHandlerMessages';
import { Message, Messages } from '../ReplicantMessages';
import { Ruleset } from '../ReplicantRuleset';
import type { SharedStates } from '../ReplicantSharedStates';
import { APIMetainfo, ReplicantAsyncActionAPI, ReplicantSyncActionAPI } from '../api/ReplicantAPI';
import { AsyncAPICache } from '../api/impl/ReplicantClientAsyncAPICache';
import { SyncAPICache } from '../api/impl/ReplicantClientSyncAPICache';
import { getPurchaseHistory } from '../api/impl/utils/PurchaseHistory';
import { ActionAnalyticsCallback } from '../common/Analytics';
import { FriendsStatesMap } from '../common/FriendsStatesMap';
import {
    APPLY_MESSAGES_ACTION,
    DEFAULT_MAX_RECENT_MESSAGES_TIME,
    HEARTBEAT_ACTION,
    MAX_REQUEST_DURATION,
} from '../common/ReplicantConstants';
import type { UserMessageItem } from '../common/Types';
import type {
    ABTestDynamicConfig,
    ABTestsDynamicConfig,
    ChatbotMetainfo,
    Entry,
    MetainfoMVCC,
    UserSharedStateItems,
} from '../db/DB';
import logger from '../logger';
import { ReplicationEntry as ServerReplicationEntry } from '../server/ServerReplicant';
import type { ReplicationResult } from '../server/ServerReplicantReplicate';
import { WithMeta, addNonEnumerableSystemStateFields, getSystemStateFieldsFromEntry } from '../systemStateFields';
import { createPromiseWithErrbackFunction, delay } from '../utils/AsyncUtils';
import { setTimeout } from '../utils/EnvUtils';
import { isEventHandlerMessage } from '../utils/MessageUtils';
import { copyModifications, deepCopy, getNamespacedValue } from '../utils/ObjUtils';
import { generateRandomId, isPromise } from '../utils/Utils';
import {
    ClientActionInvocationCallback,
    ClientActionInvocationErrorCallback,
    InternalReplicantOptions,
} from './ClientReplicant';
import { ClientReplicantABTests } from './ClientReplicantABTests';
import { computeClockOffset } from './LoginOrCreateUser';
import ReplicantHttpClient from './ReplicantHttpClient';

export type ClientReplicationEntry<T extends Replicant> = ServerReplicationEntry & {
    callback: (err?: string, result?: AsyncActionPromiseResult<T>) => void;
};

export type OnMessagesReceivedFn<T extends Replicant> = (messages: Message<T['messages']>[]) => void;

type ReplicationBatch<T extends Replicant> = {
    queue: ClientReplicationEntry<T>[];
    batchId: string;
    requestedProfileIds: string[];
    clientRandomSeed: number;
};

export type AsyncActionApiResult = { value: any; async: boolean; failed?: boolean };

type AsyncActionPromiseResult<T extends Replicant> = {
    result: AsyncActionApiResult[];
    batchId: string;
    requestedProfiles: FriendsStatesMap<T['state'], T['ruleset']> | undefined;
    messages: UserMessageItem[];

    /** Timestamp at server while processing async action. */
    serverUpdatedAt: number;
};

type QueuedAction = {
    type: ActionType;
    fnName: string;
    args: unknown;
    callback: (err?: string, val?: any) => void;
    promise?: Promise<any>;
};

export default class ClientReplicantQueue<T extends Replicant> {
    actionInvocationCallbacks: {
        preInvoke?: ClientActionInvocationCallback;
        postInvoke?: ClientActionInvocationCallback;
        cancelled?: ClientActionInvocationCallback;
        failed?: ClientActionInvocationErrorCallback;
    } = {};

    private currentState: WithMeta<T['state'], T['ruleset'], T['sharedStates']>;

    // Passed to onStateChanged and kept with state diff, so only the relevant pieces that react to the state are
    // updated.
    private externalState: WithMeta<T['state'], T['ruleset'], T['sharedStates']>;

    // Queue of replicant actions. The queue is needed because an async action would block all further actions from
    // executing until it resolves.
    private queue: QueuedAction[] = [];

    // Queue to be flushed to the server.
    private replicationBatch: ReplicationBatch<T> = {
        queue: [],
        batchId: generateRandomId(),
        requestedProfileIds: [],
        clientRandomSeed: -1,
    };
    private replicationFlushPromise: Promise<void> | null = null;

    // In case of network failure, store the queue for retrying.
    private batchToRetry: ReplicationBatch<T> | null = null;

    // Locking the client queue if an async action is currently going on.
    private lockPromise: Promise<unknown> | null = null;

    private revId: number;

    private apiMetainfo: APIMetainfo;
    private chatbotMetainfo: ChatbotMetainfo;
    private metainfoMVCC: MetainfoMVCC;

    // Batching info
    private flushTimer: number | NodeJS.Timeout | null = null;

    private heartbeatTimer: number | NodeJS.Timeout = 0;

    private isPaused = false;
    private isOnline = true;

    private asyncApiCache: AsyncAPICache<T>;
    private syncApiCache: SyncAPICache<T>;

    private onMessagesReceivedHandlers: OnMessagesReceivedFn<T>[] = [];

    private consistentFetchTimestamps: { [id: string]: number } = {};

    constructor(
        private opts: {
            abTestsDynamicConfig: ABTestsDynamicConfig;
            getTrackedUserIds: () => string[];
            id: string;
            sessionId: string;
            userData: Entry<T['state'], T['sharedStates']>;
            config: ReplicantConfig<T>;
            replicantOptions: InternalReplicantOptions;
            httpClient: ReplicantHttpClient<T>;

            /** @returns Timestamp without clock offset. */
            now: () => number;

            adjustClockOffset: (newOffset: number) => void;
            hooks: {
                onStateChanged: (state: WithMeta<T['state'], T['ruleset'], T['sharedStates']>) => void;
                onMessagePosted: (id: string, message: Message) => void;
                onActionCompleted: (batchId: string) => void;
                onReplicationResultStates: (
                    states: FriendsStatesMap<T['state'], T['ruleset']>,
                    batchId: string,
                ) => void;
                onError: (error: any) => void;
            };
            userAssetsBaseUrl: string;
            abTestsManager: ClientReplicantABTests<T>;
        },
    ) {
        this.currentState = opts.userData.state;
        const systemFields = getSystemStateFieldsFromEntry(opts.id, opts.userData);
        addNonEnumerableSystemStateFields(this.currentState, systemFields, opts.config);
        this.externalState = deepCopy(this.currentState);
        addNonEnumerableSystemStateFields(this.externalState, systemFields, opts.config);
        this.apiMetainfo = opts.userData.metainfo;
        this.chatbotMetainfo = opts.userData.chatbotMetainfo!;
        this.metainfoMVCC = opts.userData.metainfoMVCC;
        this.revId = opts.userData.rev;

        this.syncApiCache = new SyncAPICache({
            id: () => this.opts.id,
            sessionId: () => this.opts.sessionId,
            apiMetaInfo: () => this.apiMetainfo,
            userSharedStates: () => opts.userData.userSharedStates,
            chatbotMetainfo: () => this.chatbotMetainfo,
            messages: () => this.opts.config.messages,
            scheduledActions: () => this.opts.config.scheduledActions,
            sharedStates: () => this.opts.config.sharedStates,
            ruleset: () => this.opts.config.ruleset,
            userAssetsBaseUrl: () => this.opts.userAssetsBaseUrl,
            abTestsApiAccess: () => this.opts.abTestsManager.clientActionsApi,
        });
        this.asyncApiCache = new AsyncAPICache({
            id: () => this.opts.id,
            sessionId: () => this.opts.sessionId,
            apiMetaInfo: () => this.apiMetainfo,
            userSharedStates: () => opts.userData.userSharedStates,
            chatbotMetainfo: () => this.chatbotMetainfo,
            invokeTime: () => this.opts.now(),
            messages: () => this.opts.config.messages,
            scheduledActions: () => this.opts.config.scheduledActions,
            computedProperties: () => this.opts.config.computedProperties,
            asyncGetters: () => this.opts.config.asyncGetters,
            sharedStates: () => this.opts.config.sharedStates,
            ruleset: () => this.opts.config.ruleset,
            onMessagePosted: (receiverId) => {
                this.consistentFetchTimestamps[receiverId] = this.opts.now();
            },
            userAssetsBaseUrl: () => this.opts.userAssetsBaseUrl,
            abTestsApiAccess: () => this.opts.abTestsManager.clientActionsApi,
        });

        if (opts.config.sendActionAnalyticsWithClient) {
            // Set default callback to throw if new callback not registered
            this.setActionAnalyticsCallback(() => {
                throw new Error('Action analytics callback not set');
            });
        }

        this.resetHeartbeatTimer();
    }

    async enqueueAction<TReturn>(key: string, type: ActionType, args: any): Promise<TReturn> {
        const { promise: resultPromise, callback } = createPromiseWithErrbackFunction<TReturn>();

        this.queue.push({
            type,
            fnName: key,
            args,
            callback: (err?: any, result?: any) => {
                if (err !== undefined) {
                    if (this.actionInvocationCallbacks.cancelled && err === 'cancelled') {
                        this.actionInvocationCallbacks.cancelled(key, args);
                    } else if (this.actionInvocationCallbacks.failed) {
                        this.actionInvocationCallbacks.failed(key, args, err.message || err);
                    }
                }
                callback(err, result);
            },
            promise: resultPromise,
        });

        this.resolveQueue();

        const actionResult = await resultPromise;

        if (type === 'async') {
            // Ensure that locking network request finishes before async action `client.invoke` call resolves:
            await this.lockPromise;
        }

        return actionResult;
    }

    resetQueue() {
        // These actions haven't been resolved on the client. Reject their promises.
        this.queue.forEach((e) => e.callback('cancelled'));

        // Nuke the pending queue.
        this.queue = [];
        this.replicationBatch = {
            queue: [],
            batchId: generateRandomId(),
            requestedProfileIds: [],
            clientRandomSeed: -1,
        };

        // If some network calls are blocked to be retried, reject their promises, as they won't be resolved anymore.
        if (this.batchToRetry) {
            this.batchToRetry.queue.forEach((e) => e.callback('cancelled', undefined));
            this.batchToRetry = null;
            // Nuke the current queue, since we're no longer waiting on it;
            this.lockPromise = null;
        }
    }

    onRefresh(userData: Entry<T['state'], T['sharedStates']>) {
        this.currentState = userData.state;
        addNonEnumerableSystemStateFields(
            this.currentState,
            getSystemStateFieldsFromEntry(this.opts.id, userData),
            this.opts.config,
        );
        this.apiMetainfo = userData.metainfo;
        this.revId = userData.rev;

        // Restart heartbeat timer, if needed.
        this.isOnline = true;
        this.resetHeartbeatTimer();

        this.onStateChanged();
    }

    async retryLastRequest() {
        // If there is anything to retry, schedule it for replication.
        if (this.batchToRetry) {
            this.flushReplication();

            await this.replicationFlushPromise;

            // If after retrying the last request, we're still not online, means the retry failed.
            if (!this.isOnline) {
                throw new Error('Retrying last request failed.');
            }

            return true;
        } else {
            return false;
        }
    }

    /**
     * Waits until the action queue is drained and all actions are applied to the local state.
     * Useful when an async action is currently blocking the queue and we need to know when the queue is
     * unblocked.
     */
    async waitForEmptyQueue(): Promise<void> {
        if (this.queue.length === 0 && !this.lockPromise) {
            return;
        }

        // If we're waiting on a response from the server for a replication, wait for it to complete and repeat.
        if (this.lockPromise) {
            await this.lockPromise;
        } else {
            // If there is no request being flushed now, then we're just about to resolve the queue.
            // We need to retry, but not within the same call stack to avoid infinite recursion.
            await delay(0);
        }

        return this.waitForEmptyQueue();
    }

    async flush(opts?: { skipErrorHandlers?: boolean }) {
        const skipErrorHandlers = opts && opts.skipErrorHandlers;

        this.flushReplication();

        const flushPromise = new Promise<void>((resolve, reject) => {
            if (this.replicationFlushPromise) {
                this.replicationFlushPromise
                    .then(() => {
                        // Either the queue has more data in it or already another replication is in progress
                        if (this.replicationBatch.queue.length > 0 || !!this.replicationFlushPromise) {
                            void this.flush(opts).then(() => resolve());
                        } else if (this.lockPromise) {
                            void this.lockPromise.then(() => this.flush(opts).then(() => resolve()));
                        } else {
                            resolve();
                        }
                    })
                    .catch((e) => {
                        if (skipErrorHandlers) {
                            resolve();
                        } else {
                            this.handleError(e);
                            reject(e);
                        }
                    });
            } else {
                resolve();
            }
        });

        if (skipErrorHandlers) {
            return new Promise<void>((res) => {
                this.waitForEmptyQueue()
                    .then(() => res(flushPromise))
                    .catch((e) => res(flushPromise));
            });
        }

        return this.waitForEmptyQueue().then(() => flushPromise);
    }

    async checkForMessages() {
        if (!this.replicationFlushPromise) {
            if (this.replicationBatch.queue.length > 0) {
                this.flushReplication();
            } else {
                logger.debug('Sending heartbeat');

                const { promise, callback } = createPromiseWithErrbackFunction<void>();

                this.queue.push({
                    type: 'sync',
                    fnName: HEARTBEAT_ACTION,
                    args: [],
                    callback,
                    promise,
                });

                this.resolveQueue();

                promise.catch((e) => {
                    // This error should already passed to onError.
                    // Ignore it here.
                });
            }
        }

        // When the current queue resolves, we apply messages immediately through an apply messages action
        await this.replicationFlushPromise;

        // ... Or we have a pending async action and messages apply after that finishes.
        await this.lockPromise;
    }

    getPurchaseHistory() {
        return getPurchaseHistory(this.apiMetainfo);
    }

    getExternalState() {
        return this.externalState;
    }

    setActionAnalyticsCallback(handler: ActionAnalyticsCallback) {
        this.syncApiCache.setAnalyticsCallback(handler);
        this.asyncApiCache.setAnalyticsCallback(handler);
    }

    pause() {
        // Pauses heartbeat.
        this.isPaused = true;
        this.clearHeartbeatTimer();

        if (this.isOnline) {
            // Flushes anything in the queue
            void this.flush();
        }
    }

    resume() {
        // Re-enable heartbeat.
        this.isPaused = false;
        this.resetHeartbeatTimer();
    }

    onMessagesReceived(fn: OnMessagesReceivedFn<T>) {
        this.onMessagesReceivedHandlers.push(fn);
    }

    removeMessagesReceivedHandler(fn: OnMessagesReceivedFn<T>) {
        const handlerIndex = this.onMessagesReceivedHandlers.indexOf(fn);
        if (handlerIndex >= 0) {
            this.onMessagesReceivedHandlers.splice(handlerIndex, 1);
        }
    }

    calcConsistentFetchIds() {
        const maxRecentMessagesTime =
            this.opts.replicantOptions.devOpts?.maxRecentMessagesTime || DEFAULT_MAX_RECENT_MESSAGES_TIME;
        const referenceTimestamp = this.dateNow() - maxRecentMessagesTime;

        return Object.keys(this.consistentFetchTimestamps).filter(
            (id) => this.consistentFetchTimestamps[id]! > referenceTimestamp,
        );
    }

    private invokeOnMessagesReceived(messages: UserMessageItem<T['messages']>[]): void {
        const nonEventMessages = messages.filter((msg): msg is Message<T['messages']> => !isEventHandlerMessage(msg));

        if (nonEventMessages.length === 0) {
            return;
        }

        for (const handler of this.onMessagesReceivedHandlers) {
            handler(nonEventMessages);
        }
    }

    private invokeAsyncAction(
        entry: QueuedAction,
        state: WithMeta<T['state'], T['ruleset'], T['sharedStates']>,
        api: ReplicantAsyncActionAPI<T>,
    ) {
        const action = getNamespacedValue(this.opts.config.actions, entry.fnName)!;
        if (action.type !== 'async') {
            throw new Error('Inconsistent async / sync function invocation.');
        }

        if (this.actionInvocationCallbacks.preInvoke) {
            this.actionInvocationCallbacks.preInvoke(entry.fnName, entry.args);
        }
        const result = action.fn(state as any, entry.args, api);
        if (this.actionInvocationCallbacks.postInvoke) {
            this.actionInvocationCallbacks.postInvoke(entry.fnName, entry.args);
        }
        return result;
    }

    private invokeSyncAction(
        entry: QueuedAction,
        state: WithMeta<T['state'], T['ruleset'], T['sharedStates']>,
        api: ReplicantSyncActionAPI<T>,
    ) {
        const action = getNamespacedValue(this.opts.config.actions, entry.fnName)!;
        if (action.type !== 'sync') {
            throw new Error('Inconsistent async / sync function invocation.');
        }

        if (this.actionInvocationCallbacks.preInvoke) {
            this.actionInvocationCallbacks.preInvoke(entry.fnName, entry.args);
        }
        const result = action.fn(state as WithMeta<any, any, any>, entry.args, api);
        if (this.actionInvocationCallbacks.postInvoke) {
            this.actionInvocationCallbacks.postInvoke(entry.fnName, entry.args);
        }
        return result;
    }

    private onMessagePosted = (id: string, message: Message) => {
        if (!this.replicationBatch.requestedProfileIds.includes(id) && this.opts.getTrackedUserIds().includes(id)) {
            this.replicationBatch.requestedProfileIds.push(id);
        }

        this.consistentFetchTimestamps[id] = message.timestamp;

        this.opts.hooks.onMessagePosted(id, message);
    };

    private resolveQueue(): void {
        // If locked, wait until queue is unlocked.
        if (this.lockPromise) {
            const lock = this.lockPromise;
            lock.then(() => {
                // Unlock only if the expiring lock is the current one.
                if (this.lockPromise === lock) {
                    // Release lock and re-run queue.
                    this.lockPromise = null;
                    this.resolveQueue();
                }
            }).catch((e) => null); // The error on the lock is caught in the replication handler.

            return;
        }

        let changes = false;
        let flushNow = false;

        while (this.queue.length > 0 && !this.lockPromise) {
            const entry = this.queue.shift()!;

            if (entry.fnName === HEARTBEAT_ACTION) {
                // System no-op action used to trigger fetching outstanding messages, if any.

                this.replicationBatch.queue.push({
                    fn: entry.fnName,
                    args: entry.args,
                    async: entry.type === 'async',
                    callback: entry.callback,
                });

                changes = true;
                flushNow = true;

                continue;
            }

            // Sync task.
            if (entry.type === 'sync') {
                // Execute locally immediately.

                const isApplyMessagesAction = entry.fnName === APPLY_MESSAGES_ACTION;

                const action = getNamespacedValue(this.opts.config.actions, entry.fnName)!;
                if (isApplyMessagesAction) {
                    if (action) {
                        throw new Error('System action names should not be available.');
                    }
                } else if (!action || action.type !== 'sync') {
                    throw new Error('Inconsistent async / sync function invocation.');
                }

                // Delay posting messages until after the action completes.
                // Save their arguments instead.

                // initialize clockOffset if it's not present, to save it if it's
                // changed via actions API
                this.apiMetainfo.clockOffset = this.apiMetainfo.clockOffset || 0;

                const api = this.syncApiCache.getAPI(this.opts.now());

                let returnVal;

                try {
                    // Run action. on the client.
                    if (isApplyMessagesAction) {
                        withUpdatedAt({
                            state: this.currentState,
                            metainfo: api.meta.apiMetainfo(),
                            updatedAt: this.currentState.updatedAt,
                            fn: (state) => {
                                this.applyMessages(
                                    { metainfo: this.apiMetainfo, state: this.currentState },
                                    entry.args as Message<T['messages']>[],
                                );
                            },
                            userSharedStates: api.meta.userSharedStates(),
                            config: this.opts.config,
                        });
                    } else {
                        returnVal = withUpdatedAt({
                            state: this.currentState,
                            metainfo: api.meta.apiMetainfo(),
                            updatedAt: this.currentState.updatedAt,
                            fn: (state) => {
                                return this.invokeSyncAction(entry, this.currentState, api);
                            },
                            userSharedStates: api.meta.userSharedStates(),
                            config: this.opts.config,
                        });
                    }
                } catch (e: any) {
                    // Restore old state.
                    this.currentState = copyModificationsAndSystemStateFields({
                        oldState: this.currentState,
                        newState: this.externalState,
                        metainfo: this.apiMetainfo,
                        userSharedStates: this.opts.userData.userSharedStates,
                        config: this.opts.config,
                    });

                    // Nuke the pending queue.
                    this.queue.forEach((x) => x.callback('cancelled'));
                    // These actions haven't been resolved on the client. Reject their promises.
                    this.queue = [];

                    entry.callback(e);

                    return;
                }

                // Apply to original.
                this.syncApiCache.updateMetaInfo(this.apiMetainfo);

                this.syncApiCache.updateUserSharedStates(this.opts.userData.userSharedStates);
                // Now, we can post messages.
                this.syncApiCache.getPostedMessages().forEach((args) => this.onMessagePosted(...args));

                // Queue and schedule for server execution.
                this.replicationBatch.queue.push({
                    fn: entry.fnName,
                    args: isApplyMessagesAction
                        ? (entry.args as Message<T['messages']>[]).map((x) => x.id)
                        : entry.args,
                    async: false,
                    callback: entry.callback,
                    // Pass the client time if api.date.now() was used.
                    meta: api.meta.hasUsedDateNow ? { now: this.syncApiCache.getInvokeTime() } : undefined,
                });

                changes = true;

                this.onStateChanged();

                // Invoke onMessageReceived handlers after invoking the onStateChanged
                if (isApplyMessagesAction) {
                    this.invokeOnMessagesReceived(entry.args as UserMessageItem<T['messages']>[]);
                }

                // Execute promise callback after invoking change event handlers.
                entry.callback(undefined, deepCopy(returnVal));

                // This action is done, notify the rest of replicant.
                this.opts.hooks.onActionCompleted(this.replicationBatch.batchId);
            } else if (entry.type === 'async') {
                // Async task.
                // Queue and schedule for server execution.
                const { promise: apiResultPromise, callback: apiResultCallback } =
                    createPromiseWithErrbackFunction<AsyncActionPromiseResult<T>>();

                this.replicationBatch.queue.push({
                    fn: entry.fnName,
                    args: entry.args,
                    async: true,
                    meta: { now: this.opts.now() },
                    callback: apiResultCallback,
                });

                // Wait on the result
                this.lockPromise = apiResultPromise
                    // If we have received a server error, it has been reported to the client already, so we
                    // just reject the action promise.
                    .catch((e) => entry.callback(e))

                    // If there is no error, replay the action on the client with the server results
                    .then((promiseResult) => {
                        if (!promiseResult) {
                            return;
                        }

                        const { result, batchId, requestedProfiles, serverUpdatedAt, messages } = promiseResult;

                        const asyncAPI = this.asyncApiCache.getAPI(result);

                        const action = getNamespacedValue(this.opts.config.actions, entry.fnName)!;
                        if (!action || action.type !== 'async') {
                            return Promise.reject('Action ' + entry.fnName + ' is not an action or is not async.');
                        }

                        // Execute locally with the API result and get the function return value.
                        const res = withUpdatedAt({
                            state: this.currentState,
                            metainfo: this.apiMetainfo,
                            updatedAt: serverUpdatedAt,
                            fn: (state) => {
                                return this.invokeAsyncAction(entry, state, asyncAPI);
                            },
                            userSharedStates: this.opts.userData.userSharedStates,
                            config: this.opts.config,
                        });

                        const handleSuccess = (actionResult: any) => {
                            // Apply any outstanding messages to the state.
                            if (messages.length > 0) {
                                withUpdatedAt({
                                    state: this.currentState,
                                    metainfo: this.apiMetainfo,
                                    updatedAt: serverUpdatedAt,
                                    fn: (state) => {
                                        this.applyMessages(
                                            { metainfo: this.apiMetainfo, state: this.currentState },
                                            messages,
                                        );
                                    },
                                    userSharedStates: this.opts.userData.userSharedStates,
                                    config: this.opts.config,
                                });

                                // Invoke onMessageReceived handlers after invoking the onStateChanged
                                this.invokeOnMessagesReceived(messages as UserMessageItem<T['messages']>[]);
                            }

                            this.opts.hooks.onReplicationResultStates(
                                { ...this.asyncApiCache.getReceivedProfiles(), ...requestedProfiles },
                                batchId,
                            );

                            // Validate that we've used up all results from the server.
                            this.crosscheckAsyncAPICalls(entry.fnName, this.asyncApiCache.getAPIState(), result.length);

                            this.onStateChanged();
                            entry.callback(undefined, deepCopy(actionResult));

                            // The async action is done, notify the rest of replicant.
                            this.opts.hooks.onActionCompleted(this.replicationBatch.batchId);
                        };

                        // If the async action return a promise,
                        // await that Promise and return the value in the action callback.
                        if (typeof res !== 'undefined' && typeof res.then === 'function') {
                            return res.then(handleSuccess).catch((err: any) => {
                                entry.callback(err);
                                this.handleError(err);
                            });
                        } else {
                            handleSuccess(res);
                        }
                    })
                    .catch((e) => {
                        // These are client errors that occurred during the replay of the action.

                        // Fail the action promise
                        entry.callback(e);
                        // and invoke the central error handler.
                        this.handleError(e);
                    });

                changes = true;
                flushNow = true;
                // Done.
            } else {
                throw new Error(`Action of type ${entry.type as any} should not be on the queue!`);
            }
        }

        if (changes) {
            if (!flushNow && !!this.opts.replicantOptions.batchingMaxTime) {
                this.scheduleReplication();
            } else {
                this.flushReplication();
            }

            this.resetHeartbeatTimer();
        }

        if (this.queue.length > 0 || this.lockPromise) {
            this.resolveQueue();
        }
    }

    private scheduleReplication() {
        if (!this.flushTimer) {
            this.flushTimer = setTimeout(() => this.flushReplication(), this.opts.replicantOptions.batchingMaxTime!);
        }
    }

    private flushReplication() {
        // No re-entry.
        if (this.replicationFlushPromise) {
            return;
        }

        if (this.flushTimer) {
            clearTimeout(this.flushTimer as number);
            this.flushTimer = null;
        }

        // Nothing to flush, bail.
        if (this.replicationBatch.queue.length === 0 && (!this.batchToRetry || this.batchToRetry.queue.length === 0)) {
            return;
        }

        if (!this.batchToRetry) {
            this.batchToRetry = this.replicationBatch;
            this.replicationBatch = {
                queue: [],
                requestedProfileIds: [],
                batchId: generateRandomId(),
                clientRandomSeed: -1,
            };
        }

        const batch = this.batchToRetry;

        const syncActions = batch.queue.slice();
        const asyncAction = batch.queue[batch.queue.length - 1]!.async ? syncActions.pop() : null;

        if (syncActions.some((entry) => entry.async)) {
            throw new Error('Too many async actions in batch.');
        }

        // Stamp client random seed to be sent with the replication queue
        if (batch.clientRandomSeed === -1) {
            batch.clientRandomSeed = this.apiMetainfo.random.n;
        }

        this.replicationFlushPromise = this.postReplicationQueue(batch)
            .then((replicationResult) => {
                // Acknowledge the queue in flight as resolved.
                this.batchToRetry = null;

                // Increase `updatedAt` if replication batch resulted in a db write:
                if (replicationResult.newLastUpdated) {
                    setUpdatedAt({
                        state: this.currentState,
                        metainfo: this.apiMetainfo,
                        updatedAt: replicationResult.newLastUpdated,
                        userSharedStates: this.opts.userData.userSharedStates,
                        config: this.opts.config,
                    });
                    this.externalState = { ...this.externalState };
                    const { id, createdAt, updatedAt } = this.currentState;
                    addNonEnumerableSystemStateFields(
                        this.externalState,
                        {
                            id,
                            createdAt,
                            updatedAt,
                            userSharedStates: this.opts.userData.userSharedStates,

                            metainfo: this.apiMetainfo,
                        },
                        this.opts.config,
                    );
                }

                // Validate that we have exactly the number of async results back.
                const asyncActionsCount = asyncAction ? 1 : 0;
                if (asyncActionsCount !== replicationResult.results.length) {
                    throw new Error(
                        'Invalid number of results received from server: ' +
                            replicationResult.results.length +
                            (' expected: ' + asyncActionsCount),
                    );
                }

                for (const entry of syncActions) {
                    entry.callback(undefined, undefined);
                }

                asyncAction?.callback(undefined, {
                    result: replicationResult.results.shift()!,
                    batchId: batch.batchId,
                    requestedProfiles: replicationResult.requestedProfiles,
                    messages: replicationResult.messages,

                    serverUpdatedAt: replicationResult.lastUpdated,
                });

                if (!asyncAction && !this.lockPromise && replicationResult.messages.length > 0) {
                    this.enqueueAction(APPLY_MESSAGES_ACTION, 'sync', replicationResult.messages).catch((e) =>
                        this.handleError(
                            new ReplicantError(
                                `Error applying messages on the client: ${e.message || e}`,
                                'replication_error',
                                'message_errored',
                                'fatal',
                            ),
                        ),
                    );
                }

                if (replicationResult.metainfoDelta?.hasPhoneNumber) {
                    this.apiMetainfo.hasPhoneNumber = replicationResult.metainfoDelta.hasPhoneNumber;
                }
                if (replicationResult.metainfoDelta?.paymentSubscriptionStatus) {
                    this.metainfoMVCC.paymentSubscription = replicationResult.metainfoDelta?.paymentSubscriptionStatus;
                }
                if (replicationResult.metainfoDelta?.purchaseHistory) {
                    const existingHistory = this.apiMetainfo.purchaseHistory ?? [];
                    this.apiMetainfo.purchaseHistory = existingHistory.concat(
                        replicationResult.metainfoDelta.purchaseHistory,
                    );
                }

                // Update local cache of friends and lift locks.
                if (!asyncAction) {
                    this.opts.hooks.onReplicationResultStates(replicationResult.requestedProfiles ?? {}, batch.batchId);
                }

                this.replicationFlushPromise = null;

                // If recovering from an error, mark ourselves as online.
                this.isOnline = true;

                // Run flush replication just in case we've received flush requests while we were flushing.
                this.flushReplication();
            })
            .catch((e) => {
                this.replicationFlushPromise = null;

                // Distribute errors in case of a replication error.
                // Network errors can be retries, so we don't close the promises yet.
                if (e.code !== ReplicantErrorCode.network_error) {
                    // Acknoweldge the queue in flight as resolved, since we received a replication error.
                    this.batchToRetry = null;

                    for (const entry of batch.queue) {
                        entry.callback(e, undefined);
                    }

                    // Any outstanding actions are invalid. Reject their promises.
                    this.resetQueue();
                }

                this.handleError(e);
            });
    }

    private applyMessages(
        entry: { metainfo: { [key: string]: unknown }; state: { [key: string]: unknown } },
        messages: UserMessageItem[],
    ): void {
        const messagesConfig = this.opts.config.messages;

        for (const msg of messages) {
            if (isEventHandlerMessage(msg)) {
                try {
                    applyEventHandlerMessage(entry, msg);
                } catch (error) {
                    // Ignore event handler messages with inapplicable state diff. The client may have
                    // invoked actions after the event handler invocation, changing state so that the
                    // event handler's state diff is no longer applicable.
                    const ignoreError = error instanceof ReplicantError && error.subCode === 'inapplicable_diff';

                    if (!ignoreError) {
                        throw error;
                    }
                }

                continue;
            }

            const msgConfig = getNamespacedValue(messagesConfig as Messages, msg.name);

            if (!msgConfig) {
                // Ignore messages that we do not recognize. They may be sent to us from a player
                // playing a newer version of the game, or we may be requesting user profiles that
                continue;
            }

            const validationErr = msgConfig.schema.validate(msg.args);
            if (validationErr) {
                // Ignore messages with arguments that we do not recognize.
                // They may be sent to us from a player playing a different version of the game.
                continue;
            }

            msgConfig.reducer(entry.state as WithMeta<any, any, any>, msg.args, {
                senderId: msg.sender,
                timestamp: msg.timestamp,
            });
        }
    }

    private handleError(e: ReplicantError) {
        // If we hit an error, stop checking for messages, until we login again.
        this.isOnline = false;

        this.clearHeartbeatTimer();

        this.opts.hooks.onError(e);
    }

    private async postReplicationQueue(
        batch: ReplicationBatch<T>,
    ): Promise<Omit<ReplicationResult<T['state'], T['ruleset']>, 'rev'>> {
        const { queue, requestedProfileIds, clientRandomSeed } = batch;

        const payload = {
            abTestsDynamicConfig: this.opts.abTestsDynamicConfig as { [testId: string]: ABTestDynamicConfig },
            queue: queue.map(
                (q): ServerReplicationEntry => ({
                    fn: q.fn,
                    async: q.async,
                    args: q.args,
                    meta: q.meta,
                }),
            ),
            rev: this.revId,
            requestedProfileIds,
            consistentFetchIds: this.calcConsistentFetchIds(),
            sid: this.opts.sessionId,
            sessionName: this.opts.replicantOptions.sessionName,
            clientRandomSeed,
            crqid: generateRandomId(), // client request id, used to
        };

        const timeStart = this.dateNow();
        const json = await this.opts.httpClient.doPostReplicationRequest(payload);
        const timeEnd = this.dateNow();

        if (timeEnd - timeStart < MAX_REQUEST_DURATION) {
            const clockOffset = computeClockOffset(timeStart, timeEnd, json);
            this.opts.adjustClockOffset(clockOffset);
        }
        if (!json.data?.results || typeof json.data.rev !== 'number') {
            throw new Error('Invalid replication response.');
        }

        // Restamp local revId
        this.revId = json.data.rev;

        return {
            results: json.data.results,
            messages: json.data.messages,
            metainfoDelta: json.data.metainfoDelta,
            requestedProfiles: json.data.requestedProfiles,
            lastUpdated: json.data.lastUpdated,
            newLastUpdated: json.data.newLastUpdated,
        };
    }

    private resetHeartbeatTimer() {
        this.clearHeartbeatTimer();
        const interval = this.opts.replicantOptions.checkForMessagesInterval;

        if (!interval || !this.isOnline || this.isPaused) {
            return;
        }

        this.heartbeatTimer = setTimeout(() => this.scheduleCheckForMessages(), interval);
    }

    private clearHeartbeatTimer() {
        if (this.heartbeatTimer) {
            clearTimeout(this.heartbeatTimer as number);
            this.heartbeatTimer = 0;
        }
    }

    private scheduleCheckForMessages(): void {
        if (
            (this.replicationFlushPromise ?? this.queue.length > 0) ||
            this.replicationBatch.queue.length > 0 ||
            this.batchToRetry
        ) {
            // No need to scheudle anything, as we'll get the results back with the next planned replication.
            this.resetHeartbeatTimer();
            return;
        }

        this.queue.push({
            type: 'sync',
            fnName: HEARTBEAT_ACTION,
            args: [],
            callback: () => null,
        });

        this.resolveQueue();
    }

    private onStateChanged() {
        this.externalState = copyModificationsAndSystemStateFields({
            oldState: this.externalState,
            newState: this.currentState,
            metainfo: this.apiMetainfo,
            userSharedStates: this.opts.userData.userSharedStates,
            config: this.opts.config,
        });

        this.opts.hooks.onStateChanged(this.externalState);
    }

    private dateNow() {
        return this.opts.replicantOptions.devOpts?.dateNow?.() || Date.now();
    }

    private crosscheckAsyncAPICalls(
        fnName: string,
        apiState: { resultIndex: number; apiCallQueue: string[] },
        resultsCount: number,
    ) {
        if (apiState.resultIndex !== resultsCount) {
            throw new ReplicantError(
                `Action ${fnName} used ${apiState.resultIndex} api calls: ` +
                    apiState.apiCallQueue.join(', ') +
                    `; but server provided ${resultsCount} results!`,
                'replication_error',
                'async_action_api_error',
            );
        }
    }
}

function copyModificationsAndSystemStateFields<
    TState,
    TRuleset extends Ruleset,
    TSharedStates extends SharedStates,
>(opts: {
    oldState: WithMeta<TState, TRuleset, TSharedStates>;
    newState: WithMeta<TState, TRuleset, TSharedStates>;
    metainfo: APIMetainfo;
    userSharedStates: UserSharedStateItems<TSharedStates>;
    config: { sharedStates?: TSharedStates };
}): WithMeta<TState, TRuleset, TSharedStates> {
    const result = copyModifications(opts.oldState, opts.newState);
    if (result !== opts.oldState) {
        // Restore non-enumerable properties wiped by `copyModifications`:

        const { id, createdAt, updatedAt } = opts.newState;
        addNonEnumerableSystemStateFields(
            result,
            { id, createdAt, updatedAt, metainfo: opts.metainfo, userSharedStates: opts.userSharedStates },
            opts.config,
        );
    }
    return result;
}

function setUpdatedAt(opts: {
    state: { createdAt: number; id: string };
    metainfo: APIMetainfo;
    updatedAt: number;
    userSharedStates: UserSharedStateItems<any>;
    config: { sharedStates?: SharedStates };
}) {
    addNonEnumerableSystemStateFields(
        opts.state,
        {
            createdAt: opts.state.createdAt,
            id: opts.state.id,
            updatedAt: opts.updatedAt,
            metainfo: opts.metainfo,
            userSharedStates: opts.userSharedStates,
        },
        opts.config,
    );
}

/**
 * Populate `state` with `updatedAt` for the duration of `fn` and restore to
 * original value after `fn` either returns, throws or its return Promise resolves or rejects.
 */
function withUpdatedAt<TUserState, TRuleset extends Ruleset, TSharedStates extends SharedStates>(opts: {
    state: WithMeta<TUserState, TRuleset, TSharedStates>;
    metainfo: APIMetainfo;
    updatedAt: number;
    fn: (s: WithMeta<TUserState, TRuleset, TSharedStates>) => any;
    userSharedStates: UserSharedStateItems<TSharedStates>;
    config: { sharedStates?: TSharedStates };
}): any {
    const oldUpdatedAt = opts.state.updatedAt;
    setUpdatedAt({
        state: opts.state,
        metainfo: opts.metainfo,
        updatedAt: opts.updatedAt,
        userSharedStates: opts.userSharedStates,
        config: opts.config,
    });

    try {
        const result = opts.fn(opts.state);

        if (isPromise(result)) {
            return result
                .then((resolvedResult) => {
                    setUpdatedAt({
                        state: opts.state,
                        metainfo: opts.metainfo,
                        updatedAt: oldUpdatedAt,
                        userSharedStates: opts.userSharedStates,
                        config: opts.config,
                    });
                    return resolvedResult;
                })
                .catch((error: Error) => {
                    setUpdatedAt({
                        state: opts.state,
                        metainfo: opts.metainfo,
                        updatedAt: oldUpdatedAt,
                        userSharedStates: opts.userSharedStates,
                        config: opts.config,
                    });
                    return Promise.reject(error);
                });
        } else {
            setUpdatedAt({
                state: opts.state,
                metainfo: opts.metainfo,
                updatedAt: oldUpdatedAt,
                userSharedStates: opts.userSharedStates,
                config: opts.config,
            });
            return result;
        }
    } catch (error) {
        setUpdatedAt({
            state: opts.state,
            metainfo: opts.metainfo,
            updatedAt: oldUpdatedAt,
            userSharedStates: opts.userSharedStates,
            config: opts.config,
        });
        throw error;
    }
}
