blob: 3fcb2abb943e9e35190179e8dff830a2c5c9ead4 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.client.internal.pooling;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.ConnectionImpl;
import com.gemstone.gemfire.cache.client.internal.ConnectionStats;
import com.gemstone.gemfire.cache.client.internal.Endpoint;
import com.gemstone.gemfire.cache.client.internal.Op;
import com.gemstone.gemfire.distributed.internal.ServerLocation;
import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
/**
* A connection managed by the connection manager. Keeps track
* of the current state of the connection.
* @author dsmith
* @since 5.7
*
*/
class PooledConnection implements Connection {
/* connection is volatile because we may asynchronously
* destroy the pooled connection while shutting down. */
private volatile Connection connection;
private volatile Endpoint endpoint;
private volatile long birthDate;
private long lastAccessed; // read & written while synchronized
private boolean active = true; // read and write while synchronized on this
private final AtomicBoolean shouldDestroy = new AtomicBoolean();
private boolean waitingToSwitch = false;
// private final ConnectionManagerImpl manager;
public PooledConnection(ConnectionManagerImpl manager, Connection connection) {
// this.manager = manager;
this.connection = connection;
this.endpoint = connection.getEndpoint();
this.birthDate = System.nanoTime();
this.lastAccessed = this.birthDate;
}
public ServerLocation getServer() {
return getEndpoint().getLocation();
}
public boolean isActive() {
synchronized (this) {
return this.active;
}
}
public void internalDestroy() {
this.shouldDestroy.set(true); // probably already set but make sure
synchronized (this) {
this.active = false;
notifyAll();
Connection myCon = connection;
if (myCon != null) {
myCon.destroy();
connection = null;
}
}
}
/** When a pooled connection is destroyed, it's not destroyed
* right away, but when it is returned to the pool.
*/
public void destroy() {
this.shouldDestroy.set(true);
}
public void internalClose(boolean keepAlive) throws Exception {
try {
Connection con = this.connection;
if (con != null) {
con.close(keepAlive);
}
} finally {
internalDestroy();
}
}
public void close(boolean keepAlive) throws Exception {
// needed to junit test
internalClose(keepAlive);
// throw new UnsupportedOperationException(
// "Pooled connections should only be closed by the connection manager");
}
public void emergencyClose() {
Connection con = this.connection;
if (con != null) {
this.connection.emergencyClose();
}
this.connection = null;
}
Connection getConnection() {
Connection result = this.connection;
if (result == null) {
throw new ConnectionDestroyedException();
}
return result;
}
/**
* Set the destroy bit if it is not already set.
* @return true if we were able to set to bit; false if someone else already did
*/
public boolean setShouldDestroy() {
return this.shouldDestroy.compareAndSet(false, true);
}
public boolean shouldDestroy() {
return this.shouldDestroy.get();
}
public boolean isDestroyed() {
return connection == null;
}
public void passivate(final boolean accessed) {
long now = 0L;
if (accessed) {
// do this outside the sync
now = System.nanoTime();
}
synchronized (this) {
if(isDestroyed()) {
return;
}
if(!this.active) {
throw new InternalGemFireException("Connection not active");
}
this.active = false;
notifyAll();
if (accessed) {
this.lastAccessed = now; // do this while synchronized
}
}
}
public synchronized boolean switchConnection(Connection newCon)
throws InterruptedException {
Connection oldCon = null;
synchronized (this) {
if (shouldDestroy()) return false;
if (this.active && !shouldDestroy()) {
this.waitingToSwitch = true;
try {
while (this.active && !shouldDestroy()) {
wait();
}
} finally {
this.waitingToSwitch = false;
notifyAll();
}
}
if (shouldDestroy()) return false;
assert !this.active;
final long now = System.nanoTime();
oldCon = this.connection;
this.connection = newCon;
this.endpoint = newCon.getEndpoint();
this.birthDate = now;
}
if (oldCon != null) {
try {
// do this outside of sync
oldCon.close(false);
} catch (Exception e) {
// ignore
}
}
return true;
}
public void activate() {
synchronized (this) {
try {
while (this.waitingToSwitch) {
wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
getConnection(); // it checks if we are destroyed
if(active) {
throw new InternalGemFireException("Connection already active");
}
if(shouldDestroy()) {
throw new ConnectionDestroyedException();
}
active = true;
}
}
private synchronized long getLastAccessed() {
return lastAccessed;
}
public long getBirthDate() {
return this.birthDate;
}
public void setBirthDate(long ts) {
this.birthDate = ts;
}
/**
* Returns the number of nanos remaining is this guys life.
*/
public long remainingLife(long now, long timeoutNanos) {
return (getBirthDate() - now) + timeoutNanos;
}
private long remainingIdle(long now, long timeoutNanos) {
return (getLastAccessed() - now) + timeoutNanos;
}
/**
* If we were able to idle timeout this connection then return
* -1.
* If this connection has already been destroyed return 0.
* Otherwise return the amount of idle time he has remaining.
* If he is active we can't time him out now and a hint is returned
* as when we should check him next.
*
*/
public long doIdleTimeout(long now, long timeoutNanos) {
if (shouldDestroy()) return 0;
synchronized (this) {
if (isActive()) {
// this is a reasonable value to return since odds are that
// when he goes inactive he will be resetting his access time.
return timeoutNanos;
} else {
long idleRemaining = remainingIdle(now, timeoutNanos);
if (idleRemaining <= 0) {
if (setShouldDestroy()) {
// we were able to set the destroy bit
return -1;
} else {
// someone else already destroyed it
return 0;
}
} else {
return idleRemaining;
}
}
}
}
/**
* Return true if the connection has been idle long enough to expire.
*/
public boolean hasIdleExpired(long now, long timeoutNanos) {
synchronized (this) {
if (isActive()) {
return false;
} else {
return remainingIdle(now, timeoutNanos) <= 0;
}
}
}
public ByteBuffer getCommBuffer() throws SocketException {
return getConnection().getCommBuffer();
}
public Socket getSocket() {
return getConnection().getSocket();
}
public OutputStream getOutputStream() {
return getConnection().getOutputStream();
}
public InputStream getInputStream() {
return getConnection().getInputStream();
}
public ConnectionStats getStats() {
return getEndpoint().getStats();
}
public Endpoint getEndpoint() {
return this.endpoint;
}
public ServerQueueStatus getQueueStatus() {
return getConnection().getQueueStatus();
}
@Override
public String toString() {
Connection myCon = connection;
if (myCon != null) {
return "Pooled Connection to " + this.endpoint + ": " + myCon.toString();
} else {
return "Pooled Connection to " + this.endpoint + ": Connection[DESTROYED]";
}
}
public Object execute(Op op) throws Exception {
return getConnection().execute(op);
}
public static void loadEmergencyClasses() {
ConnectionImpl.loadEmergencyClasses();
}
public short getWanSiteVersion(){
return getConnection().getWanSiteVersion();
}
public int getDistributedSystemId() {
return getConnection().getDistributedSystemId();
}
public void setWanSiteVersion(short wanSiteVersion){
getConnection().setWanSiteVersion(wanSiteVersion);
}
public void setConnectionID(long id) {
this.connection.setConnectionID(id);
}
public long getConnectionID() {
return this.connection.getConnectionID();
}
}