blob: 9304bf1ba82ecca7cb14164ebf7d983a8099f525 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.server;
import org.apache.openejb.loader.Options;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.monitoring.Managed;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Managed
public class ServicePool extends ServerServiceFilter {
private static final Logger log = Logger.getInstance(LogCategory.SERVICEPOOL, "org.apache.openejb.util.resources");
private final ThreadPoolExecutor threadPool;
private final AtomicBoolean stop = new AtomicBoolean();
public ServicePool(final ServerService next, final Properties properties) {
//Liberal defaults
this(next, new Options(properties).get("threadsCore", 2), new Options(properties).get("threads", 50), new Options(properties).get("queue", 50000), new Options(properties).get("block", false), new Options(properties).get("keepAliveTime", 1000 * 60 * 5));
}
public ServicePool(final ServerService next, final int threads) {
this(next, threads, threads, 50000, true, 1000 * 60 * 5);
}
public ServicePool(final ServerService next, int threads, int queue, final boolean block) {
this(next, threads, threads, queue, block, 1000 * 60 * 5);
}
public ServicePool(final ServerService next, int threadCore, int threads, int queue, final boolean block, long keepAliveTime) {
super(next);
if (keepAliveTime <= 0) {
keepAliveTime = 1000 * 60 * 5;
}
if (threadCore <= 0) {
threadCore = 100;
}
if (threads < threadCore) {
threads = threadCore;
}
if (queue < 1) {
queue = 1;
}
/**
This thread pool starts with 2 core threads and can grow to the limit defined by 'threads'.
If a pool thread is idle for more than 1 minute it will be discarded, unless the core size is reached.
It can accept upto the number of processes defined by 'queue'.
If the queue is full then an attempt is made to add the process to the queue for 10 seconds.
Failure to add to the queue in this time will either result in a logged rejection, or if 'block'
is true then a final attempt is made to run the process in the current thread (the service thread).
*/
threadPool = new ThreadPoolExecutor(threadCore, threads, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queue));
threadPool.setThreadFactory(new ThreadFactory() {
private final AtomicInteger i = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r, "OpenEJB." + getName() + "." + i.incrementAndGet());
t.setDaemon(true);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
log.error("Uncaught error in: " + t.getName(), e);
}
});
return t;
}
});
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
return;
}
if (log.isWarningEnabled()) {
log.warning("ServicePool at capicity for process: " + r);
}
boolean offer = false;
try {
offer = tpe.getQueue().offer(r, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//Ignore
}
if (!offer) {
log.error("ServicePool failed to run asynchronous process: " + r);
if (block) {
try {
//Last ditch effort to run the process in the current thread
r.run();
} catch (Throwable e) {
log.error("ServicePool failed to run synchronous process: " + r);
}
}
}
}
});
SystemInstance.get().setComponent(ServicePool.class, this);
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
@Override
public void service(final InputStream in, final OutputStream out) throws ServiceException, IOException {
}
@Override
public void service(final Socket socket) throws ServiceException, IOException {
final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
final Runnable ctxCL = new Runnable() {
@Override
public void run() {
ClassLoader cl = null;
try {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(tccl);
if (stop.get()) {
return;
}
ServicePool.super.service(socket);
} catch (SecurityException e) {
final String msg = "ServicePool: Security error: " + e.getMessage();
if (log.isDebugEnabled()) {
log.error(msg, e);
} else {
log.error(msg + " - Debug for StackTrace");
}
} catch (IOException e) {
final String msg = "ServicePool: Unexpected IO error: " + e.getMessage();
if (log.isDebugEnabled()) {
log.debug(msg, e);
} else {
log.warning(msg + " - Debug for StackTrace");
}
} catch (Throwable e) {
final String msg = "ServicePool: Unexpected error: " + e.getMessage();
if (log.isDebugEnabled()) {
log.error(msg, e);
} else {
log.error(msg + " - Debug for StackTrace");
}
} finally {
//Ensure delegated socket is closed here
try {
if (socket != null) {
socket.close();
}
} catch (Throwable t) {
final String msg = "ServicePool: Error closing socket";
if (log.isDebugEnabled()) {
log.debug(msg, t);
} else {
log.warning(msg);
}
}
Thread.currentThread().setContextClassLoader(cl);
}
}
};
threadPool.execute(ctxCL);
}
@Managed
private final Pool pool = new Pool();
@Managed(append = true)
public class Pool {
@Managed
public boolean isShutdown() {
return threadPool.isShutdown();
}
@Managed
public boolean isTerminating() {
return threadPool.isTerminating();
}
@Managed
public boolean isTerminated() {
return threadPool.isTerminated();
}
@Managed
public int getPoolSize() {
return threadPool.getPoolSize();
}
@Managed
public int getCorePoolSize() {
return threadPool.getCorePoolSize();
}
@Managed
public int getMaximumPoolSize() {
return threadPool.getMaximumPoolSize();
}
@Managed
public long getKeepAliveTime(final TimeUnit unit) {
return threadPool.getKeepAliveTime(unit);
}
@Managed
public int getQueueSize() {
return threadPool.getQueue().size();
}
@Managed
public int getActiveCount() {
return threadPool.getActiveCount();
}
@Managed
public int getLargestPoolSize() {
return threadPool.getLargestPoolSize();
}
@Managed
public long getTaskCount() {
return threadPool.getTaskCount();
}
@Managed
public long getCompletedTaskCount() {
return threadPool.getCompletedTaskCount();
}
@Managed
public void setMaximumPoolSize(final int maximumPoolSize) {
threadPool.setMaximumPoolSize(maximumPoolSize);
}
@Managed
public void setCorePoolSize(final int corePoolSize) {
getThreadPool().setCorePoolSize(corePoolSize);
}
@Managed
public void allowCoreThreadTimeOut(final boolean value) {
getThreadPool().allowCoreThreadTimeOut(value);
}
@Managed(description = "Sets time in nanoseconds")
public void setKeepAliveTime(final long time) {
getThreadPool().setKeepAliveTime(time, TimeUnit.NANOSECONDS);
}
}
}