/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.master.querymaster;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.LogicalOptimizer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.tajo.TajoProtos.QueryState;

public class QueryMasterTask extends CompositeService {
  private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());

  // query submission directory is private!
  final public static FsPermission STAGING_DIR_PERMISSION =
      FsPermission.createImmutable((short) 0700); // rwx--------

  private QueryId queryId;

  private Session session;

  private QueryContext queryContext;

  private QueryMasterTaskContext queryTaskContext;

  private QueryMaster.QueryMasterContext queryMasterContext;

  private Query query;

  private MasterPlan masterPlan;

  private String jsonExpr;

  private String logicalPlanJson;

  private TajoAsyncDispatcher dispatcher;

  private final long querySubmitTime;

  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();

  private TajoConf systemConf;

  private AtomicLong lastClientHeartbeat = new AtomicLong(-1);

  private AbstractResourceAllocator resourceAllocator;

  private AtomicBoolean stopped = new AtomicBoolean(false);

  private TajoMetrics queryMetrics;

  private Throwable initError;

  private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
      new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();

  public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
                         QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
                         String logicalPlanJson) {

    super(QueryMasterTask.class.getName());
    this.queryMasterContext = queryMasterContext;
    this.queryId = queryId;
    this.session = session;
    this.queryContext = queryContext;
    this.jsonExpr = jsonExpr;
    this.logicalPlanJson = logicalPlanJson;
    this.querySubmitTime = System.currentTimeMillis();
  }

  @Override
  public void init(Configuration conf) {
    systemConf = (TajoConf)conf;

    try {
      queryTaskContext = new QueryMasterTaskContext();
      String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);

      if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
      } else {
        throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
      }
      addService(resourceAllocator);

      dispatcher = new TajoAsyncDispatcher(queryId.toString());
      addService(dispatcher);

      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
      dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());

      initStagingDir();

      queryMetrics = new TajoMetrics(queryId.toString());

      super.init(systemConf);
    } catch (Throwable t) {
      LOG.error(t.getMessage(), t);
      initError = t;
    }
  }

  public boolean isStopped() {
    return stopped.get();
  }

  @Override
  public void start() {
    startQuery();
    super.start();
  }

  @Override
  public void stop() {

    if(stopped.getAndSet(true)) {
      return;
    }

    LOG.info("Stopping QueryMasterTask:" + queryId);

    CallFuture future = new CallFuture();

    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
    NettyClientBase tmClient = null;
    try {
      tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
          TajoMasterProtocol.class, true);
      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
      masterClientService.stopQueryMaster(null, queryId.getProto(), future);
    } catch (Exception e) {
      LOG.error(e.getMessage(), e);
    } finally {
      connPool.releaseConnection(tmClient);
    }

    try {
      future.get(3, TimeUnit.SECONDS);
    } catch (Throwable t) {
      LOG.warn(t);
    }

    super.stop();

    //TODO change report to tajo master
    queryMetrics.report(new MetricsConsoleReporter());

    LOG.info("Stopped QueryMasterTask:" + queryId);
  }

  public void handleTaskRequestEvent(TaskRequestEvent event) {
    ExecutionBlockId id = event.getExecutionBlockId();
    query.getSubQuery(id).handleTaskRequestEvent(event);
  }

  public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
    synchronized(diagnostics) {
      if (diagnostics.size() < 10) {
        diagnostics.add(report);
      }
    }

    getEventHandler().handle(new TaskFatalErrorEvent(report));
  }

  public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
    synchronized(diagnostics) {
      return Collections.unmodifiableCollection(diagnostics);
    }
  }

  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
    public void handle(SubQueryEvent event) {
      ExecutionBlockId id = event.getSubQueryId();
      if(LOG.isDebugEnabled()) {
        LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType());
      }
      query.getSubQuery(id).handle(event);
    }
  }

  private class TaskEventDispatcher
      implements EventHandler<TaskEvent> {
    public void handle(TaskEvent event) {
      QueryUnitId taskId = event.getTaskId();
      if(LOG.isDebugEnabled()) {
        LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
      }
      QueryUnit task = query.getSubQuery(taskId.getExecutionBlockId()).
          getQueryUnit(taskId);
      task.handle(event);
    }
  }

  private class TaskAttemptEventDispatcher
      implements EventHandler<TaskAttemptEvent> {
    public void handle(TaskAttemptEvent event) {
      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
      SubQuery subQuery = query.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
      QueryUnitAttempt attempt = task.getAttempt(attemptId);
      attempt.handle(event);
    }
  }

  private class TaskSchedulerDispatcher
      implements EventHandler<TaskSchedulerEvent> {
    public void handle(TaskSchedulerEvent event) {
      SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId());
      subQuery.getTaskScheduler().handle(event);
    }
  }

  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
    @Override
    public void handle(LocalTaskEvent event) {
      TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
      if (proxy != null) {
        proxy.killTaskAttempt(event.getTaskAttemptId());
      }
    }
  }

  private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
    @Override
    public void handle(QueryMasterQueryCompletedEvent event) {
      QueryId queryId = event.getQueryId();
      LOG.info("Query completion notified from " + queryId);

      while (!isTerminatedState(query.getState())) {
        try {
          synchronized (this) {
            wait(10);
          }
        } catch (InterruptedException e) {
          LOG.error(e);
        }
      }
      LOG.info("Query final state: " + query.getState());
      queryMasterContext.stopQuery(queryId);
    }

    private boolean isTerminatedState(QueryState state) {
      return
          state == QueryState.QUERY_SUCCEEDED ||
          state == QueryState.QUERY_FAILED ||
          state == QueryState.QUERY_KILLED ||
          state == QueryState.QUERY_ERROR;
    }
  }

  public synchronized void startQuery() {
    try {
      if (query != null) {
        LOG.warn("Query already started");
        return;
      }
      CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
      LogicalPlanner planner = new LogicalPlanner(catalog);
      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
      Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
      LogicalPlan plan = planner.createPlan(session, expr);
      optimizer.optimize(plan);

      GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
      hookManager.addHook(new GlobalEngine.InsertHook());
      hookManager.doHooks(queryContext, plan);

      for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
        LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
        if (scanNodes != null) {
          for (LogicalNode eachScanNode : scanNodes) {
            ScanNode scanNode = (ScanNode) eachScanNode;
            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
          }
        }

        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
        if (scanNodes != null) {
          for (LogicalNode eachScanNode : scanNodes) {
            ScanNode scanNode = (ScanNode) eachScanNode;
            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
          }
        }
      }
      MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
      queryMasterContext.getGlobalPlanner().build(masterPlan);

      query = new Query(queryTaskContext, queryId, querySubmitTime,
          "", queryTaskContext.getEventHandler(), masterPlan);

      dispatcher.register(QueryEventType.class, query);
      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
    } catch (Throwable t) {
      LOG.error(t.getMessage(), t);
      initError = t;
    }
  }

  private void initStagingDir() throws IOException {
    Path stagingDir = null;
    Path outputDir = null;
    FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);

    try {

      stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString());
      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));

      // Create a subdirectories
      LOG.info("The staging dir '" + stagingDir + "' is created.");

      queryContext.setStagingDir(stagingDir);

      /////////////////////////////////////////////////
      // Check and Create Output Directory If Necessary
      /////////////////////////////////////////////////
      if (queryContext.hasOutputPath()) {
        outputDir = queryContext.getOutputPath();
        if (!queryContext.isOutputOverwrite()) {
          if (defaultFS.exists(outputDir)) {
            throw new IOException("The output directory '" + outputDir + " already exists.");
          }
        }
      }
    } catch (IOException ioe) {
      if (stagingDir != null && defaultFS.exists(stagingDir)) {
        defaultFS.delete(stagingDir, true);
        LOG.info("The staging directory '" + stagingDir + "' is deleted");
      }

      throw ioe;
    }
  }

  /**
   * It initializes the final output and staging directory and sets
   * them to variables.
   */
  public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException {

    String realUser;
    String currentUser;
    UserGroupInformation ugi;
    ugi = UserGroupInformation.getLoginUser();
    realUser = ugi.getShortUserName();
    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();

    Path stagingDir = null;

    ////////////////////////////////////////////
    // Create Output Directory
    ////////////////////////////////////////////

    stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);

    if (fs.exists(stagingDir)) {
      throw new IOException("The staging directory '" + stagingDir + "' already exists");
    }
    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
    FileStatus fsStatus = fs.getFileStatus(stagingDir);
    String owner = fsStatus.getOwner();

    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
      throw new IOException("The ownership on the user's query " +
          "directory " + stagingDir + " is not as expected. " +
          "It is owned by " + owner + ". The directory must " +
          "be owned by the submitter " + currentUser + " or " +
          "by " + realUser);
    }

    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
      LOG.info("Permissions on staging directory " + stagingDir + " are " +
          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
          "to correct value " + STAGING_DIR_PERMISSION);
      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
    }

    return stagingDir;
  }

  public Query getQuery() {
    return query;
  }

  public void expiredSessionTimeout() {
    stop();
  }

  public QueryMasterTaskContext getQueryTaskContext() {
    return queryTaskContext;
  }

  public EventHandler getEventHandler() {
    return queryTaskContext.getEventHandler();
  }

  public void touchSessionTime() {
    this.lastClientHeartbeat.set(System.currentTimeMillis());
  }

  public long getLastClientHeartbeat() {
    return this.lastClientHeartbeat.get();
  }

  public QueryId getQueryId() {
    return queryId;
  }

  public boolean isInitError() {
    return initError != null;
  }

  public QueryState getState() {
    if(query == null) {
      if (isInitError()) {
        return QueryState.QUERY_ERROR;
      } else {
        return QueryState.QUERY_NOT_ASSIGNED;
      }
    } else {
      return query.getState();
    }
  }

  public String getErrorMessage() {
    if (isInitError()) {
      return StringUtils.stringifyException(initError);
    } else {
      return null;
    }
  }

  public long getQuerySubmitTime() {
    return this.querySubmitTime;
  }

  public class QueryMasterTaskContext {
    EventHandler eventHandler;
    public QueryMaster.QueryMasterContext getQueryMasterContext() {
      return queryMasterContext;
    }

    public Session getSession() {
      return session;
    }

    public QueryContext getQueryContext() {
      return queryContext;
    }

    public TajoConf getConf() {
      return systemConf;
    }

    public Clock getClock() {
      return queryMasterContext.getClock();
    }

    public Query getQuery() {
      return query;
    }

    public QueryId getQueryId() {
      return queryId;
    }

    public AbstractStorageManager getStorageManager() {
      return queryMasterContext.getStorageManager();
    }

    public Path getStagingDir() {
      return queryContext.getStagingDir();
    }

    public synchronized EventHandler getEventHandler() {
      if(eventHandler == null) {
        eventHandler = dispatcher.getEventHandler();
      }
      return eventHandler;
    }

    public TajoAsyncDispatcher getDispatcher() {
      return dispatcher;
    }

    public SubQuery getSubQuery(ExecutionBlockId id) {
      return query.getSubQuery(id);
    }

    public Map<String, TableDesc> getTableDescMap() {
      return tableDescMap;
    }

    public float getProgress() {
      if(query == null) {
        return 0.0f;
      }
      return query.getProgress();
    }

    public AbstractResourceAllocator getResourceAllocator() {
      return resourceAllocator;
    }

    public TajoMetrics getQueryMetrics() {
      return queryMetrics;
    }
  }
}