| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.felix.httplite.server; |
| |
| import java.net.SocketTimeoutException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.felix.httplite.osgi.Logger; |
| |
| /** |
| * This class implements a simple thread pool for servicing HTTP connections. |
| * The thread pool does not create any threads initially, but waits for |
| * connections to be added to create threads. As connections are added, threads |
| * are only created if they are needed up until the thread limit. If threads |
| * are inactive for a period of time, then the threads terminate; the default |
| * is 60000 milliseconds. |
| **/ |
| public class ThreadPool |
| { |
| public static final int DEFAULT_THREAD_TIMEOUT = 60000; |
| private int m_threadTimeout; |
| |
| private ThreadGroup m_group = new ThreadGroup("ThreadPoolGroup"); |
| private int m_state; |
| private ThreadGate m_shutdownGate; |
| private int m_threadName = 0; |
| private int m_threadLimit = 0; |
| private int m_threadCount = 0; |
| private int m_threadAvailable = 0; |
| private List m_connectionList = new ArrayList(); |
| private final Logger m_logger; |
| |
| /** |
| * Constructs a thread pool with the specified thread limit and with |
| * the default inactivity timeout. |
| * @param threadLimit The maximum number of threads in the pool. |
| **/ |
| public ThreadPool(final int threadLimit, final Logger logger) |
| { |
| this(threadLimit, DEFAULT_THREAD_TIMEOUT, logger); |
| } |
| |
| /** |
| * Constructs a thread pool with the specified thread limit and inactivity |
| * timeout. |
| * @param threadLimit The maximum number of threads in the pool. |
| * @param threadTimeout The inactivity timeout for threads in milliseconds. |
| **/ |
| public ThreadPool(int threadLimit, int threadTimeout, Logger logger) |
| { |
| m_threadLimit = threadLimit; |
| m_threadTimeout = threadTimeout; |
| m_logger = logger; |
| m_state = Server.INACTIVE_STATE; |
| } |
| |
| /** |
| * This method returns the current state of the thread pool, which is one |
| * of the following values: |
| * <ul> |
| * <li><tt>ThreadPool.INACTIVE_STATE</tt> - the thread pool is currently |
| * not active. |
| * </li> |
| * <li><tt>ThreadPool.ACTIVE_STATE</tt> - the thread pool is active and |
| * servicing connections. |
| * </li> |
| * <li><tt>ThreadPool.STOPPING_STATE</tt> - the thread pool is in the |
| * process of shutting down. |
| * </li> |
| * </li> |
| * @return The current state of the thread pool. |
| **/ |
| public synchronized int getState() |
| { |
| return m_state; |
| } |
| |
| /** |
| * Starts the thread pool if it is not already active, allowing it to |
| * service connections. |
| * @throws java.lang.IllegalStateException If the thread pool is in the |
| * <tt>ThreadPool.STOPPING_STATE</tt> state. |
| **/ |
| public synchronized void start() |
| { |
| if (m_state != Server.STOPPING_STATE) |
| { |
| m_state = Server.ACTIVE_STATE; |
| } |
| else |
| { |
| throw new IllegalStateException("Thread pool is in process of stopping."); |
| } |
| } |
| |
| /** |
| * This method stops the thread pool if it is currently active. This method |
| * will block the calling thread until the thread pool is completely stopped. |
| * This can potentially take a long time, since it allows all existing |
| * connections to be processed before shutting down. Subsequent calls to |
| * this method will also block the caller. If a blocked thread is interrupted, |
| * the method will release the blocked thread by throwing an interrupted |
| * exception. In such a case, the thread pool will still continue its |
| * shutdown process. |
| * @throws java.lang.InterruptedException If the calling thread is interrupted. |
| **/ |
| public void stop() throws InterruptedException |
| { |
| ThreadGate gate = null; |
| |
| synchronized (this) |
| { |
| if (m_state != Server.INACTIVE_STATE) |
| { |
| // If there is no shutdown gate, create one and save its |
| // reference both in the field and locally. All threads |
| // that call stop() while the server is stopping will wait |
| // on this gate. |
| if ((m_shutdownGate == null) && (m_threadCount > 0)) |
| { |
| m_shutdownGate = new ThreadGate(); |
| } |
| gate = m_shutdownGate; |
| m_state = Server.STOPPING_STATE; |
| // Interrupt all threads that have been created by the |
| // thread pool. |
| m_group.interrupt(); |
| } |
| } |
| |
| // Wait on gate for thread pool shutdown to complete. |
| if (gate != null) |
| { |
| gate.await(); |
| } |
| } |
| |
| /** |
| * This method adds an HTTP connection to the thread pool for servicing. |
| * @param connection |
| * @throws java.lang.IllegalStateException If the thread pool is not in the |
| * <tt>ThreadPool.ACTIVE_STATE</tt> state. |
| **/ |
| public synchronized void addConnection(final Connection connection) |
| { |
| if (m_state == Server.ACTIVE_STATE) |
| { |
| // Add the new connection to the connection list. |
| m_connectionList.add(connection); |
| notify(); |
| |
| // If there are not enough available threads to handle all outstanding |
| // connections and we still haven't reached our thread limit, then |
| // add another thread. |
| if ((m_threadAvailable < m_connectionList.size()) |
| && (m_threadCount < m_threadLimit)) |
| { |
| // Increase our thread count, but not number of available threads, |
| // since the new thread will be used to service the new connection |
| // and thus is not available. |
| m_threadCount++; |
| // Use simple integer for thread name for logging purposes. |
| if (m_threadName == Integer.MAX_VALUE) |
| { |
| m_threadName = 1; |
| } |
| else |
| { |
| m_threadName++; |
| } |
| // Create and start thread into our thread group. |
| new Thread(m_group, new Runnable() |
| { |
| public void run() |
| { |
| processConnections(); |
| } |
| }, Integer.toString(m_threadName)).start(); |
| m_logger.log(Logger.LOG_DEBUG, "Created new thread for pool; count = " |
| + m_threadCount + ", max = " + m_threadLimit + "."); |
| } |
| } |
| else |
| { |
| throw new IllegalStateException("The thread pool is not active."); |
| } |
| } |
| |
| /** |
| * This method is the main loop for all threads servicing connections. |
| **/ |
| private void processConnections() |
| { |
| Connection connection; |
| while (true) |
| { |
| synchronized (this) |
| { |
| // Any new threads entering this region are now available to |
| // process a connection, so increment the available count. |
| m_threadAvailable++; |
| |
| try |
| { |
| // Keep track of when we start to wait so that we |
| // know if our timeout expires. |
| long start = System.currentTimeMillis(); |
| long current = start; |
| // Wait until there is a connection to service or until |
| // the timeout expires; if the timeout is zero, then there |
| // is no timeout. |
| while (m_state == Server.ACTIVE_STATE |
| && (m_connectionList.size() == 0) |
| && ((m_threadTimeout == 0) || ((current - start) < m_threadTimeout))) |
| { |
| // Try to wait for another connection, but our timeout |
| // expires then commit suicide. |
| wait(m_threadTimeout - (current - start)); |
| current = System.currentTimeMillis(); |
| } |
| } |
| catch (InterruptedException ex) |
| { |
| // This generally happens when we are shutting down. |
| Thread.currentThread().interrupt(); |
| } |
| |
| // Set connection to null if we are going to commit suicide; |
| // otherwise get the first available connection for servicing. |
| if (m_connectionList.size() == 0) |
| { |
| connection = null; |
| } |
| else |
| { |
| connection = (Connection) m_connectionList.remove(0); |
| } |
| |
| // Decrement number of available threads, since we will either |
| // start to service a connection at this point or we will commit |
| // suicide. |
| m_threadAvailable--; |
| |
| // If we do not have a connection, then we are committing |
| // suicide due to inactivity or because we were interrupted |
| // and are stopping the thread pool. |
| if (connection == null) |
| { |
| // One less thread in use. |
| m_threadCount--; |
| if (Thread.interrupted()) |
| { |
| m_logger.log(Logger.LOG_DEBUG, |
| "Pool thread dying due to interrupt."); |
| } |
| else |
| { |
| m_logger.log(Logger.LOG_DEBUG, |
| "Pool thread dying due to inactivity."); |
| } |
| // If we are stopping and the last thread is dead, then |
| // open the shutdown gate to release all threads waiting |
| // for us to stop. |
| if ((m_state == Server.STOPPING_STATE) && (m_threadCount == 0)) |
| { |
| m_shutdownGate.open(); |
| m_shutdownGate = null; |
| m_state = Server.INACTIVE_STATE; |
| } |
| // Return to kill the thread by exiting our run method. |
| return; |
| } |
| } |
| |
| // Otherwise, we have a connection so process it. |
| // Note, we might have outstanding connections to |
| // process even if we are stopping, so we cleaning |
| // service those remaining connections before stopping. |
| try |
| { |
| connection.process(); |
| m_logger.log(Logger.LOG_DEBUG, "Connection closed normally."); |
| } |
| catch (SocketTimeoutException ex) |
| { |
| m_logger.log(Logger.LOG_INFO, "Connection closed due to inactivity."); |
| } |
| catch (Exception ex) |
| { |
| m_logger.log(Logger.LOG_ERROR, "Connection close due to unknown reason.", |
| ex); |
| } |
| } |
| } |
| } |