/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import debug from 'debug';
import DubboAgent from './dubbo-agent';
import {ScheduleError, SocketError, ZookeeperTimeoutError} from './err';
import Queue from './queue';
import {IZkClientProps} from './types';
import {traceErr, traceInfo} from './util';
import {ZkRegistry} from './zookeeper';

const log = debug('dubbo:scheduler');
const enum STATUS {
  PADDING = 'padding',
  READY = 'ready',
  FAILED = 'failded',
  NO_AGENT = 'no_agent',
}

/**
 * scheduler
 * 1. 初始化zookeeper和socket-agent
 * 2. 接受所有的socket-worker的事件
 * 3. 处理用户的请求
 * 4. 接受zookeeper的变化，更新Server-agent
 */
export default class Scheduler {
  constructor(props: IZkClientProps, queue: Queue) {
    log(`new:|> %O`, props);
    this._status = STATUS.PADDING;

    this._queue = queue;
    this._queue.subscribe(this._handleQueueRequest);

    this._dubboAgent = new DubboAgent();

    //init ZkClient and subscribe
    this._zkClient = ZkRegistry.from(props).subscribe({
      onData: this._handleZkClientOnData,
      onError: this._handleZkClientError,
    });
  }

  private _status: STATUS;
  private _queue: Queue;
  private _zkClient: ZkRegistry;
  private _dubboAgent: DubboAgent;

  /**
   * static factory method
   * @param props
   */
  static from(props: IZkClientProps, queue: Queue) {
    return new Scheduler(props, queue);
  }

  /**
   * handle request in queue
   * @param requestId
   */
  private _handleQueueRequest = requestId => {
    //record current status
    log(`handle requestId ${requestId}, current status: ${this._status}`);

    switch (this._status) {
      case STATUS.READY:
        //发起dubbo的调用
        this._handleDubboInvoke(requestId);
        break;
      case STATUS.PADDING:
        log('current scheduler was padding');
        break;
      case STATUS.NO_AGENT:
        this._handleFailed(
          requestId,
          new ScheduleError('Zookeeper Can not be find any agents'),
        );
        break;
      case STATUS.FAILED:
        this._handleFailed(
          requestId,
          new ScheduleError('ZooKeeper Could not be connected'),
        );
        break;
    }
  };

  /**
   * 处理zookeeper的数据
   */
  private _handleZkClientOnData = (agentSet: Set<string>) => {
    //获取负载列表
    log(`get agent address:=> %O`, agentSet);

    //如果负载为空，也就是没有任何provider提供服务
    if (agentSet.size === 0) {
      this._status = STATUS.NO_AGENT;
      //将队列中的所有dubbo调用全调用失败
      const err = new ScheduleError('Can not be find any agents');
      this._queue.allFailed(err);
      traceErr(err);
      return;
    }

    //初始化dubboAgent
    this._dubboAgent.from(agentSet).subscribe({
      onConnect: this._handleOnConnect,
      onData: this._handleOnData,
      onClose: this._handleOnClose,
    });
  };

  /**
   * 处理zookeeper的错误
   */
  private _handleZkClientError = err => {
    log(err);
    //说明zookeeper连接不上
    if (err instanceof ZookeeperTimeoutError) {
      this._status = STATUS.FAILED;
    }
  };

  /**
   * 处理schedule的failed状态
   */
  private _handleFailed = (requestId: number, err: Error) => {
    log('#requestId: %d scheduler was failed, err: %s', requestId, err);
    this._queue.failed(requestId, err);
  };

  /**
   * 发起dubbo调用
   * @param ctx
   * @param agentHostList
   */
  private _handleDubboInvoke(requestId: number) {
    //get request context
    const ctx = this._queue.requestQueue.get(requestId);
    //get socket agent list
    const agentAddrList = this._zkClient.getAgentAddrList(ctx);
    log('agentAddrSet-> %O', agentAddrList);
    const worker = this._dubboAgent.getAvailableSocketWorker(agentAddrList);

    //if could not find any available socket agent worker
    if (!worker) {
      const {requestId, dubboInterface, version, group} = ctx;
      const msg = `requestId#${requestId}:Could not find any agent worker with ${dubboInterface}#${version}#${group} agentList: ${agentAddrList.join(
        ',',
      )}`;
      const err = new ScheduleError(msg);
      this._handleFailed(requestId, err);
      log(err);
      traceErr(err);
      return;
    }

    ctx.invokeHost = worker.host;
    ctx.invokePort = worker.port;

    const providerProps = this._zkClient.getDubboServiceProp(ctx);
    this._queue.consume(ctx.requestId, worker, providerProps);
  }

  private _handleOnConnect = ({pid, host, port}) => {
    log(`scheduler receive SocketWorker connect pid#${pid} ${host}:${port}`);
    const agentHost = `${host}:${port}`;
    this._status = STATUS.READY;
    traceInfo(
      `scheduler receive SocketWorker connect pid#${pid} ${host}:${port}`,
    );

    for (let ctx of this._queue.requestQueue.values()) {
      if (ctx.isNotScheduled) {
        const agentHostList = this._zkClient.getAgentAddrList(ctx);
        log('agentHostList-> %O', agentHostList);
        //当前的socket是否可以处理当前的请求
        if (agentHostList.indexOf(agentHost) != -1) {
          this._handleDubboInvoke(ctx.requestId);
        }
      }
    }
  };

  /**
   * 当收到数据的时候
   */
  private _handleOnData = ({requestId, res, err}) => {
    if (err) {
      this._queue.failed(requestId, err);
    } else {
      this._queue.resolve(requestId, res);
    }
  };

  /**
   * 处理某一个SocketWorker被关闭的状态
   */
  private _handleOnClose = ({pid}) => {
    log(`SocketWorker#${pid} was close`);

    //查询之前哪些接口的方法被pid调用, 然后直接failfast
    const {requestQueue} = this._queue;
    for (let [requestId, ctx] of requestQueue) {
      if (ctx.pid === pid) {
        this._handleFailed(
          requestId,
          new SocketError(`SocketWorker#${pid} had closed.`),
        );
      }
    }
  };
}
