/*
 * Copyright 2009-2012 by The Regents of the University of California
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * you may obtain a copy of the License from
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package edu.uci.ics.asterix.aql.translator;

import java.io.File;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import edu.uci.ics.asterix.api.common.APIFramework;
import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
import edu.uci.ics.asterix.api.common.Job;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.base.Statement.Kind;
import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
import edu.uci.ics.asterix.aql.expression.ControlFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
import edu.uci.ics.asterix.aql.expression.FeedDetailsDecl;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
import edu.uci.ics.asterix.aql.expression.InsertStatement;
import edu.uci.ics.asterix.aql.expression.InternalDetailsDecl;
import edu.uci.ics.asterix.aql.expression.LoadFromFileStatement;
import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.SetStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
import edu.uci.ics.asterix.aql.expression.WriteFromQueryResultStatement;
import edu.uci.ics.asterix.aql.expression.WriteStatement;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.FeedOperations;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.types.TypeSignature;
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionIDFactory;
import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledBeginFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDeleteStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledWriteFromQueryResultStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import edu.uci.ics.asterix.translator.TypeTranslator;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;

/*
 * Provides functionality for executing a batch of AQL statements (queries included)
 * sequentially.
 */
public class AqlTranslator extends AbstractAqlTranslator {

    private final List<Statement> aqlStatements;
    private final PrintWriter out;
    private final SessionConfig sessionConfig;
    private final DisplayFormat pdf;
    private Dataverse activeDefaultDataverse;
    private List<FunctionDecl> declaredFunctions;

    public AqlTranslator(List<Statement> aqlStatements, PrintWriter out, SessionConfig pc, DisplayFormat pdf)
            throws MetadataException, AsterixException {
        this.aqlStatements = aqlStatements;
        this.out = out;
        this.sessionConfig = pc;
        this.pdf = pdf;
        declaredFunctions = getDeclaredFunctions(aqlStatements);
    }

