blob: b1b9a62de47168e84248b4fe6aa340beaebb7cc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.transaction.HBaseOmidClientConfiguration;
import org.apache.omid.transaction.HBaseTransactionManager;
import org.apache.omid.transaction.TTable;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.TSOClient;
import org.apache.phoenix.coprocessor.OmidGCProcessor;
import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.TransactionUtil;
import com.google.inject.Guice;
import com.google.inject.Injector;
public class OmidTransactionProvider implements PhoenixTransactionProvider {
private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
public static final String OMID_TSO_PORT = "phoenix.omid.tso.port";
public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size";
public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type";
public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
private HBaseTransactionManager transactionManager = null;
private volatile CommitTable.Client commitTableClient = null;
private CommitTable.Writer commitTableWriter = null;
public static final OmidTransactionProvider getInstance() {
return INSTANCE;
}
private OmidTransactionProvider() {
}
@Override
public String toString() {
return getProvider().toString();
}
@Override
public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
// Remove last byte (which is used to identify transaction provider)
return new OmidTransactionContext(Arrays.copyOf(txnBytes,txnBytes.length-1));
}
@Override
public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException {
return new OmidTransactionContext(connection);
}
@Override
public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException{
if (transactionManager == null) {
try {
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
transactionManager = (HBaseTransactionManager) HBaseTransactionManager.newInstance(clientConf);
} catch (IOException | InterruptedException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
}
return new OmidTransactionClient(transactionManager);
}
static class OmidTransactionClient implements PhoenixTransactionClient {
private final HBaseTransactionManager transactionManager;
public OmidTransactionClient(HBaseTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public HBaseTransactionManager getTransactionClient() {
return transactionManager;
}
@Override
public void close() throws IOException {}
}
// For testing only
public CommitTable.Client getCommitTableClient() {
return commitTableClient;
}
@Override
public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException{
TSOServerConfig tsoConfig = new TSOServerConfig();
TSOServer tso;
tsoConfig.setPort(port);
tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString());
Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
tso = injector.getInstance(TSOServer.class);
tso.startAndWait();
OmidClientConfiguration clientConfig = new OmidClientConfiguration();
clientConfig.setConnectionString("localhost:" + port);
clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class);
try {
// Create the associated Handler
TSOClient client = TSOClient.newInstance(clientConfig);
HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration();
clientConf.setConnectionString("localhost:" + port);
clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
clientConf.setHBaseConfiguration(config);
commitTableClient = commitTable.getClient();
commitTableWriter = commitTable.getWriter();
transactionManager = HBaseTransactionManager.builder(clientConf)
.commitTableClient(commitTableClient)
.commitTableWriter(commitTableWriter)
.tsoClient(client).build();
} catch (IOException | InterruptedException e) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build()
.buildException();
}
return new OmidTransactionService(tso, transactionManager);
}
static class OmidTransactionService implements PhoenixTransactionService {
private final HBaseTransactionManager transactionManager;
private TSOServer tso;
public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) {
this.tso = tso;
this.transactionManager = transactionManager;
}
public void start() {
}
@Override
public void close() throws IOException {
if (transactionManager != null) {
transactionManager.close();
}
if (tso != null) {
tso.stopAndWait();
}
}
}
@Override
public Class<? extends RegionObserver> getCoprocessor() {
return OmidTransactionalProcessor.class;
}
@Override
public Class<? extends RegionObserver> getGCCoprocessor() {
return OmidGCProcessor.class;
}
@Override
public Provider getProvider() {
return TransactionFactory.Provider.OMID;
}
@Override
public boolean isUnsupported(Feature feature) {
return true;
}
@Override
public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) {
return TTable.markPutAsCommitted(put, timestamp, timestamp);
}
}