| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.impl; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.NamedNode; |
| import org.apache.camel.ThreadPoolRejectedPolicy; |
| import org.apache.camel.model.OptionalIdentifiedDefinition; |
| import org.apache.camel.model.ProcessorDefinition; |
| import org.apache.camel.model.ProcessorDefinitionHelper; |
| import org.apache.camel.model.RouteDefinition; |
| import org.apache.camel.spi.ExecutorServiceManager; |
| import org.apache.camel.spi.LifecycleStrategy; |
| import org.apache.camel.spi.ThreadPoolFactory; |
| import org.apache.camel.spi.ThreadPoolProfile; |
| import org.apache.camel.support.ServiceSupport; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.camel.util.StopWatch; |
| import org.apache.camel.util.TimeUtils; |
| import org.apache.camel.util.URISupport; |
| import org.apache.camel.util.concurrent.CamelThreadFactory; |
| import org.apache.camel.util.concurrent.SizedScheduledExecutorService; |
| import org.apache.camel.util.concurrent.ThreadHelper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * @version |
| */ |
| public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager { |
| private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class); |
| |
| private final CamelContext camelContext; |
| private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); |
| private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>(); |
| private String threadNamePattern; |
| private long shutdownAwaitTermination = 10000; |
| private String defaultThreadPoolProfileId = "defaultThreadPoolProfile"; |
| private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>(); |
| private ThreadPoolProfile defaultProfile; |
| |
| public DefaultExecutorServiceManager(CamelContext camelContext) { |
| this.camelContext = camelContext; |
| |
| defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId); |
| defaultProfile.setDefaultProfile(true); |
| defaultProfile.setPoolSize(10); |
| defaultProfile.setMaxPoolSize(20); |
| defaultProfile.setKeepAliveTime(60L); |
| defaultProfile.setTimeUnit(TimeUnit.SECONDS); |
| defaultProfile.setMaxQueueSize(1000); |
| defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); |
| |
| registerThreadPoolProfile(defaultProfile); |
| } |
| |
| @Override |
| public ThreadPoolFactory getThreadPoolFactory() { |
| return threadPoolFactory; |
| } |
| |
| @Override |
| public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) { |
| this.threadPoolFactory = threadPoolFactory; |
| } |
| |
| @Override |
| public void registerThreadPoolProfile(ThreadPoolProfile profile) { |
| ObjectHelper.notNull(profile, "profile"); |
| ObjectHelper.notEmpty(profile.getId(), "id", profile); |
| threadPoolProfiles.put(profile.getId(), profile); |
| } |
| |
| @Override |
| public ThreadPoolProfile getThreadPoolProfile(String id) { |
| return threadPoolProfiles.get(id); |
| } |
| |
| @Override |
| public ThreadPoolProfile getDefaultThreadPoolProfile() { |
| return getThreadPoolProfile(defaultThreadPoolProfileId); |
| } |
| |
| @Override |
| public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { |
| threadPoolProfiles.remove(defaultThreadPoolProfileId); |
| defaultThreadPoolProfile.addDefaults(defaultProfile); |
| |
| LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile); |
| |
| this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId(); |
| defaultThreadPoolProfile.setDefaultProfile(true); |
| registerThreadPoolProfile(defaultThreadPoolProfile); |
| } |
| |
| @Override |
| public String getThreadNamePattern() { |
| return threadNamePattern; |
| } |
| |
| @Override |
| public void setThreadNamePattern(String threadNamePattern) { |
| // must set camel id here in the pattern and let the other placeholders be resolved on demand |
| String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName()); |
| this.threadNamePattern = name; |
| } |
| |
| @Override |
| public long getShutdownAwaitTermination() { |
| return shutdownAwaitTermination; |
| } |
| |
| @Override |
| public void setShutdownAwaitTermination(long shutdownAwaitTermination) { |
| this.shutdownAwaitTermination = shutdownAwaitTermination; |
| } |
| |
| @Override |
| public String resolveThreadName(String name) { |
| return ThreadHelper.resolveThreadName(threadNamePattern, name); |
| } |
| |
| @Override |
| public Thread newThread(String name, Runnable runnable) { |
| ThreadFactory factory = createThreadFactory(name, true); |
| return factory.newThread(runnable); |
| } |
| |
| @Override |
| public ExecutorService newDefaultThreadPool(Object source, String name) { |
| return newThreadPool(source, name, getDefaultThreadPoolProfile()); |
| } |
| |
| @Override |
| public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) { |
| return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile()); |
| } |
| |
| @Override |
| public ExecutorService newThreadPool(Object source, String name, String profileId) { |
| ThreadPoolProfile profile = getThreadPoolProfile(profileId); |
| if (profile != null) { |
| return newThreadPool(source, name, profile); |
| } else { |
| // no profile with that id |
| return null; |
| } |
| } |
| |
| @Override |
| public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) { |
| String sanitizedName = URISupport.sanitizeUri(name); |
| ObjectHelper.notNull(profile, "ThreadPoolProfile"); |
| |
| ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile(); |
| profile.addDefaults(defaultProfile); |
| |
| ThreadFactory threadFactory = createThreadFactory(sanitizedName, true); |
| ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory); |
| onThreadPoolCreated(executorService, source, profile.getId()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService}); |
| } |
| |
| return executorService; |
| } |
| |
| @Override |
| public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) { |
| ThreadPoolProfile profile = new ThreadPoolProfile(name); |
| profile.setPoolSize(poolSize); |
| profile.setMaxPoolSize(maxPoolSize); |
| return newThreadPool(source, name, profile); |
| } |
| |
| @Override |
| public ExecutorService newSingleThreadExecutor(Object source, String name) { |
| return newFixedThreadPool(source, name, 1); |
| } |
| |
| @Override |
| public ExecutorService newCachedThreadPool(Object source, String name) { |
| String sanitizedName = URISupport.sanitizeUri(name); |
| ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true)); |
| onThreadPoolCreated(answer, source, null); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); |
| } |
| return answer; |
| } |
| |
| @Override |
| public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { |
| ThreadPoolProfile profile = new ThreadPoolProfile(name); |
| profile.setPoolSize(poolSize); |
| profile.setMaxPoolSize(poolSize); |
| profile.setKeepAliveTime(0L); |
| return newThreadPool(source, name, profile); |
| } |
| |
| @Override |
| public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) { |
| return newScheduledThreadPool(source, name, 1); |
| } |
| |
| @Override |
| public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) { |
| String sanitizedName = URISupport.sanitizeUri(name); |
| profile.addDefaults(getDefaultThreadPoolProfile()); |
| ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true)); |
| onThreadPoolCreated(answer, source, null); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); |
| } |
| return answer; |
| } |
| |
| @Override |
| public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) { |
| ThreadPoolProfile profile = getThreadPoolProfile(profileId); |
| if (profile != null) { |
| return newScheduledThreadPool(source, name, profile); |
| } else { |
| // no profile with that id |
| return null; |
| } |
| } |
| |
| @Override |
| public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { |
| ThreadPoolProfile profile = new ThreadPoolProfile(name); |
| profile.setPoolSize(poolSize); |
| return newScheduledThreadPool(source, name, profile); |
| } |
| |
| @Override |
| public void shutdown(ExecutorService executorService) { |
| doShutdown(executorService, 0, false); |
| } |
| |
| @Override |
| public void shutdownGraceful(ExecutorService executorService) { |
| doShutdown(executorService, getShutdownAwaitTermination(), false); |
| } |
| |
| @Override |
| public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { |
| doShutdown(executorService, shutdownAwaitTermination, false); |
| } |
| |
| private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) { |
| if (executorService == null) { |
| return false; |
| } |
| |
| boolean warned = false; |
| |
| // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively |
| // and try shutting down again. In both cases we wait at most the given shutdown timeout value given |
| // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus |
| // we ought to shutdown much faster) |
| if (!executorService.isShutdown()) { |
| StopWatch watch = new StopWatch(); |
| |
| LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); |
| executorService.shutdown(); |
| |
| if (shutdownAwaitTermination > 0) { |
| try { |
| if (!awaitTermination(executorService, shutdownAwaitTermination)) { |
| warned = true; |
| LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); |
| executorService.shutdownNow(); |
| // we are now shutting down aggressively, so wait to see if we can completely shutdown or not |
| if (!awaitTermination(executorService, shutdownAwaitTermination)) { |
| LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); |
| } |
| } |
| } catch (InterruptedException e) { |
| warned = true; |
| LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); |
| // we were interrupted during shutdown, so force shutdown |
| executorService.shutdownNow(); |
| } |
| } |
| |
| // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log |
| if (warned) { |
| LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", |
| new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); |
| } else if (LOG.isDebugEnabled()) { |
| LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", |
| new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); |
| } |
| } |
| |
| // let lifecycle strategy be notified as well which can let it be managed in JMX as well |
| ThreadPoolExecutor threadPool = null; |
| if (executorService instanceof ThreadPoolExecutor) { |
| threadPool = (ThreadPoolExecutor) executorService; |
| } else if (executorService instanceof SizedScheduledExecutorService) { |
| threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); |
| } |
| if (threadPool != null) { |
| for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { |
| lifecycle.onThreadPoolRemove(camelContext, threadPool); |
| } |
| } |
| |
| // remove reference as its shutdown (do not remove if fail-safe) |
| if (!failSafe) { |
| executorServices.remove(executorService); |
| } |
| |
| return warned; |
| } |
| |
| @Override |
| public List<Runnable> shutdownNow(ExecutorService executorService) { |
| return doShutdownNow(executorService, false); |
| } |
| |
| private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) { |
| ObjectHelper.notNull(executorService, "executorService"); |
| |
| List<Runnable> answer = null; |
| if (!executorService.isShutdown()) { |
| if (failSafe) { |
| // log as warn, as we shutdown as fail-safe, so end user should see more details in the log. |
| LOG.warn("Forcing shutdown of ExecutorService: {}", executorService); |
| } else { |
| LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); |
| } |
| answer = executorService.shutdownNow(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", |
| new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); |
| } |
| } |
| |
| // let lifecycle strategy be notified as well which can let it be managed in JMX as well |
| ThreadPoolExecutor threadPool = null; |
| if (executorService instanceof ThreadPoolExecutor) { |
| threadPool = (ThreadPoolExecutor) executorService; |
| } else if (executorService instanceof SizedScheduledExecutorService) { |
| threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); |
| } |
| if (threadPool != null) { |
| for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { |
| lifecycle.onThreadPoolRemove(camelContext, threadPool); |
| } |
| } |
| |
| // remove reference as its shutdown (do not remove if fail-safe) |
| if (!failSafe) { |
| executorServices.remove(executorService); |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { |
| // log progress every 2nd second so end user is aware of we are shutting down |
| StopWatch watch = new StopWatch(); |
| long interval = Math.min(2000, shutdownAwaitTermination); |
| boolean done = false; |
| while (!done && interval > 0) { |
| if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { |
| done = true; |
| } else { |
| LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); |
| // recalculate interval |
| interval = Math.min(2000, shutdownAwaitTermination - watch.taken()); |
| } |
| } |
| |
| return done; |
| } |
| |
| /** |
| * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created. |
| * |
| * @param executorService the created {@link java.util.concurrent.ExecutorService} |
| */ |
| protected void onNewExecutorService(ExecutorService executorService) { |
| // noop |
| } |
| |
| @Override |
| protected void doStart() throws Exception { |
| if (threadNamePattern == null) { |
| // set default name pattern which includes the camel context name |
| threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#"; |
| } |
| } |
| |
| @Override |
| protected void doStop() throws Exception { |
| // noop |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| // shutdown all remainder executor services by looping and doing this aggressively |
| // as by normal all threads pool should have been shutdown using proper lifecycle |
| // by their EIPs, components etc. This is acting as a fail-safe during shutdown |
| // of CamelContext itself. |
| Set<ExecutorService> forced = new LinkedHashSet<ExecutorService>(); |
| if (!executorServices.isEmpty()) { |
| // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also |
| LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size()); |
| for (ExecutorService executorService : executorServices) { |
| try { |
| boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true); |
| // remember the thread pools that was forced to shutdown (eg warned) |
| if (warned) { |
| forced.add(executorService); |
| } |
| } catch (Throwable e) { |
| // only log if something goes wrong as we want to shutdown them all |
| LOG.warn("Error occurred during shutdown of ExecutorService: " |
| + executorService + ". This exception will be ignored.", e); |
| } |
| } |
| } |
| |
| // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his |
| if (!forced.isEmpty()) { |
| LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size()); |
| for (ExecutorService executorService : forced) { |
| LOG.warn(" forced -> {}", executorService); |
| } |
| } |
| forced.clear(); |
| |
| // clear list |
| executorServices.clear(); |
| |
| // do not clear the default profile as we could potential be restarted |
| Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator(); |
| while (it.hasNext()) { |
| ThreadPoolProfile profile = it.next(); |
| if (!profile.isDefaultProfile()) { |
| it.remove(); |
| } |
| } |
| } |
| |
| /** |
| * Invoked when a new thread pool is created. |
| * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext, |
| * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method, |
| * which for example will enlist the thread pool in JMX management. |
| * |
| * @param executorService the thread pool |
| * @param source the source to use the thread pool |
| * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile |
| */ |
| private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) { |
| // add to internal list of thread pools |
| executorServices.add(executorService); |
| |
| String id; |
| String sourceId = null; |
| String routeId = null; |
| |
| // extract id from source |
| if (source instanceof NamedNode) { |
| id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory()); |
| // and let source be the short name of the pattern |
| sourceId = ((NamedNode) source).getShortName(); |
| } else if (source instanceof String) { |
| id = (String) source; |
| } else if (source != null) { |
| // fallback and use the simple class name with hashcode for the id so its unique for this given source |
| id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")"; |
| } else { |
| // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique |
| id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")"; |
| } |
| |
| // id is mandatory |
| ObjectHelper.notEmpty(id, "id for thread pool " + executorService); |
| |
| // extract route id if possible |
| if (source instanceof ProcessorDefinition) { |
| RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source); |
| if (route != null) { |
| routeId = route.idOrCreate(this.camelContext.getNodeIdFactory()); |
| } |
| } |
| |
| // let lifecycle strategy be notified as well which can let it be managed in JMX as well |
| ThreadPoolExecutor threadPool = null; |
| if (executorService instanceof ThreadPoolExecutor) { |
| threadPool = (ThreadPoolExecutor) executorService; |
| } else if (executorService instanceof SizedScheduledExecutorService) { |
| threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); |
| } |
| if (threadPool != null) { |
| for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { |
| lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); |
| } |
| } |
| |
| // now call strategy to allow custom logic |
| onNewExecutorService(executorService); |
| } |
| |
| private ThreadFactory createThreadFactory(String name, boolean isDaemon) { |
| ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon); |
| return threadFactory; |
| } |
| |
| } |