| /* |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.compile; |
| |
| import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; |
| import static org.apache.phoenix.util.NumberUtil.add; |
| |
| import java.io.IOException; |
| import java.sql.ParameterMetaData; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.phoenix.cache.ServerCacheClient; |
| import org.apache.phoenix.cache.ServerCacheClient.ServerCache; |
| import org.apache.phoenix.compile.GroupByCompiler.GroupBy; |
| import org.apache.phoenix.compile.OrderByCompiler.OrderBy; |
| import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.execute.AggregatePlan; |
| import org.apache.phoenix.execute.MutationState; |
| import org.apache.phoenix.execute.MutationState.MultiRowMutationState; |
| import org.apache.phoenix.execute.MutationState.RowMutationState; |
| import org.apache.phoenix.filter.SkipScanFilter; |
| import org.apache.phoenix.hbase.index.ValueGetter; |
| import org.apache.phoenix.hbase.index.covered.update.ColumnReference; |
| import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; |
| import org.apache.phoenix.index.IndexMaintainer; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.iterate.ResultIterator; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixResultSet; |
| import org.apache.phoenix.jdbc.PhoenixStatement; |
| import org.apache.phoenix.jdbc.PhoenixStatement.Operation; |
| import org.apache.phoenix.optimize.QueryOptimizer; |
| import org.apache.phoenix.parse.AliasedNode; |
| import org.apache.phoenix.parse.DeleteStatement; |
| import org.apache.phoenix.parse.HintNode; |
| import org.apache.phoenix.parse.HintNode.Hint; |
| import org.apache.phoenix.parse.NamedTableNode; |
| import org.apache.phoenix.parse.ParseNode; |
| import org.apache.phoenix.parse.ParseNodeFactory; |
| import org.apache.phoenix.parse.SelectStatement; |
| import org.apache.phoenix.parse.TableName; |
| import org.apache.phoenix.query.ConnectionQueryServices; |
| import org.apache.phoenix.query.KeyRange; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.query.QueryServicesOptions; |
| import org.apache.phoenix.schema.DelegateColumn; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PName; |
| import org.apache.phoenix.schema.PRow; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTable.IndexType; |
| import org.apache.phoenix.schema.PTableImpl; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.ReadOnlyTableException; |
| import org.apache.phoenix.schema.SortOrder; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.tuple.Tuple; |
| import org.apache.phoenix.schema.types.PLong; |
| import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.ScanUtil; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.sun.istack.NotNull; |
| |
| public class DeleteCompiler { |
| private static ParseNodeFactory FACTORY = new ParseNodeFactory(); |
| |
| private final PhoenixStatement statement; |
| private final Operation operation; |
| |
| public DeleteCompiler(PhoenixStatement statement, Operation operation) { |
| this.statement = statement; |
| this.operation = operation; |
| } |
| |
| /** |
| * Handles client side deletion of rows for a DELETE statement. We determine the "best" plan to drive the query using |
| * our standard optimizer. The plan may be based on using an index, in which case we need to translate the index row |
| * key to get the data row key used to form the delete mutation. We always collect up the data table mutations, but we |
| * only collect and send the index mutations for global, immutable indexes. Local indexes and mutable indexes are always |
| * maintained on the server side. |
| * @param context StatementContext for the scan being executed |
| * @param iterator ResultIterator for the scan being executed |
| * @param bestPlan QueryPlan used to produce the iterator |
| * @param projectedTableRef TableRef containing all indexed and covered columns across all indexes on the data table |
| * @param otherTableRefs other TableRefs needed to be maintained apart from the one over which the scan is executing. |
| * Might be other index tables (if we're driving off of the data table table), the data table (if we're driving off of |
| * an index table), or a mix of the data table and additional index tables. |
| * @return MutationState representing the uncommitted data across the data table and indexes. Will be joined with the |
| * MutationState on the connection over which the delete is occurring. |
| * @throws SQLException |
| */ |
| private static MutationState deleteRows(StatementContext context, ResultIterator iterator, QueryPlan bestPlan, TableRef projectedTableRef, List<TableRef> otherTableRefs) throws SQLException { |
| RowProjector projector = bestPlan.getProjector(); |
| TableRef tableRef = bestPlan.getTableRef(); |
| PTable table = tableRef.getTable(); |
| PhoenixStatement statement = context.getStatement(); |
| PhoenixConnection connection = statement.getConnection(); |
| PName tenantId = connection.getTenantId(); |
| byte[] tenantIdBytes = null; |
| if (tenantId != null) { |
| tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null); |
| } |
| // we automatically flush the mutations when either auto commit is enabled, or |
| // the target table is transactional (in that case changes are not visible until we commit) |
| final boolean autoFlush = connection.getAutoCommit() || tableRef.getTable().isTransactional(); |
| ConnectionQueryServices services = connection.getQueryServices(); |
| final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); |
| final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); |
| final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); |
| MultiRowMutationState mutations = new MultiRowMutationState(batchSize); |
| List<MultiRowMutationState> otherMutations = null; |
| // If otherTableRefs is not empty, we're deleting the rows from both the index table and |
| // the data table through a single query to save executing an additional one (since we |
| // can always get the data table row key from an index row key). |
| if (!otherTableRefs.isEmpty()) { |
| otherMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| otherMutations.add(new MultiRowMutationState(batchSize)); |
| } |
| } |
| List<PColumn> pkColumns = table.getPKColumns(); |
| boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; |
| boolean isSharedViewIndex = table.getViewIndexId() != null; |
| int offset = (table.getBucketNum() == null ? 0 : 1); |
| byte[][] values = new byte[pkColumns.size()][]; |
| if (isSharedViewIndex) { |
| values[offset++] = table.getviewIndexIdType().toBytes(table.getViewIndexId()); |
| } |
| if (isMultiTenant) { |
| values[offset++] = tenantIdBytes; |
| } |
| try (final PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, context)) { |
| ValueGetter getter = null; |
| if (!otherTableRefs.isEmpty()) { |
| getter = new ValueGetter() { |
| final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); |
| final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(); |
| |
| @Override |
| public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { |
| Cell cell = rs.getCurrentRow().getValue(ref.getFamily(), ref.getQualifier()); |
| if (cell == null) { |
| return null; |
| } |
| valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); |
| return valuePtr; |
| } |
| |
| @Override |
| public byte[] getRowKey() { |
| rs.getCurrentRow().getKey(rowKeyPtr); |
| return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr); |
| } |
| }; |
| } |
| IndexMaintainer scannedIndexMaintainer = null; |
| IndexMaintainer[] maintainers = null; |
| PTable dataTable = table; |
| if (table.getType() == PTableType.INDEX) { |
| if (!otherTableRefs.isEmpty()) { |
| // The data table is always the last one in the list if it's |
| // not chosen as the best of the possible plans. |
| dataTable = otherTableRefs.get(otherTableRefs.size()-1).getTable(); |
| scannedIndexMaintainer = IndexMaintainer.create(dataTable, table, connection); |
| } |
| maintainers = new IndexMaintainer[otherTableRefs.size()]; |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side |
| // expressions are used instead of server-side ones. |
| PTable otherTable = otherTableRefs.get(i).getTable(); |
| if (otherTable.getType() == PTableType.INDEX) { |
| // In this case, we'll convert from index row -> data row -> other index row |
| maintainers[i] = IndexMaintainer.create(dataTable, otherTable, connection); |
| } else { |
| maintainers[i] = scannedIndexMaintainer; |
| } |
| } |
| } else if (!otherTableRefs.isEmpty()) { |
| dataTable = table; |
| maintainers = new IndexMaintainer[otherTableRefs.size()]; |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side |
| // expressions are used instead of server-side ones. |
| maintainers[i] = IndexMaintainer.create(projectedTableRef.getTable(), otherTableRefs.get(i).getTable(), connection); |
| } |
| |
| } |
| byte[][] viewConstants = IndexUtil.getViewConstants(dataTable); |
| int rowCount = 0; |
| while (rs.next()) { |
| ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map |
| rs.getCurrentRow().getKey(rowKeyPtr); |
| // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the |
| // row key will already have its value. |
| // Check for otherTableRefs being empty required when deleting directly from the index |
| if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) { |
| mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); |
| } |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| PTable otherTable = otherTableRefs.get(i).getTable(); |
| ImmutableBytesPtr otherRowKeyPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map |
| // Translate the data table row to the index table row |
| if (table.getType() == PTableType.INDEX) { |
| otherRowKeyPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants)); |
| if (otherTable.getType() == PTableType.INDEX) { |
| otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); |
| } |
| } else { |
| otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); |
| } |
| otherMutations.get(i).put(otherRowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); |
| } |
| if (mutations.size() > maxSize) { |
| throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); |
| } |
| rowCount++; |
| // Commit a batch if we are flushing automatically and we're at our batch size |
| if (autoFlush && rowCount % batchSize == 0) { |
| MutationState state = new MutationState(tableRef, mutations, 0, maxSize, maxSizeBytes, connection); |
| connection.getMutationState().join(state); |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection); |
| connection.getMutationState().join(indexState); |
| } |
| connection.getMutationState().send(); |
| mutations.clear(); |
| if (otherMutations != null) { |
| for (MultiRowMutationState multiRowMutationState : otherMutations) { |
| multiRowMutationState.clear(); |
| } |
| } |
| } |
| } |
| |
| // If auto flush is true, this last batch will be committed upon return |
| int nCommittedRows = autoFlush ? (rowCount / batchSize * batchSize) : 0; |
| MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection); |
| for (int i = 0; i < otherTableRefs.size(); i++) { |
| MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection); |
| state.join(indexState); |
| } |
| return state; |
| } |
| } |
| |
| private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory { |
| private QueryPlan queryPlan; |
| private List<TableRef> otherTableRefs; |
| private TableRef projectedTableRef; |
| |
| private DeletingParallelIteratorFactory(PhoenixConnection connection) { |
| super(connection); |
| } |
| |
| @Override |
| protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException { |
| PhoenixStatement statement = new PhoenixStatement(connection); |
| /* |
| * We don't want to collect any read metrics within the child context. This is because any read metrics that |
| * need to be captured are already getting collected in the parent statement context enclosed in the result |
| * iterator being used for reading rows out. |
| */ |
| StatementContext context = new StatementContext(statement, false); |
| MutationState state = deleteRows(context, iterator, queryPlan, projectedTableRef, otherTableRefs); |
| return state; |
| } |
| |
| public void setQueryPlan(QueryPlan queryPlan) { |
| this.queryPlan = queryPlan; |
| } |
| |
| public void setOtherTableRefs(List<TableRef> otherTableRefs) { |
| this.otherTableRefs = otherTableRefs; |
| } |
| |
| public void setProjectedTableRef(TableRef projectedTableRef) { |
| this.projectedTableRef = projectedTableRef; |
| } |
| } |
| |
| private List<PTable> getClientSideMaintainedIndexes(TableRef tableRef) { |
| PTable table = tableRef.getTable(); |
| if (!table.getIndexes().isEmpty()) { |
| List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size()); |
| for (PTable index : table.getIndexes()) { |
| if (index.getIndexState() != PIndexState.DISABLE && isMaintainedOnClient(index)) { |
| nonDisabledIndexes.add(index); |
| } |
| } |
| return nonDisabledIndexes; |
| } |
| return Collections.emptyList(); |
| } |
| |
| private class MultiRowDeleteMutationPlan implements MutationPlan { |
| private final List<MutationPlan> plans; |
| private final MutationPlan firstPlan; |
| private final QueryPlan dataPlan; |
| |
| public MultiRowDeleteMutationPlan(QueryPlan dataPlan, @NotNull List<MutationPlan> plans) { |
| Preconditions.checkArgument(!plans.isEmpty()); |
| this.plans = plans; |
| this.firstPlan = plans.get(0); |
| this.dataPlan = dataPlan; |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return firstPlan.getContext(); |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return firstPlan.getParameterMetaData(); |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| return firstPlan.getExplainPlan(); |
| } |
| |
| @Override |
| public MutationState execute() throws SQLException { |
| MutationState state = firstPlan.execute(); |
| statement.getConnection().getMutationState().join(state); |
| for (MutationPlan plan : plans.subList(1, plans.size())) { |
| statement.getConnection().getMutationState().join(plan.execute()); |
| } |
| return state; |
| } |
| |
| @Override |
| public TableRef getTargetRef() { |
| return firstPlan.getTargetRef(); |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| return firstPlan.getSourceRefs(); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return operation; |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| Long estRows = null; |
| for (MutationPlan plan : plans) { |
| /* |
| * If any of the plan doesn't have estimate information available, then we cannot |
| * provide estimate for the overall plan. |
| */ |
| if (plan.getEstimatedRowsToScan() == null) { |
| return null; |
| } |
| estRows = add(estRows, plan.getEstimatedRowsToScan()); |
| } |
| return estRows; |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| Long estBytes = null; |
| for (MutationPlan plan : plans) { |
| /* |
| * If any of the plan doesn't have estimate information available, then we cannot |
| * provide estimate for the overall plan. |
| */ |
| if (plan.getEstimatedBytesToScan() == null) { |
| return null; |
| } |
| estBytes = add(estBytes, plan.getEstimatedBytesToScan()); |
| } |
| return estBytes; |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| Long estInfoTimestamp = Long.MAX_VALUE; |
| for (MutationPlan plan : plans) { |
| Long timestamp = plan.getEstimateInfoTimestamp(); |
| /* |
| * If any of the plan doesn't have estimate information available, then we cannot |
| * provide estimate for the overall plan. |
| */ |
| if (timestamp == null) { |
| return timestamp; |
| } |
| estInfoTimestamp = Math.min(estInfoTimestamp, timestamp); |
| } |
| return estInfoTimestamp; |
| } |
| |
| @Override |
| public QueryPlan getQueryPlan() { |
| return dataPlan; |
| } |
| } |
| |
| public MutationPlan compile(DeleteStatement delete) throws SQLException { |
| final PhoenixConnection connection = statement.getConnection(); |
| final boolean isAutoCommit = connection.getAutoCommit(); |
| final boolean hasPostProcessing = delete.getLimit() != null; |
| final ConnectionQueryServices services = connection.getQueryServices(); |
| List<QueryPlan> queryPlans; |
| boolean allowServerMutations = |
| services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_DELETE_MUTATIONS, |
| QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_DELETE_MUTATIONS); |
| NamedTableNode tableNode = delete.getTable(); |
| String tableName = tableNode.getName().getTableName(); |
| String schemaName = tableNode.getName().getSchemaName(); |
| SelectStatement select = null; |
| ColumnResolver resolverToBe = null; |
| DeletingParallelIteratorFactory parallelIteratorFactoryToBe; |
| resolverToBe = FromCompiler.getResolverForMutation(delete, connection); |
| final TableRef targetTableRef = resolverToBe.getTables().get(0); |
| PTable table = targetTableRef.getTable(); |
| // Cannot update: |
| // - read-only VIEW |
| // - transactional table with a connection having an SCN |
| // TODO: SchemaUtil.isReadOnly(PTable, connection)? |
| if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { |
| throw new ReadOnlyTableException(schemaName,tableName); |
| } |
| else if (table.isTransactional() && connection.getSCN() != null) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) |
| .setTableName(tableName).build().buildException(); |
| } |
| |
| List<PTable> clientSideIndexes = getClientSideMaintainedIndexes(targetTableRef); |
| final boolean hasClientSideIndexes = !clientSideIndexes.isEmpty(); |
| |
| boolean isSalted = table.getBucketNum() != null; |
| boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant(); |
| boolean isSharedViewIndex = table.getViewIndexId() != null; |
| int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); |
| final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset; |
| int selectColumnCount = pkColumnCount; |
| for (PTable index : clientSideIndexes) { |
| selectColumnCount += index.getPKColumns().size() - pkColumnCount; |
| } |
| Set<PColumn> projectedColumns = new LinkedHashSet<PColumn>(selectColumnCount + pkColumnOffset); |
| List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(selectColumnCount); |
| for (int i = isSalted ? 1 : 0; i < pkColumnOffset; i++) { |
| PColumn column = table.getPKColumns().get(i); |
| projectedColumns.add(column); |
| } |
| for (int i = pkColumnOffset; i < table.getPKColumns().size(); i++) { |
| PColumn column = table.getPKColumns().get(i); |
| projectedColumns.add(column); |
| aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null))); |
| } |
| // Project all non PK indexed columns so that we can do the proper index maintenance |
| for (PTable index : table.getIndexes()) { |
| IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); |
| // Go through maintainer as it handles functional indexes correctly |
| for (Pair<String,String> columnInfo : maintainer.getIndexedColumnInfo()) { |
| String familyName = columnInfo.getFirst(); |
| if (familyName != null) { |
| String columnName = columnInfo.getSecond(); |
| boolean hasNoColumnFamilies = table.getColumnFamilies().isEmpty(); |
| PColumn column = hasNoColumnFamilies ? table.getColumnForColumnName(columnName) : table.getColumnFamily(familyName).getPColumnForColumnName(columnName); |
| if(!projectedColumns.contains(column)) { |
| projectedColumns.add(column); |
| aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(hasNoColumnFamilies ? null : TableName.create(null, familyName), '"' + columnName + '"', null))); |
| } |
| } |
| } |
| } |
| select = FACTORY.select(delete.getTable(), delete.getHint(), false, aliasedNodes, delete.getWhere(), |
| Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null, |
| delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), |
| delete.getUdfParseNodes()); |
| select = StatementNormalizer.normalize(select, resolverToBe); |
| |
| SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); |
| boolean hasPreProcessing = transformedSelect != select; |
| if (transformedSelect != select) { |
| resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName()); |
| select = StatementNormalizer.normalize(transformedSelect, resolverToBe); |
| } |
| final boolean hasPreOrPostProcessing = hasPreProcessing || hasPostProcessing; |
| boolean noQueryReqd = !hasPreOrPostProcessing; |
| // No limit and no sub queries, joins, etc in where clause |
| // Can't run on same server for transactional data, as we need the row keys for the data |
| // that is being upserted for conflict detection purposes. |
| // If we have immutable indexes, we'd increase the number of bytes scanned by executing |
| // separate queries against each index, so better to drive from a single table in that case. |
| boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasClientSideIndexes; |
| HintNode hint = delete.getHint(); |
| if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { |
| select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE)); |
| } |
| |
| parallelIteratorFactoryToBe = hasPreOrPostProcessing ? null : new DeletingParallelIteratorFactory(connection); |
| QueryOptimizer optimizer = new QueryOptimizer(services); |
| QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement)); |
| final QueryPlan dataPlan = compiler.compile(); |
| // TODO: the select clause should know that there's a sub query, but doesn't seem to currently |
| queryPlans = Lists.newArrayList(!clientSideIndexes.isEmpty() |
| ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe) |
| : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)); |
| // Filter out any local indexes that don't contain all indexed columns. |
| // We have to do this manually because local indexes are still used |
| // when referenced columns aren't in the index, so they won't be |
| // filtered by the optimizer. |
| queryPlans = new ArrayList<>(queryPlans); |
| Iterator<QueryPlan> iterator = queryPlans.iterator(); |
| while (iterator.hasNext()) { |
| QueryPlan plan = iterator.next(); |
| if (plan.getTableRef().getTable().getIndexType() == IndexType.LOCAL) { |
| if (!plan.getContext().getDataColumns().isEmpty()) { |
| iterator.remove(); |
| } |
| } |
| } |
| if (queryPlans.isEmpty()) { |
| queryPlans = Collections.singletonList(dataPlan); |
| } |
| |
| runOnServer &= queryPlans.get(0).getTableRef().getTable().getType() != PTableType.INDEX; |
| runOnServer &= allowServerMutations; |
| |
| // We need to have all indexed columns available in all immutable indexes in order |
| // to generate the delete markers from the query. We also cannot have any filters |
| // except for our SkipScanFilter for point lookups. |
| // A simple check of the non existence of a where clause in the parse node is not sufficient, as the where clause |
| // may have been optimized out. Instead, we check that there's a single SkipScanFilter |
| // If we can generate a plan for every index, that means all the required columns are available in every index, |
| // hence we can drive the delete from any of the plans. |
| noQueryReqd &= queryPlans.size() == 1 + clientSideIndexes.size(); |
| int queryPlanIndex = 0; |
| while (noQueryReqd && queryPlanIndex < queryPlans.size()) { |
| QueryPlan plan = queryPlans.get(queryPlanIndex++); |
| StatementContext context = plan.getContext(); |
| noQueryReqd &= (!context.getScan().hasFilter() |
| || context.getScan().getFilter() instanceof SkipScanFilter) |
| && context.getScanRanges().isPointLookup(); |
| } |
| |
| final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); |
| final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); |
| |
| // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all. |
| if (noQueryReqd) { |
| // Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows |
| // from the data table, while the others will be for deleting rows from immutable indexes. |
| List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size()); |
| for (final QueryPlan plan : queryPlans) { |
| mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes)); |
| } |
| return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans); |
| } else if (runOnServer) { |
| // TODO: better abstraction |
| final StatementContext context = dataPlan.getContext(); |
| Scan scan = context.getScan(); |
| scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); |
| |
| // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where> |
| // The coprocessor will delete each row returned from the scan |
| // Ignoring ORDER BY, since with auto commit on and no limit makes no difference |
| SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint()); |
| RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY); |
| context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); |
| if (dataPlan.getProjector().projectEveryRow()) { |
| projectorToBe = new RowProjector(projectorToBe,true); |
| } |
| final RowProjector projector = projectorToBe; |
| final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null, |
| OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, dataPlan); |
| return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes); |
| } else { |
| final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe; |
| List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size()); |
| final int offset = table.getBucketNum() == null ? 0 : 1; |
| Iterator<PColumn> projectedColsItr = projectedColumns.iterator(); |
| int i = 0; |
| while(projectedColsItr.hasNext()) { |
| final int position = i++; |
| adjustedProjectedColumns.add(new DelegateColumn(projectedColsItr.next()) { |
| @Override |
| public int getPosition() { |
| return position + offset; |
| } |
| }); |
| } |
| PTable projectedTable = PTableImpl.builderWithColumns(table, adjustedProjectedColumns) |
| .setType(PTableType.PROJECTED) |
| .build(); |
| final TableRef projectedTableRef = new TableRef(projectedTable, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()); |
| |
| QueryPlan bestPlanToBe = dataPlan; |
| for (QueryPlan plan : queryPlans) { |
| PTable planTable = plan.getTableRef().getTable(); |
| if (planTable.getIndexState() != PIndexState.BUILDING) { |
| bestPlanToBe = plan; |
| break; |
| } |
| } |
| final QueryPlan bestPlan = bestPlanToBe; |
| final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(clientSideIndexes.size()); |
| for (PTable index : clientSideIndexes) { |
| if (!bestPlan.getTableRef().getTable().equals(index)) { |
| otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp())); |
| } |
| } |
| |
| if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) { |
| otherTableRefs.add(projectedTableRef); |
| } |
| return new ClientSelectDeleteMutationPlan(targetTableRef, dataPlan, bestPlan, hasPreOrPostProcessing, |
| parallelIteratorFactory, otherTableRefs, projectedTableRef, maxSize, maxSizeBytes, connection); |
| } |
| } |
| |
| private class SingleRowDeleteMutationPlan implements MutationPlan { |
| |
| private final QueryPlan dataPlan; |
| private final PhoenixConnection connection; |
| private final int maxSize; |
| private final StatementContext context; |
| private final int maxSizeBytes; |
| |
| public SingleRowDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, int maxSize, int maxSizeBytes) { |
| this.dataPlan = dataPlan; |
| this.connection = connection; |
| this.maxSize = maxSize; |
| this.context = dataPlan.getContext(); |
| this.maxSizeBytes = maxSizeBytes; |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return context.getBindManager().getParameterMetaData(); |
| } |
| |
| @Override |
| public MutationState execute() throws SQLException { |
| // We have a point lookup, so we know we have a simple set of fully qualified |
| // keys for our ranges |
| ScanRanges ranges = context.getScanRanges(); |
| Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); |
| MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount()); |
| while (iterator.hasNext()) { |
| mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), |
| new RowMutationState(PRow.DELETE_MARKER, 0, |
| statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); |
| } |
| return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection); |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW")); |
| } |
| |
| @Override |
| public QueryPlan getQueryPlan() { |
| return dataPlan; |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public TableRef getTargetRef() { |
| return dataPlan.getTableRef(); |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| // Don't include the target |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return operation; |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| return 0l; |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| return 0l; |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| return 0l; |
| } |
| } |
| |
| private class ServerSelectDeleteMutationPlan implements MutationPlan { |
| private final StatementContext context; |
| private final QueryPlan dataPlan; |
| private final PhoenixConnection connection; |
| private final QueryPlan aggPlan; |
| private final RowProjector projector; |
| private final int maxSize; |
| private final int maxSizeBytes; |
| |
| public ServerSelectDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, QueryPlan aggPlan, |
| RowProjector projector, int maxSize, int maxSizeBytes) { |
| this.context = dataPlan.getContext(); |
| this.dataPlan = dataPlan; |
| this.connection = connection; |
| this.aggPlan = aggPlan; |
| this.projector = projector; |
| this.maxSize = maxSize; |
| this.maxSizeBytes = maxSizeBytes; |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return context.getBindManager().getParameterMetaData(); |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public TableRef getTargetRef() { |
| return dataPlan.getTableRef(); |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| return dataPlan.getSourceRefs(); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return operation; |
| } |
| |
| @Override |
| public MutationState execute() throws SQLException { |
| // TODO: share this block of code with UPSERT SELECT |
| ImmutableBytesWritable ptr = context.getTempPtr(); |
| PTable table = dataPlan.getTableRef().getTable(); |
| table.getIndexMaintainers(ptr, context.getConnection()); |
| byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; |
| ServerCache cache = null; |
| try { |
| if (ptr.getLength() > 0) { |
| byte[] uuidValue = ServerCacheClient.generateId(); |
| context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); |
| context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); |
| context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); |
| ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION); |
| } |
| ResultIterator iterator = aggPlan.iterator(); |
| try { |
| Tuple row = iterator.next(); |
| final long mutationCount = (Long) projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); |
| return new MutationState(maxSize, maxSizeBytes, connection) { |
| @Override |
| public long getUpdateCount() { |
| return mutationCount; |
| } |
| }; |
| } finally { |
| iterator.close(); |
| } |
| } finally { |
| if (cache != null) { |
| cache.close(); |
| } |
| } |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); |
| List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); |
| planSteps.add("DELETE ROWS"); |
| planSteps.addAll(queryPlanSteps); |
| return new ExplainPlan(planSteps); |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| return aggPlan.getEstimatedRowsToScan(); |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| return aggPlan.getEstimatedBytesToScan(); |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| return aggPlan.getEstimateInfoTimestamp(); |
| } |
| |
| @Override |
| public QueryPlan getQueryPlan() { |
| return aggPlan; |
| } |
| } |
| |
| private class ClientSelectDeleteMutationPlan implements MutationPlan { |
| private final StatementContext context; |
| private final TableRef targetTableRef; |
| private final QueryPlan dataPlan; |
| private final QueryPlan bestPlan; |
| private final boolean hasPreOrPostProcessing; |
| private final DeletingParallelIteratorFactory parallelIteratorFactory; |
| private final List<TableRef> otherTableRefs; |
| private final TableRef projectedTableRef; |
| private final int maxSize; |
| private final int maxSizeBytes; |
| private final PhoenixConnection connection; |
| |
| public ClientSelectDeleteMutationPlan(TableRef targetTableRef, QueryPlan dataPlan, QueryPlan bestPlan, |
| boolean hasPreOrPostProcessing, |
| DeletingParallelIteratorFactory parallelIteratorFactory, |
| List<TableRef> otherTableRefs, TableRef projectedTableRef, int maxSize, |
| int maxSizeBytes, PhoenixConnection connection) { |
| this.context = bestPlan.getContext(); |
| this.targetTableRef = targetTableRef; |
| this.dataPlan = dataPlan; |
| this.bestPlan = bestPlan; |
| this.hasPreOrPostProcessing = hasPreOrPostProcessing; |
| this.parallelIteratorFactory = parallelIteratorFactory; |
| this.otherTableRefs = otherTableRefs; |
| this.projectedTableRef = projectedTableRef; |
| this.maxSize = maxSize; |
| this.maxSizeBytes = maxSizeBytes; |
| this.connection = connection; |
| } |
| |
| @Override |
| public ParameterMetaData getParameterMetaData() { |
| return context.getBindManager().getParameterMetaData(); |
| } |
| |
| @Override |
| public StatementContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public TableRef getTargetRef() { |
| return targetTableRef; |
| } |
| |
| @Override |
| public Set<TableRef> getSourceRefs() { |
| return dataPlan.getSourceRefs(); |
| } |
| |
| @Override |
| public Operation getOperation() { |
| return operation; |
| } |
| |
| @Override |
| public MutationState execute() throws SQLException { |
| ResultIterator iterator = bestPlan.iterator(); |
| try { |
| // If we're not doing any pre or post processing, we can produce the delete mutations directly |
| // in the parallel threads executed for the scan |
| if (!hasPreOrPostProcessing) { |
| Tuple tuple; |
| long totalRowCount = 0; |
| if (parallelIteratorFactory != null) { |
| parallelIteratorFactory.setQueryPlan(bestPlan); |
| parallelIteratorFactory.setOtherTableRefs(otherTableRefs); |
| parallelIteratorFactory.setProjectedTableRef(projectedTableRef); |
| } |
| while ((tuple=iterator.next()) != null) {// Runs query |
| Cell kv = tuple.getValue(0); |
| totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); |
| } |
| // Return total number of rows that have been deleted from the table. In the case of auto commit being off |
| // the mutations will all be in the mutation state of the current connection. We need to divide by the |
| // total number of tables we updated as otherwise the client will get an inflated result. |
| int totalTablesUpdateClientSide = 1; // data table is always updated |
| PTable bestTable = bestPlan.getTableRef().getTable(); |
| // global immutable tables are also updated client side (but don't double count the data table) |
| if (bestPlan != dataPlan && isMaintainedOnClient(bestTable)) { |
| totalTablesUpdateClientSide++; |
| } |
| for (TableRef otherTableRef : otherTableRefs) { |
| PTable otherTable = otherTableRef.getTable(); |
| // Don't double count the data table here (which morphs when it becomes a projected table, hence this check) |
| if (projectedTableRef != otherTableRef && isMaintainedOnClient(otherTable)) { |
| totalTablesUpdateClientSide++; |
| } |
| } |
| MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount/totalTablesUpdateClientSide); |
| |
| // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. |
| state.setReadMetricQueue(context.getReadMetricsQueue()); |
| |
| return state; |
| } else { |
| // Otherwise, we have to execute the query and produce the delete mutations in the single thread |
| // producing the query results. |
| return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs); |
| } |
| } finally { |
| iterator.close(); |
| } |
| } |
| |
| @Override |
| public ExplainPlan getExplainPlan() throws SQLException { |
| List<String> queryPlanSteps = bestPlan.getExplainPlan().getPlanSteps(); |
| List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); |
| planSteps.add("DELETE ROWS"); |
| planSteps.addAll(queryPlanSteps); |
| return new ExplainPlan(planSteps); |
| } |
| |
| @Override |
| public Long getEstimatedRowsToScan() throws SQLException { |
| return bestPlan.getEstimatedRowsToScan(); |
| } |
| |
| @Override |
| public Long getEstimatedBytesToScan() throws SQLException { |
| return bestPlan.getEstimatedBytesToScan(); |
| } |
| |
| @Override |
| public Long getEstimateInfoTimestamp() throws SQLException { |
| return bestPlan.getEstimateInfoTimestamp(); |
| } |
| |
| @Override |
| public QueryPlan getQueryPlan() { |
| return bestPlan; |
| } |
| } |
| |
| private static boolean isMaintainedOnClient(PTable table) { |
| // Test for not being local (rather than being GLOBAL) so that this doesn't fail |
| // when tested with our projected table. |
| return (table.getIndexType() != IndexType.LOCAL && (table.isTransactional() || table.isImmutableRows())) || |
| (table.getIndexType() == IndexType.LOCAL && (table.isTransactional() && |
| table.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER) ) ); |
| } |
| |
| } |