blob: ca8593ee3d5d542cfc1e88dd2d30811691bf9b3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
/**
* An RPC server that hosts protobuf described Services.
*
*/
@InterfaceAudience.Private
public abstract class RpcServer implements RpcServerInterface,
ConfigurationObserver {
// LOG is being used in CallRunner and the log level is being changed in tests
public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
= new CallQueueTooBigException();
private static final String MULTI_GETS = "multi.gets";
private static final String MULTI_MUTATIONS = "multi.mutations";
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private final boolean authorize;
private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;
public static final byte CURRENT_VERSION = 0;
/**
* Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
*/
public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
"hbase.ipc.server.fallback-to-simple-auth-allowed";
/**
* How many calls/handler are allowed in the queue.
*/
protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
protected final CellBlockBuilder cellBlockBuilder;
protected static final String AUTH_FAILED_FOR = "Auth failed for ";
protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
+ Server.class.getName());
protected SecretManager<TokenIdentifier> secretManager;
protected final Map<String, String> saslProps;
protected ServiceAuthorizationManager authManager;
/** This is set to Call object before Handler invokes an RPC and ybdie
* after the call returns.
*/
protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
/** Keeps MonitoredRPCHandler per handler thread. */
protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
protected final InetSocketAddress bindAddress;
protected MetricsHBaseServer metrics;
protected final Configuration conf;
/**
* Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
* this size, then we will reject the call (after parsing it though). It will go back to the
* client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
* call queue size gets incremented after we parse a call and before we add it to the queue of
* calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
* size is kept in {@link #callQueueSizeInBytes}.
* @see #callQueueSizeInBytes
* @see #DEFAULT_MAX_CALLQUEUE_SIZE
*/
protected final long maxQueueSizeInBytes;
protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
/**
* This is a running count of the size in bytes of all outstanding calls whether currently
* executing or queued waiting to be run.
*/
protected final LongAdder callQueueSizeInBytes = new LongAdder();
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
/**
* This flag is used to indicate to sub threads when they should go down. When we call
* {@link #start()}, all threads started will consult this flag on whether they should
* keep going. It is set to false when {@link #stop()} is called.
*/
volatile boolean running = true;
/**
* This flag is set to true after all threads are up and 'running' and the server is then opened
* for business by the call to {@link #start()}.
*/
volatile boolean started = false;
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
protected HBaseRPCErrorHandler errorHandler = null;
public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
new RequestTooBigException();
protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/**
* Minimum allowable timeout (in milliseconds) in rpc request's header. This
* configuration exists to prevent the rpc service regarding this request as timeout immediately.
*/
protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
/** Default value for above params */
public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
protected final int maxRequestSize;
protected final int warnResponseTime;
protected final int warnResponseSize;
protected final int minClientRequestTimeout;
protected final Server server;
protected final List<BlockingServiceAndInterface> services;
protected final RpcScheduler scheduler;
protected UserProvider userProvider;
protected final ByteBuffAllocator bbAllocator;
protected volatile boolean allowFallbackToSimpleAuth;
/**
* Used to get details for scan with a scanner_id<br/>
* TODO try to figure out a better way and remove reference from regionserver package later.
*/
private RSRpcServices rsRpcServices;
/**
* Use to add online slowlog responses
*/
private NamedQueueRecorder namedQueueRecorder;
@FunctionalInterface
protected interface CallCleanup {
void run();
}
/**
* Datastructure for passing a {@link BlockingService} and its associated class of
* protobuf service interface. For example, a server that fielded what is defined
* in the client protobuf service would pass in an implementation of the client blocking service
* and then its ClientService.BlockingInterface.class. Used checking connection setup.
*/
public static class BlockingServiceAndInterface {
private final BlockingService service;
private final Class<?> serviceInterface;
public BlockingServiceAndInterface(final BlockingService service,
final Class<?> serviceInterface) {
this.service = service;
this.serviceInterface = serviceInterface;
}
public Class<?> getServiceInterface() {
return this.serviceInterface;
}
public BlockingService getBlockingService() {
return this.service;
}
}
/**
* Constructs a server listening on the named port and address.
* @param server hosting instance of {@link Server}. We will do authentications if an
* instance else pass null for no authentication check.
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
* @param services A list of services.
* @param bindAddress Where to listen
* @param conf
* @param scheduler
* @param reservoirEnabled Enable ByteBufferPool or not.
*/
public RpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
this.server = server;
this.services = services;
this.bindAddress = bindAddress;
this.conf = conf;
// See declaration above for documentation on what this size is.
this.maxQueueSizeInBytes =
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
this.cellBlockBuilder = new CellBlockBuilder(conf);
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
if (isSecurityEnabled) {
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
} else {
saslProps = Collections.emptyMap();
}
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}
@Override
public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf);
if (scheduler instanceof ConfigurationObserver) {
((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
}
if (authorize) {
refreshAuthManager(newConf, new HBasePolicyProvider());
}
}
protected void initReconfigurable(Configuration confToLoad) {
this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
if (isSecurityEnabled && allowFallbackToSimpleAuth) {
LOG.warn("********* WARNING! *********");
LOG.warn("This server is configured to allow connections from INSECURE clients");
LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
LOG.warn("While this option is enabled, client identities cannot be secured, and user");
LOG.warn("impersonation is possible!");
LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
LOG.warn("****************************");
}
}
Configuration getConf() {
return conf;
}
@Override
public boolean isStarted() {
return this.started;
}
@Override
public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
// Ignore warnings that this should be accessed in a static way instead of via an instance;
// it'll break if you go via static route.
System.setProperty("hadoop.policy.file", "hbase-policy.xml");
this.authManager.refresh(conf, pp);
LOG.info("Refreshed hbase-policy.xml successfully");
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
LOG.info("Refreshed super and proxy users successfully");
}
protected AuthenticationTokenSecretManager createSecretManager() {
if (!isSecurityEnabled) return null;
if (server == null) return null;
Configuration conf = server.getConfiguration();
long keyUpdateInterval =
conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
long maxAge =
conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
server.getServerName().toString(), keyUpdateInterval, maxAge);
}
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
@SuppressWarnings("unchecked")
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
@Override
public Pair<Message, CellScanner> call(RpcCall call,
MonitoredRPCHandler status) throws IOException {
try {
MethodDescriptor md = call.getMethod();
Message param = call.getParam();
status.setRPC(md.getName(), new Object[]{param},
call.getReceiveTime());
// TODO: Review after we add in encoded data blocks.
status.setRPCPacket(param);
status.resume("Servicing call");
//get an instance of the method arg type
HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
controller.setCallTimeout(call.getTimeout());
Message result = call.getService().callBlockingMethod(md, controller, param);
long receiveTime = call.getReceiveTime();
long startTime = call.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
int totalTime = (int) (endTime - receiveTime);
if (LOG.isTraceEnabled()) {
LOG.trace(CurCall.get().toString() +
", response " + TextFormat.shortDebugString(result) +
" queueTime: " + qTime +
" processingTime: " + processingTime +
" totalTime: " + totalTime);
}
// Use the raw request call size for now.
long requestSize = call.getSize();
long responseSize = result.getSerializedSize();
if (call.isClientCellBlockSupported()) {
// Include the payload size in HBaseRpcController
responseSize += call.getResponseCellSize();
}
metrics.dequeuedCall(qTime);
metrics.processedCall(processingTime);
metrics.totalCall(totalTime);
metrics.receivedRequest(requestSize);
metrics.sentResponse(responseSize);
// log any RPC responses that are slower than the configured warn
// response time or larger than configured warning size
boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
if (tooSlow || tooLarge) {
final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
logResponse(param,
md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
// send logs to ring buffer owned by slowLogRecorder
final String className =
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
this.namedQueueRecorder.addRecord(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
}
return new Pair<>(result, controller.cellScanner());
} catch (Throwable e) {
// The above callBlockingMethod will always return a SE. Strip the SE wrapper before
// putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
// need to pass it over the wire.
if (e instanceof ServiceException) {
if (e.getCause() == null) {
LOG.debug("Caught a ServiceException with null cause", e);
} else {
e = e.getCause();
}
}
// increment the number of requests that were exceptions.
metrics.exception(e);
if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
if (e instanceof IOException) throw (IOException)e;
LOG.error("Unexpected throwable object ", e);
throw new IOException(e.getMessage(), e);
}
}
/**
* Logs an RPC response to the LOG file, producing valid JSON objects for
* client Operations.
* @param param The parameters received in the call.
* @param methodName The name of the method invoked
* @param call The string representation of the call
* @param tooLarge To indicate if the event is tooLarge
* @param tooSlow To indicate if the event is tooSlow
* @param clientAddress The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
* @param userName UserName of the current RPC Call
*/
void logResponse(Message param, String methodName, String call, boolean tooLarge,
boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
long responseSize, String userName) {
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
// base information that is reported regardless of type of call
Map<String, Object> responseInfo = new HashMap<>();
responseInfo.put("starttimems", startTime);
responseInfo.put("processingtimems", processingTime);
responseInfo.put("queuetimems", qTime);
responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress);
responseInfo.put("class", className);
responseInfo.put("method", methodName);
responseInfo.put("call", call);
// The params could be really big, make sure they don't kill us at WARN
String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
if (stringifiedParam.length() > 150) {
// Truncate to 1000 chars if TRACE is on, else to 150 chars
stringifiedParam = truncateTraceLog(stringifiedParam);
}
responseInfo.put("param", stringifiedParam);
if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
String scanDetails;
if (request.hasScannerId()) {
long scannerId = request.getScannerId();
scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
} else {
scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
}
if (scanDetails != null) {
responseInfo.put("scandetails", scanDetails);
}
}
if (param instanceof ClientProtos.MultiRequest) {
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action: regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
responseInfo.put(MULTI_GETS, numGets);
responseInfo.put(MULTI_MUTATIONS, numMutations);
responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
}
final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
: (tooSlow ? "TooSlow" : "TooLarge");
LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
}
/**
* Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
* if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
* @param strParam stringifiedParam to be truncated
* @return truncated trace log string
*/
String truncateTraceLog(String strParam) {
if (LOG.isTraceEnabled()) {
int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
int truncatedLength =
strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
return strParam.subSequence(0, truncatedLength) + truncatedFlag;
}
return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
}
/**
* Set the handler for calling out of RPC for error conditions.
* @param handler the handler implementation
*/
@Override
public void setErrorHandler(HBaseRPCErrorHandler handler) {
this.errorHandler = handler;
}
@Override
public HBaseRPCErrorHandler getErrorHandler() {
return this.errorHandler;
}
/**
* Returns the metrics instance for reporting RPC call statistics
*/
@Override
public MetricsHBaseServer getMetrics() {
return metrics;
}
@Override
public void addCallSize(final long diff) {
this.callQueueSizeInBytes.add(diff);
}
/**
* Authorize the incoming client connection.
* @param user client user
* @param connection incoming connection
* @param addr InetAddress of incoming connection
* @throws AuthorizationException when the client isn't authorized to talk the protocol
*/
public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
InetAddress addr) throws AuthorizationException {
if (authorize) {
Class<?> c = getServiceInterface(services, connection.getServiceName());
authManager.authorize(user, c, getConf(), addr);
}
}
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
* be smaller.
*/
protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
/**
* This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
* If the amount of data is large, it writes to channel in smaller chunks.
* This is to avoid jdk from creating many direct buffers as the size of
* ByteBuffer increases. There should not be any performance degredation.
*
* @param channel writable byte channel to write on
* @param buffer buffer to write
* @return number of bytes written
* @throws java.io.IOException e
* @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
*/
protected int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
metrics.receivedBytes(count);
}
return count;
}
/**
* Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
* Only one of readCh or writeCh should be non-null.
*
* @param readCh read channel
* @param writeCh write channel
* @param buf buffer to read or write into/out of
* @return bytes written
* @throws java.io.IOException e
* @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
*/
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while (buf.remaining() > 0) {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
if (ret < ioSize) {
break;
}
} finally {
buf.limit(originalLimit);
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
/**
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing
* call.
* @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
*/
public static Optional<RpcCall> getCurrentCall() {
return Optional.ofNullable(CurCall.get());
}
public static boolean isInRpcCallContext() {
return CurCall.get() != null;
}
/**
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
* master's rpc call, it may generate new procedure and mutate the region which store procedure.
* There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
* call to avoid the rpc check.
* @return the currently ongoing rpc call
*/
public static Optional<RpcCall> unsetCurrentCall() {
Optional<RpcCall> rpcCall = getCurrentCall();
CurCall.set(null);
return rpcCall;
}
/**
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
* rpc call back after mutate region.
*/
public static void setCurrentCall(RpcCall rpcCall) {
CurCall.set(rpcCall);
}
/**
* Returns the user credentials associated with the current RPC request or not present if no
* credentials were provided.
* @return A User
*/
public static Optional<User> getRequestUser() {
Optional<RpcCall> ctx = getCurrentCall();
return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
}
/**
* The number of open RPC conections
* @return the number of open rpc connections
*/
abstract public int getNumOpenConnections();
/**
* Returns the username for any user associated with the current RPC
* request or not present if no user is set.
*/
public static Optional<String> getRequestUserName() {
return getRequestUser().map(User::getShortName);
}
/**
* @return Address of remote client if a request is ongoing, else null
*/
public static Optional<InetAddress> getRemoteAddress() {
return getCurrentCall().map(RpcCall::getRemoteAddress);
}
/**
* @param serviceName Some arbitrary string that represents a 'service'.
* @param services Available service instances
* @return Matching BlockingServiceAndInterface pair
*/
protected static BlockingServiceAndInterface getServiceAndInterface(
final List<BlockingServiceAndInterface> services, final String serviceName) {
for (BlockingServiceAndInterface bs : services) {
if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
return bs;
}
}
return null;
}
/**
* @param serviceName Some arbitrary string that represents a 'service'.
* @param services Available services and their service interfaces.
* @return Service interface class for <code>serviceName</code>
*/
protected static Class<?> getServiceInterface(
final List<BlockingServiceAndInterface> services,
final String serviceName) {
BlockingServiceAndInterface bsasi =
getServiceAndInterface(services, serviceName);
return bsasi == null? null: bsasi.getServiceInterface();
}
/**
* @param serviceName Some arbitrary string that represents a 'service'.
* @param services Available services and their service interfaces.
* @return BlockingService that goes with the passed <code>serviceName</code>
*/
protected static BlockingService getService(
final List<BlockingServiceAndInterface> services,
final String serviceName) {
BlockingServiceAndInterface bsasi =
getServiceAndInterface(services, serviceName);
return bsasi == null? null: bsasi.getBlockingService();
}
protected static MonitoredRPCHandler getStatus() {
// It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
RpcServer.MONITORED_RPC.set(status);
return status;
}
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
* @return InetAddress
*/
public static InetAddress getRemoteIp() {
RpcCall call = CurCall.get();
if (call != null) {
return call.getRemoteAddress();
}
return null;
}
@Override
public RpcScheduler getScheduler() {
return scheduler;
}
@Override
public ByteBuffAllocator getByteBuffAllocator() {
return this.bbAllocator;
}
@Override
public void setRsRpcServices(RSRpcServices rsRpcServices) {
this.rsRpcServices = rsRpcServices;
}
@Override
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
this.namedQueueRecorder = namedQueueRecorder;
}
}