blob: 8e9e1de129be4707092b1b05dd7c427f2eb88275 [file] [log] [blame]
/*
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.compile;
import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.optimize.QueryOptimizer;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.sun.istack.NotNull;
public class DeleteCompiler {
private static ParseNodeFactory FACTORY = new ParseNodeFactory();
private final PhoenixStatement statement;
private final Operation operation;
public DeleteCompiler(PhoenixStatement statement, Operation operation) {
this.statement = statement;
this.operation = operation;
}
private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
PTable table = targetTableRef.getTable();
PhoenixStatement statement = childContext.getStatement();
PhoenixConnection connection = statement.getConnection();
PName tenantId = connection.getTenantId();
byte[] tenantIdBytes = null;
if (tenantId != null) {
tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId);
}
final boolean isAutoCommit = connection.getAutoCommit();
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one.
if (indexTableRef != null) {
indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
}
List<PColumn> pkColumns = table.getPKColumns();
boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
boolean isSharedViewIndex = table.getViewIndexId() != null;
int offset = (table.getBucketNum() == null ? 0 : 1);
byte[][] values = new byte[pkColumns.size()][];
if (isMultiTenant) {
values[offset++] = tenantIdBytes;
}
if (isSharedViewIndex) {
values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
}
try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
int rowCount = 0;
while (rs.next()) {
ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
// Use tuple directly, as projector would not have all the PK columns from
// our index table inside of our projection. Since the tables are equal,
// there's no transation required.
if (sourceTableRef.equals(targetTableRef)) {
rs.getCurrentRow().getKey(ptr);
} else {
for (int i = offset; i < values.length; i++) {
byte[] byteValue = rs.getBytes(i+1-offset);
// The ResultSet.getBytes() call will have inverted it - we need to invert it back.
// TODO: consider going under the hood and just getting the bytes
if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) {
byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length);
}
values[i] = byteValue;
}
table.newKey(ptr, values);
}
// When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
// row key will already have its value.
mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
if (indexTableRef != null) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
}
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) {
MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection);
connection.getMutationState().join(state);
if (indexTableRef != null) {
MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
connection.getMutationState().join(indexState);
}
connection.getMutationState().send();
mutations.clear();
if (indexMutations != null) {
indexMutations.clear();
}
}
}
// If auto commit is true, this last batch will be committed upon return
int nCommittedRows = rowCount / batchSize * batchSize;
MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection);
if (indexTableRef != null) {
// To prevent the counting of these index rows, we have a negative for remainingRows.
MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
state.join(indexState);
}
return state;
}
}
private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory {
private RowProjector projector;
private TableRef targetTableRef;
private TableRef indexTableRef;
private TableRef sourceTableRef;
private DeletingParallelIteratorFactory(PhoenixConnection connection) {
super(connection);
}
@Override
protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException {
PhoenixStatement statement = new PhoenixStatement(connection);
/*
* We don't want to collect any read metrics within the child context. This is because any read metrics that
* need to be captured are already getting collected in the parent statement context enclosed in the result
* iterator being used for reading rows out.
*/
StatementContext ctx = new StatementContext(statement, false);
MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef);
return state;
}
public void setTargetTableRef(TableRef tableRef) {
this.targetTableRef = tableRef;
}
public void setSourceTableRef(TableRef tableRef) {
this.sourceTableRef = tableRef;
}
public void setRowProjector(RowProjector projector) {
this.projector = projector;
}
public void setIndexTargetTableRef(TableRef indexTableRef) {
this.indexTableRef = indexTableRef;
}
}
private Set<PTable> getNonDisabledImmutableIndexes(TableRef tableRef) {
PTable table = tableRef.getTable();
if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
Set<PTable> nonDisabledIndexes = Sets.newHashSetWithExpectedSize(table.getIndexes().size());
for (PTable index : table.getIndexes()) {
if (index.getIndexState() != PIndexState.DISABLE) {
nonDisabledIndexes.add(index);
}
}
return nonDisabledIndexes;
}
return Collections.emptySet();
}
private class MultiDeleteMutationPlan implements MutationPlan {
private final List<MutationPlan> plans;
private final MutationPlan firstPlan;
public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) {
Preconditions.checkArgument(!plans.isEmpty());
this.plans = plans;
this.firstPlan = plans.get(0);
}
@Override
public StatementContext getContext() {
return firstPlan.getContext();
}
@Override
public ParameterMetaData getParameterMetaData() {
return firstPlan.getParameterMetaData();
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
return firstPlan.getExplainPlan();
}
@Override
public MutationState execute() throws SQLException {
MutationState state = firstPlan.execute();
for (MutationPlan plan : plans.subList(1, plans.size())) {
plan.execute();
}
return state;
}
@Override
public TableRef getTargetRef() {
return firstPlan.getTargetRef();
}
@Override
public Set<TableRef> getSourceRefs() {
return firstPlan.getSourceRefs();
}
@Override
public Operation getOperation() {
return operation;
}
}
public MutationPlan compile(DeleteStatement delete) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
final boolean isAutoCommit = connection.getAutoCommit();
final boolean hasLimit = delete.getLimit() != null;
final ConnectionQueryServices services = connection.getQueryServices();
List<QueryPlan> queryPlans;
NamedTableNode tableNode = delete.getTable();
String tableName = tableNode.getName().getTableName();
String schemaName = tableNode.getName().getSchemaName();
boolean retryOnce = !isAutoCommit;
TableRef tableRefToBe;
boolean noQueryReqd = false;
boolean runOnServer = false;
SelectStatement select = null;
ColumnResolver resolverToBe = null;
Set<PTable> immutableIndex = Collections.emptySet();
DeletingParallelIteratorFactory parallelIteratorFactory = null;
QueryPlan dataPlanToBe = null;
while (true) {
try {
resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
tableRefToBe = resolverToBe.getTables().get(0);
PTable table = tableRefToBe.getTable();
// Cannot update:
// - read-only VIEW
// - transactional table with a connection having an SCN
// TODO: SchemaUtil.isReadOnly(PTable, connection)?
if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
throw new ReadOnlyTableException(schemaName,tableName);
}
else if (table.isTransactional() && connection.getSCN() != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
noQueryReqd = !hasLimit;
// Can't run on same server for transactional data, as we need the row keys for the data
// that is being upserted for conflict detection purposes.
runOnServer = isAutoCommit && noQueryReqd && !table.isTransactional();
HintNode hint = delete.getHint();
if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
}
List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
boolean isSalted = table.getBucketNum() != null;
boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
boolean isSharedViewIndex = table.getViewIndexId() != null;
for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
PColumn column = table.getPKColumns().get(i);
aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
}
select = FACTORY.select(
delete.getTable(),
hint, false, aliasedNodes, delete.getWhere(),
Collections.<ParseNode>emptyList(), null,
delete.getOrderBy(), delete.getLimit(),
delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
select = StatementNormalizer.normalize(select, resolverToBe);
SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
if (transformedSelect != select) {
resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection);
select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
}
parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection);
QueryOptimizer optimizer = new QueryOptimizer(services);
QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory, new SequenceManager(statement));
dataPlanToBe = compiler.compile();
queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
? optimizer.getApplicablePlans(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
: optimizer.getBestPlan(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
if (mayHaveImmutableIndexes) { // FIXME: this is ugly
// Lookup the table being deleted from in the cache, as it's possible that the
// optimizer updated the cache if it found indexes that were out of date.
// If the index was marked as disabled, it should not be in the list
// of immutable indexes.
table = connection.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
tableRefToBe.setTable(table);
immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
}
} catch (MetaDataEntityNotFoundException e) {
// Catch column/column family not found exception, as our meta data may
// be out of sync. Update the cache once and retry if we were out of sync.
// Otherwise throw, as we'll just get the same error next time.
if (retryOnce) {
retryOnce = false;
MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
if (result.wasUpdated()) {
continue;
}
}
throw e;
}
break;
}
final QueryPlan dataPlan = dataPlanToBe;
final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
// tableRefs is parallel with queryPlans
TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1];
if (hasImmutableIndexes) {
int i = 0;
Iterator<QueryPlan> plans = queryPlans.iterator();
while (plans.hasNext()) {
QueryPlan plan = plans.next();
PTable table = plan.getTableRef().getTable();
if (table.getType() == PTableType.INDEX) { // index plans
tableRefs[i++] = plan.getTableRef();
immutableIndex.remove(table);
} else { // data plan
/*
* If we have immutable indexes that we need to maintain, don't execute the data plan
* as we can save a query by piggy-backing on any of the other index queries, since the
* PK columns that we need are always in each index row.
*/
plans.remove();
}
}
/*
* If we have any immutable indexes remaining, then that means that the plan for that index got filtered out
* because it could not be executed. This would occur if a column in the where clause is not found in the
* immutable index.
*/
if (!immutableIndex.isEmpty()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString())
.setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException();
}
}
// Make sure the first plan is targeting deletion from the data table
// In the case of an immutable index, we'll also delete from the index.
final TableRef dataTableRef = tableRefs[0] = tableRefToBe;
/*
* Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows
* from the data table, while the others will be for deleting rows from immutable indexes.
*/
List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(tableRefs.length);
for (int i = 0; i < tableRefs.length; i++) {
final TableRef tableRef = tableRefs[i];
final QueryPlan plan = queryPlans.get(i);
if (!plan.getTableRef().equals(tableRef) || !(plan instanceof BaseQueryPlan)) {
runOnServer = false;
noQueryReqd = false; // FIXME: why set this to false in this case?
}
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final StatementContext context = plan.getContext();
// If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all.
// A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause
// may have been optimized out. Instead, we check that there's a single SkipScanFilter
if (noQueryReqd
&& (!context.getScan().hasFilter()
|| context.getScan().getFilter() instanceof SkipScanFilter)
&& context.getScanRanges().isPointLookup()) {
mutationPlans.add(new MutationPlan() {
@Override
public ParameterMetaData getParameterMetaData() {
return context.getBindManager().getParameterMetaData();
}
@Override
public MutationState execute() throws SQLException {
// We have a point lookup, so we know we have a simple set of fully qualified
// keys for our ranges
ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
}
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
}
@Override
public StatementContext getContext() {
return context;
}
@Override
public TableRef getTargetRef() {
return dataTableRef;
}
@Override
public Set<TableRef> getSourceRefs() {
// Don't include the target
return Collections.emptySet();
}
@Override
public Operation getOperation() {
return operation;
}
});
} else if (runOnServer) {
// TODO: better abstraction
Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
// Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
// The coprocessor will delete each row returned from the scan
// Ignoring ORDER BY, since with auto commit on and no limit makes no difference
SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
if (plan.getProjector().projectEveryRow()) {
projectorToBe = new RowProjector(projectorToBe,true);
}
final RowProjector projector = projectorToBe;
final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
mutationPlans.add(new MutationPlan() {
@Override
public ParameterMetaData getParameterMetaData() {
return context.getBindManager().getParameterMetaData();
}
@Override
public StatementContext getContext() {
return context;
}
@Override
public TableRef getTargetRef() {
return dataTableRef;
}
@Override
public Set<TableRef> getSourceRefs() {
return dataPlan.getSourceRefs();
}
@Override
public Operation getOperation() {
return operation;
}
@Override
public MutationState execute() throws SQLException {
// TODO: share this block of code with UPSERT SELECT
ImmutableBytesWritable ptr = context.getTempPtr();
PTable table = tableRef.getTable();
table.getIndexMaintainers(ptr, context.getConnection());
byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
ServerCache cache = null;
try {
if (ptr.getLength() > 0) {
IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
byte[] uuidValue = cache.getId();
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
ResultIterator iterator = aggPlan.iterator();
try {
Tuple row = iterator.next();
final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
return new MutationState(maxSize, connection) {
@Override
public long getUpdateCount() {
return mutationCount;
}
};
} finally {
iterator.close();
}
} finally {
if (cache != null) {
cache.close();
}
}
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps();
List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
planSteps.add("DELETE ROWS");
planSteps.addAll(queryPlanSteps);
return new ExplainPlan(planSteps);
}
});
} else {
final boolean deleteFromImmutableIndexToo = hasImmutableIndexes && !plan.getTableRef().equals(tableRef);
if (parallelIteratorFactory != null) {
parallelIteratorFactory.setRowProjector(plan.getProjector());
parallelIteratorFactory.setTargetTableRef(tableRef);
parallelIteratorFactory.setSourceTableRef(plan.getTableRef());
parallelIteratorFactory.setIndexTargetTableRef(deleteFromImmutableIndexToo ? plan.getTableRef() : null);
}
mutationPlans.add( new MutationPlan() {
@Override
public ParameterMetaData getParameterMetaData() {
return context.getBindManager().getParameterMetaData();
}
@Override
public StatementContext getContext() {
return context;
}
@Override
public TableRef getTargetRef() {
return dataTableRef;
}
@Override
public Set<TableRef> getSourceRefs() {
return dataPlan.getSourceRefs();
}
@Override
public Operation getOperation() {
return operation;
}
@Override
public MutationState execute() throws SQLException {
ResultIterator iterator = plan.iterator();
try {
if (!hasLimit) {
Tuple tuple;
long totalRowCount = 0;
while ((tuple=iterator.next()) != null) {// Runs query
Cell kv = tuple.getValue(0);
totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
}
// Return total number of rows that have been delete. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection.
MutationState state = new MutationState(maxSize, connection, totalRowCount);
// set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
return state;
} else {
return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
}
} finally {
iterator.close();
}
}
@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> queryPlanSteps = plan.getExplainPlan().getPlanSteps();
List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
planSteps.add("DELETE ROWS");
planSteps.addAll(queryPlanSteps);
return new ExplainPlan(planSteps);
}
});
}
}
return mutationPlans.size() == 1 ? mutationPlans.get(0) : new MultiDeleteMutationPlan(mutationPlans);
}
}