blob: c3337418182182495b8d63efba7ba17850168497 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A nioSelector interface for doing non-blocking multi-connection network I/O.
* <p>
* This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and
* responses.
* <p>
* A connection can be added to the nioSelector associated with an integer id by doing
*
* <pre>
* nioSelector.connect(&quot;42&quot;, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
* </pre>
*
* The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
* the connection. The successful invocation of this method does not mean a valid connection has been established.
*
* Sending requests, receiving responses, processing connection completions, and disconnections on the existing
* connections are all done using the <code>poll()</code> call.
*
* <pre>
* nioSelector.send(new NetworkSend(myDestination, myBytes));
* nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
* nioSelector.poll(TIMEOUT_MS);
* </pre>
*
* The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
* various getters. These are reset by each call to <code>poll()</code>.
*
* This class is not thread safe!
*/
public class Selector implements Selectable {
private static final Logger log = LoggerFactory.getLogger(Selector.class);
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final List<String> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final Time time;
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
private final ChannelBuilder channelBuilder;
private final Map<String, Long> lruConnections;
private final long connectionsMaxIdleNanos;
private final int maxReceiveSize;
private final boolean metricsPerConnection;
private long currentTimeNanos;
private long nextIdleCloseCheckTime;
/**
* Create a new nioSelector
*/
public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
throw new KafkaException(e);
}
this.maxReceiveSize = maxReceiveSize;
this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
this.time = time;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
this.channels = new HashMap<>();
this.completedSends = new ArrayList<>();
this.completedReceives = new ArrayList<>();
this.stagedReceives = new HashMap<>();
this.immediatelyConnectedKeys = new HashSet<>();
this.connected = new ArrayList<>();
this.disconnected = new ArrayList<>();
this.failedSends = new ArrayList<>();
this.sensors = new SelectorMetrics(metrics);
this.channelBuilder = channelBuilder;
// initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
this.lruConnections = new LinkedHashMap<>(16, .75F, true);
currentTimeNanos = time.nanoseconds();
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
this.metricsPerConnection = metricsPerConnection;
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}
/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
* <p>
* Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)}
* call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
* @param id The id for the new connection
* @param address The address to connect to
* @param sendBufferSize The send buffer for the new connection
* @param receiveBufferSize The receive buffer for the new connection
* @throws IllegalStateException if there is already a connection for that id
* @throws IOException if DNS resolution fails on the hostname or if the broker is down
*/
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
/**
* Register the nioSelector with an existing channel
* Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
* Note that we are not checking if the connection id is valid - since the connection already exists
*/
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
}
/**
* Interrupt the nioSelector if it is blocked waiting to do I/O.
*/
@Override
public void wakeup() {
this.nioSelector.wakeup();
}
/**
* Close this selector and all associated connections
*/
@Override
public void close() {
List<String> connections = new ArrayList<>(channels.keySet());
for (String id : connections)
close(id);
try {
this.nioSelector.close();
} catch (IOException | SecurityException e) {
log.error("Exception closing nioSelector:", e);
}
sensors.close();
channelBuilder.close();
}
/**
* Queue the given request for sending in the subsequent {@link #poll(long)} calls
* @param send The request to send
*/
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
/**
* Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
* disconnections, initiating new sends, or making progress on in-progress sends or receives.
*
* When this call is completed the user can check for completed sends, receives, connections or disconnects using
* {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
* lists will be cleared at the beginning of each `poll` call and repopulated by the call if there is
* any completed I/O.
*
* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
* application buffer size. This means we might be reading additional bytes than the requested size.
* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
* in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
* reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
* and pop response and add to the completedReceives.
*
* @param timeout The amount of time to wait, in milliseconds, which must be non-negative
* @throws IllegalArgumentException If `timeout` is negative
* @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
*/
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
@Override
public List<Send> completedSends() {
return this.completedSends;
}
@Override
public List<NetworkReceive> completedReceives() {
return this.completedReceives;
}
@Override
public List<String> disconnected() {
return this.disconnected;
}
@Override
public List<String> connected() {
return this.connected;
}
@Override
public void mute(String id) {
KafkaChannel channel = channelOrFail(id);
mute(channel);
}
private void mute(KafkaChannel channel) {
channel.mute();
}
@Override
public void unmute(String id) {
KafkaChannel channel = channelOrFail(id);
unmute(channel);
}
private void unmute(KafkaChannel channel) {
channel.unmute();
}
@Override
public void muteAll() {
for (KafkaChannel channel : this.channels.values())
mute(channel);
}
@Override
public void unmuteAll() {
for (KafkaChannel channel : this.channels.values())
unmute(channel);
}
private void maybeCloseOldestConnection() {
if (currentTimeNanos > nextIdleCloseCheckTime) {
if (lruConnections.isEmpty()) {
nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
} else {
Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
Long connectionLastActiveTime = oldestConnectionEntry.getValue();
nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
if (currentTimeNanos > nextIdleCloseCheckTime) {
String connectionId = oldestConnectionEntry.getKey();
if (log.isTraceEnabled())
log.trace("About to close the idle connection from " + connectionId
+ " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
disconnected.add(connectionId);
close(connectionId);
}
}
}
}
/**
* Clear the results from the prior poll
*/
private void clear() {
this.completedSends.clear();
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
this.disconnected.addAll(this.failedSends);
this.failedSends.clear();
}
/**
* Check for data, waiting up to the given timeout.
*
* @param ms Length of time to wait, in milliseconds, which must be non-negative
* @return The number of keys ready
* @throws IllegalArgumentException
* @throws IOException
*/
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}
/**
* Close the connection identified by the given id
*/
public void close(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel != null)
close(channel);
}
/**
* Begin closing this connection
*/
private void close(KafkaChannel channel) {
try {
channel.close();
} catch (IOException e) {
log.error("Exception closing connection to node {}:", channel.id(), e);
}
this.stagedReceives.remove(channel);
this.channels.remove(channel.id());
this.lruConnections.remove(channel.id());
this.sensors.connectionClosed.record();
}
/**
* check if channel is ready
*/
@Override
public boolean isChannelReady(String id) {
KafkaChannel channel = this.channels.get(id);
return channel != null && channel.ready();
}
private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
return channel;
}
/**
* Return the selector channels.
*/
public List<KafkaChannel> channels() {
return new ArrayList<>(channels.values());
}
/**
* Return the channel associated with this connection or `null` if there is no channel associated with the
* connection.
*/
public KafkaChannel channel(String id) {
return this.channels.get(id);
}
/**
* Get the channel associated with selectionKey
*/
private KafkaChannel channel(SelectionKey key) {
return (KafkaChannel) key.attachment();
}
/**
* Check if given channel has a staged receive
*/
private boolean hasStagedReceive(KafkaChannel channel) {
return stagedReceives.containsKey(channel);
}
/**
* check if stagedReceives have unmuted channel
*/
private boolean hasStagedReceives() {
for (KafkaChannel channel : this.stagedReceives.keySet()) {
if (!channel.isMute())
return true;
}
return false;
}
/**
* adds a receive to staged receives
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
/**
* checks if there are any staged receives and adds to completedReceives
*/
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
NetworkReceive networkReceive = deque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
if (deque.isEmpty())
iter.remove();
}
}
}
}
private class SelectorMetrics {
private final Metrics metrics;
public final Sensor connectionClosed;
public final Sensor connectionCreated;
public final Sensor bytesTransferred;
public final Sensor bytesSent;
public final Sensor bytesReceived;
public final Sensor selectTime;
public final Sensor ioTime;
/* Names of metrics that are not registered through sensors */
private final List<MetricName> topLevelMetricNames = new ArrayList<>();
private final List<Sensor> sensors = new ArrayList<>();
public SelectorMetrics(Metrics metrics) {
this.metrics = metrics;
String metricGrpName = metricGrpPrefix + "-metrics";
StringBuilder tagsSuffix = new StringBuilder();
for (Map.Entry<String, String> tag: metricTags.entrySet()) {
tagsSuffix.append(tag.getKey());
tagsSuffix.append("-");
tagsSuffix.append(tag.getValue());
}
this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
this.connectionClosed.add(metricName, new Rate());
this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
this.connectionCreated.add(metricName, new Rate());
this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
bytesTransferred.add(metricName, new Rate(new Count()));
this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
this.bytesSent.add(metricName, new Rate());
metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
this.bytesSent.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
this.bytesSent.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
this.bytesSent.add(metricName, new Max());
this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
this.bytesReceived.add(metricName, new Rate());
metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
this.bytesReceived.add(metricName, new Rate(new Count()));
this.selectTime = sensor("select-time:" + tagsSuffix.toString());
metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
this.selectTime.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
this.ioTime = sensor("io-time:" + tagsSuffix.toString());
metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg());
metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
topLevelMetricNames.add(metricName);
this.metrics.addMetric(metricName, new Measurable() {
public double measure(MetricConfig config, long now) {
return channels.size();
}
});
}
private Sensor sensor(String name, Sensor... parents) {
Sensor sensor = metrics.sensor(name, parents);
sensors.add(sensor);
return sensor;
}
public void maybeRegisterConnectionMetrics(String connectionId) {
if (!connectionId.isEmpty() && metricsPerConnection) {
// if one sensor of the metrics has been registered for the connection,
// then all other sensors should have been registered; and vice versa
String nodeRequestName = "node-" + connectionId + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest == null) {
String metricGrpName = metricGrpPrefix + "-node-metrics";
Map<String, String> tags = new LinkedHashMap<>(metricTags);
tags.put("node-id", "node-" + connectionId);
nodeRequest = sensor(nodeRequestName);
MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
nodeRequest.add(metricName, new Rate());
metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
nodeRequest.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
nodeRequest.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
nodeRequest.add(metricName, new Max());
String nodeResponseName = "node-" + connectionId + ".bytes-received";
Sensor nodeResponse = sensor(nodeResponseName);
metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
nodeResponse.add(metricName, new Rate());
metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
nodeResponse.add(metricName, new Rate(new Count()));
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = sensor(nodeTimeName);
metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
nodeRequestTime.add(metricName, new Avg());
metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
nodeRequestTime.add(metricName, new Max());
}
}
}
public void recordBytesSent(String connectionId, long bytes) {
long now = time.milliseconds();
this.bytesSent.record(bytes, now);
if (!connectionId.isEmpty()) {
String nodeRequestName = "node-" + connectionId + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest != null)
nodeRequest.record(bytes, now);
}
}
public void recordBytesReceived(String connection, int bytes) {
long now = time.milliseconds();
this.bytesReceived.record(bytes, now);
if (!connection.isEmpty()) {
String nodeRequestName = "node-" + connection + ".bytes-received";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest != null)
nodeRequest.record(bytes, now);
}
}
public void close() {
for (MetricName metricName : topLevelMetricNames)
metrics.removeMetric(metricName);
for (Sensor sensor : sensors)
metrics.removeSensor(sensor.name());
}
}
}