blob: afece1f3ad94d6c8a13246945316f4e38781c04b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.spi.communication.tcp;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC;
import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_NAME;
import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC;
import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_NAME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_DESC;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_NAME;
/**
* Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
*/
class TcpCommunicationMetricsListener {
/** Metrics manager. */
private final GridMetricManager mmgr;
/** Metrics registry. */
private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg;
/** Current ignite instance. */
private final Ignite ignite;
/** All registered metrics. */
private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** Thread-local metrics. */
private final ThreadLocal<ThreadMetrics> threadMetrics = ThreadLocal.withInitial(() -> {
ThreadMetrics metrics = new ThreadMetrics();
allMetrics.add(metrics);
return metrics;
});
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */
private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */
private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */
private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
/** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #rcvdMsgsMetricsByConsistentId}. */
private final Function<Object, LongAdderMetric> rcvdMsgsCntByConsistentIdMetricFactory;
/** Sent bytes count metric.*/
private final LongAdderMetric sentBytesMetric;
/** Received bytes count metric. */
private final LongAdderMetric rcvdBytesMetric;
/** Sent messages count metric. */
private final LongAdderMetric sentMsgsMetric;
/** Received messages count metric. */
private final LongAdderMetric rcvdMsgsMetric;
/** Method to synchronize access to message type map. */
private final Object msgTypMapMux = new Object();
/** Message type map. */
private volatile Map<Short, String> msgTypeMap;
/** */
public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
this.mmgr = mmgr;
this.ignite = ignite;
mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
sentMessagesByTypeMetricName(directType),
SENT_MESSAGES_BY_TYPE_METRIC_DESC
);
rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
receivedMessagesByTypeMetricName(directType),
RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
);
sentMsgsCntByConsistentIdMetricFactory = consistentId ->
mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC);
rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC);
mmgr.addMetricRegistryCreationListener(mreg -> {
// Metrics for the specific nodes.
if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR))
return;
((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
});
}
/** Metrics registry. */
public MetricRegistry metricRegistry() {
return mreg;
}
/**
* Collects statistics for message sent by SPI.
*
* @param msg Sent message.
* @param consistentId Receiver node consistent id.
*/
public void onMessageSent(Message msg, Object consistentId) {
assert msg != null;
assert consistentId != null;
if (msg instanceof GridIoMessage) {
msg = ((GridIoMessage) msg).message();
updateMessageTypeMap(msg);
sentMsgsMetric.increment();
threadMetrics.get().onMessageSent(msg, consistentId);
}
}
/**
* Collects statistics for message received by SPI.
*
* @param msg Received message.
* @param consistentId Sender node consistent id.
*/
public void onMessageReceived(Message msg, Object consistentId) {
assert msg != null;
assert consistentId != null;
if (msg instanceof GridIoMessage) {
msg = ((GridIoMessage) msg).message();
updateMessageTypeMap(msg);
rcvdMsgsMetric.increment();
threadMetrics.get().onMessageReceived(msg, consistentId);
}
}
/**
* Gets sent messages count.
*
* @return Sent messages count.
*/
public int sentMessagesCount() {
int res0 = (int)sentMsgsMetric.value();
return res0 < 0 ? Integer.MAX_VALUE : res0;
}
/**
* Gets sent bytes count.
*
* @return Sent bytes count.
*/
public long sentBytesCount() {
return sentBytesMetric.value();
}
/**
* Gets received messages count.
*
* @return Received messages count.
*/
public int receivedMessagesCount() {
int res0 = (int)rcvdMsgsMetric.value();
return res0 < 0 ? Integer.MAX_VALUE : res0;
}
/**
* Gets received bytes count.
*
* @return Received bytes count.
*/
public long receivedBytesCount() {
return rcvdBytesMetric.value();
}
/**
* Gets received messages counts (grouped by type).
*
* @return Map containing message types and respective counts.
*/
public Map<String, Long> receivedMessagesByType() {
return collectMessagesCountByType(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME + SEPARATOR);
}
/**
* Gets received messages counts (grouped by node).
*
* @return Map containing sender nodes and respective counts.
*/
public Map<UUID, Long> receivedMessagesByNode() {
return collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
}
/**
* Gets sent messages counts (grouped by type).
*
* @return Map containing message types and respective counts.
*/
public Map<String, Long> sentMessagesByType() {
return collectMessagesCountByType(SENT_MESSAGES_BY_TYPE_METRIC_NAME + SEPARATOR);
}
/**
* Gets sent messages counts (grouped by node).
*
* @return Map containing receiver nodes and respective counts.
*/
public Map<UUID, Long> sentMessagesByNode() {
return collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
}
/** */
protected Map<String, Long> collectMessagesCountByType(String prefix) {
Map<String, Long> res = new HashMap<>();
prefix = metricName(COMMUNICATION_METRICS_GROUP_NAME, prefix);
for (Metric metric : mreg) {
if (metric.name().startsWith(prefix)) {
short directType = Short.parseShort(metric.name().substring(prefix.length()));
Map<Short, String> msgTypMap0 = msgTypeMap;
if (msgTypMap0 != null) {
String typeName = msgTypMap0.get(directType);
if (typeName != null)
res.put(typeName, ((LongMetric)metric).value());
}
}
}
return res;
}
/** */
protected Map<UUID, Long> collectMessagesCountByNodeId(String metricName) {
Map<UUID, Long> res = new HashMap<>();
Map<String, UUID> nodesMapping = ignite.cluster().nodes().stream().collect(toMap(
node -> node.consistentId().toString(), ClusterNode::id
));
String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(mregPrefix)) {
String nodeConsIdStr = mreg.name().substring(mregPrefix.length());
UUID nodeId = nodesMapping.get(nodeConsIdStr);
if (nodeId == null)
continue;
res.put(nodeId, mreg.<LongMetric>findMetric(metricName).value());
}
}
return res;
}
/**
* Resets metrics for this instance.
*/
public void resetMetrics() {
rcvdMsgsMetric.reset();
sentMsgsMetric.reset();
sentBytesMetric.reset();
rcvdBytesMetric.reset();
for (Metric metric : mreg) {
if (metric.name().startsWith(SENT_MESSAGES_BY_TYPE_METRIC_NAME))
metric.reset();
else if (metric.name().startsWith(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME))
metric.reset();
}
for (ReadOnlyMetricRegistry mreg : mmgr) {
if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) {
mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
}
}
}
/**
* @param consistentId Consistent id of the node.
*/
public void onNodeLeft(Object consistentId) {
// Tricky part - these maps are not thread-safe. Ideally it's only required to delete one entry from each one
// of them, but this would lead to syncs in communication worker threads. Instead, we just "clean" them so they
// will be filled later lazily with the same data.
for (ThreadMetrics threadMetrics : allMetrics) {
threadMetrics.sentMsgsMetricsByConsistentId = new HashMap<>();
threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
}
mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
}
/**
* Update message type map.
*
* @param msg Message.
*/
private void updateMessageTypeMap(Message msg) {
short typeId = msg.directType();
Map<Short, String> msgTypMap0 = msgTypeMap;
if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
synchronized (msgTypMapMux) {
if (msgTypeMap == null) {
msgTypMap0 = new HashMap<>();
msgTypMap0.put(typeId, msg.getClass().getName());
msgTypeMap = msgTypMap0;
}
else {
if (!msgTypeMap.containsKey(typeId)) {
msgTypMap0 = new HashMap<>(msgTypeMap);
msgTypMap0.put(typeId, msg.getClass().getName());
msgTypeMap = msgTypMap0;
}
}
}
}
}
/** Generate metric name by message direct type id. */
public static String sentMessagesByTypeMetricName(Short directType) {
return metricName(SENT_MESSAGES_BY_TYPE_METRIC_NAME, directType.toString());
}
/** Generate metric name by message direct type id. */
public static String receivedMessagesByTypeMetricName(Short directType) {
return metricName(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME, directType.toString());
}
/**
* Thread-local metrics.
*/
private class ThreadMetrics {
/** Sent messages count metrics grouped by message type. */
private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<>();
/** Received messages count metrics grouped by message type. */
private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>();
/**
* Sent messages count metrics grouped by message node consistent id.
*/
public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
/**
* Received messages metrics count grouped by message node consistent id.
*/
public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
/**
* Collects statistics for message sent by SPI.
* @param msg Sent message.
* @param consistentId Receiver node consistent id.
*/
private void onMessageSent(Message msg, Object consistentId) {
sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment();
sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment();
}
/**
* Collects statistics for message received by SPI.
* @param msg Received message.
* @param consistentId Sender node consistent id.
*/
private void onMessageReceived(Message msg, Object consistentId) {
rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment();
rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment();
}
}
}