blob: 4325ffa9e344ac7986278814bf17fdb8857cbd3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool.manager;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
/**
* Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
*/
public class DefaultExecutorRepository implements ExecutorRepository {
private static final Logger logger = LoggerFactory.getLogger(DefaultExecutorRepository.class);
private int DEFAULT_SCHEDULER_SIZE = Runtime.getRuntime().availableProcessors();
private final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
private ScheduledExecutorService serviceExporterExecutor;
private ScheduledExecutorService reconnectScheduledExecutor;
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
public DefaultExecutorRepository() {
for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
scheduledExecutors.addItem(scheduler);
}
//
// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
}
/**
* Get called when the server or client instance initiating.
*
* @param url
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
Map<Integer, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);
}
return executor;
}
public ExecutorService getExecutor(URL url) {
Map<Integer, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}
//issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {
return SHARED_EXECUTOR;
} else {
return executor;
}
}
@Override
public void updateThreadpool(URL url, ExecutorService executor) {
try {
if (url.hasParameter(THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Override
public ScheduledExecutorService nextScheduledExecutor() {
return scheduledExecutors.pollItem();
}
@Override
public ScheduledExecutorService getServiceExporterExecutor() {
return serviceExporterExecutor;
}
@Override
public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}
@Override
public void destroyAll() {
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
if (executor != null && !executor.isShutdown()) {
ExecutorUtil.shutdownNow(executor, 100);
}
});
}
});
}
private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
}