| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache.client.internal.pooling; |
| |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.nio.ByteBuffer; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import com.gemstone.gemfire.InternalGemFireException; |
| import com.gemstone.gemfire.cache.client.internal.Connection; |
| import com.gemstone.gemfire.cache.client.internal.ConnectionImpl; |
| import com.gemstone.gemfire.cache.client.internal.ConnectionStats; |
| import com.gemstone.gemfire.cache.client.internal.Endpoint; |
| import com.gemstone.gemfire.cache.client.internal.Op; |
| import com.gemstone.gemfire.distributed.internal.ServerLocation; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus; |
| |
| /** |
| * A connection managed by the connection manager. Keeps track |
| * of the current state of the connection. |
| * @author dsmith |
| * @since 5.7 |
| * |
| */ |
| class PooledConnection implements Connection { |
| |
| /* connection is volatile because we may asynchronously |
| * destroy the pooled connection while shutting down. */ |
| private volatile Connection connection; |
| private volatile Endpoint endpoint; |
| private volatile long birthDate; |
| private long lastAccessed; // read & written while synchronized |
| private boolean active = true; // read and write while synchronized on this |
| private final AtomicBoolean shouldDestroy = new AtomicBoolean(); |
| private boolean waitingToSwitch = false; |
| // private final ConnectionManagerImpl manager; |
| |
| public PooledConnection(ConnectionManagerImpl manager, Connection connection) { |
| // this.manager = manager; |
| this.connection = connection; |
| this.endpoint = connection.getEndpoint(); |
| this.birthDate = System.nanoTime(); |
| this.lastAccessed = this.birthDate; |
| } |
| |
| public ServerLocation getServer() { |
| return getEndpoint().getLocation(); |
| } |
| |
| public boolean isActive() { |
| synchronized (this) { |
| return this.active; |
| } |
| } |
| |
| public void internalDestroy() { |
| this.shouldDestroy.set(true); // probably already set but make sure |
| synchronized (this) { |
| this.active = false; |
| notifyAll(); |
| Connection myCon = connection; |
| if (myCon != null) { |
| myCon.destroy(); |
| connection = null; |
| } |
| } |
| } |
| |
| /** When a pooled connection is destroyed, it's not destroyed |
| * right away, but when it is returned to the pool. |
| */ |
| public void destroy() { |
| this.shouldDestroy.set(true); |
| } |
| |
| public void internalClose(boolean keepAlive) throws Exception { |
| try { |
| Connection con = this.connection; |
| if (con != null) { |
| con.close(keepAlive); |
| } |
| } finally { |
| internalDestroy(); |
| } |
| } |
| |
| public void close(boolean keepAlive) throws Exception { |
| // needed to junit test |
| internalClose(keepAlive); |
| // throw new UnsupportedOperationException( |
| // "Pooled connections should only be closed by the connection manager"); |
| } |
| |
| public void emergencyClose() { |
| Connection con = this.connection; |
| if (con != null) { |
| this.connection.emergencyClose(); |
| } |
| this.connection = null; |
| |
| } |
| |
| Connection getConnection() { |
| Connection result = this.connection; |
| if (result == null) { |
| throw new ConnectionDestroyedException(); |
| } |
| return result; |
| } |
| |
| /** |
| * Set the destroy bit if it is not already set. |
| * @return true if we were able to set to bit; false if someone else already did |
| */ |
| public boolean setShouldDestroy() { |
| return this.shouldDestroy.compareAndSet(false, true); |
| } |
| |
| public boolean shouldDestroy() { |
| return this.shouldDestroy.get(); |
| } |
| |
| public boolean isDestroyed() { |
| return connection == null; |
| } |
| |
| public void passivate(final boolean accessed) { |
| long now = 0L; |
| if (accessed) { |
| // do this outside the sync |
| now = System.nanoTime(); |
| } |
| synchronized (this) { |
| if(isDestroyed()) { |
| return; |
| } |
| if(!this.active) { |
| throw new InternalGemFireException("Connection not active"); |
| } |
| this.active = false; |
| notifyAll(); |
| if (accessed) { |
| this.lastAccessed = now; // do this while synchronized |
| } |
| } |
| } |
| |
| |
| public synchronized boolean switchConnection(Connection newCon) |
| throws InterruptedException { |
| Connection oldCon = null; |
| synchronized (this) { |
| if (shouldDestroy()) return false; |
| |
| if (this.active && !shouldDestroy()) { |
| this.waitingToSwitch = true; |
| try { |
| while (this.active && !shouldDestroy()) { |
| wait(); |
| } |
| } finally { |
| this.waitingToSwitch = false; |
| notifyAll(); |
| } |
| } |
| if (shouldDestroy()) return false; |
| assert !this.active; |
| final long now = System.nanoTime(); |
| oldCon = this.connection; |
| this.connection = newCon; |
| this.endpoint = newCon.getEndpoint(); |
| this.birthDate = now; |
| } |
| if (oldCon != null) { |
| try { |
| // do this outside of sync |
| oldCon.close(false); |
| } catch (Exception e) { |
| // ignore |
| } |
| } |
| return true; |
| } |
| |
| public void activate() { |
| synchronized (this) { |
| try { |
| while (this.waitingToSwitch) { |
| wait(); |
| } |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| } |
| getConnection(); // it checks if we are destroyed |
| if(active) { |
| throw new InternalGemFireException("Connection already active"); |
| } |
| if(shouldDestroy()) { |
| throw new ConnectionDestroyedException(); |
| } |
| active = true; |
| } |
| } |
| |
| private synchronized long getLastAccessed() { |
| return lastAccessed; |
| } |
| |
| public long getBirthDate() { |
| return this.birthDate; |
| } |
| |
| public void setBirthDate(long ts) { |
| this.birthDate = ts; |
| } |
| |
| /** |
| * Returns the number of nanos remaining is this guys life. |
| */ |
| public long remainingLife(long now, long timeoutNanos) { |
| return (getBirthDate() - now) + timeoutNanos; |
| } |
| |
| private long remainingIdle(long now, long timeoutNanos) { |
| return (getLastAccessed() - now) + timeoutNanos; |
| } |
| |
| /** |
| * If we were able to idle timeout this connection then return |
| * -1. |
| * If this connection has already been destroyed return 0. |
| * Otherwise return the amount of idle time he has remaining. |
| * If he is active we can't time him out now and a hint is returned |
| * as when we should check him next. |
| |
| * |
| */ |
| public long doIdleTimeout(long now, long timeoutNanos) { |
| if (shouldDestroy()) return 0; |
| synchronized (this) { |
| if (isActive()) { |
| // this is a reasonable value to return since odds are that |
| // when he goes inactive he will be resetting his access time. |
| return timeoutNanos; |
| } else { |
| long idleRemaining = remainingIdle(now, timeoutNanos); |
| if (idleRemaining <= 0) { |
| if (setShouldDestroy()) { |
| // we were able to set the destroy bit |
| return -1; |
| } else { |
| // someone else already destroyed it |
| return 0; |
| } |
| } else { |
| return idleRemaining; |
| } |
| } |
| } |
| } |
| /** |
| * Return true if the connection has been idle long enough to expire. |
| */ |
| public boolean hasIdleExpired(long now, long timeoutNanos) { |
| synchronized (this) { |
| if (isActive()) { |
| return false; |
| } else { |
| return remainingIdle(now, timeoutNanos) <= 0; |
| } |
| } |
| } |
| |
| public ByteBuffer getCommBuffer() throws SocketException { |
| return getConnection().getCommBuffer(); |
| } |
| |
| public Socket getSocket() { |
| return getConnection().getSocket(); |
| } |
| |
| public OutputStream getOutputStream() { |
| return getConnection().getOutputStream(); |
| } |
| |
| public InputStream getInputStream() { |
| return getConnection().getInputStream(); |
| } |
| |
| public ConnectionStats getStats() { |
| return getEndpoint().getStats(); |
| } |
| |
| public Endpoint getEndpoint() { |
| return this.endpoint; |
| } |
| public ServerQueueStatus getQueueStatus() { |
| return getConnection().getQueueStatus(); |
| } |
| |
| @Override |
| public String toString() { |
| Connection myCon = connection; |
| if (myCon != null) { |
| return "Pooled Connection to " + this.endpoint + ": " + myCon.toString(); |
| } else { |
| return "Pooled Connection to " + this.endpoint + ": Connection[DESTROYED]"; |
| } |
| } |
| |
| public Object execute(Op op) throws Exception { |
| return getConnection().execute(op); |
| } |
| |
| public static void loadEmergencyClasses() { |
| ConnectionImpl.loadEmergencyClasses(); |
| } |
| public short getWanSiteVersion(){ |
| return getConnection().getWanSiteVersion(); |
| } |
| |
| public int getDistributedSystemId() { |
| return getConnection().getDistributedSystemId(); |
| } |
| |
| public void setWanSiteVersion(short wanSiteVersion){ |
| getConnection().setWanSiteVersion(wanSiteVersion); |
| } |
| |
| public void setConnectionID(long id) { |
| this.connection.setConnectionID(id); |
| } |
| |
| public long getConnectionID() { |
| return this.connection.getConnectionID(); |
| } |
| } |