import type { ReplicationPushChangeRow } from '@app/data-access/services/ui-state/ui-state.service';
import { UiStateService } from '@app/data-access/services/ui-state/ui-state.service';
import { parseJSON } from '@oms/shared/util';
import type { RxCollection, RxError, RxReplicationPullStreamItem, RxTypeError } from 'rxdb-v15';
import type { RxReplicationState } from 'rxdb-v15/plugins/replication';
import { replicateRxCollection } from 'rxdb-v15/plugins/replication';
import { Subject, Subscription, merge } from 'rxjs';
import { delay, inject, singleton } from 'tsyringe';
import { SyncronisationSignal } from '../memory/syncronisation.signal';
import { AppDatabase } from './app-database';
import type { MemoryDatabaseCollections } from './collections';
import type { ReplicableRxDocumentType, UiStateDocumentType } from './types';
import { testScoped } from '@app/workspace.registry';

export const REPLICATION_COLLECTIONS: (keyof MemoryDatabaseCollections)[] = ['snapshots', 'actions'];
const REPLICATION_POLL_INTERVAL = 1000 * 30;

type CheckpointType = {
  id: string;
  updatedAt: string;
};

@testScoped
@singleton()
export class RxdbReplicationService {
  public pullStream$: Subject<RxReplicationPullStreamItem<UiStateDocumentType, CheckpointType>> | undefined;
  public replicationStates: RxReplicationState<UiStateDocumentType, CheckpointType>[] = [];
  private poll: ReturnType<typeof setInterval> | undefined;
  private collections: Array<MemoryDatabaseCollections[keyof MemoryDatabaseCollections]> = [];
  private errorSubscription?: Subscription;
  public middlewares: ((
    documents: ReplicableRxDocumentType[],
    collection: RxCollection
  ) => ReplicableRxDocumentType[])[] = [];

  constructor(
    @inject(UiStateService) private uiStateService: UiStateService,
    @inject(delay(() => AppDatabase)) private db: AppDatabase,
    @inject(SyncronisationSignal) private syncronisationSignal: SyncronisationSignal,
    public replicationCollections: (keyof MemoryDatabaseCollections)[] = REPLICATION_COLLECTIONS
  ) {
    this.collections = this.replicationCollections.map(
      (collectionName) => this.db.memoryDb.collections[collectionName]
    );

    if (!this.collections) {
      throw new Error('Collections not available');
    }
  }

  public addMiddleware(
    middleware: (
      documents: ReplicableRxDocumentType[],
      collection: RxCollection
    ) => ReplicableRxDocumentType[]
  ) {
    this.middlewares.push(middleware);
  }

  public registerRxDBHooks(hooks: Array<(collection: RxCollection) => void>) {
    this.collections.forEach((collection) => {
      hooks.forEach((hook) => hook(collection));
    });
  }

  async init(shouldPoll: boolean = true) {
    if (this.pullStream$) {
      console.warn('Replication cannot initialized again');
      return;
    }

    this.pullStream$ = new Subject();

    if (shouldPoll) {
      if (this.poll) {
        clearInterval(this.poll);
      }
      this.poll = setInterval(() => {
        this.pullStream$?.next('RESYNC');
      }, REPLICATION_POLL_INTERVAL);
    }

    this.syncronisationSignal.signal.set({
      isSyncronising: true,
      isSyncronised: false,
      isSyncronisationError: false,
      syncronisationError: null
    });

    this.replicationStates = this.collections.map((collection) => {
      return this.replicate(collection);
    });

    await this.awaitAllCollectionsInSync();

    this.syncronisationSignal.signal.set({
      isSyncronising: false,
      isSyncronised: true,
      isSyncronisationError: false,
      syncronisationError: null
    });

    this.errorSubscription = merge(
      ...this.replicationStates.map((replicationState) => replicationState.error$)
    ).subscribe((error: RxError | RxTypeError) => {
      this.syncronisationSignal.signal.set({
        ...this.syncronisationSignal.signal.get(),
        isSyncronisationError: true,
        syncronisationError: error.message
      });
    });
  }

