| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.common.threadpool.manager; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.logger.Logger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.threadpool.ThreadPool; |
| import org.apache.dubbo.common.utils.NamedThreadFactory; |
| |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadPoolExecutor; |
| |
| import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE; |
| import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY; |
| import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY; |
| |
| /** |
| * Consider implementing {@code Licycle} to enable executors shutdown when the process stops. |
| */ |
| public class DefaultExecutorRepository implements ExecutorRepository { |
| private static final Logger logger = LoggerFactory.getLogger(DefaultExecutorRepository.class); |
| |
| private int DEFAULT_SCHEDULER_SIZE = Runtime.getRuntime().availableProcessors(); |
| |
| private final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); |
| |
| private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>(); |
| |
| private ScheduledExecutorService serviceExporterExecutor; |
| |
| public ScheduledExecutorService registryNotificationExecutor; |
| |
| private ScheduledExecutorService reconnectScheduledExecutor; |
| |
| private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>(); |
| |
| public DefaultExecutorRepository() { |
| // for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) { |
| // ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler")); |
| // scheduledExecutors.addItem(scheduler); |
| // } |
| // |
| // reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler")); |
| serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler")); |
| registryNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-registry-notification")); |
| } |
| |
| /** |
| * Get called when the server or client instance initiating. |
| * |
| * @param url |
| * @return |
| */ |
| public synchronized ExecutorService createExecutorIfAbsent(URL url) { |
| String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; |
| if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { |
| componentKey = CONSUMER_SIDE; |
| } |
| Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>()); |
| Integer portKey = url.getPort(); |
| ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); |
| // If executor has been shut down, create a new one |
| if (executor.isShutdown() || executor.isTerminated()) { |
| executors.remove(portKey); |
| executor = createExecutor(url); |
| executors.put(portKey, executor); |
| } |
| return executor; |
| } |
| |
| public ExecutorService getExecutor(URL url) { |
| String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; |
| if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { |
| componentKey = CONSUMER_SIDE; |
| } |
| Map<Integer, ExecutorService> executors = data.get(componentKey); |
| |
| /** |
| * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already |
| * have Executor instances generated and stored. |
| */ |
| if (executors == null) { |
| logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " + |
| "before coming to here."); |
| return null; |
| } |
| |
| Integer portKey = url.getPort(); |
| ExecutorService executor = executors.get(portKey); |
| if (executor != null) { |
| if (executor.isShutdown() || executor.isTerminated()) { |
| executors.remove(portKey); |
| executor = createExecutor(url); |
| executors.put(portKey, executor); |
| } |
| } |
| return executor; |
| } |
| |
| @Override |
| public void updateThreadpool(URL url, ExecutorService executor) { |
| try { |
| if (url.hasParameter(THREADS_KEY) |
| && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { |
| ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; |
| int threads = url.getParameter(THREADS_KEY, 0); |
| int max = threadPoolExecutor.getMaximumPoolSize(); |
| int core = threadPoolExecutor.getCorePoolSize(); |
| if (threads > 0 && (threads != max || threads != core)) { |
| if (threads < core) { |
| threadPoolExecutor.setCorePoolSize(threads); |
| if (core == max) { |
| threadPoolExecutor.setMaximumPoolSize(threads); |
| } |
| } else { |
| threadPoolExecutor.setMaximumPoolSize(threads); |
| if (core == max) { |
| threadPoolExecutor.setCorePoolSize(threads); |
| } |
| } |
| } |
| } |
| } catch (Throwable t) { |
| logger.error(t.getMessage(), t); |
| } |
| } |
| |
| @Override |
| public ScheduledExecutorService nextScheduledExecutor() { |
| return scheduledExecutors.pollItem(); |
| } |
| |
| @Override |
| public ScheduledExecutorService getServiceExporterExecutor() { |
| return serviceExporterExecutor; |
| } |
| |
| @Override |
| public ScheduledExecutorService getRegistryNotificationExecutor() { |
| return registryNotificationExecutor; |
| } |
| |
| @Override |
| public ExecutorService getSharedExecutor() { |
| return SHARED_EXECUTOR; |
| } |
| |
| private ExecutorService createExecutor(URL url) { |
| return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); |
| } |
| |
| } |