import { eventChannel } from 'redux-saga';

import {
  take,
  takeEvery,
  call,
  fork,
  put, select,
} from 'redux-saga/effects';

import { prop } from 'ramda';
import { socketTypes } from './index';
import socketConnection from '../socket';
import { sagasManager } from '../../../index';
import { uiActions, uiSelectors } from '../../../../state/ui';
import { subscribeToUserWsChannels } from '../../../../state/user/operations';

function socketEmitter(event, topic) {
  return eventChannel((emitter) => {
    const { ws } = socketConnection.getSocketConnection();
    if (!ws) throw Error('Ws object is empty, can`t listening event');
    const topicSubscribe = ws.getSubscription(topic);
    if (!topicSubscribe) {
      throw Error('Can not find this topic in this WS channel');
    }
    topicSubscribe.on(event, (payload) => {
      emitter(payload);
    });
    return () => console.error('off');
  });
}

function* listen(event, topic, action) {
  const isWsError = yield select(uiSelectors.getIsWsError);
  if (!isWsError) {
    const channel = yield call(socketEmitter, event, topic, action);
    while (true) {
      try {
        const payload = yield take(channel);
        yield put(action({
          ...payload,
        }));
      } catch (err) {
        console.error('socket error:', err);
      }
    }
  } else {
    console.warn('Can not listen event on channel, socket connection is broken');
  }
}

const onCloseSocketCb = () => sagasManager.addSagaToRoot(function* watcher() {
  yield put(uiActions.setWsError(true));
});

const onOpenSocketCb = ({ afterOpenCb }) => sagasManager.addSagaToRoot(function* watcher() {
  yield put(uiActions.setWsError(false));
  yield subscribeToUserWsChannels();
  if (afterOpenCb) yield afterOpenCb();
});

function* connectionSocket() {
  while (true) {
    const { payload } = yield take(socketTypes.CONNECT_SOCKET_TO_SERVER);
    const isReconnect = prop('isReconnect', payload);
    const token = localStorage.getItem('authToken');
    if (isReconnect) {
      yield call(socketConnection.initSocketConnection, {
        token,
        onCloseCb: onCloseSocketCb,
        onOpenCb: () => onOpenSocketCb(payload),
      });
    } else {
      yield call(socketConnection.initSocketConnection, {
        token,
        onCloseCb: onCloseSocketCb,
        onOpenCb: ({ isSubscriptions }) => !isSubscriptions && onOpenSocketCb(),
      });
      yield subscribeToUserWsChannels();
    }
  }
}

function* callListener(action) {
  yield call(listen, action.payload.event, action.payload.topic, action.payload.action);
}

function* subscribe() {
  yield takeEvery(socketTypes.LISTEN_EVENT, callListener);
}

function* unsubscribe() {
  while (true) {
    const { payload: { topic } } = yield take(socketTypes.UNSUBSCRIBE_CHANNEL);
    const isWsError = yield select(uiSelectors.getIsWsError);
    if (!isWsError) {
      const { ws } = socketConnection.getSocketConnection();
      const topicSubscribe = ws.getSubscription(topic);
      if (topicSubscribe) {
        topicSubscribe.on('close', () => {
          console.log('close');
        });
        topicSubscribe.close();
      }
    } else {
      console.warn('Can not subscribe to channel, socket connection is broken');
    }
  }
}

function* subscribeToChannel() {
  while (true) {
    const { payload: { topic } } = yield take(socketTypes.SUBSCRIBE_CHANNEL);
    const isWsError = yield select(state => uiSelectors.getIsWsError(state));
    if (!isWsError) {
      const ws = socketConnection.getSocketConnection();
      yield call(ws.subscribe, topic);
    } else {
      console.warn('Can not subscribe to channel, socket connection is broken');
    }
  }
}

sagasManager.addSagaToRoot(function* watcher() {
  yield fork(connectionSocket);
  yield fork(subscribeToChannel);
  yield fork(subscribe);
  yield fork(unsubscribe);
});
