blob: 761dba63b3fba664a02782c3304f3ad74e4ccf92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.thread;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.*;
import java.util.concurrent.*;
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
*/
public class IgniteStripedThreadPoolExecutor implements ExecutorService {
/** */
public static final int DFLT_SEG_POOL_SIZE = 8;
/** */
public static final int DFLT_CONCUR_LVL = 16;
/** */
private final ExecutorService[] execs;
/** */
private final int segShift;
/** */
private final int segMask;
/**
*
*/
public IgniteStripedThreadPoolExecutor() {
execs = new ExecutorService[DFLT_CONCUR_LVL];
ThreadFactory factory = new IgniteThreadFactory(null);
for (int i = 0; i < DFLT_CONCUR_LVL; i++)
execs[i] = Executors.newFixedThreadPool(DFLT_SEG_POOL_SIZE, factory);
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < DFLT_CONCUR_LVL) {
++sshift;
ssize <<= 1;
}
segShift = 32 - sshift;
segMask = ssize - 1;
}
/** {@inheritDoc} */
@Override public void shutdown() {
for (ExecutorService exec : execs)
exec.shutdown();
}
/** {@inheritDoc} */
@Override public List<Runnable> shutdownNow() {
List<Runnable> res = new LinkedList<>();
for (ExecutorService exec : execs) {
for (Runnable r : exec.shutdownNow())
res.add(r);
}
return res;
}
/** {@inheritDoc} */
@Override public boolean isShutdown() {
for (ExecutorService exec : execs) {
if (!exec.isShutdown())
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public boolean isTerminated() {
for (ExecutorService exec : execs) {
if (!exec.isTerminated())
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean res = true;
for (ExecutorService exec : execs)
res &= exec.awaitTermination(timeout, unit);
return res;
}
/** {@inheritDoc} */
@Override public <T> Future<T> submit(Callable<T> task) {
return execForTask(task).submit(task);
}
/** {@inheritDoc} */
@Override public <T> Future<T> submit(Runnable task, T result) {
return execForTask(task).submit(task, result);
}
/** {@inheritDoc} */
@Override public Future<?> submit(Runnable task) {
return execForTask(task).submit(task);
}
/** {@inheritDoc} */
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futs = new LinkedList<>();
for (Callable<T> task : tasks)
futs.add(execForTask(task).submit(task));
boolean done = false;
try {
for (Future<T> fut : futs) {
try {
fut.get();
}
catch (ExecutionException | InterruptedException ignored) {
// No-op.
}
}
done = true;
return futs;
}
finally {
if (!done) {
for (Future<T> fut : futs)
fut.cancel(true);
}
}
}
/** {@inheritDoc} */
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
throw new RuntimeException("Not implemented.");
}
/** {@inheritDoc} */
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
ExecutionException {
throw new RuntimeException("Not implemented.");
}
/** {@inheritDoc} */
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new RuntimeException("Not implemented.");
}
/** {@inheritDoc} */
@Override public void execute(Runnable cmd) {
execForTask(cmd).execute(cmd);
}
/**
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical
* because ConcurrentHashMap uses power-of-two length hash tables,
* that otherwise encounter collisions for hashCodes that do not
* differ in lower or upper bits.
*
* @param h Hash code.
* @return Enhanced hash code.
*/
private int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
/**
* @param cmd Command.
* @return Service.
*/
private <T> ExecutorService execForTask(T cmd) {
assert cmd != null;
//return execs[ThreadLocalRandom8.current().nextInt(DFLT_CONCUR_LVL)];
return execs[(hash(cmd.hashCode()) >>> segShift) & segMask];
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteStripedThreadPoolExecutor.class, this);
}
}