/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.compile;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Lists.newArrayListWithCapacity;

import java.sql.ParameterMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.optimize.QueryOptimizer;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.BindParseNode;
import org.apache.phoenix.parse.ColumnName;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SequenceValueParseNode;
import org.apache.phoenix.parse.UpsertStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.DelegateColumn;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.TypeMismatchException;
import org.apache.phoenix.schema.UpsertColumnsValuesMismatchException;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

public class UpsertCompiler {

    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
            PTable table, MultiRowMutationState mutation,
            PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
            byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
        long columnValueSize = 0;
        Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
        byte[][] pkValues = new byte[table.getPKColumns().size()][];
        // If the table uses salting, the first byte is the salting byte, set to an empty array
        // here and we will fill in the byte later in PRowImpl.
        if (table.getBucketNum() != null) {
            pkValues[0] = new byte[] {0};
        }
        for(int i = 0; i < numSplColumns; i++) {
            pkValues[i + (table.getBucketNum() != null ? 1 : 0)] = values[i];
        }
        Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
        RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
        for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
            byte[] value = values[j];
            PColumn column = table.getColumns().get(columnIndexes[i]);
            if (SchemaUtil.isPKColumn(column)) {
                pkValues[pkSlotIndex[i]] = value;
                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
                    if (!useServerTimestamp) {
                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
                        if (rowTimestamp < 0) {
                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
                        }
                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
                    } 
                }
            } else {
                columnValues.put(column, value);
                columnValueSize += (column.getEstimatedSize() + value.length);
            }
        }
        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
        table.newKey(ptr, pkValues);
        if (table.getIndexType() == IndexType.LOCAL && maintainer != null) {
            byte[] rowKey = maintainer.buildDataRowKey(ptr, viewConstants);
            HRegionLocation region =
                    statement.getConnection().getQueryServices()
                            .getTableRegionLocation(table.getParentName().getBytes(), rowKey);
            byte[] regionPrefix =
                    region.getRegion().getStartKey().length == 0 ? new byte[region
                            .getRegion().getEndKey().length] : region.getRegion()
                            .getStartKey();
            if (regionPrefix.length != 0) {
                ptr.set(ScanRanges.prefixKey(ptr.get(), 0, ptr.getLength(), regionPrefix,
                    regionPrefix.length));
            }
        } 
        mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
    }
    
    public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
        PhoenixStatement statement = childContext.getStatement();
        PhoenixConnection connection = statement.getConnection();
        ConnectionQueryServices services = connection.getQueryServices();
        int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
                QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
        int maxSizeBytes =
                services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
                    QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
        int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
        // we automatically flush the mutations when either auto commit is enabled, or
        // the target table is transactional (in that case changes are not visible until we commit)
        final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional();
        int sizeOffset = 0;
        int numSplColumns =
                (tableRef.getTable().isMultiTenant() ? 1 : 0)
                        + (tableRef.getTable().getViewIndexId() != null ? 1 : 0);
        byte[][] values = new byte[columnIndexes.length + numSplColumns][];
        if(prefixSysColValues) {
            int i = 0;
            if(tableRef.getTable().isMultiTenant()) {
                values[i++] = connection.getTenantId().getBytes();
            }
            if(tableRef.getTable().getViewIndexId() != null) {
                values[i++] = PSmallint.INSTANCE.toBytes(tableRef.getTable().getViewIndexId());
            }
        }
        int rowCount = 0;
        MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
        PTable table = tableRef.getTable();
        IndexMaintainer indexMaintainer = null;
        byte[][] viewConstants = null;
        if (table.getIndexType() == IndexType.LOCAL) {
            PTable parentTable =
                    statement
                            .getConnection()
                            .getMetaDataCache()
                            .getTableRef(
                                new PTableKey(statement.getConnection().getTenantId(), table
                                        .getParentName().getString())).getTable();
            indexMaintainer = table.getIndexMaintainer(parentTable, connection);
            viewConstants = IndexUtil.getViewConstants(parentTable);
        }
        try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
            while (rs.next()) {
                for (int i = 0, j = numSplColumns; j < values.length; j++, i++) {
                    PColumn column = table.getColumns().get(columnIndexes[i]);
                    byte[] bytes = rs.getBytes(i + 1);
                    ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
                    Object value = rs.getObject(i + 1);
                    int rsPrecision = rs.getMetaData().getPrecision(i + 1);
                    Integer precision = rsPrecision == 0 ? null : rsPrecision;
                    int rsScale = rs.getMetaData().getScale(i + 1);
                    Integer scale = rsScale == 0 ? null : rsScale;
                    // We are guaranteed that the two column will have compatible types,
                    // as we checked that before.
                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), SortOrder.getDefault(), precision,
                            scale, column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
                            .buildException(); }
                    column.getDataType().coerceBytes(ptr, value, column.getDataType(), 
                            precision, scale, SortOrder.getDefault(), 
                            column.getMaxLength(), column.getScale(), column.getSortOrder(),
                            table.rowKeyOrderOptimizable());
                    values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                }
                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
                rowCount++;
                // Commit a batch if auto commit is true and we're at our batch size
                if (autoFlush && rowCount % batchSize == 0) {
                    MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
                    connection.getMutationState().join(state);
                    connection.getMutationState().send();
                    mutation.clear();
                }
            }

            if (autoFlush) {
                // If auto commit is true, this last batch will be committed upon return
                sizeOffset = rowCount / batchSize * batchSize;
            }
            return new MutationState(tableRef, mutation, sizeOffset, maxSize,
                    maxSizeBytes, connection);
        }
    }

    private static class UpsertingParallelIteratorFactory extends MutatingParallelIteratorFactory {
        private RowProjector projector;
        private int[] columnIndexes;
        private int[] pkSlotIndexes;
        private final TableRef tableRef;
        private final boolean useSeverTimestamp;

        private UpsertingParallelIteratorFactory (PhoenixConnection connection, TableRef tableRef, boolean useServerTimestamp) {
            super(connection);
            this.tableRef = tableRef;
            this.useSeverTimestamp = useServerTimestamp;
        }

        @Override
        protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
            if (parentContext.getSequenceManager().getSequenceCount() > 0) {
                throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced");
            }
            PhoenixStatement statement = new PhoenixStatement(connection);
            /*
             * We don't want to collect any read metrics within the child context. This is because any read metrics that
             * need to be captured are already getting collected in the parent statement context enclosed in the result
             * iterator being used for reading rows out.
             */
            StatementContext childContext = new StatementContext(statement, false);
            // Clone the row projector as it's not thread safe and would be used simultaneously by
            // multiple threads otherwise.
            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
            return state;
        }
        
        public void setRowProjector(RowProjector projector) {
            this.projector = projector;
        }
        public void setColumnIndexes(int[] columnIndexes) {
            this.columnIndexes = columnIndexes;
        }
        public void setPkSlotIndexes(int[] pkSlotIndexes) {
            this.pkSlotIndexes = pkSlotIndexes;
        }
    }
    
    private final PhoenixStatement statement;
    private final Operation operation;
    
    public UpsertCompiler(PhoenixStatement statement, Operation operation) {
        this.statement = statement;
        this.operation = operation;
    }
    
    private static LiteralParseNode getNodeForRowTimestampColumn(PColumn col) {
        PDataType type = col.getDataType();
        long dummyValue = 0L;
        if (type.isCoercibleTo(PTimestamp.INSTANCE)) {
            return new LiteralParseNode(new Timestamp(dummyValue), PTimestamp.INSTANCE);
        } else if (type == PLong.INSTANCE || type == PUnsignedLong.INSTANCE) {
            return new LiteralParseNode(dummyValue, PLong.INSTANCE);
        }
        throw new IllegalArgumentException();
    }
    
    public MutationPlan compile(UpsertStatement upsert) throws SQLException {
        final PhoenixConnection connection = statement.getConnection();
        ConnectionQueryServices services = connection.getQueryServices();
        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
        final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
        List<ColumnName> columnNodes = upsert.getColumns();
        TableRef tableRefToBe = null;
        PTable table = null;
        Set<PColumn> addViewColumnsToBe = Collections.emptySet();
        Set<PColumn> overlapViewColumnsToBe = Collections.emptySet();
        List<PColumn> allColumnsToBe = Collections.emptyList();
        boolean isTenantSpecific = false;
        boolean isSharedViewIndex = false;
        String tenantIdStr = null;
        ColumnResolver resolver = null;
        int[] columnIndexesToBe;
        int nColumnsToSet = 0;
        int[] pkSlotIndexesToBe;
        List<ParseNode> valueNodes = upsert.getValues();
        List<PColumn> targetColumns;
        NamedTableNode tableNode = upsert.getTable();
        String tableName = tableNode.getName().getTableName();
        String schemaName = tableNode.getName().getSchemaName();
        QueryPlan queryPlanToBe = null;
        int nValuesToSet;
        boolean sameTable = false;
        boolean runOnServer = false;
        boolean serverUpsertSelectEnabled =
                services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT,
                        QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT);
        boolean allowServerMutations =
                services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
                        QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_UPSERT_MUTATIONS);
        UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
        boolean useServerTimestampToBe = false;
        

        resolver = FromCompiler.getResolverForMutation(upsert, connection);
        tableRefToBe = resolver.getTables().get(0);
        table = tableRefToBe.getTable();
        // Cannot update:
        // - read-only VIEW
        // - transactional table with a connection having an SCN
        // - table with indexes and SCN set
        // - tables with ROW_TIMESTAMP columns
        if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
            throw new ReadOnlyTableException(schemaName,tableName);
        } else if (connection.isBuildingIndex() && table.getType() != PTableType.INDEX) {
            throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_INDEX_UPDATABLE_AT_SCN)
            .setSchemaName(schemaName)
            .setTableName(tableName)
            .build().buildException();
        } else if (table.isTransactional() && connection.getSCN() != null) {
            throw new SQLExceptionInfo.Builder(SQLExceptionCode
                    .CANNOT_SPECIFY_SCN_FOR_TXN_TABLE)
                    .setSchemaName(schemaName)
                    .setTableName(tableName).build().buildException();
        } else if (connection.getSCN() != null && !table.getIndexes().isEmpty()
                && !connection.isRunningUpgrade() && !connection.isBuildingIndex()) {
            throw new SQLExceptionInfo
                    .Builder(SQLExceptionCode
                    .CANNOT_UPSERT_WITH_SCN_FOR_TABLE_WITH_INDEXES)
                    .setSchemaName(schemaName)
                    .setTableName(tableName).build().buildException();
        } else if(connection.getSCN() != null && !connection.isRunningUpgrade()
                && !connection.isBuildingIndex() && table.getRowTimestampColPos() >= 0) {
            throw new SQLExceptionInfo
                    .Builder(SQLExceptionCode
                    .CANNOT_UPSERT_WITH_SCN_FOR_ROW_TIMESTAMP_COLUMN)
                    .setSchemaName(schemaName)
                    .setTableName(tableName).build().buildException();
        }
        boolean isSalted = table.getBucketNum() != null;
        isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
        isSharedViewIndex = table.getViewIndexId() != null;
        tenantIdStr = isTenantSpecific ? connection.getTenantId().getString() : null;
        int posOffset = isSalted ? 1 : 0;
        // Setup array of column indexes parallel to values that are going to be set
        allColumnsToBe = table.getColumns();

        nColumnsToSet = 0;
        if (table.getViewType() == ViewType.UPDATABLE) {
            addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size());
            for (PColumn column : allColumnsToBe) {
                if (column.getViewConstant() != null) {
                    addViewColumnsToBe.add(column);
                }
            }
        }
        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
        // Allow full row upsert if no columns or only dynamic ones are specified and values count match
        if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
            nColumnsToSet = allColumnsToBe.size() - posOffset;
            columnIndexesToBe = new int[nColumnsToSet];
            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
            int minPKPos = 0;
            if (isSharedViewIndex) {
                PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
                columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
                targetColumns.set(minPKPos, indexIdColumn);
                minPKPos++;
            }
            if (isTenantSpecific) {
                PColumn tenantColumn = table.getPKColumns().get(minPKPos);
                columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
                targetColumns.set(minPKPos, tenantColumn);
                minPKPos++;
            }
            for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) {
                PColumn column = allColumnsToBe.get(i);
                if (SchemaUtil.isPKColumn(column)) {
                    pkSlotIndexesToBe[i-posOffset] = j + posOffset;
                    if (j++ < minPKPos) { // Skip, as it's already been set above
                        continue;
                    }
                    minPKPos = 0;
                }
                columnIndexesToBe[i-posOffset+minPKPos] = i;
                targetColumns.set(i-posOffset+minPKPos, column);
            }
            if (!addViewColumnsToBe.isEmpty()) {
                // All view columns overlap in this case
                overlapViewColumnsToBe = addViewColumnsToBe;
                addViewColumnsToBe = Collections.emptySet();
            }
        } else {
            // Size for worse case
            int numColsInUpsert = columnNodes.size();
            nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) +  + (isSharedViewIndex ? 1 : 0);
            columnIndexesToBe = new int[nColumnsToSet];
            pkSlotIndexesToBe = new int[columnIndexesToBe.length];
            targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
            targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
            Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
            Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
            BitSet columnsBeingSet = new BitSet(table.getColumns().size());
            int i = 0;
            if (isSharedViewIndex) {
                PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
                columnsBeingSet.set(columnIndexesToBe[i] = indexIdColumn.getPosition());
                pkSlotIndexesToBe[i] = i + posOffset;
                targetColumns.set(i, indexIdColumn);
                i++;
            }
            // Add tenant column directly, as we don't want to resolve it as this will fail
            if (isTenantSpecific) {
                PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
                columnsBeingSet.set(columnIndexesToBe[i] = tenantColumn.getPosition());
                pkSlotIndexesToBe[i] = i + posOffset;
                targetColumns.set(i, tenantColumn);
                i++;
            }
            for (ColumnName colName : columnNodes) {
                ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
                PColumn column = ref.getColumn();
                if (IndexUtil.getViewConstantValue(column, ptr)) {
                    if (overlapViewColumnsToBe.isEmpty()) {
                        overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
                    }
                    nColumnsToSet--;
                    overlapViewColumnsToBe.add(column);
                    addViewColumnsToBe.remove(column);
                }
                columnsBeingSet.set(columnIndexesToBe[i] = ref.getColumnPosition());
                targetColumns.set(i, column);
                if (SchemaUtil.isPKColumn(column)) {
                    pkSlotIndexesToBe[i] = ref.getPKSlotPosition();
                }
                i++;
            }
            for (PColumn column : addViewColumnsToBe) {
                columnsBeingSet.set(columnIndexesToBe[i] = column.getPosition());
                targetColumns.set(i, column);
                if (SchemaUtil.isPKColumn(column)) {
                    pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column);
                }
                i++;
            }
            // If a table has rowtimestamp col, then we always set it.
            useServerTimestampToBe = table.getRowTimestampColPos() != -1 && !isRowTimestampSet(pkSlotIndexesToBe, table);
            if (useServerTimestampToBe) {
                PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
                // Need to resize columnIndexesToBe and pkSlotIndexesToBe to include this extra column.
                columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, columnIndexesToBe.length + 1);
                pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, pkSlotIndexesToBe.length + 1);
                columnsBeingSet.set(columnIndexesToBe[i] = rowTimestampCol.getPosition());
                pkSlotIndexesToBe[i] = table.getRowTimestampColPos();
                targetColumns.add(rowTimestampCol);
                if (valueNodes != null && !valueNodes.isEmpty()) {
                    valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol));
                }
                nColumnsToSet++;
            }
            for (i = posOffset; i < table.getColumns().size(); i++) {
                PColumn column = table.getColumns().get(i);
                if (!columnsBeingSet.get(i) && !column.isNullable() && column.getExpressionStr() == null) {
                    throw new ConstraintViolationException(SchemaUtil.getColumnDisplayName(column) + " may not be null");
                }
            }
        }
        boolean isAutoCommit = connection.getAutoCommit();
        if (valueNodes == null) {
            SelectStatement select = upsert.getSelect();
            assert(select != null);
            select = SubselectRewriter.flatten(select, connection);
            ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection, false, upsert.getTable().getName());
            select = StatementNormalizer.normalize(select, selectResolver);
            select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe, useServerTimestampToBe);
            SelectStatement transformedSelect = SubqueryRewriter.transform(select, selectResolver, connection);
            if (transformedSelect != select) {
                selectResolver = FromCompiler.getResolverForQuery(transformedSelect, connection, false, upsert.getTable().getName());
                select = StatementNormalizer.normalize(transformedSelect, selectResolver);
            }
            sameTable = !select.isJoin()
                && tableRefToBe.equals(selectResolver.getTables().get(0));
            /* We can run the upsert in a coprocessor if:
             * 1) from has only 1 table or server UPSERT SELECT is enabled
             * 2) the select query isn't doing aggregation (which requires a client-side final merge)
             * 3) autoCommit is on
             * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
             *    puts for index tables.
             * 5) no limit clause, as the limit clause requires client-side post processing
             * 6) no sequences, as sequences imply that the order of upsert must match the order of
             *    selection. TODO: change this and only force client side if there's a ORDER BY on the sequence value
             * Otherwise, run the query to pull the data from the server
             * and populate the MutationState (upto a limit).
            */
            if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
                // We can pipeline the upsert select instead of spooling everything to disk first,
                // if we don't have any post processing that's required.
                parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe);
                // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
                // so we might be able to run it entirely on the server side.
                // region space managed by region servers. So we bail out on executing on server side.
                // Disable running upsert select on server side if a table has global mutable secondary indexes on it
                boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows();
                boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery();
                runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit 
                        // We can run the upsert select for initial index population on server side for transactional
                        // tables since the writes do not need to be done transactionally, since we gate the index
                        // usage on successfully writing all data rows.
                        && (!table.isTransactional() || table.getType() == PTableType.INDEX)
                        && !(table.isImmutableRows() && !table.getIndexes().isEmpty())
                        && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1;
            }
            runOnServer &= allowServerMutations;
            // If we may be able to run on the server, add a hint that favors using the data table
            // if all else is equal.
            // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
            // as this would disallow running on the server. We currently use the row projector we
            // get back to figure this out.
            HintNode hint = upsert.getHint();
            if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
                hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
            }
            select = SelectStatement.create(select, hint);
            // Pass scan through if same table in upsert and select so that projection is computed correctly
            // Use optimizer to choose the best plan
            QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), true, false, null);
            queryPlanToBe = compiler.compile();

            if (sameTable) {
                // in the UPSERT INTO X ... SELECT FROM X case enforce the source tableRef's TS
                // as max TS, so that the query can safely restarted and still work of a snapshot
                // (so it won't see its own data in case of concurrent splits)
                // see PHOENIX-4849
                long serverTime = selectResolver.getTables().get(0).getCurrentTime();
                if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
                    // if this is the first time this table is resolved the ref's current time might not be defined, yet
                    // in that case force an RPC to get the server time
                    serverTime = new MetaDataClient(connection).getCurrentTime(schemaName, tableName);
                }
                Scan scan = queryPlanToBe.getContext().getScan();
                ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), serverTime);
            }
            // This is post-fix: if the tableRef is a projected table, this means there are post-processing
            // steps and parallelIteratorFactory did not take effect.
            if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) {
                parallelIteratorFactoryToBe = null;
            }
            nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
            // Cannot auto commit if doing aggregation or topN or salted
            // Salted causes problems because the row may end up living on a different region
        } else {
            nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
        }
        // Resize down to allow a subset of columns to be specifiable
        if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
            nColumnsToSet = nValuesToSet;
            columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
            pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
            for (int i = posOffset + nValuesToSet; i < table.getColumns().size(); i++) {
                PColumn column = table.getColumns().get(i);
                if (!column.isNullable() && column.getExpressionStr() == null) {
                    throw new ConstraintViolationException(SchemaUtil.getColumnDisplayName(column) + " may not be null");
                }
            }
        }
        
        if (nValuesToSet != nColumnsToSet) {
            // We might have added columns, so refresh cache and try again if stale.
            // We have logic to catch MetaNotFoundException and refresh cache  in PhoenixStatement
            // Note that this check is not really sufficient, as a column could have
            // been removed and the added back and we wouldn't detect that here.
            throw new UpsertColumnsValuesMismatchException(schemaName, tableName,
              "Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet);
        }
        final QueryPlan originalQueryPlan = queryPlanToBe;
        RowProjector projectorToBe = null;
        // Optimize only after all checks have been performed
        if (valueNodes == null) {
            queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
            projectorToBe = queryPlanToBe.getProjector();
        }
        final List<PColumn> allColumns = allColumnsToBe;
        final RowProjector projector = projectorToBe;
        final QueryPlan queryPlan = queryPlanToBe;
        final TableRef tableRef = tableRefToBe;
        final Set<PColumn> addViewColumns = addViewColumnsToBe;
        final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
        final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
        final int[] columnIndexes = columnIndexesToBe;
        final int[] pkSlotIndexes = pkSlotIndexesToBe;
        final boolean useServerTimestamp = useServerTimestampToBe;
        if (table.getRowTimestampColPos() == -1 && useServerTimestamp) {
            throw new IllegalStateException("For a table without row timestamp column, useServerTimestamp cannot be true");
        }
        // TODO: break this up into multiple functions
        ////////////////////////////////////////////////////////////////////
        // UPSERT SELECT
        /////////////////////////////////////////////////////////////////////
        if (valueNodes == null) {
            // Before we re-order, check that for updatable view columns
            // the projected expression either matches the column name or
            // is a constant with the same required value.
            throwIfNotUpdatable(tableRef, overlapViewColumnsToBe, targetColumns, projector, sameTable);
            
            ////////////////////////////////////////////////////////////////////
            // UPSERT SELECT run server-side (maybe)
            /////////////////////////////////////////////////////////////////////
            if (runOnServer) {
                // At most this array will grow bigger by the number of PK columns
                int[] allColumnsIndexes = Arrays.copyOf(columnIndexes, columnIndexes.length + nValuesToSet);
                int[] reverseColumnIndexes = new int[table.getColumns().size()];
                List<Expression> projectedExpressions = Lists.newArrayListWithExpectedSize(reverseColumnIndexes.length);
                Arrays.fill(reverseColumnIndexes, -1);
                for (int i =0; i < nValuesToSet; i++) {
                    projectedExpressions.add(projector.getColumnProjector(i).getExpression());
                    reverseColumnIndexes[columnIndexes[i]] = i;
                }
                /*
                 * Order projected columns and projected expressions with PK columns
                 * leading order by slot position
                 */
                int offset = table.getBucketNum() == null ? 0 : 1;
                for (int i = 0; i < table.getPKColumns().size() - offset; i++) {
                    PColumn column = table.getPKColumns().get(i + offset);
                    int pos = reverseColumnIndexes[column.getPosition()];
                    if (pos == -1) {
                        // Last PK column may be fixed width and nullable
                        // We don't want to insert a null expression b/c
                        // it's not valid to set a fixed width type to null.
                        if (column.getDataType().isFixedWidth()) {
                            continue;
                        }
                        // Add literal null for missing PK columns
                        pos = projectedExpressions.size();
                        Expression literalNull = LiteralExpression.newConstant(null, column.getDataType(), Determinism.ALWAYS);
                        projectedExpressions.add(literalNull);
                        allColumnsIndexes[pos] = column.getPosition();
                    }
                    // Swap select expression at pos with i
                    Collections.swap(projectedExpressions, i, pos);
                    // Swap column indexes and reverse column indexes too
                    int tempPos = allColumnsIndexes[i];
                    allColumnsIndexes[i] = allColumnsIndexes[pos];
                    allColumnsIndexes[pos] = tempPos;
                    reverseColumnIndexes[tempPos] = pos;
                    reverseColumnIndexes[i] = i;
                }
                // If any pk slots are changing and server side UPSERT SELECT is disabled, do not run on server
                if (!serverUpsertSelectEnabled && ExpressionUtil
                        .isPkPositionChanging(new TableRef(table), projectedExpressions)) {
                    runOnServer = false;
                }
                ////////////////////////////////////////////////////////////////////
                // UPSERT SELECT run server-side
                /////////////////////////////////////////////////////////////////////
                if (runOnServer) {
                    // Iterate through columns being projected
                    List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size());
                    int posOff = table.getBucketNum() != null ? 1 : 0;
                    for (int i = 0 ; i < projectedExpressions.size(); i++) {
                        // Must make new column if position has changed
                        PColumn column = allColumns.get(allColumnsIndexes[i]);
                        projectedColumns.add(column.getPosition() == i + posOff ? column : new PColumnImpl(column, i + posOff));
                    }
                    // Build table from projectedColumns
                    // Hack to add default column family to be used on server in case no value column is projected.
                    PTable projectedTable = PTableImpl.builderWithColumns(table, projectedColumns)
                            .setExcludedColumns(ImmutableList.of())
                            .setDefaultFamilyName(PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table)))
                            .build();
                    
                    SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
                    StatementContext statementContext = queryPlan.getContext();
                    RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
                            .EMPTY_GROUP_BY);
                    statementContext.getAggregationManager().compile(queryPlan.getContext()
                            ,GroupBy.EMPTY_GROUP_BY);
                    if (queryPlan.getProjector().projectEveryRow()) {
                        aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
                    }
                    final RowProjector aggProjector = aggProjectorToBe;

                    /*
                     * Transfer over PTable representing subset of columns selected, but all PK columns.
                     * Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones.
                     * Transfer over List<Expression> for projection.
                     * In region scan, evaluate expressions in order, collecting first n columns for PK and collection non PK in mutation Map
                     * Create the PRow and get the mutations, adding them to the batch
                     */
                    final StatementContext context = queryPlan.getContext();
                    final Scan scan = context.getScan();
                    scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
                    scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                    
                    // Ignore order by - it has no impact
                    final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan);
                    return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
                }
            }
            ////////////////////////////////////////////////////////////////////
            // UPSERT SELECT run client-side
            /////////////////////////////////////////////////////////////////////
            return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
        }

            
        ////////////////////////////////////////////////////////////////////
        // UPSERT VALUES
        /////////////////////////////////////////////////////////////////////
        final byte[][] values = new byte[nValuesToSet][];
        int nodeIndex = 0;
        if (isSharedViewIndex) {
            values[nodeIndex++] = table.getviewIndexIdType().toBytes(table.getViewIndexId());
        }
        if (isTenantSpecific) {
            PName tenantId = connection.getTenantId();
            values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, isSharedViewIndex);
        }
        
        final int nodeIndexOffset = nodeIndex;
        // Allocate array based on size of all columns in table,
        // since some values may not be set (if they're nullable).
        final StatementContext context = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
        UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context);
        final List<Expression> constantExpressions = Lists.newArrayListWithExpectedSize(valueNodes.size());
        // First build all the expressions, as with sequences we want to collect them all first
        // and initialize them in one batch
        for (ParseNode valueNode : valueNodes) {
            if (!valueNode.isStateless()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException();
            }
            PColumn column = allColumns.get(columnIndexes[nodeIndex]);
            expressionBuilder.setColumn(column);
            Expression expression = valueNode.accept(expressionBuilder);
            if (expression.getDataType() != null && !expression.getDataType().isCastableTo(column.getDataType())) {
                throw TypeMismatchException.newException(
                        expression.getDataType(), column.getDataType(), "expression: "
                                + expression.toString() + " in column " + column);
            }
            constantExpressions.add(expression);
            nodeIndex++;
        }
        byte[] onDupKeyBytesToBe = null;
        List<Pair<ColumnName,ParseNode>> onDupKeyPairs = upsert.getOnDupKeyPairs();
        if (onDupKeyPairs != null) {
            if (table.isImmutableRows()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
                .setSchemaName(table.getSchemaName().getString())
                .setTableName(table.getTableName().getString())
                .build().buildException();
            }
            if (table.isTransactional()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
                .setSchemaName(table.getSchemaName().getString())
                .setTableName(table.getTableName().getString())
                .build().buildException();
            }
            if (connection.getSCN() != null) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
                .setSchemaName(table.getSchemaName().getString())
                .setTableName(table.getTableName().getString())
                .build().buildException();
            }
            if (SchemaUtil.hasGlobalIndex(table)) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_WITH_GLOBAL_IDX)
                .setSchemaName(table.getSchemaName().getString())
                .setTableName(table.getTableName().getString())
                .build().buildException();
            }
            if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
                onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore();
            } else {                       // ON DUPLICATE KEY UPDATE;
                int position = table.getBucketNum() == null ? 0 : 1;
                UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
                int nColumns = onDupKeyPairs.size();
                List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns);
                LinkedHashSet<PColumn> updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
                updateColumns.add(new PColumnImpl(
                        table.getPKColumns().get(position).getName(), // Use first PK column name as we know it won't conflict with others
                        null, PVarbinary.INSTANCE, null, null, false, position, SortOrder.getDefault(), 0, null, false, null, false, false, null, table.getPKColumns().get(position).getTimestamp()));
                position++;
                for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
                    ColumnName colName = columnPair.getFirst();
                    PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
                    if (SchemaUtil.isPKColumn(updateColumn)) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY)
                        .setSchemaName(table.getSchemaName().getString())
                        .setTableName(table.getTableName().getString())
                        .setColumnName(updateColumn.getName().getString())
                        .build().buildException();
                    }
                    final int columnPosition = position++;
                    if (!updateColumns.add(new DelegateColumn(updateColumn) {
                        @Override
                        public int getPosition() {
                            return columnPosition;
                        }
                    })) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY)
                            .setSchemaName(table.getSchemaName().getString())
                            .setTableName(table.getTableName().getString())
                            .setColumnName(updateColumn.getName().getString())
                            .build().buildException();
                    };
                    ParseNode updateNode = columnPair.getSecond();
                    compiler.setColumn(updateColumn);
                    Expression updateExpression = updateNode.accept(compiler);
                    // Check that updateExpression is coercible to updateColumn
                    if (updateExpression.getDataType() != null && !updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) {
                        throw TypeMismatchException.newException(
                                updateExpression.getDataType(), updateColumn.getDataType(), "expression: "
                                        + updateExpression.toString() + " for column " + updateColumn);
                    }
                    if (compiler.isAggregate()) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY)
                            .setSchemaName(table.getSchemaName().getString())
                            .setTableName(table.getTableName().getString())
                            .setColumnName(updateColumn.getName().getString())
                            .build().buildException();
                    }
                    updateExpressions.add(updateExpression);
                }
                PTable onDupKeyTable = PTableImpl.builderWithColumns(table, updateColumns)
                        .build();
                onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions);
            }
        }
        final byte[] onDupKeyBytes = onDupKeyBytesToBe;
        
        return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions,
                allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
                connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
    }
    
    private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
        checkArgument(table.getRowTimestampColPos() != -1, "Call this method only for tables with row timestamp column");
        int rowTimestampColPKSlot = table.getRowTimestampColPos();
        for (int pkSlot : pkSlotIndexes) {
            if (pkSlot == rowTimestampColPKSlot) {
                return true;
            }
        }
        return false;
    }
    
    private static class UpdateColumnCompiler extends ExpressionCompiler {
        private PColumn column;
        
        private UpdateColumnCompiler(StatementContext context) {
            super(context);
        }

        public void setColumn(PColumn column) {
            this.column = column;
        }
        
        @Override
        public Expression visit(BindParseNode node) throws SQLException {
            if (isTopLevel()) {
                context.getBindManager().addParamMetaData(node, column);
                Object value = context.getBindManager().getBindValue(node);
                return LiteralExpression.newConstant(value, column.getDataType(), column.getSortOrder(), Determinism.ALWAYS);
            }
            return super.visit(node);
        }    
        
        @Override
        public Expression visit(LiteralParseNode node) throws SQLException {
            if (isTopLevel()) {
                return LiteralExpression.newConstant(node.getValue(), column.getDataType(), column.getSortOrder(), Determinism.ALWAYS);
            }
            return super.visit(node);
        }
    }
    
    private static class UpsertValuesCompiler extends UpdateColumnCompiler {
        private UpsertValuesCompiler(StatementContext context) {
            super(context);
        }
        
        @Override
        public Expression visit(SequenceValueParseNode node) throws SQLException {
            return context.getSequenceManager().newSequenceReference(node);
        }
    }
    

    private static SelectStatement prependTenantAndViewConstants(PTable table, SelectStatement select, String tenantId, Set<PColumn> addViewColumns, boolean useServerTimestamp) {
        if ((!table.isMultiTenant() || tenantId == null) && table.getViewIndexId() == null && addViewColumns.isEmpty() && !useServerTimestamp) {
            return select;
        }
        List<AliasedNode> selectNodes = newArrayListWithCapacity(select.getSelect().size() + 1 + addViewColumns.size());
        if (table.getViewIndexId() != null) {
            selectNodes.add(new AliasedNode(null, new LiteralParseNode(table.getViewIndexId())));
        }
        if (table.isMultiTenant() && tenantId != null) {
            selectNodes.add(new AliasedNode(null, new LiteralParseNode(tenantId)));
        }
        selectNodes.addAll(select.getSelect());
        for (PColumn column : addViewColumns) {
            byte[] byteValue = column.getViewConstant();
            Object value = column.getDataType().toObject(byteValue, 0, byteValue.length-1);
            selectNodes.add(new AliasedNode(null, new LiteralParseNode(value)));
        }
        if (useServerTimestamp) {
            PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
            selectNodes.add(new AliasedNode(null, getNodeForRowTimestampColumn(rowTimestampCol)));
        }
        return SelectStatement.create(select, selectNodes);
    }
    
    /**
     * Check that none of no columns in our updatable VIEW are changing values.
     * @param tableRef
     * @param overlapViewColumns
     * @param targetColumns
     * @param projector
     * @throws SQLException
     */
    private static void throwIfNotUpdatable(TableRef tableRef, Set<PColumn> overlapViewColumns,
            List<PColumn> targetColumns, RowProjector projector, boolean sameTable) throws SQLException {
        PTable table = tableRef.getTable();
        if (table.getViewType() == ViewType.UPDATABLE && !overlapViewColumns.isEmpty()) {
            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
            for (int i = 0; i < targetColumns.size(); i++) {
                PColumn targetColumn = targetColumns.get(i);
                if (overlapViewColumns.contains(targetColumn)) {
                    Expression source = projector.getColumnProjector(i).getExpression();
                    if (source.isStateless()) {
                        source.evaluate(null, ptr);
                        if (Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), targetColumn.getViewConstant(), 0, targetColumn.getViewConstant().length-1) == 0) {
                            continue;
                        }
                    }
                    throw new SQLExceptionInfo.Builder(
                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
                            .setColumnName(targetColumn.getName().getString())
                            .build().buildException();
                }
            }
        }
    }

    private class ServerUpsertSelectMutationPlan implements MutationPlan {
        private final QueryPlan queryPlan;
        private final TableRef tableRef;
        private final QueryPlan originalQueryPlan;
        private final StatementContext context;
        private final PhoenixConnection connection;
        private final Scan scan;
        private final QueryPlan aggPlan;
        private final RowProjector aggProjector;
        private final int maxSize;
        private final int maxSizeBytes;

        public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan,
                                              StatementContext context, PhoenixConnection connection,
                                              Scan scan, QueryPlan aggPlan, RowProjector aggProjector,
                                              int maxSize, int maxSizeBytes) {
            this.queryPlan = queryPlan;
            this.tableRef = tableRef;
            this.originalQueryPlan = originalQueryPlan;
            this.context = context;
            this.connection = connection;
            this.scan = scan;
            this.aggPlan = aggPlan;
            this.aggProjector = aggProjector;
            this.maxSize = maxSize;
            this.maxSizeBytes = maxSizeBytes;
        }

        @Override
        public ParameterMetaData getParameterMetaData() {
            return queryPlan.getContext().getBindManager().getParameterMetaData();
        }

        @Override
        public StatementContext getContext() {
            return queryPlan.getContext();
        }

        @Override
        public TableRef getTargetRef() {
            return tableRef;
        }

        @Override
        public QueryPlan getQueryPlan() {
            return aggPlan;
        }

        @Override
        public Set<TableRef> getSourceRefs() {
            return originalQueryPlan.getSourceRefs();
        }

        @Override
        public Operation getOperation() {
          return operation;
        }

        @Override
        public MutationState execute() throws SQLException {
            ImmutableBytesWritable ptr = context.getTempPtr();
            PTable table = tableRef.getTable();
            table.getIndexMaintainers(ptr, context.getConnection());
            byte[] txState = table.isTransactional() ?
                    connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;

            ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
            if (aggPlan.getTableRef().getTable().isTransactional() 
                    || (table.getType() == PTableType.INDEX && table.isTransactional())) {
                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
            }
            if (ptr.getLength() > 0) {
                byte[] uuidValue = ServerCacheClient.generateId();
                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
            }
            ResultIterator iterator = aggPlan.iterator();
            try {
                Tuple row = iterator.next();
                final long mutationCount = (Long) aggProjector.getColumnProjector(0).getValue(row,
                        PLong.INSTANCE, ptr);
                return new MutationState(maxSize, maxSizeBytes, connection) {
                    @Override
                    public long getUpdateCount() {
                        return mutationCount;
                    }
                };
            } finally {
                iterator.close();
            }

        }

        @Override
        public ExplainPlan getExplainPlan() throws SQLException {
            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
            planSteps.add("UPSERT ROWS");
            planSteps.addAll(queryPlanSteps);
            return new ExplainPlan(planSteps);
        }

        @Override
        public Long getEstimatedRowsToScan() throws SQLException {
            return aggPlan.getEstimatedRowsToScan();
        }

        @Override
        public Long getEstimatedBytesToScan() throws SQLException {
            return aggPlan.getEstimatedBytesToScan();
        }

        @Override
        public Long getEstimateInfoTimestamp() throws SQLException {
            return aggPlan.getEstimateInfoTimestamp();
        }
    }

    private class UpsertValuesMutationPlan implements MutationPlan {
        private final StatementContext context;
        private final TableRef tableRef;
        private final int nodeIndexOffset;
        private final List<Expression> constantExpressions;
        private final List<PColumn> allColumns;
        private final int[] columnIndexes;
        private final Set<PColumn> overlapViewColumns;
        private final byte[][] values;
        private final Set<PColumn> addViewColumns;
        private final PhoenixConnection connection;
        private final int[] pkSlotIndexes;
        private final boolean useServerTimestamp;
        private final byte[] onDupKeyBytes;
        private final int maxSize;
        private final int maxSizeBytes;

        public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, int nodeIndexOffset,
                                        List<Expression> constantExpressions, List<PColumn> allColumns,
                                        int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
                                        Set<PColumn> addViewColumns, PhoenixConnection connection,
                                        int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes,
                                        int maxSize, int maxSizeBytes) {
            this.context = context;
            this.tableRef = tableRef;
            this.nodeIndexOffset = nodeIndexOffset;
            this.constantExpressions = constantExpressions;
            this.allColumns = allColumns;
            this.columnIndexes = columnIndexes;
            this.overlapViewColumns = overlapViewColumns;
            this.values = values;
            this.addViewColumns = addViewColumns;
            this.connection = connection;
            this.pkSlotIndexes = pkSlotIndexes;
            this.useServerTimestamp = useServerTimestamp;
            this.onDupKeyBytes = onDupKeyBytes;
            this.maxSize = maxSize;
            this.maxSizeBytes = maxSizeBytes;
        }

        @Override
        public ParameterMetaData getParameterMetaData() {
            return context.getBindManager().getParameterMetaData();
        }

        @Override
        public StatementContext getContext() {
            return context;
        }

        @Override
        public TableRef getTargetRef() {
            return tableRef;
        }

        @Override
        public QueryPlan getQueryPlan() {
            return null;
        }

        @Override
        public Set<TableRef> getSourceRefs() {
            return Collections.emptySet();
        }

        @Override
        public Operation getOperation() {
          return operation;
        }

        @Override
        public MutationState execute() throws SQLException {
            ImmutableBytesWritable ptr = context.getTempPtr();
            final SequenceManager sequenceManager = context.getSequenceManager();
            // Next evaluate all the expressions
            int nodeIndex = nodeIndexOffset;
            PTable table = tableRef.getTable();
            Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
                sequenceManager.newSequenceTuple(null);
            for (Expression constantExpression : constantExpressions) {
                PColumn column = allColumns.get(columnIndexes[nodeIndex]);
                constantExpression.evaluate(tuple, ptr);
                Object value = null;
                if (constantExpression.getDataType() != null) {
                    value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(),
                            constantExpression.getMaxLength(), constantExpression.getScale());
                    if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
                        throw TypeMismatchException.newException(
                            constantExpression.getDataType(), column.getDataType(), "expression: "
                                    + constantExpression.toString() + " in column " + column);
                    }
                    if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
                            constantExpression.getSortOrder(), constantExpression.getMaxLength(),
                            constantExpression.getScale(), column.getMaxLength(), column.getScale())) {
                        throw new SQLExceptionInfo.Builder(
                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
                            .setMessage("value=" + constantExpression.toString()).build().buildException();
                    }
                }
                column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(),
                        constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
                        column.getMaxLength(), column.getScale(),column.getSortOrder(),
                        table.rowKeyOrderOptimizable());
                if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
                    throw new SQLExceptionInfo.Builder(
                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
                            .setColumnName(column.getName().getString())
                            .setMessage("value=" + constantExpression.toString()).build().buildException();
                }
                values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                nodeIndex++;
            }
            // Add columns based on view
            for (PColumn column : addViewColumns) {
                if (IndexUtil.getViewConstantValue(column, ptr)) {
                    values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                } else {
                    throw new IllegalStateException();
                }
            }
            MultiRowMutationState mutation = new MultiRowMutationState(1);
            IndexMaintainer indexMaintainer = null;
            byte[][] viewConstants = null;
            if (table.getIndexType() == IndexType.LOCAL) {
                PTable parentTable =
                        statement
                                .getConnection()
                                .getMetaDataCache()
                                .getTableRef(
                                    new PTableKey(statement.getConnection().getTenantId(),
                                            table.getParentName().getString())).getTable();
                indexMaintainer = table.getIndexMaintainer(parentTable, connection);
                viewConstants = IndexUtil.getViewConstants(parentTable);
            }
            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
            return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
        }

        @Override
        public ExplainPlan getExplainPlan() throws SQLException {
            List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
            if (context.getSequenceManager().getSequenceCount() > 0) {
                planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
            }
            planSteps.add("PUT SINGLE ROW");
            return new ExplainPlan(planSteps);
        }

        @Override
        public Long getEstimatedRowsToScan() throws SQLException {
            return 0l;
        }

        @Override
        public Long getEstimatedBytesToScan() throws SQLException {
            return 0l;
        }

        @Override
        public Long getEstimateInfoTimestamp() throws SQLException {
            return 0l;
        }
    }

    private class ClientUpsertSelectMutationPlan implements MutationPlan {
        private final QueryPlan queryPlan;
        private final TableRef tableRef;
        private final QueryPlan originalQueryPlan;
        private final UpsertingParallelIteratorFactory parallelIteratorFactory;
        private final RowProjector projector;
        private final int[] columnIndexes;
        private final int[] pkSlotIndexes;
        private final boolean useServerTimestamp;
        private final int maxSize;
        private final int maxSizeBytes;

        public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
            this.queryPlan = queryPlan;
            this.tableRef = tableRef;
            this.originalQueryPlan = originalQueryPlan;
            this.parallelIteratorFactory = parallelIteratorFactory;
            this.projector = projector;
            this.columnIndexes = columnIndexes;
            this.pkSlotIndexes = pkSlotIndexes;
            this.useServerTimestamp = useServerTimestamp;
            this.maxSize = maxSize;
            this.maxSizeBytes = maxSizeBytes;
            queryPlan.getContext().setClientSideUpsertSelect(true);
        }

        @Override
        public ParameterMetaData getParameterMetaData() {
            return queryPlan.getContext().getBindManager().getParameterMetaData();
        }

        @Override
        public StatementContext getContext() {
            return queryPlan.getContext();
        }

        @Override
        public TableRef getTargetRef() {
            return tableRef;
        }

        @Override
        public QueryPlan getQueryPlan() {
            return queryPlan;
        }

        @Override
        public Set<TableRef> getSourceRefs() {
            return originalQueryPlan.getSourceRefs();
        }

        @Override
        public Operation getOperation() {
          return operation;
        }

        @Override
        public MutationState execute() throws SQLException {
            ResultIterator iterator = queryPlan.iterator();
            if (parallelIteratorFactory == null) {
                return upsertSelect(new StatementContext(statement, queryPlan.getContext().getScan()), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
            }
            try {
                parallelIteratorFactory.setRowProjector(projector);
                parallelIteratorFactory.setColumnIndexes(columnIndexes);
                parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
                Tuple tuple;
                long totalRowCount = 0;
                StatementContext context = queryPlan.getContext();
                while ((tuple=iterator.next()) != null) {// Runs query
                    Cell kv = tuple.getValue(0);
                    totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
                }
                // Return total number of rows that have been updated. In the case of auto commit being off
                // the mutations will all be in the mutation state of the current connection.
                MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
                /*
                 *  All the metrics collected for measuring the reads done by the parallel mutating iterators
                 *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
                 *  returned mutation state so they can be published on commit.
                 */
                mutationState.setReadMetricQueue(context.getReadMetricsQueue());
                return mutationState;
            } finally {
                iterator.close();
            }
        }

        @Override
        public ExplainPlan getExplainPlan() throws SQLException {
            List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
            planSteps.add("UPSERT SELECT");
            planSteps.addAll(queryPlanSteps);
            return new ExplainPlan(planSteps);
        }

        @Override
        public Long getEstimatedRowsToScan() throws SQLException {
            return queryPlan.getEstimatedRowsToScan();
        }

        @Override
        public Long getEstimatedBytesToScan() throws SQLException {
            return queryPlan.getEstimatedBytesToScan();
        }

        @Override
        public Long getEstimateInfoTimestamp() throws SQLException {
            return queryPlan.getEstimateInfoTimestamp();
        }
    }
}