| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.datasource; |
| |
| import java.io.Serializable; |
| import java.sql.Connection; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EventListener; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * AbstractPoolCache implements the ConnectionPoolCache interface. This is base class for the all |
| * connection pools. The class also implements the Serializable interface. The pool maintain a list |
| * for keeping the available connections(not assigned to user) and the active connections(assigned |
| * to user) This is a thread safe class. |
| */ |
| public abstract class AbstractPoolCache implements ConnectionPoolCache, Serializable { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| protected int INIT_LIMIT; |
| private int MAX_LIMIT; |
| protected transient Map availableCache; |
| protected transient Map activeCache; |
| protected EventListener connEventListner; |
| // private String error = ""; |
| protected ConfiguredDataSourceProperties configProps; |
| // expirationTime is for the available connection which are expired in milliseconds |
| protected int expirationTime; |
| // timeOut is for the Active connection which are time out in milliseconds |
| protected int timeOut; |
| // Client Timeout in milliseconds |
| protected int loginTimeOut; |
| // private final boolean DEBUG = false; |
| public transient ConnectionCleanUpThread cleaner; |
| private int totalConnections = 0; |
| private int activeConnections = 0; |
| private List expiredConns = null; |
| protected long sleepTime = -1; |
| private Thread th = null;// cleaner thread |
| |
| /** |
| * Constructor initializes the AbstractPoolCache properties. |
| * |
| * @param eventListner The event listner for the database connections. |
| * @param configs The ConfiguredDataSourceProperties object containing the configuration for the |
| * pool. |
| */ |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SC_START_IN_CTOR", |
| justification = "the thread started is a cleanup thread and is not active until there is a timeout tx") |
| public AbstractPoolCache(EventListener eventListner, ConfiguredDataSourceProperties configs) |
| throws PoolException { |
| availableCache = new HashMap(); |
| activeCache = Collections.synchronizedMap(new LinkedHashMap()); |
| connEventListner = eventListner; |
| expiredConns = Collections.synchronizedList(new ArrayList()); |
| MAX_LIMIT = configs.getMaxPoolSize(); |
| expirationTime = configs.getConnectionExpirationTime() * 1000; |
| timeOut = configs.getConnectionTimeOut() * 1000; |
| loginTimeOut = configs.getLoginTimeOut() * 1000; |
| INIT_LIMIT = Math.min(configs.getInitialPoolSize(), MAX_LIMIT); |
| configProps = configs; |
| cleaner = this.new ConnectionCleanUpThread(); |
| th = new LoggingThread("ConnectionCleanUpThread", cleaner); |
| th.start(); |
| } |
| |
| protected void initializePool() { |
| if (INIT_LIMIT > 0) { |
| long currTime = System.currentTimeMillis(); |
| for (int count = 0; count < INIT_LIMIT; count++) { |
| try { |
| availableCache.put(getNewPoolConnection(), Long.valueOf(currTime)); |
| ++totalConnections; |
| } catch (Exception ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("AbstractPoolCache::initializePool:Error in creating connection", |
| ex.getCause()); |
| } |
| } |
| } |
| } |
| } |
| |
| public abstract Object getNewPoolConnection() throws PoolException; |
| |
| /** |
| * Returns the connection to the available pool Asif: When the connection is returned to the pool, |
| * notify any one thread waiting on availableCache so that it can go into connection seeking |
| * state. |
| * |
| * @param connectionObject PooledConnection object for return. |
| * |
| */ |
| @Override |
| public void returnPooledConnectionToPool(Object connectionObject) { |
| boolean returnedHappened = false; |
| if (connectionObject != null) { |
| // Asif: Take a lock on activeCache while removing the Connection |
| // from the map. It is possible that while the connection is |
| // being returned the cleaner thread migh have also removed |
| // it from the activeMap. If that is the case we don't do |
| // anything bcoz the cleaner thread will take care of decrementing |
| // the total & available connection. We wil take a lock on the |
| // individual connection rather than complete activeMap |
| // bcoz there is no point in blocking other threads from |
| // returning their own connections |
| synchronized (connectionObject) { |
| if (this.activeCache.containsKey(connectionObject)) { |
| // Asif: Remove the connection from activeCache |
| // Don't add it to availabel pool now. Add it when we have |
| // taken a lock on availableCache & then we will modify the |
| // count. |
| this.activeCache.remove(connectionObject); |
| returnedHappened = true; |
| } |
| } |
| } |
| if (returnedHappened) { |
| // Asif: take the lock on availableCache & decrement the |
| // count of active connections. Call notify on availableConnections |
| // so that any waiting client thread will go into connection |
| // seeking state |
| synchronized (this.availableCache) { |
| --this.activeConnections; |
| this.availableCache.put(connectionObject, Long.valueOf(System.currentTimeMillis())); |
| this.availableCache.notify(); |
| } |
| } |
| } |
| |
| /** |
| * Expires connection in the available pool. |
| * |
| */ |
| abstract void destroyPooledConnection(Object connectionObject); |
| |
| /** |
| * Returns number of connections currently in use. |
| * |
| * @return int Number of active connections |
| */ |
| public int getActiveCacheSize() { |
| return this.activeConnections; |
| } |
| |
| /** |
| * Returns number of connections available for use |
| * |
| * @return Size of available cache. |
| */ |
| public int getAvailableCacheSize() { |
| return (this.totalConnections - this.activeConnections); |
| } |
| |
| /** |
| * @see org.apache.geode.internal.datasource.ConnectionPoolCache#expirePooledConnection(Object) |
| * @param connectionObject Asif: This function will set the timestamp associated with the |
| * Connection object such that it will get timed out by the cleaner thread. Normally when |
| * this function is called, the connection object will be present in activeCache as there |
| * is no way client can return the object. But it is possible that the cleaner thread may |
| * have already picked it up ,so we still have to check whether it is contained in the map |
| * or not |
| * |
| */ |
| @Override |
| public void expirePooledConnection(Object connectionObject) { |
| synchronized (connectionObject) { |
| if (this.activeCache.containsKey(connectionObject)) { |
| // Change the time stamp associated with the object |
| long prev = ((Long) this.activeCache.get(connectionObject)).longValue(); |
| prev = prev - this.timeOut - 1000; |
| this.activeCache.put(connectionObject, Long.valueOf(prev)); |
| } |
| } |
| } |
| |
| /** |
| * Gets the connection from the pool. The specified user and password are used. If the available |
| * pool is not empty a connection is fetched from the available pool. If the available pool is |
| * empty and the total connections in the pool(available + active) are less then the Max limit, a |
| * new connection is created and returned to the client. If the Max limit is reached the client |
| * waits for the connection to be available. |
| * |
| * Asif: The client thread checks if the total number of active connections have exhausted the |
| * limit. If yes it waits on the availableCache. When another thread returns the connection to the |
| * pool, the waiting thread gets notified. If a thread experiences timeout while waiting , a |
| * SQLException will be thrown. Other case is that there are available connections in map. Now |
| * while getting connection from avaialbel map , we may or may not obtain connetion bcoz the |
| * connection in available map may have expired. But if the checkOutConnection() returns null , |
| * this iteslf guarantees that atleast one connection expired so the current thread can safely |
| * demand one new connection. |
| * |
| * @return Object connection object from the pool. |
| */ |
| @Override |
| public Object getPooledConnectionFromPool() throws PoolException { |
| Object poolConn = null; |
| // checkCredentials(username, password); |
| /* |
| * if (availableCache == null) { synchronized (this) { if (availableCache == null) { |
| * initializePool(); } } } |
| */ |
| long now = System.currentTimeMillis(); |
| synchronized (this.availableCache) { |
| while ((totalConnections - activeConnections) == 0 && totalConnections == MAX_LIMIT) { |
| try { |
| this.availableCache.wait(loginTimeOut); |
| long newtime = System.currentTimeMillis(); |
| long duration = newtime - now; |
| if (duration > loginTimeOut) |
| throw new PoolException( |
| "AbstractPooledCache::getPooledConnectionFromPool:Login time-out exceeded"); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| // TODO add a cancellation check? |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "AbstractPooledCache::getPooledConnectionFromPool:InterruptedException in waiting thread"); |
| } |
| throw new PoolException( |
| "AbstractPooledCache::getPooledConnectionFromPool:InterruptedException in waiting thread"); |
| } |
| } |
| if ((totalConnections - activeConnections) > 0) { |
| poolConn = checkOutConnection(now); |
| if (poolConn != null) { |
| activeCache.put(poolConn, Long.valueOf(now)); |
| ++activeConnections; |
| } |
| } |
| if (poolConn == null) { |
| poolConn = getNewPoolConnection(); |
| activeCache.put(poolConn, Long.valueOf(now)); |
| ++totalConnections; |
| ++activeConnections; |
| } |
| } |
| // Asif: Notify the cleaner thread if it is waiting |
| // on the active Cache as there is an entry now |
| synchronized (activeCache) { |
| activeCache.notify(); |
| } |
| return poolConn; |
| } |
| |
| /** |
| * Returns the max pool limit. |
| * |
| */ |
| public int getMaxLimit() { |
| return MAX_LIMIT; |
| } |
| |
| /** |
| * Remove a connection from the pool. Modified by Asif : This function is called from a synch |
| * block where the lock is taken on this.availableCache |
| * |
| * @param now Current time in milliseconds |
| * @return Object Connection object from the pool. |
| */ |
| private Object checkOutConnection(long now) throws PoolException { |
| // boolean expiryCheck = false; |
| Object retConn = null; |
| Set entryset = availableCache.entrySet(); |
| Iterator itr = entryset.iterator(); |
| Map.Entry entry = null; |
| while (itr.hasNext()) { |
| entry = (Map.Entry) itr.next(); |
| long time = ((Long) entry.getValue()).longValue(); |
| if ((now - time) <= expirationTime) { |
| retConn = entry.getKey(); |
| itr.remove(); |
| break; |
| } else { |
| // Asif : Take a lock on expiredConns, so that clean up thread |
| // does not miss it while emptying. |
| synchronized (this.expiredConns) { |
| this.expiredConns.add(entry.getKey()); |
| } |
| itr.remove(); |
| // Asif :Reduce the total number of available connections |
| // We will not notify the cleaner thread. It will clean |
| // the list next time when the thread awakes. The cleaner thread |
| // will be either waiting on the activeCache or sleeping. |
| // It will get notified whenever there is an elemnt added |
| // is added in the activeCache. If there is atleast one |
| // object in the activeMap it will sleep rather than wait |
| --totalConnections; |
| } |
| } |
| return retConn; |
| } |
| |
| // Code for Clean up thread |
| /** |
| * Asif: The Cleaner thread calls this function periodically to clear the expired connection list |
| * & also to add those connections to expiry list , which have timed out while being active. The |
| * thread first iterates over the map of active connections. It does by getting array of keys |
| * contained in the map so that there is no backing of Map. It collects all the actiev connections |
| * timeout & if there is atleast one such connection , it will issue a notify or notify all ( if |
| * more than one connections have timed out). After that it will just clear the list of expired |
| * connections * |
| */ |
| protected void cleanUp() { |
| // Asif Get an array of keys in the activeConnection map |
| // Since we are using LinkedHashMap the keys are guaranteed to be in order |
| // of oldest conn as the first element |
| // PooledConnection[] activeConnArr = (PooledConnection[]) |
| // this.activeCache.keySet().toArray(new PooledConnection[0]); |
| int numConnTimedOut = 0; |
| long now = System.currentTimeMillis(); |
| sleepTime = -1; |
| boolean toContinue = true; |
| // int len = activeConnArr.length; |
| // for (int i = 0; i < len; ++i) { |
| while (toContinue) { |
| Object conn = null; |
| Long associatedValue = null; |
| // Get the connection which is the oldest |
| synchronized (this.activeCache) { |
| Set set = this.activeCache.entrySet(); |
| Iterator itr = set.iterator(); |
| if (itr.hasNext()) { |
| Map.Entry entry = (Map.Entry) itr.next(); |
| conn = entry.getKey(); |
| associatedValue = (Long) entry.getValue(); |
| } else { |
| toContinue = false; |
| continue; |
| } |
| } |
| synchronized (conn) { |
| if (this.activeCache.containsKey(conn)) { |
| // Asif: We need to again get the assocaited value & check |
| // if it is same as before. As it is possibel that by the time |
| // we reach here , the connection was returned to available map |
| // and again put in active map , but this time it will be |
| // at the extreme end !!! . So we cannot use it. |
| // If that is the case we need to skip the entry |
| // Check the timeout |
| Long associatedValueInWindow = (Long) this.activeCache.get(conn); |
| long then = associatedValueInWindow.longValue(); |
| if (associatedValueInWindow.longValue() == associatedValue.longValue()) { |
| if ((now - then) > timeOut) { |
| // Asif :remove the connection from activeMap so |
| // that destroy can be called on it. |
| // We will first collect all the connections |
| // which have timed out & then expire them. |
| // In that gap even if the client genuinely closes |
| // the connection , it will not be returned to the |
| // pool as the active map no longer contains it |
| this.activeCache.remove(conn); |
| this.expiredConns.add(conn); |
| ++numConnTimedOut; |
| } else { |
| // AsifTODO: Just keep a final expitry time |
| sleepTime = then + timeOut - now; |
| toContinue = false; |
| } |
| } |
| } |
| } |
| } |
| if (numConnTimedOut > 0) { |
| // Asif : In case only one connection timed out , take a lock |
| // on availableCache & call notify . If more than one timed out |
| // call notify all . Delete the total connections & number of active |
| // connections by the right amount |
| synchronized (this.availableCache) { |
| this.activeConnections -= numConnTimedOut; |
| this.totalConnections -= numConnTimedOut; |
| if (numConnTimedOut == 1) |
| this.availableCache.notify(); |
| else |
| this.availableCache.notifyAll(); |
| } |
| } |
| // Asif : Create a temp list which copies the connections in |
| // the expired list & releases the lock on expired list |
| // immediately . Then it is safe to clear the expiredConnList |
| List temp; |
| synchronized (this.expiredConns) { |
| temp = new ArrayList(this.expiredConns); |
| this.expiredConns.clear(); |
| } |
| // Asif: destroy the connections contained in the temp list |
| // & clear it |
| int size = temp.size(); |
| for (int i = 0; i < size; ++i) { |
| Object conn = temp.get(i); |
| this.destroyPooledConnection(conn); |
| } |
| temp.clear(); |
| } |
| |
| /** |
| * It will clean all the thread |
| */ |
| @Override |
| public void clearUp() { |
| cleaner.toContinueRunning = false; |
| try { |
| th.interrupt(); |
| } catch (Exception e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("AbstractPoolCache::clearUp: Exception in interrupting the thread", e); |
| } |
| } |
| // closing all the connection |
| try { |
| Iterator availableCacheItr = availableCache.keySet().iterator(); |
| Iterator activeCacheItr = activeCache.keySet().iterator(); |
| while (activeCacheItr.hasNext()) { |
| ((Connection) activeCacheItr.next()).close(); |
| } |
| while (availableCacheItr.hasNext()) { |
| ((Connection) availableCacheItr.next()).close(); |
| } |
| } catch (Exception e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "AbstractPoolCache::clearUp: Exception in closing connections. Ignoring this exception)"); |
| } |
| } |
| /* |
| * try { th.join(); } catch (Exception e) { e.printStackTrace(); } |
| */ |
| } |
| |
| /** |
| * Inner class for Clean up thread. Asif: The inner class implements Runnable |
| */ |
| class ConnectionCleanUpThread implements Runnable { |
| |
| // private AbstractPoolCache poolCache; |
| // private int sleepTime; |
| protected volatile boolean toContinueRunning = true; |
| |
| /* |
| * public ConnectionCleanUpThread(int time) { // poolCache = pool; sleepTime = time; |
| */ |
| @Override |
| public void run() { |
| while (toContinueRunning) { |
| SystemFailure.checkFailure(); |
| try { |
| cleanUp(); |
| if (sleepTime != -1) |
| Thread.sleep(sleepTime); |
| // Asif : The cleaner thread will wait on activeCache if it is |
| // empty . Else it will sleep for a while & again do the |
| // clean up. If the activeMap is empty cleaner thread will |
| // wait till some cleint obtains the connection. At this point |
| // the thread will awaken .It will sleep for stipulated time |
| // & then check on the connections |
| synchronized (activeCache) { |
| if (activeCache.isEmpty() && toContinueRunning) { |
| activeCache.wait(); |
| } |
| } // synchronized |
| } catch (InterruptedException e) { |
| // No need to reset the bit, we'll exit. |
| if (toContinueRunning) { |
| logger.debug("ConnectionCleanupThread: interrupted", e); |
| } |
| break; |
| } catch (CancelException e) { |
| if (toContinueRunning) { |
| logger.debug("ConnectionCleanupThread: cancelled", e); |
| } |
| break; |
| } catch (Exception e) { |
| if (logger.isDebugEnabled() && toContinueRunning) { |
| logger.debug( |
| "ConnectionCleanUpThread::run: Thread encountered Exception. e={}. Ignoring the exception", |
| e.getMessage(), e); |
| } |
| } |
| } // while |
| } // method |
| |
| } // CleanupThread |
| } |