| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tephra.distributed; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Throwables; |
| import com.google.inject.Guice; |
| import com.google.inject.Inject; |
| import com.google.inject.Injector; |
| import com.google.inject.name.Named; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.tephra.InvalidTruncateTimeException; |
| import org.apache.tephra.Transaction; |
| import org.apache.tephra.TransactionCouldNotTakeSnapshotException; |
| import org.apache.tephra.TransactionFailureException; |
| import org.apache.tephra.TransactionNotInProgressException; |
| import org.apache.tephra.TransactionSizeException; |
| import org.apache.tephra.TransactionSystemClient; |
| import org.apache.tephra.TxConstants; |
| import org.apache.tephra.runtime.ConfigModule; |
| import org.apache.tephra.runtime.DiscoveryModules; |
| import org.apache.tephra.runtime.TransactionClientModule; |
| import org.apache.tephra.runtime.TransactionModules; |
| import org.apache.tephra.runtime.ZKModule; |
| import org.apache.tephra.util.ConfigurationFactory; |
| import org.apache.thrift.TException; |
| import org.apache.twill.zookeeper.ZKClientService; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.InputStream; |
| import java.lang.management.ManagementFactory; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Set; |
| |
| /** |
| * A tx service client |
| */ |
| public class TransactionServiceClient implements TransactionSystemClient { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TransactionServiceClient.class); |
| |
| // we will use this to provide every call with an tx client |
| private ThriftClientProvider clientProvider; |
| |
| // the retry strategy we will use |
| private final RetryStrategyProvider retryStrategyProvider; |
| |
| // client id that is used to identify the transactions |
| private final String clientId; |
| |
| private final int changeSetCountLimit; |
| private final int changeSetCountThreshold; |
| private final long changeSetSizeLimit; |
| private final long changeSetSizeThreshold; |
| |
| /** |
| * Utility to be used for basic verification of transaction system availability and functioning |
| * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception { |
| if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) { |
| System.out.println("USAGE: TransactionServiceClient [-v]"); |
| } |
| |
| boolean verbose = false; |
| if (args.length == 1 && "-v".equals(args[0])) { |
| verbose = true; |
| } |
| doMain(verbose, new ConfigurationFactory().get()); |
| } |
| |
| @VisibleForTesting |
| public static void doMain(boolean verbose, Configuration conf) throws Exception { |
| LOG.info("Starting tx server client test."); |
| Injector injector = Guice.createInjector( |
| new ConfigModule(conf), |
| new ZKModule(), |
| new DiscoveryModules().getDistributedModules(), |
| new TransactionModules().getDistributedModules(), |
| new TransactionClientModule() |
| ); |
| |
| ZKClientService zkClient = injector.getInstance(ZKClientService.class); |
| zkClient.startAndWait(); |
| |
| try { |
| TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class); |
| LOG.info("Starting tx..."); |
| Transaction tx = client.startShort(); |
| if (verbose) { |
| LOG.info("Started tx details: " + tx.toString()); |
| } else { |
| LOG.info("Started tx: " + tx.getTransactionId() + |
| ", readPointer: " + tx.getReadPointer() + |
| ", invalids: " + tx.getInvalids().length + |
| ", inProgress: " + tx.getInProgress().length); |
| } |
| LOG.info("Checking if canCommit tx..."); |
| boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList()); |
| LOG.info("canCommit: " + canCommit); |
| if (canCommit) { |
| LOG.info("Committing tx..."); |
| boolean committed = client.commit(tx); |
| LOG.info("Committed tx: " + committed); |
| if (!committed) { |
| LOG.info("Aborting tx..."); |
| client.abort(tx); |
| LOG.info("Aborted tx..."); |
| } |
| } else { |
| LOG.info("Aborting tx..."); |
| client.abort(tx); |
| LOG.info("Aborted tx..."); |
| } |
| } finally { |
| zkClient.stopAndWait(); |
| } |
| } |
| |
| /** |
| * Create from a configuration. This will first attempt to find a zookeeper for service discovery. |
| * This constructor can be used if Guice dependency injection is not used. JVM name will be used for the client id. |
| * @param config a configuration containing the zookeeper properties |
| */ |
| public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider) { |
| this(config, clientProvider, ManagementFactory.getRuntimeMXBean().getName()); |
| } |
| |
| /** |
| * Create from a configuration. This will first attempt to find a zookeeper |
| * for service discovery. Otherwise it will look for the port in the |
| * config and use localhost. |
| * @param config a configuration containing the zookeeper properties |
| * @param clientId id of the client that identifies it when it starts a transaction |
| */ |
| @Inject |
| public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider, |
| @Named(TxConstants.CLIENT_ID) String clientId) { |
| |
| // initialize the retry logic |
| String retryStrat = config.get( |
| TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, |
| TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY); |
| if ("backoff".equals(retryStrat)) { |
| this.retryStrategyProvider = new RetryWithBackoff.Provider(); |
| } else if ("n-times".equals(retryStrat)) { |
| this.retryStrategyProvider = new RetryNTimes.Provider(); |
| } else { |
| try { |
| this.retryStrategyProvider = (RetryStrategyProvider) Class.forName(retryStrat).newInstance(); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new IllegalArgumentException( |
| String.format("Unable to instantiate RetryStrategyProvider '%s'", retryStrat), e); |
| } |
| } |
| this.retryStrategyProvider.configure(config); |
| LOG.debug("Retry strategy is " + this.retryStrategyProvider); |
| |
| this.clientProvider = clientProvider; |
| this.clientId = clientId; |
| |
| changeSetCountLimit = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT, |
| TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT); |
| changeSetCountThreshold = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD, |
| TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD); |
| changeSetSizeLimit = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT, |
| TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT); |
| changeSetSizeThreshold = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD, |
| TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD); |
| } |
| |
| /** |
| * This is an abstract class that encapsulates an operation. It provides a |
| * method to attempt the actual operation, and it can throw an operation |
| * exception. |
| * @param <T> The return type of the operation |
| */ |
| abstract static class Operation<T> { |
| |
| /** the name of the operation. */ |
| String name; |
| |
| /** constructor with name of operation. */ |
| Operation(String name) { |
| this.name = name; |
| } |
| |
| /** return the name of the operation. */ |
| String getName() { |
| return name; |
| } |
| |
| /** execute the operation, given an tx client. */ |
| abstract T execute(TransactionServiceThriftClient client) |
| throws Exception; |
| } |
| |
| /** see execute(operation, client). */ |
| private <T> T execute(Operation<T> operation) throws Exception { |
| return execute(operation, null); |
| } |
| |
| /** |
| * This is a generic method implementing the somewhat complex execution |
| * and retry logic for operations, to avoid repetitive code. |
| * |
| * Attempts to execute one operation, by obtaining an tx client from |
| * the client provider and passing the operation to the client. If the |
| * call fails with a Thrift exception, apply the retry strategy. If no |
| * more retries are to be made according to the strategy, call the |
| * operation's error method to obtain a value to return. Note that error() |
| * may also throw an exception. Note also that the retry logic is only |
| * applied for thrift exceptions. |
| * |
| * @param operation The operation to be executed |
| * @param provider An tx client provider. If null, then a client will be |
| * obtained using the client provider |
| * @param <T> The return type of the operation |
| * @return the result of the operation, or a value returned by error() |
| */ |
| private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception { |
| RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy(); |
| while (true) { |
| // did we get a custom client provider or do we use the default? |
| if (provider == null) { |
| provider = this.clientProvider; |
| } |
| // this will throw a TException if it cannot get a client |
| try (CloseableThriftClient closeable = provider.getCloseableClient()) { |
| // note that this can throw exceptions other than TException |
| return operation.execute(closeable.getThriftClient()); |
| |
| } catch (TException te) { |
| // determine whether we should retry |
| boolean retry = retryStrategy.failOnce(); |
| if (!retry) { |
| // retry strategy is exceeded, throw an operation exception |
| String message = |
| "Thrift error for " + operation + ": " + te.getMessage(); |
| LOG.error(message); |
| LOG.debug(message, te); |
| throw te; |
| } else { |
| // call retry strategy before retrying |
| retryStrategy.beforeRetry(); |
| String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage(); |
| LOG.info(msg); |
| LOG.debug(msg, te); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Transaction startLong() { |
| try { |
| return execute( |
| new Operation<Transaction>("startLong") { |
| @Override |
| public Transaction execute(TransactionServiceThriftClient client) |
| throws TException { |
| return client.startLong(clientId); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public Transaction startShort() { |
| try { |
| return execute( |
| new Operation<Transaction>("startShort") { |
| @Override |
| public Transaction execute(TransactionServiceThriftClient client) |
| throws TException { |
| return client.startShort(clientId); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public Transaction startShort(final int timeout) { |
| try { |
| return execute( |
| new Operation<Transaction>("startShort") { |
| @Override |
| public Transaction execute(TransactionServiceThriftClient client) |
| throws TException { |
| return client.startShort(clientId, timeout); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds) |
| throws TransactionNotInProgressException { |
| |
| try { |
| return execute( |
| new Operation<Boolean>("canCommit") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws Exception { |
| return client.canCommit(tx, changeIds); |
| } |
| }); |
| } catch (TransactionNotInProgressException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds) |
| throws TransactionFailureException { |
| |
| // we want to validate the size of the change set here before sending it over the wire. |
| // if the change set is large, it can cause memory issues on the server side. |
| if (changeIds.size() > changeSetCountLimit) { |
| throw new TransactionSizeException(String.format( |
| "Change set for transaction %d has %d entries and exceeds the limit of %d", |
| tx.getTransactionId(), changeIds.size(), changeSetCountLimit)); |
| } else if (changeIds.size() > changeSetCountThreshold) { |
| LOG.warn("Change set for transaction {} has {} entries. " + |
| "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ", |
| tx.getTransactionId(), changeIds.size(), changeSetCountThreshold); |
| } |
| long byteCount = 0L; |
| for (byte[] change : changeIds) { |
| byteCount += change.length; |
| } |
| if (byteCount > changeSetSizeLimit) { |
| throw new TransactionSizeException(String.format( |
| "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes", |
| tx.getTransactionId(), byteCount, changeSetSizeLimit)); |
| } else if (byteCount > changeSetSizeThreshold) { |
| LOG.warn("Change set for transaction {} has total size of {} bytes. " + |
| "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ", |
| tx.getTransactionId(), byteCount, changeSetSizeThreshold); |
| } |
| |
| try { |
| return execute( |
| new Operation<Boolean>("canCommit") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws Exception { |
| return client.canCommitOrThrow(tx, changeIds); |
| } |
| }); |
| } catch (TransactionNotInProgressException | TransactionSizeException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean commit(final Transaction tx) throws TransactionNotInProgressException { |
| try { |
| return this.execute( |
| new Operation<Boolean>("commit") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws Exception { |
| return client.commit(tx); |
| } |
| }); |
| } catch (TransactionNotInProgressException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public void abort(final Transaction tx) { |
| try { |
| this.execute( |
| new Operation<Boolean>("abort") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws TException { |
| client.abort(tx); |
| return true; |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean invalidate(final long tx) { |
| try { |
| return this.execute( |
| new Operation<Boolean>("invalidate") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws TException { |
| return client.invalidate(tx); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException { |
| try { |
| return this.execute( |
| new Operation<Transaction>("checkpoint") { |
| @Override |
| Transaction execute(TransactionServiceThriftClient client) throws Exception { |
| return client.checkpoint(tx); |
| } |
| } |
| ); |
| } catch (TransactionNotInProgressException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { |
| try { |
| return this.execute( |
| new Operation<InputStream>("takeSnapshot") { |
| @Override |
| public InputStream execute(TransactionServiceThriftClient client) |
| throws Exception { |
| return client.getSnapshotStream(); |
| } |
| }); |
| } catch (TransactionCouldNotTakeSnapshotException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public String status() { |
| try { |
| return this.execute( |
| new Operation<String>("status") { |
| @Override |
| public String execute(TransactionServiceThriftClient client) throws Exception { |
| return client.status(); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public void resetState() { |
| try { |
| this.execute( |
| new Operation<Boolean>("resetState") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) |
| throws TException { |
| client.resetState(); |
| return true; |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean truncateInvalidTx(final Set<Long> invalidTxIds) { |
| try { |
| return this.execute( |
| new Operation<Boolean>("truncateInvalidTx") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) throws TException { |
| return client.truncateInvalidTx(invalidTxIds); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException { |
| try { |
| return this.execute( |
| new Operation<Boolean>("truncateInvalidTxBefore") { |
| @Override |
| public Boolean execute(TransactionServiceThriftClient client) throws Exception { |
| return client.truncateInvalidTxBefore(time); |
| } |
| }); |
| } catch (InvalidTruncateTimeException e) { |
| throw e; |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public int getInvalidSize() { |
| try { |
| return this.execute( |
| new Operation<Integer>("getInvalidSize") { |
| @Override |
| public Integer execute(TransactionServiceThriftClient client) throws TException { |
| return client.getInvalidSize(); |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| @Override |
| public void pruneNow() { |
| try { |
| this.execute( |
| new Operation<Void>("pruneNow") { |
| @Override |
| public Void execute(TransactionServiceThriftClient client) |
| throws TException { |
| client.pruneNow(); |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| |
| } |