/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.vxquery.rest.service;

import static java.util.logging.Level.SEVERE;
import static org.apache.vxquery.rest.Constants.ErrorCodes.NOT_FOUND;
import static org.apache.vxquery.rest.Constants.ErrorCodes.PROBLEM_WITH_QUERY;
import static org.apache.vxquery.rest.Constants.ErrorCodes.UNFORSEEN_PROBLEM;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.vxquery.compiler.CompilerControlBlock;
import org.apache.vxquery.compiler.algebricks.VXQueryGlobalDataFactory;
import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor;
import org.apache.vxquery.context.DynamicContext;
import org.apache.vxquery.context.DynamicContextImpl;
import org.apache.vxquery.context.RootStaticContextImpl;
import org.apache.vxquery.context.StaticContextImpl;
import org.apache.vxquery.exceptions.ErrorCode;
import org.apache.vxquery.exceptions.SystemException;
import org.apache.vxquery.exceptions.VXQueryRuntimeException;
import org.apache.vxquery.rest.request.QueryRequest;
import org.apache.vxquery.rest.request.QueryResultRequest;
import org.apache.vxquery.rest.response.APIResponse;
import org.apache.vxquery.rest.response.Error;
import org.apache.vxquery.rest.response.QueryResponse;
import org.apache.vxquery.rest.response.QueryResultResponse;
import org.apache.vxquery.rest.response.SyncQueryResponse;
import org.apache.vxquery.result.ResultUtils;
import org.apache.vxquery.xmlquery.ast.ModuleNode;
import org.apache.vxquery.xmlquery.query.Module;
import org.apache.vxquery.xmlquery.query.XMLQueryCompiler;
import org.apache.vxquery.xmlquery.query.XQueryCompilationListener;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

/**
 * Main class responsible for handling query requests. This class will first
 * compile, then submit query to hyracks and finally fetch results for a given
 * query.
 *
 * @author Erandi Ganepola
 */
public class VXQueryService {

    private static final Logger LOGGER = Logger.getLogger(VXQueryService.class.getName());

    private static final Pattern EMBEDDED_SYSERROR_PATTERN = Pattern.compile("(\\p{javaUpperCase}{4}\\d{4})");

    private volatile State state = State.STOPPED;
    private VXQueryConfig vxQueryConfig;
    private AtomicLong atomicLong = new AtomicLong(0);
    private Map<Long, HyracksJobContext> jobContexts = new ConcurrentHashMap<>();
    private IHyracksClientConnection hyracksClientConnection;
    private HyracksDataset hyracksDataset;

    public VXQueryService(VXQueryConfig config) {
        vxQueryConfig = config;
    }

    /**
     * Starts VXQueryService class by creating a {@link IHyracksClientConnection}
     * which will later be used to submit and retrieve queries and results to/from
     * hyracks.
     */
    public synchronized void start() {
        if (!State.STOPPED.equals(state)) {
            throw new IllegalStateException("VXQueryService is at state : " + state);
        }

        if (vxQueryConfig.getHyracksClientIp() == null) {
            throw new IllegalArgumentException("hyracksClientIp is required to connect to hyracks");
        }

        setState(State.STARTING);

        try {
            hyracksClientConnection =
                    new HyracksConnection(vxQueryConfig.getHyracksClientIp(), vxQueryConfig.getHyracksClientPort());
        } catch (Exception e) {
            LOGGER.log(SEVERE, String.format("Unable to create a hyracks client connection to %s:%d",
                    vxQueryConfig.getHyracksClientIp(), vxQueryConfig.getHyracksClientPort()));
            throw new VXQueryRuntimeException("Unable to create a hyracks client connection", e);
        }

        LOGGER.log(Level.FINE, String.format("Using hyracks connection to %s:%d", vxQueryConfig.getHyracksClientIp(),
                vxQueryConfig.getHyracksClientPort()));

        setState(State.STARTED);
        LOGGER.log(Level.INFO, "VXQueryService started successfully");
    }

    private synchronized void setState(State newState) {
        state = newState;
    }

