blob: 1861bd63f550f51c4b3e2340efca51b0a17c49ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class PrioritizedExecutorService extends AbstractExecutorService implements ListeningExecutorService
{
public static PrioritizedExecutorService create(Lifecycle lifecycle, DruidProcessingConfig config)
{
final PrioritizedExecutorService service = new PrioritizedExecutorService(
new ThreadPoolExecutor(
config.getNumThreads(),
config.getNumThreads(),
0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
),
config
);
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
}
@Override
public void stop()
{
service.shutdownNow();
}
}
);
return service;
}
private final AtomicLong queuePosition = new AtomicLong(Long.MAX_VALUE);
private final ListeningExecutorService delegate;
private final BlockingQueue<Runnable> delegateQueue;
private final boolean allowRegularTasks;
private final int defaultPriority;
private final DruidProcessingConfig config;
final ThreadPoolExecutor threadPoolExecutor; // Used in unit tests
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor,
DruidProcessingConfig config
)
{
this(threadPoolExecutor, false, 0, config);
}
public PrioritizedExecutorService(
ThreadPoolExecutor threadPoolExecutor,
boolean allowRegularTasks,
int defaultPriority,
DruidProcessingConfig config
)
{
this.threadPoolExecutor = threadPoolExecutor;
this.delegate = MoreExecutors.listeningDecorator(Preconditions.checkNotNull(threadPoolExecutor));
this.delegateQueue = threadPoolExecutor.getQueue();
this.allowRegularTasks = allowRegularTasks;
this.defaultPriority = defaultPriority;
this.config = config;
}
@Override
protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Runnable runnable, T value)
{
Preconditions.checkArgument(
allowRegularTasks || runnable instanceof PrioritizedRunnable,
"task does not implement PrioritizedRunnable"
);
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(runnable, value),
runnable instanceof PrioritizedRunnable
? ((PrioritizedRunnable) runnable).getPriority()
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}
@Override
protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Callable<T> callable)
{
Preconditions.checkArgument(
allowRegularTasks || callable instanceof PrioritizedCallable,
"task does not implement PrioritizedCallable"
);
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(callable),
callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority()
: defaultPriority,
config.isFifo() ? queuePosition.decrementAndGet() : 0
);
}
@Override
public ListenableFuture<?> submit(Runnable task)
{
return (ListenableFuture<?>) super.submit(task);
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result)
{
return (ListenableFuture<T>) super.submit(task, result);
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task)
{
return (ListenableFuture<T>) super.submit(task);
}
@Override
public void shutdown()
{
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return delegate.shutdownNow();
}
@Override
public boolean isShutdown()
{
return delegate.isShutdown();
}
@Override
public boolean isTerminated()
{
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return delegate.awaitTermination(l, timeUnit);
}
@Override
public void execute(final Runnable runnable)
{
delegate.execute(runnable);
}
public int getQueueSize()
{
return delegateQueue.size();
}
}
class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
{
// NOTE: For priority HIGHER numeric value means more priority. As such we swap left and right in the compares
private static final Comparator<PrioritizedListenableFutureTask> PRIORITY_COMPARATOR = new Ordering<PrioritizedListenableFutureTask>()
{
@Override
public int compare(PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right)
{
return Integer.compare(right.getPriority(), left.getPriority());
}
}.compound(
new Ordering<PrioritizedListenableFutureTask>()
{
@Override
public int compare(PrioritizedListenableFutureTask left, PrioritizedListenableFutureTask right)
{
return Long.compare(right.getInsertionPlace(), left.getInsertionPlace());
}
}
);
public static <V> PrioritizedListenableFutureTask<V> create(ListenableFutureTask<V> task, int priority, long position)
{
return new PrioritizedListenableFutureTask<>(task, priority, position);
}
private final ListenableFutureTask<V> delegate;
private final int priority;
private final long insertionPlace;
PrioritizedListenableFutureTask(ListenableFutureTask<V> delegate, int priority, long position)
{
this.delegate = delegate;
this.priority = priority;
this.insertionPlace = position; // Long.MAX_VALUE will always be "highest"
}
@Override
public void run()
{
delegate.run();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
@Override
public boolean isDone()
{
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
@Override
public void addListener(Runnable listener, Executor executor)
{
delegate.addListener(listener, executor);
}
@Override
public int getPriority()
{
return priority;
}
protected long getInsertionPlace()
{
return insertionPlace;
}
@Override
public int compareTo(PrioritizedListenableFutureTask otherTask)
{
return PRIORITY_COMPARATOR.compare(this, otherTask);
}
}