import type { RemoteMarketDataUpdateEvent } from '@app/common/marketdata/marketdata.events';
import { MARKETDATA_EVENT_TYPE } from '@app/common/marketdata/marketdata.events';
import { UNKNOWN_WINDOW_PROCESS_ID } from '@app/common/window/window.common';
import { MarketdataSubject, marketdata$ } from '@app/data-access/memory/marketdata.subject';
import { ProcessState } from '@app/data-access/memory/process-id.subject';
import { createLogger } from '@oms/shared/util';
import { Observable, filter, from, map, mergeMap } from 'rxjs';
import { inject, singleton } from 'tsyringe';
import { v4 as uuid } from 'uuid';
import { AppWorkspace } from '../../../app-config/workspace.config';
import type { IMarketDataService, MarketDataEvent } from './marketdata.common';
import { MarketData } from './marketdata.common';
import { testScoped } from '@app/workspace.registry';

const l = createLogger({ label: 'MdaasMarketDataService' });

type TickerId = string;
type CompId = string;

type LatestDataAndCompSubscribers = {
  latestData: MarketData;
  compSubscribers: Set<CompId>;
};

@testScoped
@singleton()
export class MdaasMarketDataService implements IMarketDataService {
  protected tickerMap = new Map<TickerId, LatestDataAndCompSubscribers>();
  protected workspace: AppWorkspace;
  protected processService: ProcessState;
  protected marketdata$: MarketdataSubject;

  constructor(
    @inject(AppWorkspace) workspace: AppWorkspace,
    @inject(ProcessState) processService: ProcessState,
    _marketdata$: MarketdataSubject = marketdata$
  ) {
    this.workspace = workspace;
    this.processService = processService;
    this.marketdata$ = _marketdata$;
  }

  public observe(...tickers: string[]): Observable<MarketDataEvent> {
    const uniqueTickers = new Set(tickers);
    const tickers$ = from(uniqueTickers);

    const observedTickers$ = tickers$.pipe(mergeMap((t) => this._observe(t)));

    return observedTickers$;
  }

  public hasAllTickers(...tickers: string[]): boolean {
    return tickers.every((t) => this.tickerMap.has(t));
  }

  public read(ticker: string): MarketData | undefined {
    return this.tickerMap.get(ticker)?.latestData || undefined;
  }

  public readAll(...tickers: string[]): (MarketData | undefined)[] {
    return tickers.map((t) => this.read(t));
  }

  private _observe(ticker: string): Observable<MarketDataEvent> {
    return new Observable<MarketDataEvent>((subscriber) => {
      const fromCompId = uuid();

      const subscription = this.marketdata$
        .pipe(
          filter((e) => e.type === MARKETDATA_EVENT_TYPE.LEVEL_ONE_UPDATE && e.payload.ticker === ticker),
          map((e) => e as RemoteMarketDataUpdateEvent)
        )
        .subscribe((e) => {
          const marketData = new MarketData({ level1: e.payload.data });

          const existingTicker = this.tickerMap.get(ticker);

          if (existingTicker) {
            existingTicker.latestData = marketData;
            existingTicker.compSubscribers.add(fromCompId);
          } else {
            this.tickerMap.set(ticker, {
              latestData: marketData,
              compSubscribers: new Set([fromCompId])
            });
          }
          subscriber.next({ ticker, data: marketData });
        });

      this._subscribeTickerToProvider(ticker, fromCompId);

      return () => {
        subscription.unsubscribe();
        this._unsubscribeFromRemote(ticker, fromCompId);
        const existing = this.tickerMap.get(ticker);
        if (existing) {
          existing.compSubscribers.delete(fromCompId);
          if (existing.compSubscribers.size === 0) {
            this.tickerMap.delete(ticker);
          }
        }
      };
    });
  }

  private _subscribeTickerToProvider(ticker: string, fromCompId: string) {
    if (this.processService.processId && this.processService.processId !== UNKNOWN_WINDOW_PROCESS_ID) {
      const eventPayload = {
        tickers: [ticker],
        fromProcessId: this.processService.processId,
        fromCompId
      };

      this.marketdata$.next({
        type: MARKETDATA_EVENT_TYPE.SUBSCRIBE,
        payload: eventPayload
      });
    } else {
      setTimeout(() => {
        this._subscribeTickerToProvider(ticker, fromCompId);
      }, 5);
      l.log('Unknown window id, cannot subcribe to', ticker);
    }
  }

  private _unsubscribeFromRemote(ticker: string, fromCompId: string) {
    if (this.processService.processId && this.processService.processId !== UNKNOWN_WINDOW_PROCESS_ID) {
      this.marketdata$.next({
        type: MARKETDATA_EVENT_TYPE.UNSUBSCRIBE,
        payload: { tickers: [ticker], fromProcessId: this.processService.processId, fromCompId }
      });
    }
  }
}
