| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cxf.workqueue; |
| |
| import java.beans.PropertyChangeEvent; |
| import java.beans.PropertyChangeListener; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.Method; |
| import java.security.AccessController; |
| import java.security.PrivilegedAction; |
| import java.text.NumberFormat; |
| import java.util.ArrayList; |
| import java.util.Dictionary; |
| import java.util.Hashtable; |
| import java.util.List; |
| import java.util.concurrent.DelayQueue; |
| import java.util.concurrent.Delayed; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.apache.cxf.common.classloader.ClassLoaderUtils; |
| import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder; |
| import org.apache.cxf.common.injection.NoJSR250Annotations; |
| import org.apache.cxf.common.logging.LogUtils; |
| import org.apache.cxf.common.util.ReflectionUtil; |
| |
| @NoJSR250Annotations |
| public class AutomaticWorkQueueImpl implements AutomaticWorkQueue { |
| public static final String PROPERTY_NAME = "name"; |
| static final int DEFAULT_MAX_QUEUE_SIZE = 256; |
| private static final Logger LOG = |
| LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class); |
| |
| |
| String name = "default"; |
| int maxQueueSize; |
| int initialThreads; |
| int lowWaterMark; |
| int highWaterMark; |
| long dequeueTimeout; |
| AtomicInteger approxThreadCount = new AtomicInteger(); |
| |
| ThreadPoolExecutor executor; |
| Method addWorkerMethod; |
| Object[] addWorkerArgs; |
| |
| AWQThreadFactory threadFactory; |
| ReentrantLock mainLock; |
| final ReentrantLock addThreadLock = new ReentrantLock(); |
| |
| DelayQueue<DelayedTaskWrapper> delayQueue; |
| WatchDog watchDog; |
| |
| boolean shared; |
| int sharedCount; |
| |
| private List<PropertyChangeListener> changeListenerList; |
| |
| public AutomaticWorkQueueImpl() { |
| this(DEFAULT_MAX_QUEUE_SIZE); |
| } |
| public AutomaticWorkQueueImpl(String name) { |
| this(DEFAULT_MAX_QUEUE_SIZE, name); |
| } |
| public AutomaticWorkQueueImpl(int max) { |
| this(max, "default"); |
| } |
| public AutomaticWorkQueueImpl(int max, String name) { |
| this(max, |
| 0, |
| 25, |
| 5, |
| 2 * 60 * 1000L, |
| name); |
| } |
| public AutomaticWorkQueueImpl(int mqs, |
| int initialThreads, |
| int highWaterMark, |
| int lowWaterMark, |
| long dequeueTimeout) { |
| this(mqs, initialThreads, highWaterMark, lowWaterMark, dequeueTimeout, "default"); |
| } |
| public AutomaticWorkQueueImpl(int mqs, |
| int initialThreads, |
| int highWaterMark, |
| int lowWaterMark, |
| long dequeueTimeout, |
| String name) { |
| this.maxQueueSize = mqs == -1 ? DEFAULT_MAX_QUEUE_SIZE : mqs; |
| this.initialThreads = initialThreads; |
| this.highWaterMark = -1 == highWaterMark ? Integer.MAX_VALUE : highWaterMark; |
| this.lowWaterMark = -1 == lowWaterMark ? Integer.MAX_VALUE : lowWaterMark; |
| this.dequeueTimeout = dequeueTimeout; |
| this.name = name; |
| this.changeListenerList = new ArrayList<>(); |
| } |
| |
| public void addChangeListener(PropertyChangeListener listener) { |
| this.changeListenerList.add(listener); |
| } |
| |
| public void removeChangeListener(PropertyChangeListener listener) { |
| this.changeListenerList.remove(listener); |
| } |
| |
| public void notifyChangeListeners(PropertyChangeEvent event) { |
| for (PropertyChangeListener listener : changeListenerList) { |
| listener.propertyChange(event); |
| } |
| } |
| |
| public void setShared(boolean shared) { |
| this.shared = shared; |
| } |
| public boolean isShared() { |
| return shared; |
| } |
| public void addSharedUser() { |
| sharedCount++; |
| } |
| public void removeSharedUser() { |
| sharedCount--; |
| } |
| public int getShareCount() { |
| return sharedCount; |
| } |
| |
| protected synchronized ThreadPoolExecutor getExecutor() { |
| if (executor == null) { |
| threadFactory = createThreadFactory(name); |
| executor = new ThreadPoolExecutor(lowWaterMark, |
| highWaterMark, |
| TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), |
| TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<Runnable>(maxQueueSize), |
| threadFactory) { |
| @Override |
| protected void terminated() { |
| ThreadFactory f = executor.getThreadFactory(); |
| if (f instanceof AWQThreadFactory) { |
| ((AWQThreadFactory)f).shutdown(); |
| } |
| if (watchDog != null) { |
| watchDog.shutdown(); |
| } |
| } |
| }; |
| |
| |
| if (LOG.isLoggable(Level.FINE)) { |
| StringBuilder buf = new StringBuilder(128).append("Constructing automatic work queue with:\n") |
| .append("max queue size: ").append(maxQueueSize).append('\n') |
| .append("initialThreads: ").append(initialThreads).append('\n') |
| .append("lowWaterMark: ").append(lowWaterMark).append('\n') |
| .append("highWaterMark: ").append(highWaterMark).append('\n'); |
| LOG.fine(buf.toString()); |
| } |
| |
| if (initialThreads > highWaterMark) { |
| initialThreads = highWaterMark; |
| } |
| |
| // as we cannot prestart more core than corePoolSize initial threads, we temporarily |
| // change the corePoolSize to the number of initial threads |
| // this is important as otherwise these threads will be created only when |
| // the queue has filled up, |
| // potentially causing problems with starting up under heavy load |
| if (initialThreads < Integer.MAX_VALUE && initialThreads > 0) { |
| executor.setCorePoolSize(initialThreads); |
| int started = executor.prestartAllCoreThreads(); |
| if (started < initialThreads) { |
| LOG.log(Level.WARNING, "THREAD_START_FAILURE_MSG", |
| new Object[] {started, initialThreads}); |
| } |
| executor.setCorePoolSize(lowWaterMark); |
| } |
| |
| ReentrantLock l; |
| try { |
| Field f = ThreadPoolExecutor.class.getDeclaredField("mainLock"); |
| ReflectionUtil.setAccessible(f); |
| l = (ReentrantLock)f.get(executor); |
| } catch (Throwable t) { |
| l = new ReentrantLock(); |
| } |
| mainLock = l; |
| |
| |
| try { |
| //java 7 |
| addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addWorker", |
| Runnable.class, Boolean.TYPE); |
| addWorkerArgs = new Object[] {null, Boolean.FALSE}; |
| } catch (Throwable t2) { |
| //nothing we cando |
| } |
| } |
| return executor; |
| } |
| private AWQThreadFactory createThreadFactory(final String nm) { |
| ThreadGroup group; |
| try { |
| //Try and find the highest level ThreadGroup that we're allowed to use. |
| //That SHOULD allow the default classloader and thread locals and such |
| //to be the least likely to cause issues down the road. |
| group = AccessController.doPrivileged( |
| new PrivilegedAction<ThreadGroup>() { |
| public ThreadGroup run() { |
| ThreadGroup group = Thread.currentThread().getThreadGroup(); |
| ThreadGroup parent = group; |
| try { |
| while (parent != null) { |
| group = parent; |
| parent = parent.getParent(); |
| } |
| } catch (SecurityException se) { |
| //ignore - if we get here, the "group" is as high as |
| //the security manager will allow us to go. Use that one. |
| } |
| return new ThreadGroup(group, nm + "-workqueue"); |
| } |
| } |
| ); |
| } catch (SecurityException e) { |
| group = new ThreadGroup(nm + "-workqueue"); |
| } |
| return new AWQThreadFactory(group, nm); |
| } |
| |
| static class DelayedTaskWrapper implements Delayed, Runnable { |
| long trigger; |
| Runnable work; |
| |
| DelayedTaskWrapper(Runnable work, long delay) { |
| this.work = work; |
| trigger = System.currentTimeMillis() + delay; |
| } |
| |
| public long getDelay(TimeUnit unit) { |
| long n = trigger - System.currentTimeMillis(); |
| return unit.convert(n, TimeUnit.MILLISECONDS); |
| } |
| |
| public int compareTo(Delayed delayed) { |
| long other = ((DelayedTaskWrapper)delayed).trigger; |
| int returnValue; |
| if (this.trigger < other) { |
| returnValue = -1; |
| } else if (this.trigger > other) { |
| returnValue = 1; |
| } else { |
| returnValue = 0; |
| } |
| return returnValue; |
| } |
| |
| public void run() { |
| work.run(); |
| } |
| |
| } |
| |
| class WatchDog extends Thread { |
| DelayQueue<DelayedTaskWrapper> delayQueue; |
| AtomicBoolean shutdown = new AtomicBoolean(false); |
| |
| WatchDog(DelayQueue<DelayedTaskWrapper> queue) { |
| delayQueue = queue; |
| } |
| |
| public void shutdown() { |
| shutdown.set(true); |
| // to exit the waiting thread |
| interrupt(); |
| } |
| |
| public void run() { |
| DelayedTaskWrapper task; |
| try { |
| while (!shutdown.get()) { |
| task = delayQueue.take(); |
| if (task != null) { |
| try { |
| execute(task); |
| } catch (Exception ex) { |
| LOG.warning("Executing the task from DelayQueue with exception: " + ex); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| if (LOG.isLoggable(Level.FINE)) { |
| LOG.finer("The DelayQueue watchdog Task is stopping"); |
| } |
| } |
| |
| } |
| |
| } |
| class AWQThreadFactory implements ThreadFactory { |
| final AtomicInteger threadNumber = new AtomicInteger(1); |
| ThreadGroup group; |
| String name; |
| ClassLoader loader; |
| |
| AWQThreadFactory(ThreadGroup gp, String nm) { |
| group = gp; |
| name = nm; |
| //force the loader to be the loader of CXF, not the application loader |
| loader = AutomaticWorkQueueImpl.class.getClassLoader(); |
| } |
| |
| public Thread newThread(final Runnable r) { |
| if (group.isDestroyed()) { |
| group = new ThreadGroup(group.getParent(), name + "-workqueue"); |
| } |
| Runnable wrapped = new Runnable() { |
| public void run() { |
| approxThreadCount.incrementAndGet(); |
| try { |
| r.run(); |
| } finally { |
| approxThreadCount.decrementAndGet(); |
| } |
| } |
| }; |
| final Thread t = new Thread(group, |
| wrapped, |
| name + "-workqueue-" + threadNumber.getAndIncrement(), |
| 0); |
| AccessController.doPrivileged(new PrivilegedAction<Boolean>() { |
| public Boolean run() { |
| t.setContextClassLoader(loader); |
| return true; |
| } |
| }); |
| t.setDaemon(true); |
| if (t.getPriority() != Thread.NORM_PRIORITY) { |
| t.setPriority(Thread.NORM_PRIORITY); |
| } |
| return t; |
| } |
| public void setName(String s) { |
| name = s; |
| } |
| public void shutdown() { |
| if (!group.isDestroyed()) { |
| try { |
| group.destroy(); |
| group.setDaemon(true); |
| } catch (Throwable t) { |
| //ignore |
| } |
| } |
| } |
| } |
| |
| public void setName(String s) { |
| name = s; |
| if (threadFactory != null) { |
| threadFactory.setName(s); |
| } |
| } |
| public String getName() { |
| return name; |
| } |
| |
| public String toString() { |
| return new StringBuilder(super.toString()) |
| .append(" [queue size: ").append(getSize()) |
| .append(", max size: ").append(maxQueueSize) |
| .append(", threads: ").append(getPoolSize()) |
| .append(", active threads: ").append(getActiveCount()) |
| .append(", low water mark: ").append(getLowWaterMark()) |
| .append(", high water mark: ").append(getHighWaterMark()) |
| .append(']').toString(); |
| } |
| |
| public void execute(final Runnable command) { |
| //Grab the context classloader of this thread. We'll make sure we use that |
| //on the thread the runnable actually runs on. |
| |
| final ClassLoader loader = Thread.currentThread().getContextClassLoader(); |
| Runnable r = new Runnable() { |
| public void run() { |
| ClassLoaderHolder orig = ClassLoaderUtils.setThreadContextClassloader(loader); |
| try { |
| command.run(); |
| } finally { |
| if (orig != null) { |
| orig.reset(); |
| } |
| } |
| } |
| }; |
| //The ThreadPoolExecutor in the JDK doesn't expand the number |
| //of threads until the queue is full. However, we would |
| //prefer the number of threads to expand immediately and |
| //only uses the queue if we've reached the maximum number |
| //of threads. |
| ThreadPoolExecutor ex = getExecutor(); |
| ex.execute(r); |
| if (addWorkerMethod != null |
| && !ex.getQueue().isEmpty() |
| && this.approxThreadCount.get() < highWaterMark |
| && addThreadLock.tryLock()) { |
| try { |
| mainLock.lock(); |
| try { |
| int ps = this.getPoolSize(); |
| int sz = executor.getQueue().size(); |
| int sz2 = this.getActiveCount(); |
| |
| if ((sz + sz2) > ps) { |
| // Needs --add-opens java.base/java.util.concurrent=ALL-UNNAMED for JDK16+ |
| ReflectionUtil.setAccessible(addWorkerMethod).invoke(executor, addWorkerArgs); |
| } |
| } catch (Exception exc) { |
| //ignore |
| } finally { |
| mainLock.unlock(); |
| } |
| } finally { |
| addThreadLock.unlock(); |
| } |
| } |
| } |
| |
| // WorkQueue interface |
| public void execute(Runnable work, long timeout) { |
| try { |
| execute(work); |
| } catch (RejectedExecutionException ree) { |
| try { |
| if (!getExecutor().getQueue().offer(work, timeout, TimeUnit.MILLISECONDS)) { |
| throw ree; |
| } |
| } catch (InterruptedException ie) { |
| throw ree; |
| } |
| } |
| } |
| |
| public synchronized void schedule(final Runnable work, final long delay) { |
| if (delayQueue == null) { |
| delayQueue = new DelayQueue<>(); |
| watchDog = new WatchDog(delayQueue); |
| watchDog.setDaemon(true); |
| watchDog.start(); |
| } |
| delayQueue.put(new DelayedTaskWrapper(work, delay)); |
| } |
| |
| // AutomaticWorkQueue interface |
| |
| public void shutdown(boolean processRemainingWorkItems) { |
| if (executor != null) { |
| if (!processRemainingWorkItems) { |
| executor.getQueue().clear(); |
| } |
| executor.shutdown(); |
| } |
| } |
| |
| |
| /** |
| * Gets the maximum size (capacity) of the backing queue. |
| * @return the maximum size (capacity) of the backing queue. |
| */ |
| public long getMaxSize() { |
| return maxQueueSize; |
| } |
| |
| /** |
| * Gets the current size of the backing queue. |
| * @return the current size of the backing queue. |
| */ |
| public long getSize() { |
| return executor == null ? 0 : executor.getQueue().size(); |
| } |
| |
| |
| public boolean isEmpty() { |
| return executor == null || executor.getQueue().isEmpty(); |
| } |
| |
| public boolean isFull() { |
| return executor != null && executor.getQueue().remainingCapacity() == 0; |
| } |
| |
| public int getHighWaterMark() { |
| int hwm = executor == null ? highWaterMark : executor.getMaximumPoolSize(); |
| return hwm == Integer.MAX_VALUE ? -1 : hwm; |
| } |
| |
| public int getLowWaterMark() { |
| int lwm = executor == null ? lowWaterMark : executor.getCorePoolSize(); |
| return lwm == Integer.MAX_VALUE ? -1 : lwm; |
| } |
| |
| public int getInitialSize() { |
| return this.initialThreads; |
| } |
| |
| public void setHighWaterMark(int hwm) { |
| highWaterMark = hwm < 0 ? Integer.MAX_VALUE : hwm; |
| if (executor != null) { |
| notifyChangeListeners(new PropertyChangeEvent(this, "highWaterMark", |
| this.executor.getMaximumPoolSize(), hwm)); |
| executor.setMaximumPoolSize(highWaterMark); |
| } |
| } |
| |
| public void setLowWaterMark(int lwm) { |
| lowWaterMark = lwm < 0 ? 0 : lwm; |
| if (executor != null) { |
| notifyChangeListeners(new PropertyChangeEvent(this, "lowWaterMark", |
| this.executor.getCorePoolSize(), lwm)); |
| executor.setCorePoolSize(lowWaterMark); |
| } |
| } |
| |
| public void setInitialSize(int initialSize) { |
| notifyChangeListeners(new PropertyChangeEvent(this, "initialSize", this.initialThreads, initialSize)); |
| this.initialThreads = initialSize; |
| } |
| |
| public void setQueueSize(int size) { |
| notifyChangeListeners(new PropertyChangeEvent(this, "queueSize", this.maxQueueSize, size)); |
| this.maxQueueSize = size; |
| } |
| |
| public void setDequeueTimeout(long l) { |
| notifyChangeListeners(new PropertyChangeEvent(this, "dequeueTimeout", this.dequeueTimeout, l)); |
| this.dequeueTimeout = l; |
| } |
| |
| public boolean isShutdown() { |
| if (executor == null) { |
| return false; |
| } |
| return executor.isShutdown(); |
| } |
| public int getLargestPoolSize() { |
| if (executor == null) { |
| return 0; |
| } |
| return executor.getLargestPoolSize(); |
| } |
| public int getPoolSize() { |
| if (executor == null) { |
| return 0; |
| } |
| return executor.getPoolSize(); |
| } |
| public int getActiveCount() { |
| if (executor == null) { |
| return 0; |
| } |
| return executor.getActiveCount(); |
| } |
| public void update(Dictionary<String, String> config) { |
| String s = config.get("highWaterMark"); |
| if (s != null) { |
| this.highWaterMark = Integer.parseInt(s); |
| } |
| s = config.get("lowWaterMark"); |
| if (s != null) { |
| this.lowWaterMark = Integer.parseInt(s); |
| } |
| s = config.get("initialSize"); |
| if (s != null) { |
| this.initialThreads = Integer.parseInt(s); |
| } |
| s = config.get("dequeueTimeout"); |
| if (s != null) { |
| this.dequeueTimeout = Long.parseLong(s); |
| } |
| s = config.get("queueSize"); |
| if (s != null) { |
| this.maxQueueSize = Integer.parseInt(s); |
| } |
| } |
| public Dictionary<String, String> getProperties() { |
| Dictionary<String, String> properties = new Hashtable<>(); |
| NumberFormat nf = NumberFormat.getIntegerInstance(); |
| properties.put("name", nf.format(getName())); |
| properties.put("highWaterMark", nf.format(getHighWaterMark())); |
| properties.put("lowWaterMark", nf.format(getLowWaterMark())); |
| properties.put("initialSize", nf.format(getLowWaterMark())); |
| properties.put("dequeueTimeout", nf.format(getLowWaterMark())); |
| properties.put("queueSize", nf.format(getLowWaterMark())); |
| return properties; |
| } |
| } |