blob: caf7f48ab758d552677a5fc5acda3818ba6294dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.pd.util;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.pd.common.HgAssert;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class HgExecutorUtil {
private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();
private static final Executor COMMON_EXECUTOR
= new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
newThreadFactory("pd-common"));
public static void execute(Runnable command) {
if (command == null) {
return;
}
COMMON_EXECUTOR.execute(command);
}
public static ThreadFactory newThreadFactory(String namePrefix, int priority) {
HgAssert.isArgumentNotNull(namePrefix, "namePrefix");
return new HgThreadFactory(namePrefix, priority);
}
public static ThreadFactory newThreadFactory(String namePrefix) {
HgAssert.isArgumentNotNull(namePrefix, "namePrefix");
return new HgDefaultThreadFactory(namePrefix);
}
public static ThreadPoolExecutor getThreadPoolExecutor(String name) {
if (name == null) {
return null;
}
return EXECUTOR_MAP.get(name);
}
/**
* @see HgExecutorUtil:createExecutor(String , int , int , int )
*/
@Deprecated
public static Executor createExecutor(String name, int coreThreads, int maxThreads) {
/* ThreadPoolExecutor res =
new ThreadPoolExecutor(coreThreads, maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
newThreadFactory(name));
if (threadPoolMap.containsKey(name)) {
threadPoolMap.put(name + "-1", res);
} else {
threadPoolMap.put(name, res);
}*/
return createExecutor(name, coreThreads, maxThreads, Integer.MAX_VALUE);
}
public static ThreadPoolExecutor createExecutor(String name, int coreThreads, int maxThreads,
int queueSize) {
ThreadPoolExecutor res = EXECUTOR_MAP.get(name);
if (res != null) {
return res;
}
synchronized (EXECUTOR_MAP) {
res = EXECUTOR_MAP.get(name);
if (res != null) {
return res;
}
BlockingQueue queue = null;
if (queueSize <= 0) {
queue = new SynchronousQueue();
} else {
queue = new LinkedBlockingQueue<>(queueSize);
}
res = new ThreadPoolExecutor(
coreThreads,
maxThreads,
60L, TimeUnit.SECONDS,
queue,
newThreadFactory(name)
);
EXECUTOR_MAP.put(name, res);
}
return res;
}
/**
* The default thread factory
*/
static class HgThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final int priority;
HgThreadFactory(String namePrefix, int priority) {
this.namePrefix = namePrefix;
this.priority = priority;
SecurityManager s = System.getSecurityManager();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(null, r,
namePrefix + "-" + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
return t;
}
}
/**
* The default thread factory, which added threadNamePrefix in construction method.
*/
static class HgDefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
HgDefaultThreadFactory(String threadNamePrefix) {
SecurityManager s = System.getSecurityManager();
this.namePrefix = threadNamePrefix + "-" +
POOL_NUMBER.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(null, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}