| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.kafka.spout.metrics; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.function.Supplier; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.RetriableException; |
| import org.apache.storm.kafka.spout.internal.OffsetManager; |
| import org.apache.storm.metric.api.IMetric; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class is used compute the partition and topic level offset metrics. |
| * <p> |
| * Partition level metrics are: |
| * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition |
| * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition |
| * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout |
| * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout |
| * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset |
| * topicName/partition_{number}/recordsInPartition // total number of records in the partition |
| * </p> |
| * <p> |
| * Topic level metrics are: |
| * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout |
| * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout |
| * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout |
| * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout |
| * topicName/spoutLag // total spout lag of all the associated partitions of this spout |
| * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout |
| * </p> |
| */ |
| public class KafkaOffsetMetric<K, V> implements IMetric { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); |
| private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; |
| private final Supplier<Consumer<K,V>> consumerSupplier; |
| |
| public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, |
| Supplier<Consumer<K, V>> consumerSupplier) { |
| this.offsetManagerSupplier = offsetManagerSupplier; |
| this.consumerSupplier = consumerSupplier; |
| } |
| |
| @Override |
| public Object getValueAndReset() { |
| |
| Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get(); |
| Consumer<K, V> consumer = consumerSupplier.get(); |
| |
| if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) { |
| LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null."); |
| return null; |
| } |
| |
| Map<String,TopicMetrics> topicMetricsMap = new HashMap<>(); |
| Set<TopicPartition> topicPartitions = offsetManagers.keySet(); |
| |
| Map<TopicPartition, Long> beginningOffsets; |
| Map<TopicPartition, Long> endOffsets; |
| |
| try { |
| beginningOffsets = consumer.beginningOffsets(topicPartitions); |
| endOffsets = consumer.endOffsets(topicPartitions); |
| } catch (RetriableException e) { |
| LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e); |
| return null; |
| } |
| |
| //map to hold partition level and topic level metrics |
| Map<String, Long> result = new HashMap<>(); |
| |
| for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) { |
| TopicPartition topicPartition = entry.getKey(); |
| OffsetManager offsetManager = entry.getValue(); |
| |
| long latestTimeOffset = endOffsets.get(topicPartition); |
| long earliestTimeOffset = beginningOffsets.get(topicPartition); |
| |
| long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); |
| long latestCompletedOffset = offsetManager.getCommittedOffset(); |
| long spoutLag = latestTimeOffset - latestCompletedOffset; |
| long recordsInPartition = latestTimeOffset - earliestTimeOffset; |
| |
| String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); |
| result.put(metricPath + "/" + "spoutLag", spoutLag); |
| result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); |
| result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); |
| result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); |
| result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); |
| result.put(metricPath + "/" + "recordsInPartition", recordsInPartition); |
| |
| TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); |
| if (topicMetrics == null) { |
| topicMetrics = new TopicMetrics(); |
| topicMetricsMap.put(topicPartition.topic(), topicMetrics); |
| } |
| |
| topicMetrics.totalSpoutLag += spoutLag; |
| topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; |
| topicMetrics.totalLatestTimeOffset += latestTimeOffset; |
| topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; |
| topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; |
| topicMetrics.totalRecordsInPartitions += recordsInPartition; |
| } |
| |
| for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { |
| String topic = e.getKey(); |
| TopicMetrics topicMetrics = e.getValue(); |
| result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); |
| result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); |
| result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); |
| result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); |
| result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); |
| result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions); |
| } |
| |
| LOG.debug("Metrics Tick: value : {}", result); |
| return result; |
| } |
| |
| private class TopicMetrics { |
| long totalSpoutLag = 0; |
| long totalEarliestTimeOffset = 0; |
| long totalLatestTimeOffset = 0; |
| long totalLatestEmittedOffset = 0; |
| long totalLatestCompletedOffset = 0; |
| long totalRecordsInPartitions = 0; |
| } |
| } |