| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.execute; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE; |
| import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES; |
| import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME; |
| |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.concurrent.Immutable; |
| |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.hadoop.hbase.client.Mutation; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.htrace.Span; |
| import org.apache.htrace.TraceScope; |
| import org.apache.phoenix.cache.ServerCacheClient; |
| import org.apache.phoenix.cache.ServerCacheClient.ServerCache; |
| import org.apache.phoenix.compile.MutationPlan; |
| import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; |
| import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; |
| import org.apache.phoenix.exception.SQLExceptionCode; |
| import org.apache.phoenix.exception.SQLExceptionInfo; |
| import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; |
| import org.apache.phoenix.index.IndexMaintainer; |
| import org.apache.phoenix.index.IndexMetaDataCacheClient; |
| import org.apache.phoenix.index.PhoenixIndexCodec; |
| import org.apache.phoenix.jdbc.PhoenixConnection; |
| import org.apache.phoenix.jdbc.PhoenixStatement.Operation; |
| import org.apache.phoenix.monitoring.GlobalClientMetrics; |
| import org.apache.phoenix.monitoring.MutationMetricQueue; |
| import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric; |
| import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue; |
| import org.apache.phoenix.monitoring.ReadMetricQueue; |
| import org.apache.phoenix.query.QueryConstants; |
| import org.apache.phoenix.schema.IllegalDataException; |
| import org.apache.phoenix.schema.MetaDataClient; |
| import org.apache.phoenix.schema.PColumn; |
| import org.apache.phoenix.schema.PIndexState; |
| import org.apache.phoenix.schema.PMetaData; |
| import org.apache.phoenix.schema.PRow; |
| import org.apache.phoenix.schema.PTable; |
| import org.apache.phoenix.schema.PTable.IndexType; |
| import org.apache.phoenix.schema.PTableRef; |
| import org.apache.phoenix.schema.PTableType; |
| import org.apache.phoenix.schema.RowKeySchema; |
| import org.apache.phoenix.schema.TableNotFoundException; |
| import org.apache.phoenix.schema.TableRef; |
| import org.apache.phoenix.schema.ValueSchema.Field; |
| import org.apache.phoenix.schema.types.PLong; |
| import org.apache.phoenix.trace.util.Tracing; |
| import org.apache.phoenix.util.ByteUtil; |
| import org.apache.phoenix.util.IndexUtil; |
| import org.apache.phoenix.util.LogUtil; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.SQLCloseable; |
| import org.apache.phoenix.util.SQLCloseables; |
| import org.apache.phoenix.util.ServerUtil; |
| import org.apache.phoenix.util.TransactionUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import co.cask.tephra.Transaction; |
| import co.cask.tephra.Transaction.VisibilityLevel; |
| import co.cask.tephra.TransactionAware; |
| import co.cask.tephra.TransactionCodec; |
| import co.cask.tephra.TransactionConflictException; |
| import co.cask.tephra.TransactionContext; |
| import co.cask.tephra.TransactionFailureException; |
| import co.cask.tephra.TransactionSystemClient; |
| import co.cask.tephra.hbase10.TransactionAwareHTable; |
| import co.cask.tephra.visibility.FenceWait; |
| import co.cask.tephra.visibility.VisibilityFence; |
| |
| /** |
| * |
| * Tracks the uncommitted state |
| * |
| */ |
| public class MutationState implements SQLCloseable { |
| private static final Logger logger = LoggerFactory.getLogger(MutationState.class); |
| private static final TransactionCodec CODEC = new TransactionCodec(); |
| private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0]; |
| private static final int MAX_COMMIT_RETRIES = 3; |
| |
| private final PhoenixConnection connection; |
| private final long maxSize; |
| private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; |
| private final List<TransactionAware> txAwares; |
| private final TransactionContext txContext; |
| private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); |
| |
| private Transaction tx; |
| private long sizeOffset; |
| private int numRows = 0; |
| private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; |
| private boolean isExternalTxContext = false; |
| private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); |
| |
| private final MutationMetricQueue mutationMetricQueue; |
| private ReadMetricQueue readMetricQueue; |
| |
| public MutationState(long maxSize, PhoenixConnection connection) { |
| this(maxSize,connection, null, null); |
| } |
| |
| public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) { |
| this(maxSize,connection, null, txContext); |
| } |
| |
| public MutationState(MutationState mutationState) { |
| this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null); |
| } |
| |
| public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { |
| this(maxSize, connection, null, null, sizeOffset); |
| } |
| |
| private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) { |
| this(maxSize,connection, tx, txContext, 0); |
| } |
| |
| private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { |
| this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); |
| this.sizeOffset = sizeOffset; |
| } |
| |
| MutationState(long maxSize, PhoenixConnection connection, |
| Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, |
| Transaction tx, TransactionContext txContext) { |
| this.maxSize = maxSize; |
| this.connection = connection; |
| this.mutations = mutations; |
| boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); |
| this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() |
| : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; |
| this.tx = tx; |
| if (tx == null) { |
| this.txAwares = Collections.emptyList(); |
| if (txContext == null) { |
| TransactionSystemClient txServiceClient = this.connection |
| .getQueryServices().getTransactionSystemClient(); |
| this.txContext = new TransactionContext(txServiceClient); |
| } else { |
| isExternalTxContext = true; |
| this.txContext = txContext; |
| } |
| } else { |
| // this code path is only used while running child scans, we can't pass the txContext to child scans |
| // as it is not thread safe, so we use the tx member variable |
| this.txAwares = Lists.newArrayList(); |
| this.txContext = null; |
| } |
| } |
| |
| public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { |
| this(maxSize, connection, null, null, sizeOffset); |
| this.mutations.put(table, mutations); |
| this.numRows = mutations.size(); |
| this.tx = connection.getMutationState().getTransaction(); |
| throwIfTooBig(); |
| } |
| |
| public long getMaxSize() { |
| return maxSize; |
| } |
| |
| /** |
| * Commit a write fence when creating an index so that we can detect |
| * when a data table transaction is started before the create index |
| * but completes after it. In this case, we need to rerun the data |
| * table transaction after the index creation so that the index rows |
| * are generated. See {@link #addDMLFence(PTable)} and TEPHRA-157 |
| * for more information. |
| * @param dataTable the data table upon which an index is being added |
| * @throws SQLException |
| */ |
| public void commitDDLFence(PTable dataTable) throws SQLException { |
| if (dataTable.isTransactional()) { |
| byte[] key = dataTable.getName().getBytes(); |
| boolean success = false; |
| try { |
| FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient()); |
| fenceWait.await(10000, TimeUnit.MILLISECONDS); |
| success = true; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); |
| } catch (TimeoutException | TransactionFailureException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) |
| .setSchemaName(dataTable.getSchemaName().getString()) |
| .setTableName(dataTable.getTableName().getString()) |
| .build().buildException(); |
| } finally { |
| // The client expects a transaction to be in progress on the txContext while the |
| // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's |
| // finished, we start a new one here. |
| // TODO: seems like an autonomous tx capability in Tephra would be useful here. |
| try { |
| txContext.start(); |
| if (logger.isInfoEnabled() && success) logger.info("Added write fence at ~" + getTransaction().getReadPointer()); |
| } catch (TransactionFailureException e) { |
| throw TransactionUtil.getTransactionFailureException(e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Add an entry to the change set representing the DML operation that is starting. |
| * These entries will not conflict with each other, but they will conflict with a |
| * DDL operation of creating an index. See {@link #addDMLFence(PTable)} and TEPHRA-157 |
| * for more information. |
| * @param table the table which is doing DML |
| * @throws SQLException |
| */ |
| private void addDMLFence(PTable table) throws SQLException { |
| if (table.getType() == PTableType.INDEX || !table.isTransactional()) { |
| return; |
| } |
| byte[] logicalKey = table.getName().getBytes(); |
| TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); |
| if (this.txContext == null) { |
| this.txAwares.add(logicalTxAware); |
| } else { |
| this.txContext.addTransactionAware(logicalTxAware); |
| } |
| byte[] physicalKey = table.getPhysicalName().getBytes(); |
| if (Bytes.compareTo(physicalKey, logicalKey) != 0) { |
| TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); |
| if (this.txContext == null) { |
| this.txAwares.add(physicalTxAware); |
| } else { |
| this.txContext.addTransactionAware(physicalTxAware); |
| } |
| } |
| } |
| |
| public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { |
| Transaction currentTx = getTransaction(); |
| if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { |
| return false; |
| } |
| Set<TableRef> sources = plan.getSourceRefs(); |
| if (sources.isEmpty()) { |
| return false; |
| } |
| // For a DELETE statement, we're always querying the table being deleted from. This isn't |
| // a problem, but it potentially could be if there are other references to the same table |
| // nested in the DELETE statement (as a sub query or join, for example). |
| TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null; |
| boolean excludeCurrent = false; |
| String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString(); |
| for (TableRef source : sources) { |
| if (source.getTable().isTransactional() && !source.equals(ignoreForExcludeCurrent)) { |
| String sourcePhysicalName = source.getTable().getPhysicalName().getString(); |
| if (targetPhysicalName.equals(sourcePhysicalName)) { |
| excludeCurrent = true; |
| break; |
| } |
| } |
| } |
| // If we're querying the same table we're updating, we must exclude our writes to |
| // it from being visible. |
| if (excludeCurrent) { |
| // If any source tables have uncommitted data prior to last checkpoint, |
| // then we must create a new checkpoint. |
| boolean hasUncommittedData = false; |
| for (TableRef source : sources) { |
| String sourcePhysicalName = source.getTable().getPhysicalName().getString(); |
| // Tracking uncommitted physical table names is an optimization that prevents us from |
| // having to do a checkpoint if no data has yet been written. If we're using an |
| // external transaction context, it's possible that data was already written at the |
| // current transaction timestamp, so we always checkpoint in that case is we're |
| // reading and writing to the same table. |
| if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) { |
| hasUncommittedData = true; |
| break; |
| } |
| } |
| if (hasUncommittedData) { |
| try { |
| if (txContext == null) { |
| currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx); |
| } else { |
| txContext.checkpoint(); |
| currentTx = tx = txContext.getCurrentTransaction(); |
| } |
| // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards |
| // should see all this data. |
| uncommittedPhysicalNames.clear(); |
| } catch (TransactionFailureException e) { |
| throw new SQLException(e); |
| } |
| } |
| // Since we're querying our own table while mutating it, we must exclude |
| // see our current mutations, otherwise we can get erroneous results (for DELETE) |
| // or get into an infinite loop (for UPSERT SELECT). |
| currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); |
| return true; |
| } |
| return false; |
| } |
| |
| private void addTransactionParticipant(TransactionAware txAware) throws SQLException { |
| if (txContext == null) { |
| txAwares.add(txAware); |
| assert(tx != null); |
| txAware.startTx(tx); |
| } else { |
| txContext.addTransactionAware(txAware); |
| } |
| } |
| |
| // Though MutationState is not thread safe in general, this method should be because it may |
| // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose |
| // the Transaction outside of MutationState, this seems reasonable, as the member variables |
| // would not change as these threads are running. |
| public HTableInterface getHTable(PTable table) throws SQLException { |
| HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); |
| Transaction currentTx; |
| if (table.isTransactional() && (currentTx=getTransaction()) != null) { |
| TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table); |
| // Using cloned mutationState as we may have started a new transaction already |
| // if auto commit is true and we need to use the original one here. |
| txAware.startTx(currentTx); |
| htable = txAware; |
| } |
| return htable; |
| } |
| |
| public PhoenixConnection getConnection() { |
| return connection; |
| } |
| |
| // Kept private as the Transaction may change when check pointed. Keeping it private ensures |
| // no one holds on to a stale copy. |
| private Transaction getTransaction() { |
| return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; |
| } |
| |
| public boolean isTransactionStarted() { |
| return getTransaction() != null; |
| } |
| |
| public long getInitialWritePointer() { |
| Transaction tx = getTransaction(); |
| return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing |
| } |
| |
| // For testing |
| public long getWritePointer() { |
| Transaction tx = getTransaction(); |
| return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); |
| } |
| |
| // For testing |
| public VisibilityLevel getVisibilityLevel() { |
| Transaction tx = getTransaction(); |
| return tx == null ? null : tx.getVisibilityLevel(); |
| } |
| |
| public boolean startTransaction() throws SQLException { |
| if (txContext == null) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); |
| } |
| |
| if (connection.getSCN() != null) { |
| throw new SQLExceptionInfo.Builder( |
| SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) |
| .build().buildException(); |
| } |
| |
| try { |
| if (!isTransactionStarted()) { |
| // Clear any transactional state in case transaction was ended outside |
| // of Phoenix so we don't carry the old transaction state forward. We |
| // cannot call reset() here due to the case of having mutations and |
| // then transitioning from non transactional to transactional (which |
| // would end up clearing our uncommitted state). |
| resetTransactionalState(); |
| txContext.start(); |
| return true; |
| } |
| } catch (TransactionFailureException e) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException(); |
| } |
| return false; |
| } |
| |
| public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { |
| MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null); |
| state.sizeOffset = 0; |
| return state; |
| } |
| |
| private void throwIfTooBig() { |
| if (numRows > maxSize) { |
| // TODO: throw SQLException ? |
| throw new IllegalArgumentException("MutationState size of " + numRows + " is bigger than max allowed size of " + maxSize); |
| } |
| } |
| |
| public long getUpdateCount() { |
| return sizeOffset + numRows; |
| } |
| |
| private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows, |
| Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { |
| PTable table = tableRef.getTable(); |
| boolean isIndex = table.getType() == PTableType.INDEX; |
| boolean incrementRowCount = dstMutations == this.mutations; |
| Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows); |
| if (existingRows != null) { // Rows for that table already exist |
| // Loop through new rows and replace existing with new |
| for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) { |
| // Replace existing row with new row |
| RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); |
| if (existingRowMutationState != null) { |
| Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); |
| if (existingValues != PRow.DELETE_MARKER) { |
| Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); |
| // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. |
| if (newRow != PRow.DELETE_MARKER) { |
| // Merge existing column values with new column values |
| existingRowMutationState.join(rowEntry.getValue()); |
| // Now that the existing row has been merged with the new row, replace it back |
| // again (since it was merged with the new one above). |
| existingRows.put(rowEntry.getKey(), existingRowMutationState); |
| } |
| } |
| } else { |
| if (incrementRowCount && !isIndex) { // Don't count index rows in row count |
| numRows++; |
| } |
| } |
| } |
| // Put the existing one back now that it's merged |
| dstMutations.put(tableRef, existingRows); |
| } else { |
| // Size new map at batch size as that's what it'll likely grow to. |
| Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); |
| newRows.putAll(srcRows); |
| dstMutations.put(tableRef, newRows); |
| if (incrementRowCount && !isIndex) { |
| numRows += srcRows.size(); |
| } |
| } |
| } |
| |
| private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, |
| Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { |
| // Merge newMutation with this one, keeping state from newMutation for any overlaps |
| for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) { |
| // Replace existing entries for the table with new entries |
| TableRef tableRef = entry.getKey(); |
| Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue(); |
| joinMutationState(tableRef, srcRows, dstMutations); |
| } |
| } |
| /** |
| * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. |
| * Combine any metrics collected for the newer mutation. |
| * |
| * @param newMutationState the newer mutation state |
| */ |
| public void join(MutationState newMutationState) { |
| if (this == newMutationState) { // Doesn't make sense |
| return; |
| } |
| if (txContext != null) { |
| for (TransactionAware txAware : newMutationState.txAwares) { |
| txContext.addTransactionAware(txAware); |
| } |
| } else { |
| txAwares.addAll(newMutationState.txAwares); |
| } |
| this.sizeOffset += newMutationState.sizeOffset; |
| joinMutationState(newMutationState.mutations, this.mutations); |
| if (!newMutationState.txMutations.isEmpty()) { |
| if (txMutations.isEmpty()) { |
| txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); |
| } |
| joinMutationState(newMutationState.txMutations, this.txMutations); |
| } |
| mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue); |
| if (readMetricQueue == null) { |
| readMetricQueue = newMutationState.readMetricQueue; |
| } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) { |
| readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue); |
| } |
| throwIfTooBig(); |
| } |
| |
| |
| private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { |
| RowKeySchema schema = table.getRowKeySchema(); |
| int rowTimestampColPos = table.getRowTimestampColPos(); |
| Field rowTimestampField = schema.getField(rowTimestampColPos); |
| byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, rowTimestampField.getSortOrder()); |
| int oldOffset = ptr.getOffset(); |
| int oldLength = ptr.getLength(); |
| // Move the pointer to the start byte of the row timestamp pk |
| schema.position(ptr, 0, rowTimestampColPos); |
| byte[] b = ptr.get(); |
| int newOffset = ptr.getOffset(); |
| int length = ptr.getLength(); |
| for (int i = newOffset; i < newOffset + length; i++) { |
| // modify the underlying bytes array with the bytes of the row timestamp |
| b[i] = rowTimestampBytes[i - newOffset]; |
| } |
| // move the pointer back to where it was before. |
| ptr.set(ptr.get(), oldOffset, oldLength); |
| return ptr; |
| } |
| |
| private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { |
| final PTable table = tableRef.getTable(); |
| final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism |
| (table.isImmutableRows() || includeMutableIndexes) ? |
| IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : |
| Iterators.<PTable>emptyIterator(); |
| final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); |
| final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; |
| generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); |
| return new Iterator<Pair<byte[],List<Mutation>>>() { |
| boolean isFirst = true; |
| |
| @Override |
| public boolean hasNext() { |
| return isFirst || indexes.hasNext(); |
| } |
| |
| @Override |
| public Pair<byte[], List<Mutation>> next() { |
| if (isFirst) { |
| isFirst = false; |
| return new Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(), mutationList); |
| } |
| PTable index = indexes.next(); |
| List<Mutation> indexMutations; |
| try { |
| indexMutations = |
| IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, |
| connection.getKeyValueBuilder(), connection); |
| // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map |
| if (!sendAll) { |
| TableRef key = new TableRef(index); |
| Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key); |
| if (rowToColumnMap!=null) { |
| final List<Mutation> deleteMutations = Lists.newArrayList(); |
| generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null); |
| indexMutations.addAll(deleteMutations); |
| } |
| } |
| } catch (SQLException e) { |
| throw new IllegalDataException(e); |
| } |
| return new Pair<byte[],List<Mutation>>(index.getPhysicalName().getBytes(),indexMutations); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| }; |
| } |
| |
| private void generateMutations(final TableRef tableRef, long timestamp, |
| final Map<ImmutableBytesPtr, RowMutationState> values, |
| final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { |
| final PTable table = tableRef.getTable(); |
| boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; |
| Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = |
| values.entrySet().iterator(); |
| long timestampToUse = timestamp; |
| while (iterator.hasNext()) { |
| Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); |
| ImmutableBytesPtr key = rowEntry.getKey(); |
| RowMutationState state = rowEntry.getValue(); |
| if (tableWithRowTimestampCol) { |
| RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); |
| if (rowTsColInfo.useServerTimestamp()) { |
| // regenerate the key with this timestamp. |
| key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); |
| } else { |
| if (rowTsColInfo.getTimestamp() != null) { |
| timestampToUse = rowTsColInfo.getTimestamp(); |
| } |
| } |
| } |
| PRow row = |
| tableRef.getTable() |
| .newRow(connection.getKeyValueBuilder(), timestampToUse, key); |
| List<Mutation> rowMutations, rowMutationsPertainingToIndex; |
| if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete |
| row.delete(); |
| rowMutations = row.toRowMutations(); |
| // Row deletes for index tables are processed by running a re-written query |
| // against the index table (as this allows for flexibility in being able to |
| // delete rows). |
| rowMutationsPertainingToIndex = Collections.emptyList(); |
| } else { |
| for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues() |
| .entrySet()) { |
| row.setValue(valueEntry.getKey(), valueEntry.getValue()); |
| } |
| rowMutations = row.toRowMutations(); |
| rowMutationsPertainingToIndex = rowMutations; |
| } |
| mutationList.addAll(rowMutations); |
| if (mutationsPertainingToIndex != null) mutationsPertainingToIndex |
| .addAll(rowMutationsPertainingToIndex); |
| } |
| } |
| |
| /** |
| * Get the unsorted list of HBase mutations for the tables with uncommitted data. |
| * @return list of HBase mutations for uncommitted data. |
| */ |
| public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) { |
| return toMutations(false, timestamp); |
| } |
| |
| public Iterator<Pair<byte[],List<Mutation>>> toMutations() { |
| return toMutations(false, null); |
| } |
| |
| public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { |
| return toMutations(includeMutableIndexes, null); |
| } |
| |
| public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { |
| final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); |
| if (!iterator.hasNext()) { |
| return Iterators.emptyIterator(); |
| } |
| Long scn = connection.getSCN(); |
| final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); |
| return new Iterator<Pair<byte[],List<Mutation>>>() { |
| private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); |
| private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); |
| |
| private Iterator<Pair<byte[],List<Mutation>>> init() { |
| return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return innerIterator.hasNext() || iterator.hasNext(); |
| } |
| |
| @Override |
| public Pair<byte[], List<Mutation>> next() { |
| if (!innerIterator.hasNext()) { |
| current = iterator.next(); |
| } |
| return innerIterator.next(); |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| }; |
| } |
| |
| /** |
| * Validates that the meta data is valid against the server meta data if we haven't yet done so. |
| * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data |
| * has changed. |
| * @param connection |
| * @return the server time to use for the upsert |
| * @throws SQLException if the table or any columns no longer exist |
| */ |
| private long[] validateAll() throws SQLException { |
| int i = 0; |
| long[] timeStamps = new long[this.mutations.size()]; |
| for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { |
| TableRef tableRef = entry.getKey(); |
| timeStamps[i++] = validate(tableRef, entry.getValue()); |
| } |
| return timeStamps; |
| } |
| |
| private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException { |
| Long scn = connection.getSCN(); |
| MetaDataClient client = new MetaDataClient(connection); |
| long serverTimeStamp = tableRef.getTimeStamp(); |
| // If we're auto committing, we've already validated the schema when we got the ColumnResolver, |
| // so no need to do it again here. |
| PTable table = tableRef.getTable(); |
| MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); |
| PTable resolvedTable = result.getTable(); |
| if (resolvedTable == null) { |
| throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); |
| } |
| // Always update tableRef table as the one we've cached may be out of date since when we executed |
| // the UPSERT VALUES call and updated in the cache before this. |
| tableRef.setTable(resolvedTable); |
| List<PTable> indexes = resolvedTable.getIndexes(); |
| for (PTable idxTtable : indexes) { |
| // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that |
| // our failure mode is block writes on index failure. |
| if (idxTtable.getIndexState() == PIndexState.ACTIVE && idxTtable.getIndexDisableTimestamp() > 0) { |
| throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE) |
| .setSchemaName(table.getSchemaName().getString()) |
| .setTableName(table.getTableName().getString()).build().buildException(); |
| } |
| } |
| long timestamp = result.getMutationTime(); |
| if (timestamp != QueryConstants.UNSET_TIMESTAMP) { |
| serverTimeStamp = timestamp; |
| if (result.wasUpdated()) { |
| List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size()); |
| for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { |
| RowMutationState valueEntry = rowEntry.getValue(); |
| if (valueEntry != null) { |
| Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); |
| if (colValues != PRow.DELETE_MARKER) { |
| for (PColumn column : colValues.keySet()) { |
| if (!column.isDynamic()) |
| columns.add(column); |
| } |
| } |
| } |
| } |
| for (PColumn column : columns) { |
| if (column != null) { |
| resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); |
| } |
| } |
| } |
| } |
| return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; |
| } |
| |
| private static long calculateMutationSize(List<Mutation> mutations) { |
| long byteSize = 0; |
| if (GlobalClientMetrics.isMetricsEnabled()) { |
| for (Mutation mutation : mutations) { |
| byteSize += mutation.heapSize(); |
| } |
| } |
| GLOBAL_MUTATION_BYTES.update(byteSize); |
| return byteSize; |
| } |
| |
| private boolean hasKeyValueColumn(PTable table, PTable index) { |
| IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); |
| return !maintainer.getAllColumns().isEmpty(); |
| } |
| |
| private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) { |
| while (enabledImmutableIndexes.hasNext()) { |
| PTable index = enabledImmutableIndexes.next(); |
| if (index.getIndexType() != IndexType.LOCAL) { |
| if (hasKeyValueColumn(table, index)) { |
| keyValueIndexes.add(index); |
| } else { |
| rowKeyIndexes.add(index); |
| } |
| } |
| } |
| } |
| private class MetaDataAwareHTable extends DelegateHTable { |
| private final TableRef tableRef; |
| |
| private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { |
| super(delegate); |
| this.tableRef = tableRef; |
| } |
| |
| /** |
| * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an |
| * opportunity to attach our index meta data to the mutations such that we can also undo |
| * the index mutations. |
| */ |
| @Override |
| public void delete(List<Delete> deletes) throws IOException { |
| ServerCache cache = null; |
| try { |
| PTable table = tableRef.getTable(); |
| List<PTable> indexes = table.getIndexes(); |
| Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); |
| if (enabledIndexes.hasNext()) { |
| List<PTable> keyValueIndexes = Collections.emptyList(); |
| ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); |
| boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); |
| if (table.isImmutableRows()) { |
| List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); |
| keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); |
| divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); |
| // Generate index deletes for immutable indexes that only reference row key |
| // columns and submit directly here. |
| ImmutableBytesWritable ptr = new ImmutableBytesWritable(); |
| for (PTable index : rowKeyIndexes) { |
| List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); |
| HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); |
| hindex.delete(indexDeletes); |
| } |
| } |
| |
| // If we have mutable indexes, local immutable indexes, or global immutable indexes |
| // that reference key value columns, setup index meta data and attach here. In this |
| // case updates to the indexes will be generated on the server side. |
| // An alternative would be to let Tephra track the row keys for the immutable index |
| // by adding it as a transaction participant (soon we can prevent any conflict |
| // detection from occurring) with the downside being the additional memory required. |
| if (!keyValueIndexes.isEmpty()) { |
| attachMetaData = true; |
| IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); |
| } |
| if (attachMetaData) { |
| cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); |
| } |
| } |
| delegate.delete(deletes); |
| } catch (SQLException e) { |
| throw new IOException(e); |
| } finally { |
| if (cache != null) { |
| SQLCloseables.closeAllQuietly(Collections.singletonList(cache)); |
| } |
| } |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| private void send(Iterator<TableRef> tableRefIterator) throws SQLException { |
| int i = 0; |
| long[] serverTimeStamps = null; |
| boolean sendAll = false; |
| if (tableRefIterator == null) { |
| serverTimeStamps = validateAll(); |
| tableRefIterator = mutations.keySet().iterator(); |
| sendAll = true; |
| } |
| |
| Map<ImmutableBytesPtr, RowMutationState> valuesMap; |
| List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size()); |
| // add tracing for this operation |
| try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { |
| Span span = trace.getSpan(); |
| ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); |
| boolean isTransactional; |
| while (tableRefIterator.hasNext()) { |
| // at this point we are going through mutations for each table |
| final TableRef tableRef = tableRefIterator.next(); |
| valuesMap = mutations.get(tableRef); |
| if (valuesMap == null || valuesMap.isEmpty()) { |
| continue; |
| } |
| // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) |
| long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; |
| final PTable table = tableRef.getTable(); |
| // Track tables to which we've sent uncommitted data |
| if (isTransactional = table.isTransactional()) { |
| txTableRefs.add(tableRef); |
| addDMLFence(table); |
| uncommittedPhysicalNames.add(table.getPhysicalName().getString()); |
| } |
| boolean isDataTable = true; |
| table.getIndexMaintainers(indexMetaDataPtr, connection); |
| Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); |
| while (mutationsIterator.hasNext()) { |
| Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); |
| byte[] htableName = pair.getFirst(); |
| List<Mutation> mutationList = pair.getSecond(); |
| |
| //create a span per target table |
| //TODO maybe we can be smarter about the table name to string here? |
| Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); |
| |
| int retryCount = 0; |
| boolean shouldRetry = false; |
| do { |
| final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null; |
| |
| // If we haven't retried yet, retry for this case only, as it's possible that |
| // a split will occur after we send the index metadata cache to all known |
| // region servers. |
| shouldRetry = cache != null; |
| SQLException sqlE = null; |
| HTableInterface hTable = connection.getQueryServices().getTable(htableName); |
| try { |
| if (isTransactional) { |
| // If we have indexes, wrap the HTable in a delegate HTable that |
| // will attach the necessary index meta data in the event of a |
| // rollback |
| if (!table.getIndexes().isEmpty()) { |
| hTable = new MetaDataAwareHTable(hTable, tableRef); |
| } |
| TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); |
| // Don't add immutable indexes (those are the only ones that would participate |
| // during a commit), as we don't need conflict detection for these. |
| if (isDataTable) { |
| // Even for immutable, we need to do this so that an abort has the state |
| // necessary to generate the rows to delete. |
| addTransactionParticipant(txnAware); |
| } else { |
| txnAware.startTx(getTransaction()); |
| } |
| hTable = txnAware; |
| } |
| long numMutations = mutationList.size(); |
| GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); |
| |
| long startTime = System.currentTimeMillis(); |
| child.addTimelineAnnotation("Attempt " + retryCount); |
| hTable.batch(mutationList); |
| if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); |
| child.stop(); |
| child.stop(); |
| shouldRetry = false; |
| long mutationCommitTime = System.currentTimeMillis() - startTime; |
| GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); |
| |
| long mutationSizeBytes = calculateMutationSize(mutationList); |
| MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); |
| mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); |
| } catch (Exception e) { |
| SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); |
| if (inferredE != null) { |
| if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { |
| // Swallow this exception once, as it's possible that we split after sending the index metadata |
| // and one of the region servers doesn't have it. This will cause it to have it the next go around. |
| // If it fails again, we don't retry. |
| String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; |
| logger.warn(LogUtil.addCustomAnnotations(msg, connection)); |
| connection.getQueryServices().clearTableRegionCache(htableName); |
| |
| // add a new child span as this one failed |
| child.addTimelineAnnotation(msg); |
| child.stop(); |
| child = Tracing.child(span,"Failed batch, attempting retry"); |
| |
| continue; |
| } |
| e = inferredE; |
| } |
| // Throw to client an exception that indicates the statements that |
| // were not committed successfully. |
| sqlE = new CommitException(e, getUncommittedStatementIndexes()); |
| } finally { |
| try { |
| if (cache != null) { |
| cache.close(); |
| } |
| } finally { |
| try { |
| hTable.close(); |
| } |
| catch (IOException e) { |
| if (sqlE != null) { |
| sqlE.setNextException(ServerUtil.parseServerException(e)); |
| } else { |
| sqlE = ServerUtil.parseServerException(e); |
| } |
| } |
| if (sqlE != null) { |
| throw sqlE; |
| } |
| } |
| } |
| } while (shouldRetry && retryCount++ < 1); |
| isDataTable = false; |
| } |
| if (tableRef.getTable().getType() != PTableType.INDEX) { |
| numRows -= valuesMap.size(); |
| } |
| // For transactions, track the statement indexes as we send data |
| // over because our CommitException should include all statements |
| // involved in the transaction since none of them would have been |
| // committed in the event of a failure. |
| if (isTransactional) { |
| addUncommittedStatementIndexes(valuesMap.values()); |
| if (txMutations.isEmpty()) { |
| txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); |
| } |
| // Keep all mutations we've encountered until a commit or rollback. |
| // This is not ideal, but there's not good way to get the values back |
| // in the event that we need to replay the commit. |
| // Copy TableRef so we have the original PTable and know when the |
| // indexes have changed. |
| joinMutationState(new TableRef(tableRef), valuesMap, txMutations); |
| } |
| // Remove batches as we process them |
| if (sendAll) { |
| // Iterating through map key set in this case, so we cannot use |
| // the remove method without getting a concurrent modification |
| // exception. |
| tableRefIterator.remove(); |
| } else { |
| mutations.remove(tableRef); |
| } |
| } |
| } |
| } |
| |
| public byte[] encodeTransaction() throws SQLException { |
| try { |
| return CODEC.encode(getTransaction()); |
| } catch (IOException e) { |
| throw new SQLException(e); |
| } |
| } |
| |
| public static Transaction decodeTransaction(byte[] txnBytes) throws IOException { |
| return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes); |
| } |
| |
| private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, |
| ImmutableBytesWritable indexMetaDataPtr) throws SQLException { |
| PTable table = tableRef.getTable(); |
| byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); |
| ServerCache cache = null; |
| byte[] attribValue = null; |
| byte[] uuidValue = null; |
| byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; |
| if (table.isTransactional()) { |
| txState = encodeTransaction(); |
| } |
| boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; |
| if (hasIndexMetaData) { |
| if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { |
| IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); |
| cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); |
| uuidValue = cache.getId(); |
| } else { |
| attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); |
| uuidValue = ServerCacheClient.generateId(); |
| } |
| } else if (txState.length == 0) { |
| return null; |
| } |
| // Either set the UUID to be able to access the index metadata from the cache |
| // or set the index metadata directly on the Mutation |
| for (Mutation mutation : mutations) { |
| if (tenantId != null) { |
| mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); |
| } |
| mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); |
| if (attribValue != null) { |
| mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); |
| if (txState.length > 0) { |
| mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); |
| } |
| } else if (!hasIndexMetaData && txState.length > 0) { |
| mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); |
| } |
| } |
| return cache; |
| } |
| |
| private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) { |
| for (RowMutationState rowMutationState : rowMutations) { |
| uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes()); |
| } |
| } |
| |
| private int[] getUncommittedStatementIndexes() { |
| for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values()) { |
| addUncommittedStatementIndexes(rowMutationMap.values()); |
| } |
| return uncommittedStatementIndexes; |
| } |
| |
| @Override |
| public void close() throws SQLException { |
| } |
| |
| private void resetState() { |
| numRows = 0; |
| this.mutations.clear(); |
| resetTransactionalState(); |
| } |
| |
| private void resetTransactionalState() { |
| tx = null; |
| txAwares.clear(); |
| txMutations = Collections.emptyMap(); |
| uncommittedPhysicalNames.clear(); |
| uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; |
| } |
| |
| public void rollback() throws SQLException { |
| try { |
| if (txContext != null && isTransactionStarted()) { |
| try { |
| txContext.abort(); |
| } catch (TransactionFailureException e) { |
| throw TransactionUtil.getTransactionFailureException(e); |
| } |
| } |
| } finally { |
| resetState(); |
| } |
| } |
| |
| public void commit() throws SQLException { |
| Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); |
| int retryCount = 0; |
| do { |
| boolean sendSuccessful=false; |
| boolean retryCommit = false; |
| SQLException sqlE = null; |
| try { |
| send(); |
| txMutations = this.txMutations; |
| sendSuccessful=true; |
| } catch (SQLException e) { |
| sqlE = e; |
| } finally { |
| try { |
| if (txContext != null && isTransactionStarted()) { |
| TransactionFailureException txFailure = null; |
| boolean finishSuccessful=false; |
| try { |
| if (sendSuccessful) { |
| txContext.finish(); |
| finishSuccessful = true; |
| } |
| } catch (TransactionFailureException e) { |
| if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); |
| retryCommit = (e instanceof TransactionConflictException && retryCount < MAX_COMMIT_RETRIES); |
| txFailure = e; |
| SQLException nextE = TransactionUtil.getTransactionFailureException(e); |
| if (sqlE == null) { |
| sqlE = nextE; |
| } else { |
| sqlE.setNextException(nextE); |
| } |
| } finally { |
| // If send fails or finish fails, abort the tx |
| if (!finishSuccessful) { |
| try { |
| txContext.abort(txFailure); |
| if (logger.isInfoEnabled()) logger.info("Abort successful"); |
| } catch (TransactionFailureException e) { |
| if (logger.isInfoEnabled()) logger.info("Abort failed with " + e); |
| SQLException nextE = TransactionUtil.getTransactionFailureException(e); |
| if (sqlE == null) { |
| sqlE = nextE; |
| } else { |
| sqlE.setNextException(nextE); |
| } |
| } |
| } |
| } |
| } |
| } finally { |
| try { |
| resetState(); |
| } finally { |
| if (retryCommit) { |
| startTransaction(); |
| // Add back read fences |
| Set<TableRef> txTableRefs = txMutations.keySet(); |
| for (TableRef tableRef : txTableRefs) { |
| PTable dataTable = tableRef.getTable(); |
| addDMLFence(dataTable); |
| } |
| try { |
| // Only retry if an index was added |
| retryCommit = shouldResubmitTransaction(txTableRefs); |
| } catch (SQLException e) { |
| retryCommit = false; |
| if (sqlE == null) { |
| sqlE = e; |
| } else { |
| sqlE.setNextException(e); |
| } |
| } |
| } |
| if (sqlE != null && !retryCommit) { |
| throw sqlE; |
| } |
| } |
| } |
| } |
| // Retry commit once if conflict occurred and index was added |
| if (!retryCommit) { |
| break; |
| } |
| retryCount++; |
| mutations.putAll(txMutations); |
| } while (true); |
| } |
| |
| /** |
| * Determines whether indexes were added to mutated tables while the transaction was in progress. |
| * @return true if indexes were added and false otherwise. |
| * @throws SQLException |
| */ |
| private boolean shouldResubmitTransaction(Set<TableRef> txTableRefs) throws SQLException { |
| if (logger.isInfoEnabled()) logger.info("Checking for index updates as of " + getInitialWritePointer()); |
| MetaDataClient client = new MetaDataClient(connection); |
| PMetaData cache = connection.getMetaDataCache(); |
| boolean addedAnyIndexes = false; |
| boolean allImmutableTables = !txTableRefs.isEmpty(); |
| for (TableRef tableRef : txTableRefs) { |
| PTable dataTable = tableRef.getTable(); |
| List<PTable> oldIndexes; |
| PTableRef ptableRef = cache.getTableRef(dataTable.getKey()); |
| oldIndexes = ptableRef.getTable().getIndexes(); |
| // Always check at server for metadata change, as it's possible that the table is configured to not check for metadata changes |
| // but in this case, the tx manager is telling us it's likely that there has been a change. |
| MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName().getString(), dataTable.getTableName().getString(), true); |
| long timestamp = TransactionUtil.getResolvedTime(connection, result); |
| tableRef.setTimeStamp(timestamp); |
| PTable updatedDataTable = result.getTable(); |
| if (updatedDataTable == null) { |
| throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); |
| } |
| allImmutableTables &= updatedDataTable.isImmutableRows(); |
| tableRef.setTable(updatedDataTable); |
| if (!addedAnyIndexes) { |
| // TODO: in theory we should do a deep equals check here, as it's possible |
| // that an index was dropped and recreated with the same name but different |
| // indexed/covered columns. |
| addedAnyIndexes = (!oldIndexes.equals(updatedDataTable.getIndexes())); |
| if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + updatedDataTable.getName().getString() + " with indexes " + updatedDataTable.getIndexes()); |
| } |
| } |
| if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer() + " over " + (allImmutableTables ? " all immutable tables" : " some mutable tables")); |
| // If all tables are immutable, we know the conflict we got was due to our DDL/DML fence. |
| // If any indexes were added, then the conflict might be due to DDL/DML fence. |
| return allImmutableTables || addedAnyIndexes; |
| } |
| |
| /** |
| * Send to HBase any uncommitted data for transactional tables. |
| * @return true if any data was sent and false otherwise. |
| * @throws SQLException |
| */ |
| public boolean sendUncommitted() throws SQLException { |
| return sendUncommitted(mutations.keySet().iterator()); |
| } |
| /** |
| * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a |
| * query. In this way, they are visible to subsequent reads but are not actually committed until |
| * commit is called. |
| * @param tableRefs |
| * @return true if any data was sent and false otherwise. |
| * @throws SQLException |
| */ |
| public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException { |
| Transaction currentTx = getTransaction(); |
| if (currentTx != null) { |
| // Initialize visibility so that transactions see their own writes. |
| // The checkpoint() method will set it to not see writes if necessary. |
| currentTx.setVisibility(VisibilityLevel.SNAPSHOT); |
| } |
| Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ |
| @Override |
| public boolean apply(TableRef tableRef) { |
| return tableRef.getTable().isTransactional(); |
| } |
| }); |
| if (filteredTableRefs.hasNext()) { |
| // FIXME: strip table alias to prevent equality check from failing due to alias mismatch on null alias. |
| // We really should be keying the tables based on the physical table name. |
| List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); |
| while (filteredTableRefs.hasNext()) { |
| TableRef tableRef = filteredTableRefs.next(); |
| strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); |
| } |
| startTransaction(); |
| send(strippedAliases.iterator()); |
| return true; |
| } |
| return false; |
| } |
| |
| public void send() throws SQLException { |
| send(null); |
| } |
| |
| public static int[] joinSortedIntArrays(int[] a, int[] b) { |
| int[] result = new int[a.length + b.length]; |
| int i = 0, j = 0, k = 0, current; |
| while (i < a.length && j < b.length) { |
| current = a[i] < b[j] ? a[i++] : b[j++]; |
| for ( ; i < a.length && a[i] == current; i++); |
| for ( ; j < b.length && b[j] == current; j++); |
| result[k++] = current; |
| } |
| while (i < a.length) { |
| for (current = a[i++] ; i < a.length && a[i] == current; i++); |
| result[k++] = current; |
| } |
| while (j < b.length) { |
| for (current = b[j++] ; j < b.length && b[j] == current; j++); |
| result[k++] = current; |
| } |
| return Arrays.copyOf(result, k); |
| } |
| |
| @Immutable |
| public static class RowTimestampColInfo { |
| private final boolean useServerTimestamp; |
| private final Long rowTimestamp; |
| |
| public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new RowTimestampColInfo(false, null); |
| |
| public RowTimestampColInfo(boolean autoGenerate, Long value) { |
| this.useServerTimestamp = autoGenerate; |
| this.rowTimestamp = value; |
| } |
| |
| public boolean useServerTimestamp() { |
| return useServerTimestamp; |
| } |
| |
| public Long getTimestamp() { |
| return rowTimestamp; |
| } |
| } |
| |
| public static class RowMutationState { |
| @Nonnull private Map<PColumn,byte[]> columnValues; |
| private int[] statementIndexes; |
| @Nonnull private final RowTimestampColInfo rowTsColInfo; |
| |
| public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) { |
| checkNotNull(columnValues); |
| checkNotNull(rowTsColInfo); |
| this.columnValues = columnValues; |
| this.statementIndexes = new int[] {statementIndex}; |
| this.rowTsColInfo = rowTsColInfo; |
| } |
| |
| Map<PColumn, byte[]> getColumnValues() { |
| return columnValues; |
| } |
| |
| int[] getStatementIndexes() { |
| return statementIndexes; |
| } |
| |
| void join(RowMutationState newRow) { |
| getColumnValues().putAll(newRow.getColumnValues()); |
| statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); |
| } |
| |
| @Nonnull |
| RowTimestampColInfo getRowTimestampColInfo() { |
| return rowTsColInfo; |
| } |
| |
| } |
| |
| public ReadMetricQueue getReadMetricQueue() { |
| return readMetricQueue; |
| } |
| |
| public void setReadMetricQueue(ReadMetricQueue readMetricQueue) { |
| this.readMetricQueue = readMetricQueue; |
| } |
| |
| public MutationMetricQueue getMutationMetricQueue() { |
| return mutationMetricQueue; |
| } |
| |
| } |