blob: 035d46f144a57ece6a0878b5387b1332eae60bd2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
* a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active.
*
* The protocols currently used are:
* <ul>
* <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
* <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
* </ul>
*/
public class ConnectionContext {
private static final Logger LOG =
LoggerFactory.getLogger(ConnectionContext.class);
/** Client for the connection. */
private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
/** Last timestamp the connection was active. */
private long lastActiveTs = 0;
/** The connection's active status would expire after this window. */
private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
/** The maximum number of requests that this connection can handle concurrently. **/
private final int maxConcurrencyPerConn;
public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
this.client = connection;
this.maxConcurrencyPerConn = conf.getInt(
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
}
/**
* Check if the connection is active.
*
* @return True if the connection is active.
*/
public synchronized boolean isActive() {
return this.numThreads > 0;
}
/**
* Check if the connection is/was active recently.
*
* @return True if the connection is active or
* was active in the past period of time.
*/
public synchronized boolean isActiveRecently() {
return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
}
/**
* Check if the connection is closed.
*
* @return If the connection is closed.
*/
public synchronized boolean isClosed() {
return this.closed;
}
/**
* Check if the connection can be used. It checks if the connection is used by
* another thread or already closed.
*
* @return True if the connection can be used.
*/
public synchronized boolean isUsable() {
return hasAvailableConcurrency() && !isClosed();
}
/**
* Return true if this connection context still has available concurrency,
* else return false.
*/
private synchronized boolean hasAvailableConcurrency() {
return this.numThreads < maxConcurrencyPerConn;
}
/**
* Check if the connection is idle. It checks if the connection is not used
* by another thread.
* @return True if the connection is not used by another thread.
*/
public synchronized boolean isIdle() {
return !isActive() && !isClosed();
}
/**
* Get the connection client.
*
* @return Connection client.
*/
public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++;
this.lastActiveTs = Time.monotonicNow();
return this.client;
}
/**
* Release this connection.
*/
public synchronized void release() {
if (this.numThreads > 0) {
this.numThreads--;
}
}
/**
* Close a connection. Only idle connections can be closed since
* the RPC proxy would be shut down immediately.
*
* @param force whether the connection should be closed anyway.
*/
public synchronized void close(boolean force) {
if (!force && this.numThreads > 0) {
// this is an erroneous case, but we have to close the connection
// anyway since there will be connection leak if we don't do so
// the connection has been moved out of the pool
LOG.error("Active connection with {} handlers will be closed, ConnectionContext is {}",
this.numThreads, this);
}
this.closed = true;
Object proxy = this.client.getProxy();
// Nobody should be using this anymore, so it should close right away
RPC.stopProxy(proxy);
}
public synchronized void close() {
close(false);
}
@Override
public String toString() {
InetSocketAddress addr = this.client.getAddress();
Object proxy = this.client.getProxy();
Class<?> clazz = proxy.getClass();
StringBuilder sb = new StringBuilder();
sb.append("hashcode:")
.append(hashCode())
.append(" ")
.append(clazz.getSimpleName())
.append("@")
.append(addr)
.append("x")
.append(numThreads);
if (closed) {
sb.append("[CLOSED]");
}
return sb.toString();
}
}