import type { ReplicationPushChangeRow } from '@app/data-access/services/ui-state/ui-state.service';
import { UiStateService } from '@app/data-access/services/ui-state/ui-state.service';
import { parseJSON, Logger } from '@oms/ui-util';
import type { RxCollection, RxError, RxReplicationPullStreamItem, RxTypeError } from 'rxdb-oms-app';
// This imports cause TS bugs & grinds this file to a halt

//@ts-ignore
import type { RxReplicationState } from 'rxdb-oms-app/plugins/replication';
// This imports cause TS bugs & grinds this file to a halt

//@ts-ignore
import { replicateRxCollection } from 'rxdb-oms-app/plugins/replication';
import { Subject, Subscription, merge } from 'rxjs';
import { delay, inject, singleton } from 'tsyringe';
import { SyncronisationSignal } from '../memory/syncronisation.signal';
import { AppDatabase } from './app-database';
import type { MemoryDatabaseCollections } from './collections';
import type { ReplicableRxDocumentType, UiStateDocumentType } from './types';
import { testScoped } from '@app/workspace.registry';
import { DocumentWithSchemaVersion } from './replication-migrations/middleware';
import { UiStateWithContext } from '@oms/generated/frontend';

export const REPLICATION_COLLECTIONS: (keyof MemoryDatabaseCollections)[] = [
  'snapshots',
  'actions',
  'grids',
  'user_preferences'
];
const REPLICATION_POLL_INTERVAL = 1000 * 30;

type CheckpointType = {
  id: string;
  updatedAt: string;
};

type Middleware = (
  documents: DocumentWithSchemaVersion[],
  collection: RxCollection
) => DocumentWithSchemaVersion[] | Promise<DocumentWithSchemaVersion[]>;

@testScoped
@singleton()
export class RxdbReplicationService {
  public pullStream$: Subject<RxReplicationPullStreamItem<UiStateDocumentType, CheckpointType>> | undefined;
  private poll: ReturnType<typeof setInterval> | undefined;
  private collections: Array<MemoryDatabaseCollections[keyof MemoryDatabaseCollections]> = [];
  private errorSubscription?: Subscription;
  public middlewares: ((
    documents: DocumentWithSchemaVersion[],
    collection: RxCollection
  ) => DocumentWithSchemaVersion[] | Promise<DocumentWithSchemaVersion[]>)[] = [];
  private registeredHooks: Map<string, Set<(collection: RxCollection) => void>> = new Map();
  private registeredMiddlewares: Set<Middleware> = new Set();
  private replicationStates: Map<string, RxReplicationState<UiStateDocumentType, CheckpointType>> = new Map();
  private logger = Logger.named(RxdbReplicationService.name);
  private _isStopping = false;
  private _hasGracefullyStopped = true;
  private _hasInitialized = false;

  constructor(
    @inject(UiStateService) private uiStateService: UiStateService,
    @inject(delay(() => AppDatabase)) private db: AppDatabase,
    @inject(SyncronisationSignal) private syncronisationSignal: SyncronisationSignal,
    public replicationCollections: (keyof MemoryDatabaseCollections)[] = REPLICATION_COLLECTIONS
  ) {
    this.collections = this.replicationCollections.map(
      (collectionName) => this.db.memoryDb.collections[collectionName]
    );

    if (!this.collections) {
      throw new Error('Collections not available');
    }
  }

  public get isInitialized() {
    return this._hasInitialized;
  }

  public get isStopping() {
    return this._isStopping;
  }

  public get hasGracefullyStopped() {
    return this._hasGracefullyStopped;
  }

  public addMiddleware(middleware: Middleware) {
    if (this.registeredMiddlewares.has(middleware)) {
      this.logger.debug('Middleware already registered');
      return;
    }

    this.middlewares.push(async (documents, collection) => {
      return await middleware(documents, collection);
    });

    this.logger.debug(`Registered middleware ${middleware.name || 'anonymous'}`);

    this.registeredMiddlewares.add(middleware);
  }

  public registerRxDBHooks(hooks: Array<(collection: RxCollection) => void>) {
    this.collections.forEach((collection) => {
      hooks.forEach((hook) => {
        const collectionHooks = this.registeredHooks.get(collection.name) || new Set();

        if (collectionHooks.has(hook)) {
          this.logger.debug(`Hook ${hook.name} already registered for collection ${collection.name}`);
          return;
        }

        collectionHooks.add(hook);
        this.registeredHooks.set(collection.name, collectionHooks);
        hook(collection);
        this.logger.debug(`Registered hook ${hook.name} for collection ${collection.name}`);
      });
    });
  }

