blob: 597c88662b8e799ab4f2379f84d1a4dd62840a5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.util;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides 2 things over the java {@link ScheduledExecutorService}.
*
* 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
* This means that exceptions in scheduled tasks wont go unnoticed and will be
* logged.
*
* 2. It supports submitting tasks with an ordering key, so that tasks submitted
* with the same key will always be executed in order, but tasks across
* different keys can be unordered. This retains parallelism while retaining the
* basic amount of ordering we want (e.g. , per ledger handle). Ordering is
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
*
*/
public class OrderedSafeExecutor {
final ExecutorService threads[];
final long threadIds[];
final Random rand = new Random();
/**
* Constructs Safe executor
*
* @param numThreads
* - number of threads
* @param threadName
* - name of the thread
*/
public OrderedSafeExecutor(int numThreads, String threadName) {
if (numThreads <= 0) {
throw new IllegalArgumentException();
}
if (StringUtils.isBlank(threadName)) {
// sets default name
threadName = "OrderedSafeExecutor";
}
threads = new ExecutorService[numThreads];
threadIds = new long[numThreads];
for (int i = 0; i < numThreads; i++) {
StringBuilder thName = new StringBuilder(threadName);
thName.append("-");
thName.append(i);
thName.append("-%d");
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
.setNameFormat(thName.toString());
threads[i] = Executors.newSingleThreadExecutor(tfb.build());
final int tid = i;
try {
threads[i].submit(new SafeRunnable() {
@Override
public void safeRun() {
threadIds[tid] = Thread.currentThread().getId();
}
}).get();
} catch (InterruptedException e) {
throw new RuntimeException("Couldn't start thread " + i, e);
} catch (ExecutionException e) {
throw new RuntimeException("Couldn't start thread " + i, e);
}
}
}
ExecutorService chooseThread() {
// skip random # generation in this special case
if (threads.length == 1) {
return threads[0];
}
return threads[rand.nextInt(threads.length)];
}
ExecutorService chooseThread(Object orderingKey) {
// skip hashcode generation in this special case
if (threads.length == 1) {
return threads[0];
}
return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
}
/**
* schedules a one time action to execute
*/
public void submit(SafeRunnable r) {
chooseThread().submit(r);
}
/**
* schedules a one time action to execute with an ordering guarantee on the key
* @param orderingKey
* @param r
*/
public void submitOrdered(Object orderingKey, SafeRunnable r) {
chooseThread(orderingKey).submit(r);
}
private long getThreadID(Object orderingKey) {
// skip hashcode generation in this special case
if (threadIds.length == 1) {
return threadIds[0];
}
return threadIds[MathUtils.signSafeMod(orderingKey.hashCode(), threadIds.length)];
}
public void shutdown() {
for (int i = 0; i < threads.length; i++) {
threads[i].shutdown();
}
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean ret = true;
for (int i = 0; i < threads.length; i++) {
ret = ret && threads[i].awaitTermination(timeout, unit);
}
return ret;
}
/**
* Generic callback implementation which will run the
* callback in the thread which matches the ordering key
*/
public static abstract class OrderedSafeGenericCallback<T>
implements GenericCallback<T> {
private final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class);
private final OrderedSafeExecutor executor;
private final Object orderingKey;
/**
* @param executor The executor on which to run the callback
* @param orderingKey Key used to decide which thread the callback
* should run on.
*/
public OrderedSafeGenericCallback(OrderedSafeExecutor executor, Object orderingKey) {
this.executor = executor;
this.orderingKey = orderingKey;
}
@Override
public final void operationComplete(final int rc, final T result) {
// during closing, callbacks that are error out might try to submit to
// the scheduler again. if the submission will go to same thread, we
// don't need to submit to executor again. this is also an optimization for
// callback submission
if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) {
safeOperationComplete(rc, result);
} else {
try {
executor.submitOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
safeOperationComplete(rc, result);
}
});
} catch (RejectedExecutionException re) {
LOG.warn("Failed to submit callback for {} : ", orderingKey, re);
}
}
}
public abstract void safeOperationComplete(int rc, T result);
}
}