blob: f63c492273be30c14e088e9be7b680b92732c1a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.DelegateHTable;
import org.apache.phoenix.execute.PhoenixTxIndexMutationGenerator;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.tephra.Transaction;
import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.TransactionAwareHTable;
import org.apache.tephra.visibility.FenceWait;
import org.apache.tephra.visibility.VisibilityFence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class TephraTransactionContext implements PhoenixTransactionContext {
private static final Logger LOGGER = LoggerFactory.getLogger(TephraTransactionContext.class);
private static final TransactionCodec CODEC = new TransactionCodec();
private final List<TransactionAware> txAwares;
private final TransactionContext txContext;
private Transaction tx;
private TransactionSystemClient txServiceClient;
public TephraTransactionContext() {
this.txServiceClient = null;
this.txAwares = Lists.newArrayList();
this.txContext = null;
}
public TephraTransactionContext(byte[] txnBytes) throws IOException {
this();
this.tx = CODEC.decode(txnBytes);
}
public TephraTransactionContext(PhoenixConnection connection) throws SQLException {
PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());
assert (client instanceof TephraTransactionClient);
this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient();
this.txAwares = Collections.emptyList();
this.txContext = new TransactionContext(txServiceClient);
}
private TephraTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
this.txServiceClient = tephraTransactionContext.txServiceClient;
if (subTask) {
this.tx = tephraTransactionContext.getTransaction();
this.txAwares = Lists.newArrayList();
this.txContext = null;
} else {
this.txAwares = Collections.emptyList();
this.txContext = tephraTransactionContext.getContext();
}
}
@Override
public TransactionFactory.Provider getProvider() {
return TransactionFactory.Provider.TEPHRA;
}
@Override
public void begin() throws SQLException {
if (txContext == null) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
.buildException();
}
try {
txContext.start();
} catch (TransactionFailureException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void commit() throws SQLException {
if (txContext == null || !isTransactionRunning()) {
return;
}
try {
txContext.finish();
} catch (TransactionFailureException e) {
if (e instanceof TransactionConflictException) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void abort() throws SQLException {
if (txContext == null || !isTransactionRunning()) {
return;
}
try {
txContext.abort();
} catch (TransactionFailureException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void checkpoint(boolean hasUncommittedData) throws SQLException {
if (hasUncommittedData) {
try {
if (txContext == null) {
tx = txServiceClient.checkpoint(tx);
} else {
assert (txContext != null);
txContext.checkpoint();
tx = txContext.getCurrentTransaction();
}
} catch (TransactionFailureException e) {
throw new SQLException(e);
}
}
// Since we're querying our own table while mutating it, we must exclude
// see our current mutations, otherwise we can get erroneous results
// (for DELETE)
// or get into an infinite loop (for UPSERT SELECT).
if (txContext == null) {
tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
} else {
assert (txContext != null);
txContext.getCurrentTransaction().setVisibility(
VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
}
}
@Override
public void commitDDLFence(PTable dataTable)
throws SQLException {
byte[] key = dataTable.getName().getBytes();
try {
FenceWait fenceWait = VisibilityFence.prepareWait(key,
txServiceClient);
fenceWait.await(10000, TimeUnit.MILLISECONDS);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Added write fence at ~"
+ getCurrentTransaction().getReadPointer());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e)
.build().buildException();
} catch (TimeoutException | TransactionFailureException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
.setSchemaName(dataTable.getSchemaName().getString())
.setTableName(dataTable.getTableName().getString()).build()
.buildException();
} finally {
// The client expects a transaction to be in progress on the txContext while the
// VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's
// finished, we start a new one here.
// TODO: seems like an autonomous tx capability in Tephra would be useful here.
this.begin();
}
}
@Override
public void markDMLFence(PTable table) {
if (table.getType() == PTableType.INDEX) {
return;
}
byte[] logicalKey = table.getName().getBytes();
TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
if (this.txContext == null) {
this.txAwares.add(logicalTxAware);
} else {
this.txContext.addTransactionAware(logicalTxAware);
}
byte[] physicalKey = table.getPhysicalName().getBytes();
if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
TransactionAware physicalTxAware = VisibilityFence
.create(physicalKey);
if (this.txContext == null) {
this.txAwares.add(physicalTxAware);
} else {
this.txContext.addTransactionAware(physicalTxAware);
}
}
}
@Override
public void join(PhoenixTransactionContext ctx) {
if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
return;
}
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
if (txContext != null) {
for (TransactionAware txAware : tephraContext.getAwares()) {
txContext.addTransactionAware(txAware);
}
} else {
txAwares.addAll(tephraContext.getAwares());
}
}
private Transaction getCurrentTransaction() {
return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
}
@Override
public boolean isTransactionRunning() {
return getCurrentTransaction() != null;
}
@Override
public void reset() {
tx = null;
txAwares.clear();
}
@Override
public long getTransactionId() {
Transaction tx = getCurrentTransaction();
return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing
}
@Override
public long getReadPointer() {
Transaction tx = getCurrentTransaction();
if (tx == null) {
return (-1);
}
return tx.getReadPointer();
}
// For testing
@Override
public long getWritePointer() {
Transaction tx = getCurrentTransaction();
return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
}
@Override
public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
VisibilityLevel tephraVisibilityLevel = null;
switch (visibilityLevel) {
case SNAPSHOT:
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
break;
default:
assert (false);
}
Transaction tx = getCurrentTransaction();
assert(tx != null);
tx.setVisibility(tephraVisibilityLevel);
}
@Override
public PhoenixVisibilityLevel getVisibilityLevel() {
VisibilityLevel visibilityLevel = null;
Transaction tx = getCurrentTransaction();
assert(tx != null);
visibilityLevel = tx.getVisibilityLevel();
PhoenixVisibilityLevel phoenixVisibilityLevel;
switch (visibilityLevel) {
case SNAPSHOT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
default:
phoenixVisibilityLevel = null;
}
return phoenixVisibilityLevel;
}
@Override
public byte[] encodeTransaction() throws SQLException {
Transaction tx = getCurrentTransaction();
assert (tx != null);
try {
byte[] encodedTxBytes = CODEC.encode(tx);
encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
return encodedTxBytes;
} catch (IOException e) {
throw new SQLException(e);
}
}
/**
* TephraTransactionContext specific functions
*/
Transaction getTransaction() {
return this.getCurrentTransaction();
}
TransactionContext getContext() {
return this.txContext;
}
List<TransactionAware> getAwares() {
return txAwares;
}
void addTransactionAware(TransactionAware txAware) {
if (this.txContext != null) {
txContext.addTransactionAware(txAware);
} else if (this.tx != null) {
txAwares.add(txAware);
assert (tx != null);
txAware.startTx(tx);
}
}
@Override
public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
return new TephraTransactionContext(context, subTask);
}
@Override
public Table getTransactionalTable(Table htable, boolean isConflictFree) {
TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isConflictFree ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
this.addTransactionAware(transactionAwareHTable);
return transactionAwareHTable;
}
@Override
public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException {
// If we have indexes, wrap the HTable in a delegate HTable that
// will attach the necessary index meta data in the event of a
// rollback
TransactionAwareHTable transactionAwareHTable;
// Don't add immutable indexes (those are the only ones that would participate
// during a commit), as we don't need conflict detection for these.
if (isIndex) {
transactionAwareHTable = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.NONE);
transactionAwareHTable.startTx(getTransaction());
} else {
htable = new RollbackHookHTableWrapper(htable, table, connection);
transactionAwareHTable = new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
// Even for immutable, we need to do this so that an abort has the state
// necessary to generate the rows to delete.
this.addTransactionAware(transactionAwareHTable);
}
return transactionAwareHTable;
}
/**
*
* Wraps Tephra data table HTables to catch when a rollback occurs so
* that index Delete mutations can be generated and applied (as
* opposed to storing them in the Tephra change set). This technique
* allows the Tephra API to be used directly with HBase APIs and
* Phoenix APIs since we can detect the rollback as a callback
* when the Tephra rollback is called.
*
*/
private static class RollbackHookHTableWrapper extends DelegateHTable {
private final PTable table;
private final PhoenixConnection connection;
private RollbackHookHTableWrapper(Table delegate, PTable table, PhoenixConnection connection) {
super(delegate);
this.table = table;
this.connection = connection;
}
/**
* Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach
* our index meta data to the mutations such that we can also undo the index mutations.
*/
@Override
public void delete(List<Delete> deletes) throws IOException {
ServerCache cache = null;
try {
if (deletes.isEmpty()) { return; }
// Attach meta data for server maintained indexes
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
cache = IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, deletes, indexMetaDataPtr);
}
// Send deletes for client maintained indexes
List<PTable> indexes = IndexUtil.getClientMaintainedIndexes(table);
if (!indexes.isEmpty()) {
PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table, indexes, deletes.get(0)
.getAttributesMap());
Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
deletes.iterator());
for (PTable index : indexes) {
byte[] physicalName = index.getPhysicalName().getBytes();
try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) {
List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
for (Pair<Mutation, byte[]> mutationPair : indexUpdates) {
if (Bytes.equals(mutationPair.getSecond(), physicalName)) {
indexDeletes.add(mutationPair.getFirst());
}
hindex.batch(indexDeletes);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
}
delegate.delete(deletes);
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (cache != null) {
SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
}
}
}
}
}