blob: 496e1d8127bfcca89ae854d80a8c71e908087505 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used compute the partition and topic level offset metrics.
* <p>
* Partition level metrics are:
* topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
* topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
* topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
* topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
* topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
* topicName/partition_{number}/recordsInPartition // total number of records in the partition
* </p>
* <p>
* Topic level metrics are:
* topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
* topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
* topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
* topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
* topicName/spoutLag // total spout lag of all the associated partitions of this spout
* topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
* </p>
*/
public class KafkaOffsetMetric<K, V> implements IMetric {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<Consumer<K,V>> consumerSupplier;
public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
Supplier<Consumer<K, V>> consumerSupplier) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
}
@Override
public Object getValueAndReset() {
Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
Consumer<K, V> consumer = consumerSupplier.get();
if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) {
LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
return null;
}
Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();
Map<TopicPartition, Long> beginningOffsets;
Map<TopicPartition, Long> endOffsets;
try {
beginningOffsets = consumer.beginningOffsets(topicPartitions);
endOffsets = consumer.endOffsets(topicPartitions);
} catch (RetriableException e) {
LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
return null;
}
//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetManager offsetManager = entry.getValue();
long latestTimeOffset = endOffsets.get(topicPartition);
long earliestTimeOffset = beginningOffsets.get(topicPartition);
long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
long latestCompletedOffset = offsetManager.getCommittedOffset();
long spoutLag = latestTimeOffset - latestCompletedOffset;
long recordsInPartition = latestTimeOffset - earliestTimeOffset;
String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
result.put(metricPath + "/" + "spoutLag", spoutLag);
result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);
TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
if (topicMetrics == null) {
topicMetrics = new TopicMetrics();
topicMetricsMap.put(topicPartition.topic(), topicMetrics);
}
topicMetrics.totalSpoutLag += spoutLag;
topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
topicMetrics.totalLatestTimeOffset += latestTimeOffset;
topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
topicMetrics.totalRecordsInPartitions += recordsInPartition;
}
for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
String topic = e.getKey();
TopicMetrics topicMetrics = e.getValue();
result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
}
LOG.debug("Metrics Tick: value : {}", result);
return result;
}
private class TopicMetrics {
long totalSpoutLag = 0;
long totalEarliestTimeOffset = 0;
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
long totalLatestCompletedOffset = 0;
long totalRecordsInPartitions = 0;
}
}