blob: ad013092a8df32abd3738a265f70948d67b73438 [file] [log] [blame]
package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TxConstants;
import org.apache.tephra.distributed.PooledClientProvider;
import org.apache.tephra.distributed.TransactionServiceClient;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.util.TxUtils;
import org.apache.tephra.visibility.FenceWait;
import org.apache.tephra.visibility.VisibilityFence;
import org.apache.tephra.zookeeper.TephraZKClientService;
import org.apache.tephra.distributed.TransactionService;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.inject.util.Providers;
import org.slf4j.Logger;
public class TephraTransactionContext implements PhoenixTransactionContext {
private static final TransactionCodec CODEC = new TransactionCodec();
private static TransactionSystemClient txClient = null;
private static ZKClientService zkClient = null;
private static TransactionService txService = null;
private static TransactionManager txManager = null;
private final List<TransactionAware> txAwares;
private final TransactionContext txContext;
private Transaction tx;
private TransactionSystemClient txServiceClient;
private TransactionFailureException e;
public TephraTransactionContext() {
this.txServiceClient = null;
this.txAwares = Lists.newArrayList();
this.txContext = null;
}
public TephraTransactionContext(byte[] txnBytes) throws IOException {
this();
this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC
.decode(txnBytes) : null;
}
public TephraTransactionContext(PhoenixConnection connection) {
assert(txClient != null);
this.txServiceClient = txClient;
this.txAwares = Collections.emptyList();
this.txContext = new TransactionContext(txServiceClient);
}
public TephraTransactionContext(PhoenixTransactionContext ctx,
PhoenixConnection connection, boolean subTask) {
assert(txClient != null);
this.txServiceClient = txClient;
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
if (subTask) {
this.tx = tephraTransactionContext.getTransaction();
this.txAwares = Lists.newArrayList();
this.txContext = null;
} else {
this.txAwares = Collections.emptyList();
this.txContext = tephraTransactionContext.getContext();
}
this.e = null;
}
@Override
public void setInMemoryTransactionClient(Configuration config) {
TransactionManager txnManager = new TransactionManager(config);
txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager);
}
@Override
public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) {
String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
if (zkQuorumServersString==null) {
zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
}
int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
// Create instance of the tephra zookeeper client
ZKClientService txZKClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
new TephraZKClientService(zkQuorumServersString, timeOut, null,
ArrayListMultimap.<String, byte[]>create()),
RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
)
);
txZKClientService.startAndWait();
ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
PooledClientProvider pooledClientProvider = new PooledClientProvider(
config, zkDiscoveryService);
txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
return txZKClientService;
}
@Override
public void begin() throws SQLException {
if (txContext == null) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
.buildException();
}
try {
txContext.start();
} catch (TransactionFailureException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void commit() throws SQLException {
if (txContext == null || !isTransactionRunning()) {
return;
}
try {
txContext.finish();
} catch (TransactionFailureException e) {
this.e = e;
if (e instanceof TransactionConflictException) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void abort() throws SQLException {
if (txContext == null || !isTransactionRunning()) {
return;
}
try {
if (e != null) {
txContext.abort(e);
e = null;
} else {
txContext.abort();
}
} catch (TransactionFailureException e) {
this.e = null;
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void checkpoint(boolean hasUncommittedData) throws SQLException {
if (hasUncommittedData) {
try {
if (txContext == null) {
tx = txServiceClient.checkpoint(tx);
} else {
assert (txContext != null);
txContext.checkpoint();
tx = txContext.getCurrentTransaction();
}
} catch (TransactionFailureException e) {
throw new SQLException(e);
}
}
// Since we're querying our own table while mutating it, we must exclude
// see our current mutations, otherwise we can get erroneous results
// (for DELETE)
// or get into an infinite loop (for UPSERT SELECT).
if (txContext == null) {
tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
} else {
assert (txContext != null);
txContext.getCurrentTransaction().setVisibility(
VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
}
}
@Override
public void commitDDLFence(PTable dataTable, Logger logger)
throws SQLException {
byte[] key = dataTable.getName().getBytes();
try {
FenceWait fenceWait = VisibilityFence.prepareWait(key,
txServiceClient);
fenceWait.await(10000, TimeUnit.MILLISECONDS);
if (logger.isInfoEnabled()) {
logger.info("Added write fence at ~"
+ getCurrentTransaction().getReadPointer());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e)
.build().buildException();
} catch (TimeoutException | TransactionFailureException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
.setSchemaName(dataTable.getSchemaName().getString())
.setTableName(dataTable.getTableName().getString()).build()
.buildException();
}
}
public void markDMLFence(PTable table) {
byte[] logicalKey = table.getName().getBytes();
TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
if (this.txContext == null) {
this.txAwares.add(logicalTxAware);
} else {
this.txContext.addTransactionAware(logicalTxAware);
}
byte[] physicalKey = table.getPhysicalName().getBytes();
if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
TransactionAware physicalTxAware = VisibilityFence
.create(physicalKey);
if (this.txContext == null) {
this.txAwares.add(physicalTxAware);
} else {
this.txContext.addTransactionAware(physicalTxAware);
}
}
}
@Override
public void join(PhoenixTransactionContext ctx) {
assert (ctx instanceof TephraTransactionContext);
TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
if (txContext != null) {
for (TransactionAware txAware : tephraContext.getAwares()) {
txContext.addTransactionAware(txAware);
}
} else {
txAwares.addAll(tephraContext.getAwares());
}
}
private Transaction getCurrentTransaction() {
return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
}
@Override
public boolean isTransactionRunning() {
return getCurrentTransaction() != null;
}
@Override
public void reset() {
tx = null;
txAwares.clear();
this.e = null;
}
@Override
public long getTransactionId() {
Transaction tx = getCurrentTransaction();
return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing
}
@Override
public long getReadPointer() {
Transaction tx = getCurrentTransaction();
if (tx == null) {
return (-1);
}
return tx.getReadPointer();
}
// For testing
@Override
public long getWritePointer() {
Transaction tx = getCurrentTransaction();
return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
}
@Override
public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
VisibilityLevel tephraVisibilityLevel = null;
switch (visibilityLevel) {
case SNAPSHOT:
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
System.out.println("OHAD Move to SNAPSHOT_ALL ");
System.out.flush();
tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
break;
default:
assert (false);
}
Transaction tx = getCurrentTransaction();
assert(tx != null);
tx.setVisibility(tephraVisibilityLevel);
}
@Override
public PhoenixVisibilityLevel getVisibilityLevel() {
VisibilityLevel visibilityLevel = null;
Transaction tx = getCurrentTransaction();
assert(tx != null);
visibilityLevel = tx.getVisibilityLevel();
PhoenixVisibilityLevel phoenixVisibilityLevel;
switch (visibilityLevel) {
case SNAPSHOT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
default:
phoenixVisibilityLevel = null;
}
return phoenixVisibilityLevel;
}
@Override
public byte[] encodeTransaction() throws SQLException {
Transaction tx = getCurrentTransaction();
assert (tx != null);
try {
return CODEC.encode(tx);
} catch (IOException e) {
throw new SQLException(e);
}
}
@Override
public long getMaxTransactionsPerSecond() {
return TxConstants.MAX_TX_PER_MS;
}
@Override
public boolean isPreExistingVersion(long version) {
return TxUtils.isPreExistingVersion(version);
}
@Override
public BaseRegionObserver getCoProcessor() {
return new TransactionProcessor();
}
@Override
public byte[] getFamilyDeleteMarker() {
return TxConstants.FAMILY_DELETE_QUALIFIER;
}
@Override
public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
}
@Override
public void setupTxManager(Configuration config, String url) throws SQLException {
if (txService != null) {
return;
}
ConnectionInfo connInfo = ConnectionInfo.create(url);
zkClient = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
.build(),
RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
)
)
);
zkClient.startAndWait();
DiscoveryService discovery = new ZKDiscoveryService(zkClient);
txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
txService.startAndWait();
}
@Override
public void tearDownTxManager() {
try {
if (txService != null) txService.stopAndWait();
} finally {
try {
if (zkClient != null) zkClient.stopAndWait();
} finally {
txService = null;
zkClient = null;
txManager = null;
}
}
}
/**
* TephraTransactionContext specific functions
*/
Transaction getTransaction() {
return this.getCurrentTransaction();
}
TransactionContext getContext() {
return this.txContext;
}
List<TransactionAware> getAwares() {
return txAwares;
}
void addTransactionAware(TransactionAware txAware) {
if (this.txContext != null) {
txContext.addTransactionAware(txAware);
} else if (this.tx != null) {
txAwares.add(txAware);
assert (tx != null);
txAware.startTx(tx);
}
}
}