  async init({ shouldPoll = true, firstInit = false }: { shouldPoll?: boolean; firstInit?: boolean } = {}) {
    if (!this._hasGracefullyStopped) {
      this.logger.error('Replication cannot initialize because it did not gracefully stop');
      return;
    }

    if (this.pullStream$) {
      this.logger.debug('Replication cannot initialized again');
      return;
    }

    this.logger.info('Initializing replication');

    this.pullStream$ = new Subject();

    if (shouldPoll) {
      if (this.poll) {
        clearInterval(this.poll);
      }
      this.logger.info('Initializing replication poll');
      this.poll = setInterval(() => {
        this.pullStream$?.next('RESYNC');
        this.logger.info('Triggering resync');
      }, REPLICATION_POLL_INTERVAL);
    }

    this.syncronisationSignal.signal.set({
      isSyncronising: true,
      isSyncronised: false,
      isSyncronisationError: false,
      syncronisationErrorMessage: null
    });

    if (firstInit === false) {
      await this.resetCollections();
      this.logger.info('Cleared user data from previous session');
    }

    this.collections.forEach((collection) => {
      if (this.replicationStates.has(collection.name)) {
        this.logger.info(`Replication for collection ${collection.name} already exists`);
        return;
      }
      const replicationState = this.replicate(collection);
      this.replicationStates.set(collection.name, replicationState);
      this.logger.info(`Registered replicating collection ${collection.name}`);
    });

    this.logger.info('Awaiting all collections in sync');
    await this.awaitAllCollectionsInSync();
    this.logger.info('All collections in sync');

    this.syncronisationSignal.signal.set({
      isSyncronising: false,
      isSyncronised: true,
      isSyncronisationError: false,
      syncronisationErrorMessage: null
    });

    this.errorSubscription = merge(
      ...Array.from(this.replicationStates.values()).map((replicationState) => replicationState.error$)
    ).subscribe((error: RxError | RxTypeError) => {
      this.syncronisationSignal.signal.set({
        ...this.syncronisationSignal.signal.get(),
        isSyncronisationError: true,
        syncronisationErrorMessage: error.message
      });
    });

    this._hasInitialized = true;
  }

  async awaitAllCollectionsInSync() {
    await Promise.all(
      Array.from(this.replicationStates.values()).map((replicationState) =>
        replicationState.awaitInitialReplication()
      )
    );
  }

  async stopReplication() {
    if (!this._hasInitialized) {
      this.logger.warn('Replication cannot stop because it has not been initialized');
      return;
    }

    if (this._isStopping) {
      this.logger.warn('Replication cannot stop because it is already stopping');
      return;
    }

    this._hasGracefullyStopped = false;
    this._isStopping = true;

    this.logger.info('Stopping replication');

    try {
      if (this.pullStream$) {
        this.pullStream$.unsubscribe();
        this.pullStream$ = undefined;
      }

      await this.clearReplicationStates();

      if (this.poll) {
        clearInterval(this.poll);
        this.poll = undefined;
        this.logger.info('Poll interval cleared');
      }

      if (this.errorSubscription) {
        this.errorSubscription.unsubscribe();
        this.logger.info('Error subscription unsubscribed');
      }
      this._hasGracefullyStopped = true;
      this.logger.info('Replication stopped gracefully');
    } catch (error) {
      this.logger.error('Error stopping replication', error);
      throw error;
    } finally {
      this._isStopping = false;
      this._hasInitialized = false;
    }
  }

  private async clearReplicationStates() {
    if (this.db.memoryDb.closed) {
      this.replicationStates.clear();
      this.logger.info('Memory database is closed, skipping clearReplicationStates');
      return;
    }

    await Promise.all(
      Array.from(this.replicationStates.values()).map((replicationState) => replicationState.remove())
    );
    this.logger.info(`Removed all replication states`);

    this.replicationStates.clear();
  }

  private async resetCollections() {
    await this.clearReplicationStates();
    await this.db.resetMemoryCollections();
    this.collections = this.replicationCollections.map(
      (collectionName) => this.db.memoryDb.collections[collectionName]
    );
  }

