blob: 89e8a99e65413a0c0d7f0b42a30adce6bf0c6383 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.datasource;
import java.io.Serializable;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
/**
* AbstractPoolCache implements the ConnectionPoolCache interface. This is base class for the all
* connection pools. The class also implements the Serializable interface. The pool maintain a list
* for keeping the available connections(not assigned to user) and the active connections(assigned
* to user) This is a thread safe class.
*/
public abstract class AbstractPoolCache implements ConnectionPoolCache, Serializable {
private static final Logger logger = LogService.getLogger();
protected int INIT_LIMIT;
private int MAX_LIMIT;
protected transient Map availableCache;
protected transient Map activeCache;
protected EventListener connEventListner;
// private String error = "";
protected ConfiguredDataSourceProperties configProps;
// expirationTime is for the available connection which are expired in milliseconds
protected int expirationTime;
// timeOut is for the Active connection which are time out in milliseconds
protected int timeOut;
// Client Timeout in milliseconds
protected int loginTimeOut;
// private final boolean DEBUG = false;
public transient ConnectionCleanUpThread cleaner;
private int totalConnections = 0;
private int activeConnections = 0;
private List expiredConns = null;
protected long sleepTime = -1;
private Thread th = null;// cleaner thread
/**
* Constructor initializes the AbstractPoolCache properties.
*
* @param eventListner The event listner for the database connections.
* @param configs The ConfiguredDataSourceProperties object containing the configuration for the
* pool.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SC_START_IN_CTOR",
justification = "the thread started is a cleanup thread and is not active until there is a timeout tx")
public AbstractPoolCache(EventListener eventListner, ConfiguredDataSourceProperties configs)
throws PoolException {
availableCache = new HashMap();
activeCache = Collections.synchronizedMap(new LinkedHashMap());
connEventListner = eventListner;
expiredConns = Collections.synchronizedList(new ArrayList());
MAX_LIMIT = configs.getMaxPoolSize();
expirationTime = configs.getConnectionExpirationTime() * 1000;
timeOut = configs.getConnectionTimeOut() * 1000;
loginTimeOut = configs.getLoginTimeOut() * 1000;
INIT_LIMIT = Math.min(configs.getInitialPoolSize(), MAX_LIMIT);
configProps = configs;
cleaner = this.new ConnectionCleanUpThread();
th = new LoggingThread("ConnectionCleanUpThread", cleaner);
th.start();
}
protected void initializePool() {
if (INIT_LIMIT > 0) {
long currTime = System.currentTimeMillis();
for (int count = 0; count < INIT_LIMIT; count++) {
try {
availableCache.put(getNewPoolConnection(), Long.valueOf(currTime));
++totalConnections;
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("AbstractPoolCache::initializePool:Error in creating connection",
ex.getCause());
}
}
}
}
}
public abstract Object getNewPoolConnection() throws PoolException;
/**
* Returns the connection to the available pool Asif: When the connection is returned to the pool,
* notify any one thread waiting on availableCache so that it can go into connection seeking
* state.
*
* @param connectionObject PooledConnection object for return.
*
*/
@Override
public void returnPooledConnectionToPool(Object connectionObject) {
boolean returnedHappened = false;
if (connectionObject != null) {
// Asif: Take a lock on activeCache while removing the Connection
// from the map. It is possible that while the connection is
// being returned the cleaner thread migh have also removed
// it from the activeMap. If that is the case we don't do
// anything bcoz the cleaner thread will take care of decrementing
// the total & available connection. We wil take a lock on the
// individual connection rather than complete activeMap
// bcoz there is no point in blocking other threads from
// returning their own connections
synchronized (connectionObject) {
if (this.activeCache.containsKey(connectionObject)) {
// Asif: Remove the connection from activeCache
// Don't add it to availabel pool now. Add it when we have
// taken a lock on availableCache & then we will modify the
// count.
this.activeCache.remove(connectionObject);
returnedHappened = true;
}
}
}
if (returnedHappened) {
// Asif: take the lock on availableCache & decrement the
// count of active connections. Call notify on availableConnections
// so that any waiting client thread will go into connection
// seeking state
synchronized (this.availableCache) {
--this.activeConnections;
this.availableCache.put(connectionObject, Long.valueOf(System.currentTimeMillis()));
this.availableCache.notify();
}
}
}
/**
* Expires connection in the available pool.
*
*/
abstract void destroyPooledConnection(Object connectionObject);
/**
* Returns number of connections currently in use.
*
* @return int Number of active connections
*/
public int getActiveCacheSize() {
return this.activeConnections;
}
/**
* Returns number of connections available for use
*
* @return Size of available cache.
*/
public int getAvailableCacheSize() {
return (this.totalConnections - this.activeConnections);
}
/**
* @see org.apache.geode.internal.datasource.ConnectionPoolCache#expirePooledConnection(Object)
* @param connectionObject Asif: This function will set the timestamp associated with the
* Connection object such that it will get timed out by the cleaner thread. Normally when
* this function is called, the connection object will be present in activeCache as there
* is no way client can return the object. But it is possible that the cleaner thread may
* have already picked it up ,so we still have to check whether it is contained in the map
* or not
*
*/
@Override
public void expirePooledConnection(Object connectionObject) {
synchronized (connectionObject) {
if (this.activeCache.containsKey(connectionObject)) {
// Change the time stamp associated with the object
long prev = ((Long) this.activeCache.get(connectionObject)).longValue();
prev = prev - this.timeOut - 1000;
this.activeCache.put(connectionObject, Long.valueOf(prev));
}
}
}
/**
* Gets the connection from the pool. The specified user and password are used. If the available
* pool is not empty a connection is fetched from the available pool. If the available pool is
* empty and the total connections in the pool(available + active) are less then the Max limit, a
* new connection is created and returned to the client. If the Max limit is reached the client
* waits for the connection to be available.
*
* Asif: The client thread checks if the total number of active connections have exhausted the
* limit. If yes it waits on the availableCache. When another thread returns the connection to the
* pool, the waiting thread gets notified. If a thread experiences timeout while waiting , a
* SQLException will be thrown. Other case is that there are available connections in map. Now
* while getting connection from avaialbel map , we may or may not obtain connetion bcoz the
* connection in available map may have expired. But if the checkOutConnection() returns null ,
* this iteslf guarantees that atleast one connection expired so the current thread can safely
* demand one new connection.
*
* @return Object connection object from the pool.
*/
@Override
public Object getPooledConnectionFromPool() throws PoolException {
Object poolConn = null;
// checkCredentials(username, password);
/*
* if (availableCache == null) { synchronized (this) { if (availableCache == null) {
* initializePool(); } } }
*/
long now = System.currentTimeMillis();
synchronized (this.availableCache) {
while ((totalConnections - activeConnections) == 0 && totalConnections == MAX_LIMIT) {
try {
this.availableCache.wait(loginTimeOut);
long newtime = System.currentTimeMillis();
long duration = newtime - now;
if (duration > loginTimeOut)
throw new PoolException(
"AbstractPooledCache::getPooledConnectionFromPool:Login time-out exceeded");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// TODO add a cancellation check?
if (logger.isDebugEnabled()) {
logger.debug(
"AbstractPooledCache::getPooledConnectionFromPool:InterruptedException in waiting thread");
}
throw new PoolException(
"AbstractPooledCache::getPooledConnectionFromPool:InterruptedException in waiting thread");
}
}
if ((totalConnections - activeConnections) > 0) {
poolConn = checkOutConnection(now);
if (poolConn != null) {
activeCache.put(poolConn, Long.valueOf(now));
++activeConnections;
}
}
if (poolConn == null) {
poolConn = getNewPoolConnection();
activeCache.put(poolConn, Long.valueOf(now));
++totalConnections;
++activeConnections;
}
}
// Asif: Notify the cleaner thread if it is waiting
// on the active Cache as there is an entry now
synchronized (activeCache) {
activeCache.notify();
}
return poolConn;
}
/**
* Returns the max pool limit.
*
*/
public int getMaxLimit() {
return MAX_LIMIT;
}
/**
* Remove a connection from the pool. Modified by Asif : This function is called from a synch
* block where the lock is taken on this.availableCache
*
* @param now Current time in milliseconds
* @return Object Connection object from the pool.
*/
private Object checkOutConnection(long now) throws PoolException {
// boolean expiryCheck = false;
Object retConn = null;
Set entryset = availableCache.entrySet();
Iterator itr = entryset.iterator();
Map.Entry entry = null;
while (itr.hasNext()) {
entry = (Map.Entry) itr.next();
long time = ((Long) entry.getValue()).longValue();
if ((now - time) <= expirationTime) {
retConn = entry.getKey();
itr.remove();
break;
} else {
// Asif : Take a lock on expiredConns, so that clean up thread
// does not miss it while emptying.
synchronized (this.expiredConns) {
this.expiredConns.add(entry.getKey());
}
itr.remove();
// Asif :Reduce the total number of available connections
// We will not notify the cleaner thread. It will clean
// the list next time when the thread awakes. The cleaner thread
// will be either waiting on the activeCache or sleeping.
// It will get notified whenever there is an elemnt added
// is added in the activeCache. If there is atleast one
// object in the activeMap it will sleep rather than wait
--totalConnections;
}
}
return retConn;
}
// Code for Clean up thread
/**
* Asif: The Cleaner thread calls this function periodically to clear the expired connection list
* & also to add those connections to expiry list , which have timed out while being active. The
* thread first iterates over the map of active connections. It does by getting array of keys
* contained in the map so that there is no backing of Map. It collects all the actiev connections
* timeout & if there is atleast one such connection , it will issue a notify or notify all ( if
* more than one connections have timed out). After that it will just clear the list of expired
* connections *
*/
protected void cleanUp() {
// Asif Get an array of keys in the activeConnection map
// Since we are using LinkedHashMap the keys are guaranteed to be in order
// of oldest conn as the first element
// PooledConnection[] activeConnArr = (PooledConnection[])
// this.activeCache.keySet().toArray(new PooledConnection[0]);
int numConnTimedOut = 0;
long now = System.currentTimeMillis();
sleepTime = -1;
boolean toContinue = true;
// int len = activeConnArr.length;
// for (int i = 0; i < len; ++i) {
while (toContinue) {
Object conn = null;
Long associatedValue = null;
// Get the connection which is the oldest
synchronized (this.activeCache) {
Set set = this.activeCache.entrySet();
Iterator itr = set.iterator();
if (itr.hasNext()) {
Map.Entry entry = (Map.Entry) itr.next();
conn = entry.getKey();
associatedValue = (Long) entry.getValue();
} else {
toContinue = false;
continue;
}
}
synchronized (conn) {
if (this.activeCache.containsKey(conn)) {
// Asif: We need to again get the assocaited value & check
// if it is same as before. As it is possibel that by the time
// we reach here , the connection was returned to available map
// and again put in active map , but this time it will be
// at the extreme end !!! . So we cannot use it.
// If that is the case we need to skip the entry
// Check the timeout
Long associatedValueInWindow = (Long) this.activeCache.get(conn);
long then = associatedValueInWindow.longValue();
if (associatedValueInWindow.longValue() == associatedValue.longValue()) {
if ((now - then) > timeOut) {
// Asif :remove the connection from activeMap so
// that destroy can be called on it.
// We will first collect all the connections
// which have timed out & then expire them.
// In that gap even if the client genuinely closes
// the connection , it will not be returned to the
// pool as the active map no longer contains it
this.activeCache.remove(conn);
this.expiredConns.add(conn);
++numConnTimedOut;
} else {
// AsifTODO: Just keep a final expitry time
sleepTime = then + timeOut - now;
toContinue = false;
}
}
}
}
}
if (numConnTimedOut > 0) {
// Asif : In case only one connection timed out , take a lock
// on availableCache & call notify . If more than one timed out
// call notify all . Delete the total connections & number of active
// connections by the right amount
synchronized (this.availableCache) {
this.activeConnections -= numConnTimedOut;
this.totalConnections -= numConnTimedOut;
if (numConnTimedOut == 1)
this.availableCache.notify();
else
this.availableCache.notifyAll();
}
}
// Asif : Create a temp list which copies the connections in
// the expired list & releases the lock on expired list
// immediately . Then it is safe to clear the expiredConnList
List temp = new ArrayList();
synchronized (this.expiredConns) {
temp.addAll(this.expiredConns);
this.expiredConns.clear();
}
// Asif: destroy the connections contained in the temp list
// & clear it
int size = temp.size();
for (int i = 0; i < size; ++i) {
Object conn = temp.get(i);
this.destroyPooledConnection(conn);
}
temp.clear();
}
/**
* It will clean all the thread
*/
@Override
public void clearUp() {
cleaner.toContinueRunning = false;
try {
th.interrupt();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("AbstractPoolCache::clearUp: Exception in interrupting the thread", e);
}
}
// closing all the connection
try {
Iterator availableCacheItr = availableCache.keySet().iterator();
Iterator activeCacheItr = activeCache.keySet().iterator();
while (activeCacheItr.hasNext()) {
((Connection) activeCacheItr.next()).close();
}
while (availableCacheItr.hasNext()) {
((Connection) availableCacheItr.next()).close();
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(
"AbstractPoolCache::clearUp: Exception in closing connections. Ignoring this exception)");
}
}
/*
* try { th.join(); } catch (Exception e) { e.printStackTrace(); }
*/
}
/**
* Inner class for Clean up thread. Asif: The inner class implements Runnable
*/
class ConnectionCleanUpThread implements Runnable {
// private AbstractPoolCache poolCache;
// private int sleepTime;
protected volatile boolean toContinueRunning = true;
/*
* public ConnectionCleanUpThread(int time) { // poolCache = pool; sleepTime = time;
*/
@Override
public void run() {
while (toContinueRunning) {
SystemFailure.checkFailure();
try {
cleanUp();
if (sleepTime != -1)
Thread.sleep(sleepTime);
// Asif : The cleaner thread will wait on activeCache if it is
// empty . Else it will sleep for a while & again do the
// clean up. If the activeMap is empty cleaner thread will
// wait till some cleint obtains the connection. At this point
// the thread will awaken .It will sleep for stipulated time
// & then check on the connections
synchronized (activeCache) {
if (activeCache.isEmpty() && toContinueRunning) {
activeCache.wait();
}
} // synchronized
} catch (InterruptedException e) {
// No need to reset the bit, we'll exit.
if (toContinueRunning) {
logger.debug("ConnectionCleanupThread: interrupted", e);
}
break;
} catch (CancelException e) {
if (toContinueRunning) {
logger.debug("ConnectionCleanupThread: cancelled", e);
}
break;
} catch (Exception e) {
if (logger.isDebugEnabled() && toContinueRunning) {
logger.debug(
"ConnectionCleanUpThread::run: Thread encountered Exception. e={}. Ignoring the exception",
e.getMessage(), e);
}
}
} // while
} // method
} // CleanupThread
}