blob: 414a519bcc12e5861e188a48fd7833bec86f6366 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.transaction;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hbase.client.Table;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
import org.apache.omid.transaction.HBaseCellId;
import org.apache.omid.transaction.HBaseTransaction;
import org.apache.omid.transaction.HBaseTransactionManager;
import org.apache.omid.transaction.RollbackException;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.transaction.Transaction.Status;
import org.apache.omid.transaction.TransactionException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.InvalidProtocolBufferException;
public class OmidTransactionContext implements PhoenixTransactionContext {
private static final Logger LOGGER = LoggerFactory.getLogger(OmidTransactionContext.class);
private HBaseTransactionManager tm;
private HBaseTransaction tx;
public OmidTransactionContext() {
this.tx = null;
this.tm = null;
}
public OmidTransactionContext(PhoenixConnection connection) throws SQLException {
PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());
assert (client instanceof OmidTransactionProvider.OmidTransactionClient);
this.tm = ((OmidTransactionProvider.OmidTransactionClient)client).getTransactionClient();
this.tx = null;
}
public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException {
this();
if (txnBytes != null && txnBytes.length > 0) {
TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes);
tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(), new HashSet<HBaseCellId>(),
new HashSet<HBaseCellId>(), null, tm.isLowLatency());
} else {
tx = null;
}
}
public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
assert (ctx instanceof OmidTransactionContext);
OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;
this.tm = omidTransactionContext.tm;
if (subTask) {
if (omidTransactionContext.isTransactionRunning()) {
Transaction transaction = omidTransactionContext.getTransaction();
this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(),
new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), this.tm,
transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency());
} else {
this.tx = null;
}
this.tm = null;
} else {
this.tx = omidTransactionContext.getTransaction();
}
}
@Override
public void begin() throws SQLException {
if (tm == null) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
.buildException();
}
try {
tx = (HBaseTransaction) tm.begin();
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void commit() throws SQLException {
if (tx == null || tm == null)
return;
try {
tm.commit(tx);
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
} catch (RollbackException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void abort() throws SQLException {
if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) {
return;
}
try {
tm.rollback(tx);
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
@Override
public void checkpoint(boolean hasUncommittedData) throws SQLException {
try {
tx.checkpoint();
} catch (TransactionException e) {
throw new SQLException(e);
}
tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
}
@Override
public void commitDDLFence(PTable dataTable) throws SQLException {
try {
tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Added write fence at ~"
+ tx.getReadTimestamp());
}
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
.setSchemaName(dataTable.getSchemaName().getString())
.setTableName(dataTable.getTableName().getString()).build()
.buildException();
}
}
@Override
public void join(PhoenixTransactionContext ctx) {
if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
return;
}
assert (ctx instanceof OmidTransactionContext);
OmidTransactionContext omidContext = (OmidTransactionContext) ctx;
HBaseTransaction transaction = omidContext.getTransaction();
if (transaction == null || tx == null) return;
Set<HBaseCellId> writeSet = transaction.getWriteSet();
for (HBaseCellId cell : writeSet) {
tx.addWriteSetElement(cell);
}
}
@Override
public boolean isTransactionRunning() {
return (tx != null);
}
@Override
public void reset() {
tx = null;
}
@Override
public long getTransactionId() {
return tx.getTransactionId();
}
@Override
public long getReadPointer() {
return tx.getReadTimestamp();
}
@Override
public long getWritePointer() {
return tx.getWriteTimestamp();
}
@Override
public PhoenixVisibilityLevel getVisibilityLevel() {
VisibilityLevel visibilityLevel = null;
assert(tx != null);
visibilityLevel = tx.getVisibilityLevel();
PhoenixVisibilityLevel phoenixVisibilityLevel;
switch (visibilityLevel) {
case SNAPSHOT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
default:
phoenixVisibilityLevel = null;
}
return phoenixVisibilityLevel;
}
@Override
public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
VisibilityLevel omidVisibilityLevel = null;
switch (visibilityLevel) {
case SNAPSHOT:
omidVisibilityLevel = VisibilityLevel.SNAPSHOT;
break;
case SNAPSHOT_EXCLUDE_CURRENT:
omidVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
break;
case SNAPSHOT_ALL:
omidVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
break;
default:
assert (false);
}
assert(tx != null);
tx.setVisibilityLevel(omidVisibilityLevel);
}
@Override
public byte[] encodeTransaction() throws SQLException {
assert(tx != null);
TSOProto.Transaction.Builder transactionBuilder = TSOProto.Transaction.newBuilder();
transactionBuilder.setTimestamp(tx.getTransactionId());
transactionBuilder.setEpoch(tx.getEpoch());
byte[] encodedTxBytes = transactionBuilder.build().toByteArray();
// Add code of TransactionProvider at end of byte array
encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
return encodedTxBytes;
}
@Override
public Provider getProvider() {
return TransactionFactory.Provider.OMID;
}
@Override
public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
return new OmidTransactionContext(context, subTask);
}
@Override
public void markDMLFence(PTable dataTable) {
}
/**
* OmidTransactionContext specific functions
*/
public HBaseTransaction getTransaction() {
return tx;
}
@Override
public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException {
return new OmidTransactionTable(this, htable, isConflictFree);
}
@Override
public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException {
// When we're getting a table for writing, if the table being written to is an index,
// write the shadow cells immediately since the only time we write to an index is
// when we initially populate it synchronously.
return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex, isIndex);
}
}