  private getDocumentId(document: UiStateDocumentType, collection: RxCollection) {
    let primaryKey = collection.schema.jsonSchema.primaryKey;

    if (typeof primaryKey === 'object' && primaryKey.key) {
      primaryKey = primaryKey.key;
    }

    if (typeof primaryKey !== 'string') {
      throw new Error('Primary key must be a string');
    }

    return String(document[primaryKey as keyof UiStateDocumentType]);
  }

  parsePulledDocumentJSON(documents: UiStateWithContext[] | undefined): DocumentWithSchemaVersion[] {
    return (
      documents?.map((document: UiStateWithContext) => {
        const parsedDocument = parseJSON<ReplicableRxDocumentType>(document.document);
        if (!parsedDocument) {
          throw new Error('Invalid document, must be a ReplicableRxDocumentType');
        }
        return { ...document, document: parsedDocument };
      }) || []
    );
  }

  sortDocumentsByLastUpdatedAt(documents: DocumentWithSchemaVersion[]): DocumentWithSchemaVersion[] {
    return documents.sort(
      (a, b) => new Date(a.document.lastUpdatedAt).getTime() - new Date(b.document.lastUpdatedAt).getTime()
    );
  }

  replicate(collection: RxCollection) {
    const _this = this;
    _this.logger.debug(`Creating new replicater function for collection ${collection.name}`);
    return replicateRxCollection({
      collection,
      replicationIdentifier: `${collection.name}-replication`,
      push: {
        handler: async function (changeRows: ReplicationPushChangeRow[]) {
          _this.logger.debug(`Pushing changes for collection ${collection.name}`);
          const indexableFields = changeRows.map(({ newDocumentState }) => ({
            documentId: _this.getDocumentId(newDocumentState, collection),
            documentLastUpdatedAt: newDocumentState.lastUpdatedAt,
            documentDeleted: !!newDocumentState._deleted,
            documentSchemaVersion: collection.schema.version,
            documentIsShared: newDocumentState.isShared
          }));
          const result = await _this.uiStateService.pushUiState(changeRows, indexableFields, collection.name);
          const conflicts = result.data?.pushUiStates?.documents || [];
          if (conflicts.length > 0) {
            _this.pullStream$?.next('RESYNC');
          }
          _this.logger.info(
            `Pushed changes for collection ${collection.name} with ${conflicts.length} conflicts`
          );
          return conflicts.map((document: string) => JSON.parse(document));
        }
      },
      pull: {
        handler: async function (
          checkpointOrNull: CheckpointType = { id: '', updatedAt: new Date(0).toISOString() },
          limit: number
        ) {
          _this.logger.debug(`Pulling changes for collection ${collection.name}`);
          const lastUpdatedAt = checkpointOrNull.updatedAt;
          const id = checkpointOrNull.id;
          if (!_this.uiStateService) {
            throw new Error('uiStateService not available');
          }
          const result = await _this.uiStateService.pullUiState(
            {
              id,
              collection: collection.name,
              lastUpdatedAt,
              schemaVersion: collection.schema.version,
              limit
            },
            collection.name
          );
          const parsedDocuments = _this.parsePulledDocumentJSON(result.data.pullUiStates?.documents);
          const sortedDocuments = _this.sortDocumentsByLastUpdatedAt(parsedDocuments);
          const newCheckpoint = sortedDocuments.at(-1)?.document;

          const documentsAfterMiddlewares = await _this.middlewares.reduce(async (accPromise, middleware) => {
            const acc = await accPromise;
            return middleware.bind(_this)(acc, collection);
          }, Promise.resolve(sortedDocuments));

          const documents = documentsAfterMiddlewares.map(({ document }) => document);
          const noOfDeletedDocuments = documents.filter(({ _deleted }) => _deleted).length;

          _this.logger.info(
            `Pulled ${documents.length} active documents for collection ${collection.name}, with ${noOfDeletedDocuments} deleted documents`
          );

          return {
            documents,
            checkpoint: newCheckpoint
              ? {
                  id: _this.getDocumentId(newCheckpoint, collection),
                  updatedAt: newCheckpoint.lastUpdatedAt
                }
              : checkpointOrNull
          };
        },
        stream$: _this.pullStream$?.asObservable()
      }
    });
  }
}
