blob: 3aef89ba10c9db4b200df2b3093af86ed02844b4 [file] [log] [blame]
/*
* Copyright © 2012-2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* Utility class that encapsulates the transaction life cycle over a given set of
* transaction-aware datasets. The executor can be reused across multiple invocations
* of the execute() method. However, it is not thread-safe for concurrent execution.
* <p>
* Transaction execution will be retries according to specified in constructor {@link RetryStrategy}.
* By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries.
* </p>
*/
public class DefaultTransactionExecutor extends AbstractTransactionExecutor {
private final Collection<TransactionAware> txAwares;
private final TransactionSystemClient txClient;
private final RetryStrategy retryStrategy;
/**
* Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)}
*/
public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) {
this(txClient, Arrays.asList(txAwares));
}
public DefaultTransactionExecutor(TransactionSystemClient txClient,
Iterable<TransactionAware> txAwares,
RetryStrategy retryStrategy) {
super(MoreExecutors.sameThreadExecutor());
this.txAwares = ImmutableList.copyOf(txAwares);
this.txClient = txClient;
this.retryStrategy = retryStrategy;
}
/**
* Constructor for a transaction executor.
*/
@Inject
public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) {
this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100));
}
@Override
public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException {
return executeWithRetry(function, input);
}
@Override
public <I> void execute(final Procedure<I> procedure, I input)
throws TransactionFailureException, InterruptedException {
execute(new Function<I, Void>() {
@Override
public Void apply(I input) throws Exception {
procedure.apply(input);
return null;
}
}, input);
}
@Override
public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException {
return execute(new Function<Void, O>() {
@Override
public O apply(Void input) throws Exception {
return callable.call();
}
}, null);
}
@Override
public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException {
execute(new Function<Void, Void>() {
@Override
public Void apply(Void input) throws Exception {
subroutine.apply();
return null;
}
}, null);
}
private <I, O> O executeWithRetry(Function<I, O> function, I input)
throws TransactionFailureException, InterruptedException {
int retries = 0;
while (true) {
try {
return executeOnce(function, input);
} catch (TransactionFailureException e) {
long delay = retryStrategy.nextRetry(e, ++retries);
if (delay < 0) {
throw e;
}
if (delay > 0) {
TimeUnit.MILLISECONDS.sleep(delay);
}
}
}
}
private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException {
TransactionContext txContext = new TransactionContext(txClient, txAwares);
txContext.start();
O o = null;
try {
o = function.apply(input);
} catch (Throwable e) {
txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e));
// abort will throw
}
// will throw if smth goes wrong
txContext.finish();
return o;
}
}