blob: 1257f2ebc51ab6a052c4272cc0df287c0bbe7943 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.common.util;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.stats.StatsLogger;
/**
* This class provides 2 things over the java {@link ScheduledExecutorService}.
*
* <p>1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
* This means that exceptions in scheduled tasks wont go unnoticed and will be
* logged.
*
* <p>2. It supports submitting tasks with an ordering key, so that tasks submitted
* with the same key will always be executed in order, but tasks across
* different keys can be unordered. This retains parallelism while retaining the
* basic amount of ordering we want (e.g. , per ledger handle). Ordering is
* achieved by hashing the key objects to threads by their {@link #hashCode()}
* method.
*/
public class OrderedScheduler extends OrderedExecutor implements ScheduledExecutorService {
/**
* Create a builder to build ordered scheduler.
*
* @return builder to build ordered scheduler.
*/
public static SchedulerBuilder newSchedulerBuilder() {
return new SchedulerBuilder();
}
/**
* Builder to build ordered scheduler.
*/
public static class SchedulerBuilder extends OrderedExecutor.AbstractBuilder<OrderedScheduler> {
@Override
public OrderedScheduler build() {
if (null == threadFactory) {
threadFactory = new DefaultThreadFactory(name);
}
return new OrderedScheduler(
name,
numThreads,
threadFactory,
statsLogger,
traceTaskExecution,
preserveMdcForTaskExecution,
warnTimeMicroSec,
maxTasksInQueue);
}
}
/**
* Constructs Safe executor.
*
* @param numThreads
* - number of threads
* @param baseName
* - base name of executor threads
* @param threadFactory
* - for constructing threads
* @param statsLogger
* - for reporting executor stats
* @param traceTaskExecution
* - should we stat task execution
* @param preserveMdcForTaskExecution
* - should we preserve MDC for task execution
* @param warnTimeMicroSec
* - log long task exec warning after this interval
*/
private OrderedScheduler(String baseName,
int numThreads,
ThreadFactory threadFactory,
StatsLogger statsLogger,
boolean traceTaskExecution,
boolean preserveMdcForTaskExecution,
long warnTimeMicroSec,
int maxTasksInQueue) {
super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution,
preserveMdcForTaskExecution, warnTimeMicroSec, maxTasksInQueue, false /* enableBusyWait */);
}
@Override
protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
return new ScheduledThreadPoolExecutor(1, factory);
}
@Override
protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue);
}
@Override
protected ListeningScheduledExecutorService addExecutorDecorators(ExecutorService executor) {
return new OrderedSchedulerDecoratedThread((ListeningScheduledExecutorService) executor);
}
@Override
public ListeningScheduledExecutorService chooseThread() {
return (ListeningScheduledExecutorService) super.chooseThread();
}
@Override
public ListeningScheduledExecutorService chooseThread(Object orderingKey) {
return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
}
@Override
public ListeningScheduledExecutorService chooseThread(long orderingKey) {
return (ListeningScheduledExecutorService) super.chooseThread(orderingKey);
}
/**
* schedules a one time action to execute with an ordering guarantee on the key.
*
* @param orderingKey
* @param callable
*/
public <T> ListenableFuture<T> submitOrdered(Object orderingKey,
Callable<T> callable) {
return chooseThread(orderingKey).submit(callable);
}
/**
* Creates and executes a one-shot action that becomes enabled after the given delay.
*
* @param command - the SafeRunnable to execute
* @param delay - the time from now to delay execution
* @param unit - the time unit of the delay parameter
* @return a ScheduledFuture representing pending completion of the task and whose get() method
* will return null upon completion
*/
public ScheduledFuture<?> schedule(SafeRunnable command, long delay, TimeUnit unit) {
return chooseThread().schedule(timedRunnable(command), delay, unit);
}
/**
* Creates and executes a one-shot action that becomes enabled after the given delay.
*
* @param orderingKey - the key used for ordering
* @param command - the SafeRunnable to execute
* @param delay - the time from now to delay execution
* @param unit - the time unit of the delay parameter
* @return a ScheduledFuture representing pending completion of the task and whose get() method
* will return null upon completion
*/
public ScheduledFuture<?> scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) {
return chooseThread(orderingKey).schedule(command, delay, unit);
}
/**
* Creates and executes a periodic action that becomes enabled first after
* the given initial delay, and subsequently with the given period.
*
* <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
*
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
* @param period - the period between successive executions
* @param unit - the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of the task, and whose get()
* method will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) {
return chooseThread().scheduleAtFixedRate(timedRunnable(command), initialDelay, period, unit);
}
/**
* Creates and executes a periodic action that becomes enabled first after
* the given initial delay, and subsequently with the given period.
*
* <p>For more details check {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
*
* @param orderingKey - the key used for ordering
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
* @param period - the period between successive executions
* @param unit - the time unit of the initialDelay and period parameters
* @return a ScheduledFuture representing pending completion of the task, and whose get() method
* will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
long period, TimeUnit unit) {
return chooseThread(orderingKey).scheduleAtFixedRate(command, initialDelay, period, unit);
}
/**
* Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
* with the given delay between the termination of one execution and the commencement of the next.
*
* <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
* .
*
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
* @param delay - the delay between the termination of one execution and the commencement of the next
* @param unit - the time unit of the initialDelay and delay parameters
* @return a ScheduledFuture representing pending completion of the task, and whose get() method
* will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay,
TimeUnit unit) {
return chooseThread().scheduleWithFixedDelay(timedRunnable(command), initialDelay, delay, unit);
}
/**
* Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently
* with the given delay between the termination of one execution and the commencement of the next.
*
* <p>For more details check {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}
* .
*
* @param orderingKey - the key used for ordering
* @param command - the SafeRunnable to execute
* @param initialDelay - the time to delay first execution
* @param delay - the delay between the termination of one execution and the commencement of the next
* @param unit - the time unit of the initialDelay and delay parameters
* @return a ScheduledFuture representing pending completion of the task, and whose get() method
* will throw an exception upon cancellation
*/
public ScheduledFuture<?> scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay,
long delay, TimeUnit unit) {
return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
//
// Methods for implementing {@link ScheduledExecutorService}
//
/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return chooseThread().schedule(timedRunnable(command), delay, unit);
}
/**
* {@inheritDoc}
*/
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return chooseThread().schedule(timedCallable(callable), delay, unit);
}
/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
return chooseThread().scheduleAtFixedRate(timedRunnable(command), initialDelay, period, unit);
}
/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit) {
return chooseThread().scheduleWithFixedDelay(timedRunnable(command), initialDelay, delay, unit);
}
class OrderedSchedulerDecoratedThread extends ForwardingListeningExecutorService
implements ListeningScheduledExecutorService {
private final ListeningScheduledExecutorService delegate;
private OrderedSchedulerDecoratedThread(ListeningScheduledExecutorService delegate) {
this.delegate = delegate;
}
@Override
protected ListeningExecutorService delegate() {
return delegate;
}
@Override
public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(timedRunnable(command), delay, unit);
}
@Override
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(timedCallable(callable), delay, unit);
}
@Override
public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(timedRunnable(command), initialDelay, period, unit);
}
@Override
public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleAtFixedRate(timedRunnable(command), initialDelay, delay, unit);
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
return super.submit(timedCallable(task));
}
@Override
public ListenableFuture<?> submit(Runnable task) {
return super.submit(timedRunnable(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return super.invokeAll(timedCallables(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
return super.invokeAll(timedCallables(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return super.invokeAny(timedCallables(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(timedCallables(tasks), timeout, unit);
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
return super.submit(timedRunnable(task), result);
}
@Override
public void execute(Runnable command) {
super.execute(timedRunnable(command));
}
}
}