    private List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) {
        List<FunctionDecl> functionDecls = new ArrayList<FunctionDecl>();
        for (Statement st : statements) {
            if (st.getKind().equals(Statement.Kind.FUNCTION_DECL)) {
                functionDecls.add((FunctionDecl) st);
            }
        }
        return functionDecls;
    }

    /**
     * Compiles and submits for execution a list of AQL statements.
     * 
     * @param hcc
     *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
     * @param hdc
     *            A Hyracks dataset client object that is used to read the results.
     * @param asyncResults
     *            True if the results should be read asynchronously or false if we should wait for results to be read.
     * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
     * @throws Exception
     */
    public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, boolean asyncResults)
            throws Exception {
        int resultSetIdCounter = 0;
        List<QueryResult> executionResult = new ArrayList<QueryResult>();
        FileSplit outputFile = null;
        IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
        IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
        Map<String, String> config = new HashMap<String, String>();
        List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();

        for (Statement stmt : aqlStatements) {
            validateOperation(activeDefaultDataverse, stmt);
            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
            metadataProvider.setWriterFactory(writerFactory);
            metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
            metadataProvider.setOutputFile(outputFile);
            metadataProvider.setConfig(config);
            jobsToExecute.clear();
            try {
                switch (stmt.getKind()) {
                    case SET: {
                        handleSetStatement(metadataProvider, stmt, config, jobsToExecute);
                        break;
                    }
                    case DATAVERSE_DECL: {
                        activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }
                    case CREATE_DATAVERSE: {
                        handleCreateDataverseStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }
                    case DATASET_DECL: {
                        handleCreateDatasetStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case CREATE_INDEX: {
                        handleCreateIndexStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case TYPE_DECL: {
                        handleCreateTypeStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }
                    case NODEGROUP_DECL: {
                        handleCreateNodeGroupStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }
                    case DATAVERSE_DROP: {
                        handleDataverseDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case DATASET_DROP: {
                        handleDatasetDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case INDEX_DROP: {
                        handleIndexDropStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case TYPE_DROP: {
                        handleTypeDropStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }
                    case NODEGROUP_DROP: {
                        handleNodegroupDropStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }

                    case CREATE_FUNCTION: {
                        handleCreateFunctionStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }

                    case FUNCTION_DROP: {
                        handleFunctionDropStatement(metadataProvider, stmt, jobsToExecute);
                        break;
                    }

                    case LOAD_FROM_FILE: {
                        handleLoadFromFileStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case WRITE_FROM_QUERY_RESULT: {
                        handleWriteFromQueryResultStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case INSERT: {
                        handleInsertStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }
                    case DELETE: {
                        handleDeleteStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }

                    case BEGIN_FEED: {
                        handleBeginFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }

                    case CONTROL_FEED: {
                        handleControlFeedStatement(metadataProvider, stmt, hcc, jobsToExecute);
                        break;
                    }

                    case QUERY: {
                        metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                        executionResult.add(handleQuery(metadataProvider, (Query) stmt, hcc, jobsToExecute));
                        break;
                    }

                    case WRITE: {
                        Pair<IAWriterFactory, FileSplit> result = handleWriteStatement(metadataProvider, stmt,
                                jobsToExecute);
                        if (result.first != null) {
                            writerFactory = result.first;
                        }
                        outputFile = result.second;
                        break;
                    }

                }
                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
            } catch (Exception e) {
                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
                throw new AlgebricksException(e);
            }
            // Following jobs are run under a separate transaction, that is
            // committed/aborted by the JobEventListener
            for (JobSpecification jobspec : jobsToExecute) {
                JobId jobId = runJob(hcc, jobspec);
                if (stmt.getKind() == Kind.QUERY) {
                    JSONObject response = new JSONObject();

                    if (asyncResults) {
                        JSONArray handle = new JSONArray();
                        handle.put(jobId.getId());
                        handle.put(metadataProvider.getResultSetId().getId());
                        response.put("handle", handle);
                    } else {
                        ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
                        ResultReader resultReader = new ResultReader(hcc, hdc);
                        resultReader.open(jobId, metadataProvider.getResultSetId());
                        buffer.clear();
                        JSONArray results = new JSONArray();
                        while (resultReader.read(buffer) > 0) {
                            results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
                            buffer.clear();
                        }
                        response.put("results", results);
                    }
                    out.print(response);
                }
                hcc.waitForCompletion(jobId);
            }
        }
        return executionResult;
    }

    private void handleSetStatement(AqlMetadataProvider metadataProvider, Statement stmt, Map<String, String> config,
            List<JobSpecification> jobsToExecute) throws RemoteException, ACIDException {
        SetStatement ss = (SetStatement) stmt;
        String pname = ss.getPropName();
        String pvalue = ss.getPropValue();
        config.put(pname, pvalue);
    }

    private Pair<IAWriterFactory, FileSplit> handleWriteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws InstantiationException, IllegalAccessException,
            ClassNotFoundException {
        WriteStatement ws = (WriteStatement) stmt;
        File f = new File(ws.getFileName());
        FileSplit outputFile = new FileSplit(ws.getNcName().getValue(), new FileReference(f));
        IAWriterFactory writerFactory = null;
        if (ws.getWriterClassName() != null) {
            writerFactory = (IAWriterFactory) Class.forName(ws.getWriterClassName()).newInstance();
        }
        return new Pair<IAWriterFactory, FileSplit>(writerFactory, outputFile);
    }

    private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
        DataverseDecl dvd = (DataverseDecl) stmt;
        String dvName = dvd.getDataverseName().getValue();
        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
        if (dv == null) {
            throw new MetadataException("Unknown dataverse " + dvName);
        }
        return dv;
    }

    private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
            ACIDException {
        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
        String dvName = stmtCreateDataverse.getDataverseName().getValue();
        Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
        if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
            throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
        }
        MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
                stmtCreateDataverse.getFormat()));
    }

    private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
        DatasetDecl dd = (DatasetDecl) stmt;
        String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
                : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        String datasetName = dd.getName().getValue();
        DatasetType dsType = dd.getDatasetType();
        String itemTypeName = dd.getItemTypeName().getValue();

        IDatasetDetails datasetDetails = null;
        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                datasetName);
        if (ds != null) {
            if (dd.getIfNotExists()) {
                return;
            } else {
                throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
            }
        }
        Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
                itemTypeName);
        if (dt == null) {
            throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
        }
        switch (dd.getDatasetType()) {
            case INTERNAL: {
                IAType itemType = dt.getDatatype();
                if (itemType.getTypeTag() != ATypeTag.RECORD) {
                    throw new AlgebricksException("Can only partition ARecord's.");
                }
                List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
                        .getPartitioningExprs();
                String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
                datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
                break;
            }
            case EXTERNAL: {
                String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
                Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
                datasetDetails = new ExternalDatasetDetails(adapter, properties);
                break;
            }
            case FEED: {
                IAType itemType = dt.getDatatype();
                if (itemType.getTypeTag() != ATypeTag.RECORD) {
                    throw new AlgebricksException("Can only partition ARecord's.");
                }
                List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
                String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
                String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
                Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
                FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
                datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                        InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
                        adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
                break;
            }
        }
        MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
                datasetName, itemTypeName, datasetDetails, dd.getHints(), dsType));
        if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
                    dataverseName);
            JobId jobId = runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
            hcc.waitForCompletion(jobId);
        }
    }

    private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
        String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        String datasetName = stmtCreateIndex.getDatasetName().getValue();
        Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                datasetName);
        if (ds == null) {
            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
                    + dataverseName);
        }
        String indexName = stmtCreateIndex.getIndexName().getValue();
        Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                datasetName, indexName);
        if (idx != null) {
            if (!stmtCreateIndex.getIfNotExists()) {
                throw new AlgebricksException("An index with this name " + indexName + " already exists.");
            } else {
                stmtCreateIndex.setNeedToCreate(false);
            }
        } else {
            Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
                    stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
            runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);

            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                    index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
            JobSpecification loadIndexJobSpec = IndexOperations
                    .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
            JobId jobId = runJob(hcc, loadIndexJobSpec);
            hcc.waitForCompletion(jobId);
        }
    }

    private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
            MetadataException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        TypeDecl stmtCreateType = (TypeDecl) stmt;
        String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        String typeName = stmtCreateType.getIdent().getValue();
        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
        if (dv == null) {
            throw new AlgebricksException("Unknonw dataverse " + dataverseName);
        }
        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
        if (dt != null) {
            if (!stmtCreateType.getIfNotExists())
                throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
        } else {
            if (builtinTypeMap.get(typeName) != null) {
                throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
            } else {
                Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
                        dataverseName);
                TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
                IAType type = typeMap.get(typeSignature);
                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
            }
        }
    }

    private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
        String dvName = stmtDelete.getDataverseName().getValue();

        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
        if (dv == null) {
            if (!stmtDelete.getIfExists()) {
                throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
            }
        } else {
            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
            for (int j = 0; j < datasets.size(); j++) {
                String datasetName = datasets.get(j).getDatasetName();
                DatasetType dsType = datasets.get(j).getDatasetType();
                if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
                    List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
                    for (int k = 0; k < indexes.size(); k++) {
                        if (indexes.get(k).isSecondaryIndex()) {
                            compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
                                    metadataProvider);
                        }
                    }
                }
                compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
            }

            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
            if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
                activeDefaultDataverse = null;
            }
        }
    }

    private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        DropStatement stmtDelete = (DropStatement) stmt;
        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        String datasetName = stmtDelete.getDatasetName().getValue();
        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
        if (ds == null) {
            if (!stmtDelete.getIfExists())
                throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
                        + dataverseName + ".");
        } else {
            if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                for (int j = 0; j < indexes.size(); j++) {
                    if (indexes.get(j).isPrimaryIndex()) {
                        compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
                                metadataProvider);
                    }
                }
            }
            compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
        }
    }

    private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
        String datasetName = stmtIndexDrop.getDatasetName().getValue();
        String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
        if (ds == null)
            throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
                    + dataverseName);
        if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
            String indexName = stmtIndexDrop.getIndexName().getValue();
            Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
            if (idx == null) {
                if (!stmtIndexDrop.getIfExists())
                    throw new AlgebricksException("There is no index with this name " + indexName + ".");
            } else
                compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
        } else {
            throw new AlgebricksException(datasetName
                    + " is an external dataset. Indexes are not maintained for external datasets.");
        }
    }

    private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
            ACIDException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
        String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
        if (dataverseName == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        String typeName = stmtTypeDrop.getTypeName().getValue();
        Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
        if (dt == null) {
            if (!stmtTypeDrop.getIfExists())
                throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
        } else {
            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
        }
    }

    private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
            ACIDException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
        String nodegroupName = stmtDelete.getNodeGroupName().getValue();
        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
        if (ng == null) {
            if (!stmtDelete.getIfExists())
                throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
        } else {
            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
        }
    }

    private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
            ACIDException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
        String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
        if (dataverse == null) {
            throw new AlgebricksException(" dataverse not specified ");
        }
        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
        if (dv == null) {
            throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
        }
        Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
                .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
                Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
        MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
    }

    private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
            AlgebricksException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
        FunctionSignature signature = stmtDropFunction.getFunctionSignature();
        Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
        if (function == null) {
            if (!stmtDropFunction.getIfExists())
                throw new AlgebricksException("Unknonw function " + signature);
        } else {
            MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
        }
    }

    private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
        String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
        CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
                .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());

        IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
        Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
        jobsToExecute.add(job.getJobSpec());
        // Also load the dataset's secondary indexes.
        List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
                .getDatasetName().getValue());
        for (Index index : datasetIndexes) {
            if (!index.isSecondaryIndex()) {
                continue;
            }
            // Create CompiledCreateIndexStatement from metadata entity 'index'.
            CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                    index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
            jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
        }
    }

    private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        metadataProvider.setWriteTransaction(true);
        WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
        String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
        CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
                .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());

        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
        if (compiled != null) {
            jobsToExecute.add(compiled);
        }
    }

    private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        metadataProvider.setWriteTransaction(true);
        InsertStatement stmtInsert = (InsertStatement) stmt;
        String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
        CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
                .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());

        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
        if (compiled != null) {
            jobsToExecute.add(compiled);
        }
    }

    private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        metadataProvider.setWriteTransaction(true);
        DeleteStatement stmtDelete = (DeleteStatement) stmt;
        String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
        CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
                stmtDelete.getVarCounter(), metadataProvider);
        JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
        if (compiled != null) {
            jobsToExecute.add(compiled);
        }
    }

    private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
            ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException,
            ACIDException {

        // Query Rewriting (happens under the same ongoing metadata transaction)
        Pair<Query, Integer> reWrittenQuery = APIFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                sessionConfig, out, pdf);

        // Query Compilation (happens under the same ongoing metadata
        // transaction)
        if (metadataProvider.isWriteTransaction()) {
            metadataProvider.setJobTxnId(TransactionIDFactory.generateTransactionId());
        }
        JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
                reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);

        return spec;

    }

    private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        BeginFeedStatement bfs = (BeginFeedStatement) stmt;
        String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : bfs.getDataverseName().getValue();

        CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
                bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());

        Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                bfs.getDatasetName().getValue());
        if (dataset == null) {
            throw new AsterixException("Unknown dataset :" + bfs.getDatasetName().getValue());
        }
        IDatasetDetails datasetDetails = dataset.getDatasetDetails();
        if (datasetDetails.getDatasetType() != DatasetType.FEED) {
            throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
        }
        bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
        cbfs.setQuery(bfs.getQuery());

        JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
        if (compiled != null) {
            jobsToExecute.add(compiled);
        }
    }

    private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
        ControlFeedStatement cfs = (ControlFeedStatement) stmt;
        String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : cfs.getDataverseName().getValue();
        CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
                cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
        jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
    }

    private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
            List<JobSpecification> jobsToExecute) throws Exception {
        JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
        if (compiled != null) {
            GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
            jobsToExecute.add(compiled);
        }

        return new QueryResult(query, metadataProvider.getResultSetId());
    }

    private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
            AqlMetadataProvider metadataProvider) throws Exception {
        // TODO: Eventually CreateIndexStatement and
        // CompiledCreateIndexStatement should be replaced by the corresponding
        // metadata entity.
        // For now we must still convert to a CompiledCreateIndexStatement here.
        String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
                : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
        CompiledCreateIndexStatement createIndexStmt = new CompiledCreateIndexStatement(stmtCreateIndex.getIndexName()
                .getValue(), dataverseName, stmtCreateIndex.getDatasetName().getValue(),
                stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), stmtCreateIndex.getIndexType());
        JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(createIndexStmt, metadataProvider);
        if (spec == null) {
            throw new AsterixException("Failed to create job spec for creating index '"
                    + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
        }
        JobId jobId = runJob(hcc, spec);
        hcc.waitForCompletion(jobId);
    }

    private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
            List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
            ACIDException {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
        String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
        NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
        if (ng != null) {
            if (!stmtCreateNodegroup.getIfNotExists())
                throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
        } else {
            List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
            List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
            for (Identifier id : ncIdentifiers) {
                ncNames.add(id.getValue());
            }
            MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
        }
    }

    private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception {
        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, out, pdf);
        return jobIds[0];
    }

    private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
            String indexName, AqlMetadataProvider metadataProvider) throws Exception {
        CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
        JobId jobId = runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
        hcc.waitForCompletion(jobId);
        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                indexName);
    }

    private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
            AqlMetadataProvider metadataProvider) throws Exception {
        MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
        CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
        Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
        if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
            JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
            for (JobSpecification spec : jobSpecs) {
                JobId jobId = runJob(hcc, spec);
                hcc.waitForCompletion(jobId);
            }
        }
        MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
    }

    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, DisplayFormat pdf)
            throws Exception {
        JobId[] startedJobIds = new JobId[jobs.length];
        for (int i = 0; i < jobs.length; i++) {
            JobSpecification spec = jobs[i].getJobSpec();
            spec.setMaxReattempts(0);
            JobId jobId = hcc.startJob(spec);
            startedJobIds[i] = jobId;
        }
        return startedJobIds;
    }

    private static IDataFormat getDataFormat(MetadataTransactionContext mdTxnCtx, String dataverseName)
            throws AsterixException {
        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
        IDataFormat format;
        try {
            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
        } catch (Exception e) {
            throw new AsterixException(e);
        }
        return format;
    }

}
