| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.router; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.net.SocketFactory; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.ipc.AlignmentContext; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryUtils; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.RefreshUserMappingsProtocol; |
| import org.apache.hadoop.security.SaslRpcServer; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; |
| import org.apache.hadoop.tools.GetUserMappingsProtocol; |
| import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; |
| import org.apache.hadoop.util.Time; |
| import org.eclipse.jetty.util.ajax.JSON; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Maintains a pool of connections for each User (including tokens) + NN. The |
| * RPC client maintains a single socket, to achieve throughput similar to a NN, |
| * each request is multiplexed across multiple sockets/connections from a |
| * pool. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class ConnectionPool { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(ConnectionPool.class); |
| |
| /** Configuration settings for the connection pool. */ |
| private final Configuration conf; |
| |
| /** Identifier for this connection pool. */ |
| private final ConnectionPoolId connectionPoolId; |
| /** Namenode this pool connects to. */ |
| private final String namenodeAddress; |
| /** User for this connections. */ |
| private final UserGroupInformation ugi; |
| /** Class of the protocol. */ |
| private final Class<?> protocol; |
| |
| /** Pool of connections. We mimic a COW array. */ |
| private volatile List<ConnectionContext> connections = new ArrayList<>(); |
| /** Connection index for round-robin. */ |
| private final AtomicInteger clientIndex = new AtomicInteger(0); |
| /** Underlying socket index. **/ |
| private final AtomicInteger socketIndex = new AtomicInteger(0); |
| |
| /** Min number of connections per user. */ |
| private final int minSize; |
| /** Max number of connections per user. */ |
| private final int maxSize; |
| /** Min ratio of active connections per user. */ |
| private final float minActiveRatio; |
| |
| /** The last time a connection was active. */ |
| private volatile long lastActiveTime = 0; |
| |
| /** Enable using multiple physical socket or not. **/ |
| private final boolean enableMultiSocket; |
| /** StateID alignment context. */ |
| private final PoolAlignmentContext alignmentContext; |
| |
| /** Map for the protocols and their protobuf implementations. */ |
| private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>(); |
| static { |
| PROTO_MAP.put(ClientProtocol.class, |
| new ProtoImpl(ClientNamenodeProtocolPB.class, |
| ClientNamenodeProtocolTranslatorPB.class)); |
| PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl( |
| NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class)); |
| PROTO_MAP.put(RefreshUserMappingsProtocol.class, |
| new ProtoImpl(RefreshUserMappingsProtocolPB.class, |
| RefreshUserMappingsProtocolClientSideTranslatorPB.class)); |
| PROTO_MAP.put(GetUserMappingsProtocol.class, |
| new ProtoImpl(GetUserMappingsProtocolPB.class, |
| GetUserMappingsProtocolClientSideTranslatorPB.class)); |
| } |
| |
| /** Class to store the protocol implementation. */ |
| private static class ProtoImpl { |
| private final Class<?> protoPb; |
| private final Class<?> protoClientPb; |
| |
| ProtoImpl(Class<?> pPb, Class<?> pClientPb) { |
| this.protoPb = pPb; |
| this.protoClientPb = pClientPb; |
| } |
| } |
| |
| protected ConnectionPool(Configuration config, String address, |
| UserGroupInformation user, int minPoolSize, int maxPoolSize, |
| float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext) |
| throws IOException { |
| |
| this.conf = config; |
| |
| // Connection pool target |
| this.ugi = user; |
| this.namenodeAddress = address; |
| this.protocol = proto; |
| this.connectionPoolId = |
| new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol); |
| |
| // Set configuration parameters for the pool |
| this.minSize = minPoolSize; |
| this.maxSize = maxPoolSize; |
| this.minActiveRatio = minActiveRatio; |
| this.enableMultiSocket = conf.getBoolean( |
| RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY, |
| RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT); |
| |
| this.alignmentContext = alignmentContext; |
| |
| // Add minimum connections to the pool |
| for (int i = 0; i < this.minSize; i++) { |
| ConnectionContext newConnection = newConnection(); |
| this.connections.add(newConnection); |
| } |
| LOG.debug("Created connection pool \"{}\" with {} connections", |
| this.connectionPoolId, this.minSize); |
| } |
| |
| /** |
| * Get the maximum number of connections allowed in this pool. |
| * |
| * @return Maximum number of connections. |
| */ |
| protected int getMaxSize() { |
| return this.maxSize; |
| } |
| |
| /** |
| * Get the minimum number of connections in this pool. |
| * |
| * @return Minimum number of connections. |
| */ |
| protected int getMinSize() { |
| return this.minSize; |
| } |
| |
| /** |
| * Get the minimum ratio of active connections in this pool. |
| * |
| * @return Minimum ratio of active connections. |
| */ |
| protected float getMinActiveRatio() { |
| return this.minActiveRatio; |
| } |
| |
| /** |
| * Get the connection pool identifier. |
| * |
| * @return Connection pool identifier. |
| */ |
| protected ConnectionPoolId getConnectionPoolId() { |
| return this.connectionPoolId; |
| } |
| |
| /** |
| * Get the clientIndex used to calculate index for lookup. |
| * @return Client index. |
| */ |
| @VisibleForTesting |
| public AtomicInteger getClientIndex() { |
| return this.clientIndex; |
| } |
| |
| /** |
| * Get the alignment context for this pool. |
| * @return Alignment context |
| */ |
| public PoolAlignmentContext getPoolAlignmentContext() { |
| return this.alignmentContext; |
| } |
| |
| /** |
| * Return the next connection round-robin. |
| * |
| * @return Connection context. |
| */ |
| protected ConnectionContext getConnection() { |
| this.lastActiveTime = Time.now(); |
| List<ConnectionContext> tmpConnections = this.connections; |
| for (ConnectionContext tmpConnection : tmpConnections) { |
| if (tmpConnection != null && tmpConnection.isUsable()) { |
| return tmpConnection; |
| } |
| } |
| |
| ConnectionContext conn = null; |
| // We return a connection even if it's busy |
| int size = tmpConnections.size(); |
| if (size > 0) { |
| // Get a connection from the pool following round-robin |
| // Inc and mask off sign bit, lookup index should be non-negative int |
| int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF; |
| conn = tmpConnections.get(threadIndex % size); |
| } |
| return conn; |
| } |
| |
| /** |
| * Add a connection to the current pool. It uses a Copy-On-Write approach. |
| * |
| * @param conn New connection to add to the pool. |
| */ |
| public synchronized void addConnection(ConnectionContext conn) { |
| List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections); |
| tmpConnections.add(conn); |
| this.connections = tmpConnections; |
| |
| this.lastActiveTime = Time.now(); |
| } |
| |
| /** |
| * Remove connections from the current pool. |
| * |
| * @param num Number of connections to remove. |
| * @return Removed connections. |
| */ |
| public synchronized List<ConnectionContext> removeConnections(int num) { |
| List<ConnectionContext> removed = new LinkedList<>(); |
| if (this.connections.size() > this.minSize) { |
| int targetCount = Math.min(num, this.connections.size() - this.minSize); |
| // Remove and close targetCount of connections |
| List<ConnectionContext> tmpConnections = new ArrayList<>(); |
| for (ConnectionContext conn : this.connections) { |
| // Only pick idle connections to close |
| if (removed.size() < targetCount && conn.isIdle()) { |
| removed.add(conn); |
| } else { |
| tmpConnections.add(conn); |
| } |
| } |
| this.connections = tmpConnections; |
| } |
| LOG.debug("Expected to remove {} connection and actually removed {} connections " |
| + "for connectionPool: {}", num, removed.size(), connectionPoolId); |
| return removed; |
| } |
| |
| /** |
| * Close the connection pool. |
| */ |
| protected synchronized void close() { |
| long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds( |
| Time.now() - getLastActiveTime()); |
| LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago", |
| this.connectionPoolId, timeSinceLastActive); |
| |
| for (ConnectionContext connection : this.connections) { |
| connection.close(true); |
| } |
| this.connections.clear(); |
| } |
| |
| /** |
| * Number of connections in the pool. |
| * |
| * @return Number of connections. |
| */ |
| protected int getNumConnections() { |
| return this.connections.size(); |
| } |
| |
| /** |
| * Number of active connections in the pool. |
| * |
| * @return Number of active connections. |
| */ |
| protected int getNumActiveConnections() { |
| int ret = 0; |
| List<ConnectionContext> tmpConnections = this.connections; |
| for (ConnectionContext conn : tmpConnections) { |
| if (conn.isActive()) { |
| ret++; |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Number of usable i.e. no active thread connections. |
| * |
| * @return Number of idle connections |
| */ |
| protected int getNumIdleConnections() { |
| int ret = 0; |
| List<ConnectionContext> tmpConnections = this.connections; |
| for (ConnectionContext conn : tmpConnections) { |
| if (conn.isIdle()) { |
| ret++; |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Number of active connections recently in the pool. |
| * |
| * @return Number of active connections recently. |
| */ |
| protected int getNumActiveConnectionsRecently() { |
| int ret = 0; |
| List<ConnectionContext> tmpConnections = this.connections; |
| for (ConnectionContext conn : tmpConnections) { |
| if (conn.isActiveRecently()) { |
| ret++; |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Get the last time the connection pool was used. |
| * |
| * @return Last time the connection pool was used. |
| */ |
| protected long getLastActiveTime() { |
| return this.lastActiveTime; |
| } |
| |
| @Override |
| public String toString() { |
| return this.connectionPoolId.toString(); |
| } |
| |
| /** |
| * JSON representation of the connection pool. |
| * |
| * @return String representation of the JSON. |
| */ |
| public String getJSON() { |
| final Map<String, String> info = new LinkedHashMap<>(); |
| info.put("active", Integer.toString(getNumActiveConnections())); |
| info.put("recent_active", |
| Integer.toString(getNumActiveConnectionsRecently())); |
| info.put("idle", Integer.toString(getNumIdleConnections())); |
| info.put("total", Integer.toString(getNumConnections())); |
| if (LOG.isDebugEnabled()) { |
| List<ConnectionContext> tmpConnections = this.connections; |
| for (int i=0; i<tmpConnections.size(); i++) { |
| ConnectionContext connection = tmpConnections.get(i); |
| info.put(i + " active", Boolean.toString(connection.isActive())); |
| info.put(i + " recent_active", |
| Integer.toString(getNumActiveConnectionsRecently())); |
| info.put(i + " idle", Boolean.toString(connection.isUsable())); |
| info.put(i + " closed", Boolean.toString(connection.isClosed())); |
| } |
| } |
| return JSON.toString(info); |
| } |
| |
| /** |
| * Create a new proxy wrapper for a client NN connection. |
| * @return Proxy for the target ClientProtocol that contains the user's |
| * security context. |
| * @throws IOException If it cannot get a new connection. |
| */ |
| public ConnectionContext newConnection() throws IOException { |
| return newConnection(this.conf, this.namenodeAddress, |
| this.ugi, this.protocol, this.enableMultiSocket, |
| this.socketIndex.incrementAndGet(), alignmentContext); |
| } |
| |
| /** |
| * Creates a proxy wrapper for a client NN connection. Each proxy contains |
| * context for a single user/security context. To maximize throughput it is |
| * recommended to use multiple connection per user+server, allowing multiple |
| * writes and reads to be dispatched in parallel. |
| * |
| * @param conf Configuration for the connection. |
| * @param nnAddress Address of server supporting the ClientProtocol. |
| * @param ugi User context. |
| * @param proto Interface of the protocol. |
| * @param enableMultiSocket Enable multiple socket or not. |
| * @param socketIndex Index for FederationConnectionId. |
| * @param alignmentContext Client alignment context. |
| * @param <T> Input type T. |
| * @return proto for the target ClientProtocol that contains the user's |
| * security context. |
| * @throws IOException If it cannot be created. |
| */ |
| protected static <T> ConnectionContext newConnection(Configuration conf, |
| String nnAddress, UserGroupInformation ugi, Class<T> proto, |
| boolean enableMultiSocket, int socketIndex, |
| AlignmentContext alignmentContext) throws IOException { |
| if (!PROTO_MAP.containsKey(proto)) { |
| String msg = "Unsupported protocol for connection to NameNode: " |
| + ((proto != null) ? proto.getName() : "null"); |
| LOG.error(msg); |
| throw new IllegalStateException(msg); |
| } |
| ProtoImpl classes = PROTO_MAP.get(proto); |
| RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine2.class); |
| |
| final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, |
| HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, |
| HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, |
| HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, |
| HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, |
| HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); |
| |
| SocketFactory factory = SocketFactory.getDefault(); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| SaslRpcServer.init(conf); |
| } |
| InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); |
| final long version = RPC.getProtocolVersion(classes.protoPb); |
| Object proxy; |
| if (enableMultiSocket) { |
| FederationConnectionId connectionId = new FederationConnectionId( |
| socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf), |
| defaultPolicy, conf, socketIndex); |
| proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId, |
| conf, factory, alignmentContext).getProxy(); |
| } else { |
| proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, |
| conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null, |
| alignmentContext).getProxy(); |
| } |
| |
| T client = newProtoClient(proto, classes, proxy); |
| Text dtService = SecurityUtil.buildTokenService(socket); |
| |
| ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket); |
| return new ConnectionContext(clientProxy, conf); |
| } |
| |
| private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes, |
| Object proxy) { |
| try { |
| Constructor<?> constructor = |
| classes.protoClientPb.getConstructor(classes.protoPb); |
| Object o = constructor.newInstance(proxy); |
| if (proto.isAssignableFrom(o.getClass())) { |
| @SuppressWarnings("unchecked") |
| T client = (T) o; |
| return client; |
| } |
| } catch (Exception e) { |
| LOG.error(e.getMessage()); |
| } |
| return null; |
| } |
| } |