  async awaitAllCollectionsInSync() {
    await Promise.all(
      this.replicationStates.map((replicationState) => replicationState.awaitInitialReplication())
    );
  }

  stopReplication() {
    if (this.pullStream$) {
      this.pullStream$.unsubscribe();
      this.pullStream$ = undefined;
    }
    if (this.poll) {
      clearInterval(this.poll);
      this.poll = undefined;
    }
    if (this.errorSubscription) {
      this.errorSubscription.unsubscribe();
    }
  }

  private getDocumentId(document: UiStateDocumentType, collection: RxCollection) {
    let primaryKey = collection.schema.jsonSchema.primaryKey;

    if (typeof primaryKey === 'object' && primaryKey.key) {
      primaryKey = primaryKey.key;
    }

    if (typeof primaryKey !== 'string') {
      throw new Error('Primary key must be a string');
    }

    return String(document[primaryKey as keyof UiStateDocumentType]);
  }

  parsePulledDocumentJSON(documents: string[] | undefined): ReplicableRxDocumentType[] {
    return (
      documents?.map((document: string) => {
        const parsedDocument = parseJSON<ReplicableRxDocumentType>(document);
        if (!parsedDocument) {
          throw new Error('Invalid document, must be a ReplicableRxDocumentType');
        }
        return parsedDocument;
      }) || []
    );
  }

  sortDocumentsByLastUpdatedAt(documents: ReplicableRxDocumentType[]): ReplicableRxDocumentType[] {
    return documents.sort(
      (a, b) => new Date(a.lastUpdatedAt).getTime() - new Date(b.lastUpdatedAt).getTime()
    );
  }

  replicate(collection: RxCollection) {
    const _this = this;
    return replicateRxCollection({
      collection,
      replicationIdentifier: `${collection.name}-replication`,
      push: {
        handler: async function (changeRows: ReplicationPushChangeRow[]) {
          const indexableFields = changeRows.map(({ newDocumentState }) => ({
            documentId: _this.getDocumentId(newDocumentState, collection),
            documentLastUpdatedAt: newDocumentState.lastUpdatedAt,
            documentDeleted: !!newDocumentState._deleted
          }));
          const result = await _this.uiStateService.pushUiState(changeRows, indexableFields, collection.name);
          const conflicts = result.data?.pushUiStates?.documents || [];
          if (conflicts.length > 0) {
            _this.pullStream$?.next('RESYNC');
          }
          return conflicts.map((document: string) => JSON.parse(document));
        }
      },
      pull: {
        handler: async function (
          checkpointOrNull: CheckpointType = { id: '', updatedAt: new Date(0).toISOString() },
          limit: number
        ) {
          const lastUpdatedAt = checkpointOrNull.updatedAt;
          const id = checkpointOrNull.id;
          if (!_this.uiStateService) {
            throw new Error('uiStateService not available');
          }
          const result = await _this.uiStateService.pullUiState(
            {
              id,
              collection: collection.name,
              lastUpdatedAt,
              limit
            },
            collection.name
          );
          const parsedAndSortedDocuments = _this.sortDocumentsByLastUpdatedAt(
            _this.parsePulledDocumentJSON(result.data.pullUiStates?.documents)
          );

          const newCheckpoint = parsedAndSortedDocuments.at(-1);

          const documents = _this.middlewares.reduce(
            (acc, middleware) => middleware(acc, collection),
            parsedAndSortedDocuments
          );

          return {
            documents,
            checkpoint: newCheckpoint
              ? {
                  id: _this.getDocumentId(newCheckpoint, collection),
                  updatedAt: newCheckpoint.lastUpdatedAt
                }
              : checkpointOrNull
          };
        },
        stream$: _this.pullStream$?.asObservable()
      }
    });
  }
}
