blob: ae7bf5e2f36b7cc320ac7b2710c438134d1d2ea5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal.pooling;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ConnectionImpl;
import org.apache.geode.cache.client.internal.ConnectionStats;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.Op;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
/**
* A connection managed by the connection manager. Keeps track of the current state of the
* connection.
*
* @since GemFire 5.7
*
*/
public class PooledConnection implements Connection {
/*
* connection is volatile because we may asynchronously destroy the pooled connection while
* shutting down.
*/
private volatile Connection connection;
private volatile Endpoint endpoint;
private volatile long birthDate;
private long lastAccessed; // read & written while synchronized
private boolean active = true; // read and write while synchronized on this
private final AtomicBoolean shouldDestroy = new AtomicBoolean();
private boolean waitingToSwitch = false;
public PooledConnection(ConnectionManagerImpl manager, Connection connection) {
this.connection = connection;
this.endpoint = connection.getEndpoint();
this.birthDate = System.nanoTime();
this.lastAccessed = this.birthDate;
}
@Override
public ServerLocation getServer() {
return getEndpoint().getLocation();
}
public boolean isActive() {
synchronized (this) {
return this.active;
}
}
/**
* @return true if internal connection was destroyed by this call; false if already destroyed
*/
public boolean internalDestroy() {
boolean result = false;
this.shouldDestroy.set(true); // probably already set but make sure
synchronized (this) {
this.active = false;
notifyAll();
Connection myCon = connection;
if (myCon != null) {
myCon.destroy();
connection = null;
result = true;
}
}
return result;
}
/**
* When a pooled connection is destroyed, it's not destroyed right away, but when it is returned
* to the pool.
*/
@Override
public void destroy() {
this.shouldDestroy.set(true);
}
public void internalClose(boolean keepAlive) throws Exception {
try {
Connection con = this.connection;
if (con != null) {
con.close(keepAlive);
}
} finally {
internalDestroy();
}
}
@Override
public void close(boolean keepAlive) throws Exception {
internalClose(keepAlive);
}
@Override
public void emergencyClose() {
Connection con = this.connection;
if (con != null) {
this.connection.emergencyClose();
}
this.connection = null;
}
Connection getConnection() {
Connection result = this.connection;
if (result == null) {
throw new ConnectionDestroyedException();
}
return result;
}
@Override
public Connection getWrappedConnection() {
return getConnection();
}
/**
* Set the destroy bit if it is not already set.
*
* @return true if we were able to set to bit; false if someone else already did
*/
public boolean setShouldDestroy() {
return this.shouldDestroy.compareAndSet(false, true);
}
public boolean shouldDestroy() {
return this.shouldDestroy.get();
}
@Override
public boolean isDestroyed() {
return connection == null;
}
@Override
public void passivate(final boolean accessed) {
long now = 0L;
if (accessed) {
// do this outside the sync
now = System.nanoTime();
}
synchronized (this) {
if (isDestroyed()) {
return;
}
if (!this.active) {
throw new InternalGemFireException("Connection not active");
}
this.active = false;
if (this.waitingToSwitch) {
notifyAll();
}
if (accessed) {
this.lastAccessed = now; // do this while synchronized
}
}
}
public synchronized boolean switchConnection(Connection newCon) throws InterruptedException {
Connection oldCon = null;
synchronized (this) {
if (shouldDestroy())
return false;
if (this.active && !shouldDestroy()) {
this.waitingToSwitch = true;
try {
while (this.active && !shouldDestroy()) {
wait();
}
} finally {
this.waitingToSwitch = false;
notifyAll();
}
}
if (shouldDestroy())
return false;
assert !this.active;
final long now = System.nanoTime();
oldCon = this.connection;
this.connection = newCon;
this.endpoint = newCon.getEndpoint();
this.birthDate = now;
}
if (oldCon != null) {
try {
// do this outside of sync
oldCon.close(false);
} catch (Exception e) {
// ignore
}
}
return true;
}
@Override
public boolean activate() {
synchronized (this) {
try {
while (this.waitingToSwitch) {
wait();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (isDestroyed() || shouldDestroy()) {
return false;
}
if (active) {
throw new InternalGemFireException("Connection already active");
}
active = true;
return true;
}
}
private synchronized long getLastAccessed() {
return lastAccessed;
}
public long getBirthDate() {
return this.birthDate;
}
public void setBirthDate(long ts) {
this.birthDate = ts;
}
/**
* Returns the number of nanos remaining is this connection's life.
*/
public long remainingLife(long now, long timeoutNanos) {
return (getBirthDate() - now) + timeoutNanos;
}
private long remainingIdle(long now, long timeoutNanos) {
return (getLastAccessed() - now) + timeoutNanos;
}
/**
* If we were able to idle timeout this connection then return -1. If this connection has already
* been destroyed return 0. Otherwise return the amount of idle time remaining. If the connection
* is active we can't time it out now and a hint is returned as when we should check again.
*/
public long doIdleTimeout(long now, long timeoutNanos) {
if (shouldDestroy())
return 0;
synchronized (this) {
if (isActive()) {
// this is a reasonable value to return since odds are that
// when the connection goes inactive it will be resetting its access time.
return timeoutNanos;
} else {
long idleRemaining = remainingIdle(now, timeoutNanos);
if (idleRemaining <= 0) {
if (setShouldDestroy()) {
// we were able to set the destroy bit
return -1;
} else {
// someone else already destroyed it
return 0;
}
} else {
return idleRemaining;
}
}
}
}
/**
* Return true if the connection has been idle long enough to expire.
*/
public boolean hasIdleExpired(long now, long timeoutNanos) {
synchronized (this) {
if (isActive()) {
return false;
} else {
return remainingIdle(now, timeoutNanos) <= 0;
}
}
}
@Override
public ByteBuffer getCommBuffer() throws SocketException {
return getConnection().getCommBuffer();
}
@Override
public Socket getSocket() {
return getConnection().getSocket();
}
@Override
public OutputStream getOutputStream() {
return getConnection().getOutputStream();
}
@Override
public InputStream getInputStream() {
return getConnection().getInputStream();
}
@Override
public ConnectionStats getStats() {
return getEndpoint().getStats();
}
@Override
public Endpoint getEndpoint() {
return this.endpoint;
}
@Override
public ServerQueueStatus getQueueStatus() {
return getConnection().getQueueStatus();
}
@Override
public String toString() {
Connection myCon = connection;
if (myCon != null) {
return "Pooled Connection to " + this.endpoint + ": " + myCon.toString();
} else {
return "Pooled Connection to " + this.endpoint + ": Connection[DESTROYED]";
}
}
@Override
public Object execute(Op op) throws Exception {
return getConnection().execute(op);
}
public static void loadEmergencyClasses() {
ConnectionImpl.loadEmergencyClasses();
}
@Override
public short getWanSiteVersion() {
return getConnection().getWanSiteVersion();
}
@Override
public int getDistributedSystemId() {
return getConnection().getDistributedSystemId();
}
@Override
public void setWanSiteVersion(short wanSiteVersion) {
getConnection().setWanSiteVersion(wanSiteVersion);
}
public void setConnection(Connection newConnection) {
this.connection = newConnection;
}
@Override
public void setConnectionID(long id) {
this.connection.setConnectionID(id);
}
@Override
public long getConnectionID() {
return this.connection.getConnectionID();
}
}