| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.ipc; |
| |
| import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; |
| import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; |
| |
| import io.opentelemetry.api.trace.Span; |
| import io.opentelemetry.api.trace.StatusCode; |
| import io.opentelemetry.context.Scope; |
| import java.io.IOException; |
| import java.net.SocketAddress; |
| import java.util.Collection; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.client.MetricsConnection; |
| import org.apache.hadoop.hbase.codec.Codec; |
| import org.apache.hadoop.hbase.codec.KeyValueCodec; |
| import org.apache.hadoop.hbase.net.Address; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.security.UserProvider; |
| import org.apache.hadoop.hbase.trace.TraceUtil; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.PoolMap; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; |
| import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; |
| import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; |
| import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; |
| import org.apache.hbase.thirdparty.com.google.protobuf.Message; |
| import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; |
| import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; |
| import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; |
| import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; |
| import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; |
| |
| /** |
| * Provides the basics for a RpcClient implementation like configuration and Logging. |
| * <p> |
| * Locking schema of the current IPC implementation |
| * <ul> |
| * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating |
| * connection.</li> |
| * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li> |
| * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of |
| * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)} |
| * of how to deal with cancel.</li> |
| * <li>For connection implementation, the construction of a connection should be as fast as possible |
| * because the creation is protected under a lock. Connect to remote side when needed. There is no |
| * forced locking schema for a connection implementation.</li> |
| * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held |
| * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute |
| * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations |
| * of the callbacks are free to hold any lock.</li> |
| * </ul> |
| * @since 2.0.0 |
| */ |
| @InterfaceAudience.Private |
| public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient { |
| // Log level is being changed in tests |
| public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); |
| |
| protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( |
| new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true) |
| .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), |
| 10, TimeUnit.MILLISECONDS); |
| |
| private static final ScheduledExecutorService IDLE_CONN_SWEEPER = |
| Executors.newScheduledThreadPool(1, |
| new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true) |
| .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); |
| |
| private boolean running = true; // if client runs |
| |
| protected final Configuration conf; |
| protected final String clusterId; |
| protected final SocketAddress localAddr; |
| protected final MetricsConnection metrics; |
| |
| protected final UserProvider userProvider; |
| protected final CellBlockBuilder cellBlockBuilder; |
| |
| protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this |
| // time (in ms), it will be closed at any moment. |
| protected final int maxRetries; // the max. no. of retries for socket connections |
| protected final long failureSleep; // Time to sleep before retry on failure. |
| protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm |
| protected final boolean tcpKeepAlive; // if T then use keepalives |
| protected final Codec codec; |
| protected final CompressionCodec compressor; |
| protected final boolean fallbackAllowed; |
| |
| protected final FailedServers failedServers; |
| |
| protected final int connectTO; |
| protected final int readTO; |
| protected final int writeTO; |
| |
| private final PoolMap<ConnectionId, T> connections; |
| |
| private final AtomicInteger callIdCnt = new AtomicInteger(0); |
| |
| private final ScheduledFuture<?> cleanupIdleConnectionTask; |
| |
| private int maxConcurrentCallsPerServer; |
| |
| private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache = |
| CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS). |
| build(new CacheLoader<Address, AtomicInteger>() { |
| @Override public AtomicInteger load(Address key) throws Exception { |
| return new AtomicInteger(0); |
| } |
| }); |
| |
| /** |
| * Construct an IPC client for the cluster <code>clusterId</code> |
| * @param conf configuration |
| * @param clusterId the cluster id |
| * @param localAddr client socket bind address. |
| * @param metrics the connection metrics |
| */ |
| public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, |
| MetricsConnection metrics) { |
| this.userProvider = UserProvider.instantiate(conf); |
| this.localAddr = localAddr; |
| this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); |
| this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; |
| this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, |
| HConstants.DEFAULT_HBASE_CLIENT_PAUSE); |
| this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); |
| this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); |
| this.cellBlockBuilder = new CellBlockBuilder(conf); |
| |
| this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes |
| this.conf = conf; |
| this.codec = getCodec(); |
| this.compressor = getCompressor(conf); |
| this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, |
| IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); |
| this.failedServers = new FailedServers(conf); |
| this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); |
| this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); |
| this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); |
| this.metrics = metrics; |
| this.maxConcurrentCallsPerServer = conf.getInt( |
| HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, |
| HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD); |
| |
| this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); |
| |
| this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { |
| |
| @Override |
| public void run() { |
| cleanupIdleConnections(); |
| } |
| }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" |
| + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO |
| + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose=" |
| + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" |
| + this.fallbackAllowed + ", bind address=" |
| + (this.localAddr != null ? this.localAddr : "null")); |
| } |
| } |
| |
| private void cleanupIdleConnections() { |
| long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose; |
| synchronized (connections) { |
| for (T conn : connections.values()) { |
| // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the |
| // connection itself has already shutdown. The latter check is because we may still |
| // have some pending calls on connection so we should not shutdown the connection outside. |
| // The connection itself will disconnect if there is no pending call for maxIdleTime. |
| if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress()); |
| } |
| connections.remove(conn.remoteId(), conn); |
| conn.cleanupConnection(); |
| } |
| } |
| } |
| } |
| |
| public static String getDefaultCodec(final Configuration c) { |
| // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because |
| // Configuration will complain -- then no default codec (and we'll pb everything). Else |
| // default is KeyValueCodec |
| return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); |
| } |
| |
| /** |
| * Encapsulate the ugly casting and RuntimeException conversion in private method. |
| * @return Codec to use on this client. |
| */ |
| Codec getCodec() { |
| // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND |
| // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. |
| String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); |
| if (className == null || className.length() == 0) { |
| return null; |
| } |
| try { |
| return (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed getting codec " + className, e); |
| } |
| } |
| |
| @Override |
| public boolean hasCellBlockSupport() { |
| return this.codec != null; |
| } |
| |
| // for writing tests that want to throw exception when connecting. |
| boolean isTcpNoDelay() { |
| return tcpNoDelay; |
| } |
| |
| /** |
| * Encapsulate the ugly casting and RuntimeException conversion in private method. |
| * @param conf configuration |
| * @return The compressor to use on this client. |
| */ |
| private static CompressionCodec getCompressor(final Configuration conf) { |
| String className = conf.get("hbase.client.rpc.compressor", null); |
| if (className == null || className.isEmpty()) { |
| return null; |
| } |
| try { |
| return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed getting compressor " + className, e); |
| } |
| } |
| |
| /** |
| * Return the pool type specified in the configuration, which must be set to either |
| * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or |
| * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the |
| * former. For applications with many user threads, use a small round-robin pool. For applications |
| * with few user threads, you may want to try using a thread-local pool. In any case, the number |
| * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating |
| * system's hard limit on the number of connections. |
| * @param config configuration |
| * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or |
| * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} |
| */ |
| private static PoolMap.PoolType getPoolType(Configuration config) { |
| return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), |
| PoolMap.PoolType.RoundRobin); |
| } |
| |
| /** |
| * Return the pool size specified in the configuration, which is applicable only if the pool type |
| * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. |
| * @param config configuration |
| * @return the maximum pool size |
| */ |
| private static int getPoolSize(Configuration config) { |
| int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); |
| |
| if (poolSize <= 0) { |
| LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE); |
| return 1; |
| } else { |
| return poolSize; |
| } |
| } |
| |
| private int nextCallId() { |
| int id, next; |
| do { |
| id = callIdCnt.get(); |
| next = id < Integer.MAX_VALUE ? id + 1 : 0; |
| } while (!callIdCnt.compareAndSet(id, next)); |
| return id; |
| } |
| |
| /** |
| * Make a blocking call. Throws exceptions if there are network problems or if the remote code |
| * threw an exception. |
| * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. |
| * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a |
| * new Connection each time. |
| * @return A pair with the Message response and the Cell data (if any). |
| */ |
| private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, |
| Message param, Message returnType, final User ticket, final Address isa) |
| throws ServiceException { |
| BlockingRpcCallback<Message> done = new BlockingRpcCallback<>(); |
| callMethod(md, hrc, param, returnType, ticket, isa, done); |
| Message val; |
| try { |
| val = done.get(); |
| } catch (IOException e) { |
| throw new ServiceException(e); |
| } |
| if (hrc.failed()) { |
| throw new ServiceException(hrc.getFailed()); |
| } else { |
| return val; |
| } |
| } |
| |
| /** |
| * Get a connection from the pool, or create a new one and add it to the pool. Connections to a |
| * given host/port are reused. |
| */ |
| private T getConnection(ConnectionId remoteId) throws IOException { |
| if (failedServers.isFailedServer(remoteId.getAddress())) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Not trying to connect to " + remoteId.getAddress() |
| + " this server is in the failed servers list"); |
| } |
| throw new FailedServerException( |
| "This server is in the failed servers list: " + remoteId.getAddress()); |
| } |
| T conn; |
| synchronized (connections) { |
| if (!running) { |
| throw new StoppedRpcClientException(); |
| } |
| conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId)); |
| conn.setLastTouched(EnvironmentEdgeManager.currentTime()); |
| } |
| return conn; |
| } |
| |
| /** |
| * Not connected. |
| */ |
| protected abstract T createConnection(ConnectionId remoteId) throws IOException; |
| |
| private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, |
| RpcCallback<Message> callback) { |
| call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); |
| if (metrics != null) { |
| metrics.updateRpc(call.md, call.param, call.callStats); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace( |
| "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms"); |
| } |
| if (call.error != null) { |
| if (call.error instanceof RemoteException) { |
| call.error.fillInStackTrace(); |
| hrc.setFailed(call.error); |
| } else { |
| hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error)); |
| } |
| callback.run(null); |
| } else { |
| hrc.setDone(call.cells); |
| callback.run(call.response); |
| } |
| } |
| |
| private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, |
| final Message param, Message returnType, final User ticket, final Address addr, |
| final RpcCallback<Message> callback) { |
| Span span = TraceUtil.createClientSpan("RpcClient.callMethod") |
| .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName()) |
| .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName()) |
| .setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName()) |
| .setAttribute(TraceUtil.REMOTE_PORT_KEY, addr.getPort()); |
| try (Scope scope = span.makeCurrent()) { |
| final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); |
| cs.setStartTime(EnvironmentEdgeManager.currentTime()); |
| |
| if (param instanceof ClientProtos.MultiRequest) { |
| ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; |
| int numActions = 0; |
| for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { |
| numActions += regionAction.getActionCount(); |
| } |
| |
| cs.setNumActionsPerServer(numActions); |
| } |
| |
| final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); |
| Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, |
| hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() { |
| @Override |
| public void run(Call call) { |
| try (Scope scope = call.span.makeCurrent()) { |
| counter.decrementAndGet(); |
| onCallFinished(call, hrc, addr, callback); |
| } finally { |
| if (hrc.failed()) { |
| TraceUtil.setError(span, hrc.getFailed()); |
| } else { |
| span.setStatus(StatusCode.OK); |
| } |
| span.end(); |
| } |
| } |
| }, cs); |
| ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); |
| int count = counter.incrementAndGet(); |
| try { |
| if (count > maxConcurrentCallsPerServer) { |
| throw new ServerTooBusyException(addr, count); |
| } |
| cs.setConcurrentCallsPerServer(count); |
| T connection = getConnection(remoteId); |
| connection.sendRequest(call, hrc); |
| } catch (Exception e) { |
| call.setException(toIOE(e)); |
| span.end(); |
| } |
| return call; |
| } |
| } |
| |
| private static Address createAddr(ServerName sn) { |
| return Address.fromParts(sn.getHostname(), sn.getPort()); |
| } |
| |
| /** |
| * Interrupt the connections to the given ip:port server. This should be called if the server is |
| * known as actually dead. This will not prevent current operation to be retried, and, depending |
| * on their own behavior, they may retry on the same server. This can be a feature, for example at |
| * startup. In any case, they're likely to get connection refused (if the process died) or no |
| * route to host: i.e. their next retries should be faster and with a safe exception. |
| */ |
| @Override |
| public void cancelConnections(ServerName sn) { |
| synchronized (connections) { |
| for (T connection : connections.values()) { |
| ConnectionId remoteId = connection.remoteId(); |
| if (remoteId.getAddress().getPort() == sn.getPort() |
| && remoteId.getAddress().getHostName().equals(sn.getHostname())) { |
| LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " |
| + connection.remoteId); |
| connections.remove(remoteId, connection); |
| connection.shutdown(); |
| connection.cleanupConnection(); |
| } |
| } |
| } |
| } |
| /** |
| * Configure an hbase rpccontroller |
| * @param controller to configure |
| * @param channelOperationTimeout timeout for operation |
| * @return configured controller |
| */ |
| static HBaseRpcController configureHBaseRpcController( |
| RpcController controller, int channelOperationTimeout) { |
| HBaseRpcController hrc; |
| if (controller != null && controller instanceof HBaseRpcController) { |
| hrc = (HBaseRpcController) controller; |
| if (!hrc.hasCallTimeout()) { |
| hrc.setCallTimeout(channelOperationTimeout); |
| } |
| } else { |
| hrc = new HBaseRpcControllerImpl(); |
| hrc.setCallTimeout(channelOperationTimeout); |
| } |
| return hrc; |
| } |
| |
| protected abstract void closeInternal(); |
| |
| @Override |
| public void close() { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Stopping rpc client"); |
| } |
| Collection<T> connToClose; |
| synchronized (connections) { |
| if (!running) { |
| return; |
| } |
| running = false; |
| connToClose = connections.values(); |
| connections.clear(); |
| } |
| cleanupIdleConnectionTask.cancel(true); |
| for (T conn : connToClose) { |
| conn.shutdown(); |
| } |
| closeInternal(); |
| for (T conn : connToClose) { |
| conn.cleanupConnection(); |
| } |
| } |
| |
| @Override |
| public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, |
| int rpcTimeout) { |
| return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout); |
| } |
| |
| @Override |
| public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) { |
| return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); |
| } |
| |
| private static class AbstractRpcChannel { |
| |
| protected final Address addr; |
| |
| protected final AbstractRpcClient<?> rpcClient; |
| |
| protected final User ticket; |
| |
| protected final int rpcTimeout; |
| |
| protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr, |
| User ticket, int rpcTimeout) { |
| this.addr = addr; |
| this.rpcClient = rpcClient; |
| this.ticket = ticket; |
| this.rpcTimeout = rpcTimeout; |
| } |
| |
| /** |
| * Configure an rpc controller |
| * @param controller to configure |
| * @return configured rpc controller |
| */ |
| protected HBaseRpcController configureRpcController(RpcController controller) { |
| HBaseRpcController hrc; |
| // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client |
| // side. And now we may use ServerRpcController. |
| if (controller != null && controller instanceof HBaseRpcController) { |
| hrc = (HBaseRpcController) controller; |
| if (!hrc.hasCallTimeout()) { |
| hrc.setCallTimeout(rpcTimeout); |
| } |
| } else { |
| hrc = new HBaseRpcControllerImpl(); |
| hrc.setCallTimeout(rpcTimeout); |
| } |
| return hrc; |
| } |
| } |
| |
| /** |
| * Blocking rpc channel that goes via hbase rpc. |
| */ |
| public static class BlockingRpcChannelImplementation extends AbstractRpcChannel |
| implements BlockingRpcChannel { |
| |
| protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, |
| Address addr, User ticket, int rpcTimeout) { |
| super(rpcClient, addr, ticket, rpcTimeout); |
| } |
| |
| @Override |
| public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, |
| Message param, Message returnType) throws ServiceException { |
| return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, |
| ticket, addr); |
| } |
| } |
| |
| /** |
| * Async rpc channel that goes via hbase rpc. |
| */ |
| public static class RpcChannelImplementation extends AbstractRpcChannel implements |
| RpcChannel { |
| |
| protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, |
| User ticket, int rpcTimeout) { |
| super(rpcClient, addr, ticket, rpcTimeout); |
| } |
| |
| @Override |
| public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, |
| Message returnType, RpcCallback<Message> done) { |
| HBaseRpcController configuredController = configureRpcController( |
| Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); |
| // This method does not throw any exceptions, so the caller must provide a |
| // HBaseRpcController which is used to pass the exceptions. |
| this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done); |
| } |
| } |
| } |