    /**
     * Submits a query to hyracks to be run after compiling. Required intermediate
     * results and metrics are also calculated according to the
     * {@link QueryRequest}. Checks if this class has started before moving further.
     *
     * @param request
     *            {@link QueryRequest} containing information about the query to be
     *            executed and the merics required along with the results
     * @return AsyncQueryResponse if no error occurs | ErrorResponse else
     */
    public APIResponse execute(final QueryRequest request) {
        QueryRequest indexingRequest = new QueryRequest("show-indexes()");
        indexingRequest.setAsync(false);
        SyncQueryResponse indexingResponse = (SyncQueryResponse) execute(indexingRequest, new ArrayList<>());
        LOGGER.log(Level.FINE, String.format("Found indexes: %s", indexingResponse.getResults()));

        List<String> collections = Arrays.asList(indexingResponse.getResults().split("\n"));
        return execute(request, collections);
    }

    private APIResponse execute(final QueryRequest request, List<String> collections) {
        if (!State.STARTED.equals(state)) {
            throw new IllegalStateException("VXQueryService is at state : " + state);
        }

        String query = request.getStatement();
        final ResultSetId resultSetId = createResultSetId();

        QueryResponse response = APIResponse.newQueryResponse(request, resultSetId);
        response.setStatement(query);

        // Obtaining the node controller information from hyracks client connection
        Map<String, NodeControllerInfo> nodeControllerInfos = null;
        try {
            nodeControllerInfos = hyracksClientConnection.getNodeControllerInfos();
        } catch (HyracksException e) {
            LOGGER.log(Level.SEVERE, String.format("Error occurred when obtaining NC info: '%s'", e.getMessage()));
            return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM)
                    .withMessage("Hyracks connection problem: " + e.getMessage()).build());
        }
        if (nodeControllerInfos.isEmpty()) {
            return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM)
                    .withMessage("No NodeControllers available").build());
        }

        // Adding a query compilation listener
        VXQueryCompilationListener listener = new VXQueryCompilationListener(response,
                request.isShowAbstractSyntaxTree(), request.isShowTranslatedExpressionTree(),
                request.isShowOptimizedExpressionTree(), request.isShowRuntimePlan());

        Date start = new Date();
        // Compiling the XQuery given
        final XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, request.getFrameSize(),
                vxQueryConfig.getAvailableProcessors(), vxQueryConfig.getJoinHashSize(),
                vxQueryConfig.getMaximumDataSize(), vxQueryConfig.getHdfsConf());
        CompilerControlBlock compilerControlBlock = new CompilerControlBlock(
                new StaticContextImpl(RootStaticContextImpl.INSTANCE), resultSetId, request.getSourceFileMap());
        try {
            compiler.compile(null, new StringReader(query), compilerControlBlock, request.getOptimization(),
                    collections);
        } catch (AlgebricksException e) {
            LOGGER.log(Level.SEVERE, String.format("Error occurred when compiling query: '%s' with message: '%s'",
                    query, e.getMessage()));
            return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(PROBLEM_WITH_QUERY)
                    .withMessage("Query compilation failure: " + e.getMessage()).build());
        } catch (SystemException e) {
            LOGGER.log(Level.SEVERE, String.format("Error occurred when compiling query: '%s' with message: '%s'",
                    query, e.getMessage()));
            return APIResponse.newErrorResponse(request.getRequestId(),
                    new Error(PROBLEM_WITH_QUERY, "Query compilation failure: " + e.getCode()));
        }

        if (request.isShowMetrics()) {
            response.getMetrics().setCompileTime(new Date().getTime() - start.getTime());
        }

        if (request.isCompileOnly()) {
            return response;
        }

        Module module = compiler.getModule();
        JobSpecification js = module.getHyracksJobSpecification();
        DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext());
        js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory()));

        HyracksJobContext hyracksJobContext;
        start = new Date();
        if (!request.isAsync()) {
            for (int i = 0; i < request.getRepeatExecutions(); i++) {
                try {
                    hyracksJobContext = executeJob(js, resultSetId, request);

                } catch (Exception e) {
                    LOGGER.log(SEVERE, "Error occurred when submitting job to hyracks for query: " + query, e);
                    return APIResponse.newErrorResponse(request.getRequestId(),
                            Error.builder().withCode(UNFORSEEN_PROBLEM)
                                    .withMessage("Error occurred when starting hyracks job").build());
                }
                try {
                    String results = readResults(hyracksJobContext);
                    ((SyncQueryResponse) response).setResults(results);
                } catch (HyracksException e) {
                    LOGGER.log(Level.SEVERE, "Error occurred when reading results", e);
                    SystemException se = getSystemException(e);
                    return APIResponse.newErrorResponse(request.getRequestId(), new Error(UNFORSEEN_PROBLEM,
                            String.format("Error occurred when reading results: %s", se != null ? se.getCode() : "")));
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Error occurred when reading results", e);
                    return APIResponse.newErrorResponse(request.getRequestId(),
                            new Error(UNFORSEEN_PROBLEM, "Error occurred when reading results: " + e.getMessage()));
                }
            }
        } else {
            try {
                hyracksJobContext = executeJob(js, resultSetId, request);
            } catch (Exception e) {
                LOGGER.log(SEVERE, "Error occurred when submitting job to hyracks for query: " + query, e);
                return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM)
                        .withMessage("Error occurred when starting hyracks job").build());
            }
            jobContexts.put(resultSetId.getId(), hyracksJobContext);
        }

        if (request.isShowMetrics()) {
            response.getMetrics().setElapsedTime(new Date().getTime() - start.getTime());
        }

        return response;
    }

    private HyracksJobContext executeJob(JobSpecification js, ResultSetId resultSetId, QueryRequest request)
            throws Exception {
        HyracksJobContext hyracksJobContext;
        JobId jobId = hyracksClientConnection.startJob(js, EnumSet.of(JobFlag.PROFILE_RUNTIME));
        hyracksJobContext = new HyracksJobContext(jobId, js.getFrameSize(), resultSetId);

        return hyracksJobContext;
    }

    private static SystemException getSystemException(HyracksException e) {
        Throwable t = e;
        Throwable candidate = t instanceof SystemException ? t : null;
        while (t.getCause() != null) {
            t = t.getCause();
            if (t instanceof SystemException) {
                candidate = t;
            }
        }

        t = candidate == null ? t : candidate;
        final String message = t.getMessage();
        if (message != null) {
            Matcher m = EMBEDDED_SYSERROR_PATTERN.matcher(message);
            if (m.find()) {
                String eCode = m.group(1);
                return new SystemException(ErrorCode.valueOf(eCode), e);
            }
        }
        return null;
    }

    /**
     * Returns the query results for a given result set id.
     *
     * @param request
     *            {@link QueryResultRequest} with result ID required
     * @return Either a {@link QueryResultResponse} if no error occurred |
     *         {@link org.apache.vxquery.rest.response.ErrorResponse} else.
     */
    public APIResponse getResult(QueryResultRequest request) {
        if (jobContexts.containsKey(request.getResultId())) {
            QueryResultResponse resultResponse = APIResponse.newQueryResultResponse(request.getRequestId());
            Date start = new Date();
            try {
                String results = readResults(jobContexts.get(request.getResultId()));
                resultResponse.setResults(results);
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Error occurred when reading results for id : " + request.getResultId());
                return APIResponse.newErrorResponse(request.getRequestId(), new Error(UNFORSEEN_PROBLEM,
                        "Error occurred when reading results for: " + request.getResultId()));
            }

            if (request.isShowMetrics()) {
                resultResponse.getMetrics().setElapsedTime(new Date().getTime() - start.getTime());
            }

            return resultResponse;
        } else {
            return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(NOT_FOUND)
                    .withMessage("No query found for result ID : " + request.getResultId()).build());
        }
    }

    /**
     * Reads results from hyracks given the {@link HyracksJobContext} containing
     * {@link ResultSetId} and {@link JobId} mapping.
     *
     * @param jobContext
     *            mapoing between the {@link ResultSetId} and corresponding hyracks
     *            {@link JobId}
     * @return Results of the given query
     * @throws Exception
     *             IOErrors and etc
     */
    private String readResults(HyracksJobContext jobContext) throws Exception {
        int nReaders = 1;

        if (hyracksDataset == null) {
            hyracksDataset = new HyracksDataset(hyracksClientConnection, jobContext.getFrameSize(), nReaders);
        }

        FrameManager resultDisplayFrameMgr = new FrameManager(jobContext.getFrameSize());
        IFrame frame = new VSizeFrame(resultDisplayFrameMgr);
        IHyracksDatasetReader reader = hyracksDataset.createReader(jobContext.getJobId(), jobContext.getResultSetId());
        OutputStream resultStream = new ByteArrayOutputStream();

        // This loop is required for XTests to reliably identify the error code of
        // SystemException.
        while (reader.getResultStatus().getState() == DatasetJobRecord.State.RUNNING) {
            Thread.sleep(100);
        }

        IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
        try (PrintWriter writer = new PrintWriter(resultStream, true)) {
            while (reader.read(frame) > 0) {
                writer.print(ResultUtils.getStringFromBuffer(frame.getBuffer(), frameTupleAccessor));
                writer.flush();
                frame.getBuffer().clear();
            }
        }

        hyracksClientConnection.waitForCompletion(jobContext.getJobId());
        LOGGER.log(Level.FINE, String.format("Result for resultId %d completed", jobContext.getResultSetId().getId()));
        return resultStream.toString();
    }

    /**
     * Create a unique result set id to get the correct query back from the cluster.
     *
     * @return Result Set id generated with current system time.
     */
    protected ResultSetId createResultSetId() {
        long resultSetId = atomicLong.incrementAndGet();
        LOGGER.log(Level.FINE, String.format("Creating result set with ID : %d", resultSetId));
        return new ResultSetId(resultSetId);
    }

    public synchronized void stop() {
        if (!State.STOPPED.equals(state)) {
            setState(State.STOPPING);
            LOGGER.log(Level.FINE, "Stooping VXQueryService");
            setState(State.STOPPED);
            LOGGER.log(Level.INFO, "VXQueryService stopped successfully");
        } else {
            LOGGER.log(Level.INFO, "VXQueryService is already in state : " + state);
        }
    }

    public State getState() {
        return state;
    }

    /**
     * A {@link XQueryCompilationListener} implementation to be used to add
     * AbstractSyntaxTree, RuntimePlan and etc to the {@link QueryResponse} if
     * requested by the user.
     */
    private class VXQueryCompilationListener implements XQueryCompilationListener {
        private QueryResponse response;
        private boolean showAbstractSyntaxTree;
        private boolean showTranslatedExpressionTree;
        private boolean showOptimizedExpressionTree;
        private boolean showRuntimePlan;

        public VXQueryCompilationListener(QueryResponse response, boolean showAbstractSyntaxTree,
                boolean showTranslatedExpressionTree, boolean showOptimizedExpressionTree, boolean showRuntimePlan) {
            this.response = response;
            this.showAbstractSyntaxTree = showAbstractSyntaxTree;
            this.showTranslatedExpressionTree = showTranslatedExpressionTree;
            this.showOptimizedExpressionTree = showOptimizedExpressionTree;
            this.showRuntimePlan = showRuntimePlan;
        }

        @Override
        public void notifyParseResult(ModuleNode moduleNode) {
            if (showAbstractSyntaxTree) {
                response.setAbstractSyntaxTree(new XStream(new DomDriver()).toXML(moduleNode));
            }
        }

        @Override
        public void notifyTranslationResult(Module module) {
            if (showTranslatedExpressionTree) {
                response.setTranslatedExpressionTree(appendPrettyPlan(new StringBuilder(), module).toString());
            }
        }

        @Override
        public void notifyTypecheckResult(Module module) {
        }

        @Override
        public void notifyCodegenResult(Module module) {
            if (showRuntimePlan) {
                JobSpecification jobSpec = module.getHyracksJobSpecification();
                try {
                    response.setRuntimePlan(jobSpec.toJSON().toString());
                } catch (IOException e) {
                    LOGGER.log(SEVERE,
                            "Error occurred when obtaining runtime plan from job specification : " + jobSpec.toString(),
                            e);
                }
            }
        }

        @Override
        public void notifyOptimizedResult(Module module) {
            if (showOptimizedExpressionTree) {
                response.setOptimizedExpressionTree(appendPrettyPlan(new StringBuilder(), module).toString());
            }
        }

        @SuppressWarnings("Duplicates")
        private StringBuilder appendPrettyPlan(StringBuilder sb, Module module) {
            try {
                ILogicalExpressionVisitor<String, Integer> ev =
                        new VXQueryLogicalExpressionPrettyPrintVisitor(module.getModuleContext());
                AlgebricksAppendable buffer = new AlgebricksAppendable();
                LogicalOperatorPrettyPrintVisitor v = new LogicalOperatorPrettyPrintVisitor(buffer, ev);
                PlanPrettyPrinter.printPlan(module.getBody(), v, 0);
                sb.append(buffer.toString());
            } catch (AlgebricksException e) {
                LOGGER.log(SEVERE, "Error occurred when pretty printing expression : " + e.getMessage());
            }
            return sb;
        }
    }
}
