import { BehaviorSubject, filter, take } from 'rxjs';
import { BroadcastChannel } from 'broadcast-channel';

export interface BroadcastBehaviorSubjectOptions {
  initialize$?: BehaviorSubject<boolean | undefined>;
  /* default: false */
  initializeOnce?: boolean;
}

export type BroadcastBehaviorSubjectEvent<T> = {
  type: 'update' | 'request' | 'reply';
  value?: T;
};

export const NO_REPLY_TIMEOUT = 2_000;

/**
 * A BehaviorSubject that broadcasts updates to other windows
 * and initializes from other windows
 * @extends BehaviorSubject
 *
 * @example
 * ```ts
 * // Create a subject that broadcasts updates to other windows
 * // and initializes from the leader window (single source of truth)
 * const subject = new BroadcastBehaviorSubject('my-channel', 'initial value', {
 *  initialize: true,
 *  initializeFrom: isLeaderProcess
 * });
 * ```
 *
 * @param channelName - The name of the broadcast channel
 * @param initialValue - The initial value of the subject
 * @param options - Options for the subject
 */
export class BroadcastBehaviorSubject<T> extends BehaviorSubject<T> {
  protected _channelName: string;
  private _options?: BroadcastBehaviorSubjectOptions;
  private _bc: BroadcastChannel<BroadcastBehaviorSubjectEvent<T>>;
  private _hasInitialized = false;
  private _queue: T[] = [];
  private _canReply = false;

  constructor(channelName: string, initialValue: T, options?: BroadcastBehaviorSubjectOptions) {
    super(initialValue);
    this._options = options;
    this._channelName = channelName;
    this._bc = new BroadcastChannel<BroadcastBehaviorSubjectEvent<T>>(this._channelName);
    this._initialize();
  }

  _initialize() {
    const { initialize$, initializeOnce = false } = this._options || {};

    let initTimeout: NodeJS.Timeout | undefined;

    /**
     * On message handler for remote updates from other windows
     */
    this._bc.onmessage = (data) => {
      /**
       * If a regular update, just update the value
       */
      if (data.type === 'update' && data.value) {
        super.next(data.value);
        return;
      }

      /**
       * If a request for initial value, reply with the current value
       * If the current window can reply
       */
      if (data.type === 'request' && this._canReply) {
        this._bc
          .postMessage({
            type: 'reply',
            value: this.getValue()
          })
          .catch(console.error);
      }

      /**
       * If a reply to an initial value request, set the current value in the subject
       * and drain the queue (now that the subject has been initialized it can emit values & won't initialize again)
       */
      if (data.type === 'reply' && data.value && !this._hasInitialized) {
        this._hasInitialized = true;
        if (initTimeout) {
          clearTimeout(initTimeout);
        }

        super.next(data.value);
        this._drainQueue();
        return;
      }
    };

    /**
     * If the current subject in this window wants to initialize, send a request for the initial value
     */
    if (initialize$) {
      const sub = initialize$
        .pipe(
          filter((v) => v !== undefined),
          take(initializeOnce ? 1 : Infinity)
        )
        .subscribe((isSource) => {
          if (isSource) {
            this._canReply = true;
            this._hasInitialized = true;
            this._drainQueue();
            return;
          }
          this._canReply = false;

          this._bc
            .postMessage({
              type: 'request'
            })
            .then(() => {
              // If no reply after 50ms, assume no other windows are open and initialize
              initTimeout = setTimeout(() => {
                this._hasInitialized = true;
                this._drainQueue();
              }, NO_REPLY_TIMEOUT);
            })
            .catch((e) => {
              // If error, assume no other windows are open and initialize & clear timeout
              this._hasInitialized = true;
              console.error(e);
              if (initTimeout) {
                clearTimeout(initTimeout);
              }
              this._drainQueue();
            });
        });

      if (initialize$.value === true) {
        this._canReply = true;
        this._hasInitialized = true;
        if (initializeOnce) {
          sub.unsubscribe();
        }
        this._drainQueue();
      }
    } else {
      /**
       * Otherwise, just set the initialized flag to true
       * because the current window doesn't need to initialize
       */
      this._canReply = true;
      this._hasInitialized = true;
      this._drainQueue();
    }
  }

  /**
   * Override next to:
   * 1. Add to queue if not initialized
   * 2. Send update to other windows
   * 3. Update the current value
   *
   * @param value
   * @returns {void}
   */
  next(value: T): void {
    // Add to queue if not initialized
    if (!this._hasInitialized) {
      this._queue.push(value);
      return;
    }

    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    /*@ts-ignore*/
    super.next((this._value = value));
    this._bc
      .postMessage({
        type: 'update',
        value
      })
      .catch(console.error);
  }

  /**
   * Override unsubscribe to close the broadcast channel
   */
  unsubscribe() {
    super.unsubscribe();
    this._bc.close().catch(console.error);
  }

  /**
   * Override complete to close the broadcast channel
   */
  complete() {
    super.complete();
    this._bc.close().catch(console.error);
  }

  /**
   * Drain the queue of values that were added before the subject was initialized
   */
  private _drainQueue() {
    while (this._queue.length > 0) {
      this.next(this._queue.shift() as T);
    }
  }
}
