| /* |
| * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.contrib.kafka; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultPartition; |
| import com.datatorrent.api.InputOperator; |
| import com.datatorrent.api.Operator.ActivationListener; |
| import com.datatorrent.api.Operator.CheckpointListener; |
| import com.datatorrent.api.Partitioner; |
| import com.datatorrent.api.Stats; |
| import com.datatorrent.api.StatsListener; |
| import com.datatorrent.api.annotation.OperatorAnnotation; |
| import com.datatorrent.api.annotation.Stateless; |
| |
| import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions; |
| import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap; |
| |
| import com.datatorrent.lib.io.IdempotentStorageManager; |
| import com.esotericsoftware.kryo.Kryo; |
| import com.esotericsoftware.kryo.io.Input; |
| import com.esotericsoftware.kryo.io.Output; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.SetMultimap; |
| import com.google.common.collect.Sets; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.lang.reflect.Array; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import javax.validation.Valid; |
| import javax.validation.constraints.Min; |
| import javax.validation.constraints.NotNull; |
| |
| import kafka.api.FetchRequest; |
| import kafka.api.FetchRequestBuilder; |
| import kafka.cluster.Broker; |
| import kafka.javaapi.FetchResponse; |
| import kafka.javaapi.PartitionMetadata; |
| import kafka.javaapi.consumer.SimpleConsumer; |
| import kafka.message.Message; |
| import kafka.message.MessageAndOffset; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.tuple.MutablePair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus. |
| * Subclasses should implement the method for emitting tuples to downstream operators. |
| * It will be dynamically partitioned based on the upstream kafka partition. |
| * <p> |
| * <b>Partition Strategy:</b> |
| * <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p> |
| * <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p> |
| * <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p> |
| * <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because |
| * <p> 1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p> |
| * <p> 2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p> |
| * <p></p> |
| * <br> |
| * <br> |
| * <b>Basic Algorithm:</b> |
| * <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p> |
| * <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p> |
| * <p>3.cloneOperator method is used to initialize the new {@link AbstractKafkaInputOperator} instance for the new partition operator</p> |
| * <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm(http://en.wikipedia.org/wiki/Bin_packing_problem) to minimize the partition operator |
| * <br> |
| * <br> |
| * <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br> |
| * <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} |
| * <br> |
| * <br> |
| * <b>Self adjust to Kafka partition change:</b> |
| * <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p> |
| * <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p> |
| * <br> |
| * <br> |
| * </p> |
| * Properties:<br> |
| * <b>tuplesBlast</b>: Number of tuples emitted in each burst<br> |
| * <b>bufferSize</b>: Size of holding buffer<br> |
| * <br> |
| * Compile time checks:<br> |
| * Class derived from this has to implement the abstract method emitTuple() <br> |
| * <br> |
| * Run time checks:<br> |
| * None<br> |
| * <br> |
| * Benchmarks:<br> |
| * TBD<br> |
| * <br> |
| * |
| * Each operator can consume 1 topic from multiple partitions and clusters<br> |
| * </p> |
| * |
| * @displayName Abstract Kafka Input |
| * @category Messaging |
| * @tags input operator |
| * |
| * @since 0.3.2 |
| */ |
| |
| @OperatorAnnotation(partitionable = true) |
| public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener |
| { |
| private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class); |
| |
| @Min(1) |
| private int maxTuplesPerWindow = Integer.MAX_VALUE; |
| private transient int emitCount = 0; |
| protected IdempotentStorageManager idempotentStorageManager; |
| protected transient long currentWindowId; |
| protected transient int operatorId; |
| protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState; |
| protected transient Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>(); |
| private transient OperatorContext context = null; |
| private boolean idempotent = true; |
| // By default the partition policy is 1:1 |
| public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE; |
| |
| // default resource is unlimited in terms of msgs per second |
| private long msgRateUpperBound = Long.MAX_VALUE; |
| |
| // default resource is unlimited in terms of bytes per second |
| private long byteRateUpperBound = Long.MAX_VALUE; |
| |
| // Store the current operator partition topology |
| private transient List<PartitionInfo> currentPartitionInfo = Lists.newLinkedList(); |
| |
| // Store the current collected kafka consumer stats |
| private transient Map<Integer, List<KafkaConsumer.KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>(); |
| |
| private OffsetManager offsetManager = null; |
| |
| // Minimal interval between 2 (re)partition actions |
| private long repartitionInterval = 30000L; |
| |
| // Minimal interval between checking collected stats and decide whether it needs to repartition or not. |
| // And minimal interval between 2 offset updates |
| private long repartitionCheckInterval = 5000L; |
| |
| private transient long lastCheckTime = 0L; |
| |
| private transient long lastRepartitionTime = 0L; |
| |
| // A list store the newly discovered partitions |
| private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>(); |
| |
| @Min(1) |
| private int initialPartitionCount = 1; |
| |
| @NotNull |
| @Valid |
| protected KafkaConsumer consumer = new SimpleKafkaConsumer(); |
| |
| public AbstractKafkaInputOperator() |
| { |
| idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); |
| currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>(); |
| } |
| |
| /** |
| * Any concrete class derived from KafkaInputOperator has to implement this method to emit tuples to an output port. |
| * |
| */ |
| protected abstract void emitTuple(Message message); |
| |
| public int getMaxTuplesPerWindow() |
| { |
| return maxTuplesPerWindow; |
| } |
| |
| public void setMaxTuplesPerWindow(int maxTuplesPerWindow) |
| { |
| this.maxTuplesPerWindow = maxTuplesPerWindow; |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| if(!(getConsumer() instanceof SimpleKafkaConsumer) || !idempotent) { |
| idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); |
| } |
| logger.debug("consumer {} topic {} cacheSize {}", consumer, consumer.getTopic(), consumer.getCacheSize()); |
| consumer.create(); |
| this.context = context; |
| operatorId = context.getId(); |
| idempotentStorageManager.setup(context); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| idempotentStorageManager.teardown(); |
| consumer.teardown(); |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| currentWindowId = windowId; |
| if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { |
| replay(windowId); |
| } |
| emitCount = 0; |
| } |
| |
| protected void replay(long windowId) |
| { |
| try { |
| @SuppressWarnings("unchecked") |
| Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) idempotentStorageManager.load(operatorId, windowId); |
| if (recoveredData == null) { |
| return; |
| } |
| |
| Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic); |
| if (pms == null) { |
| return; |
| } |
| |
| SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer(); |
| // add all partition request in one Fretch request together |
| FetchRequestBuilder frb = new FetchRequestBuilder().clientId(cons.getClientId()); |
| for (Map.Entry<KafkaPartition, MutablePair<Long, Integer>> rc: recoveredData.entrySet()) { |
| KafkaPartition kp = rc.getKey(); |
| List<PartitionMetadata> pmsVal = pms.get(kp.getClusterId()); |
| |
| Iterator<PartitionMetadata> pmIterator = pmsVal.iterator(); |
| PartitionMetadata pm = pmIterator.next(); |
| while (pm.partitionId() != kp.getPartitionId()) { |
| if (!pmIterator.hasNext()) |
| break; |
| pm = pmIterator.next(); |
| } |
| if (pm.partitionId() != kp.getPartitionId()) |
| continue; |
| |
| Broker bk = pm.leader(); |
| |
| frb.addFetch(consumer.topic, rc.getKey().getPartitionId(), rc.getValue().left, cons.getBufferSize()); |
| FetchRequest req = frb.build(); |
| |
| SimpleConsumer ksc = new SimpleConsumer(bk.host(), bk.port(), cons.getTimeout(), cons.getBufferSize(), cons.getClientId()); |
| FetchResponse fetchResponse = ksc.fetch(req); |
| Integer count = 0; |
| for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) { |
| emitTuple(msg.message()); |
| offsetStats.put(kp, msg.offset()); |
| count = count + 1; |
| if (count.equals(rc.getValue().right)) |
| break; |
| } |
| } |
| if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) { |
| // Set the offset positions to the consumer |
| Map<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>(cons.getCurrentOffsets()); |
| // Increment the offsets |
| for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) { |
| currentOffsets.put(e.getKey(), e.getValue() + 1); |
| } |
| |
| cons.resetOffset(currentOffsets); |
| cons.start(); |
| } |
| } |
| |
| catch (IOException e) { |
| throw new RuntimeException("replay", e); |
| } |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { |
| try { |
| if((getConsumer() instanceof SimpleKafkaConsumer)) { |
| SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer(); |
| context.setCounters(cons.getConsumerStats(offsetStats)); |
| } |
| idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); |
| } |
| catch (IOException e) { |
| throw new RuntimeException("saving recovery", e); |
| } |
| } |
| currentWindowRecoveryState.clear(); |
| } |
| |
| @Override |
| public void checkpointed(long windowId) |
| { |
| // commit the consumer offset |
| getConsumer().commitOffset(); |
| } |
| |
| @Override |
| public void committed(long windowId) |
| { |
| try { |
| idempotentStorageManager.deleteUpTo(operatorId, windowId); |
| } |
| catch (IOException e) { |
| throw new RuntimeException("deleting state", e); |
| } |
| } |
| |
| @Override |
| public void activate(OperatorContext ctx) |
| { |
| if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { |
| // If it is a replay state, don't start the consumer |
| return; |
| } |
| // Don't start thread here! |
| // # of kafka_consumer_threads depends on the type of kafka client and the message |
| // metadata(topic/partition/replica) layout |
| consumer.start(); |
| } |
| |
| @Override |
| public void deactivate() |
| { |
| consumer.stop(); |
| } |
| |
| @Override |
| public void emitTuples() |
| { |
| if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { |
| return; |
| } |
| int count = consumer.messageSize(); |
| if (maxTuplesPerWindow > 0) { |
| count = Math.min(count, maxTuplesPerWindow - emitCount); |
| } |
| for (int i = 0; i < count; i++) { |
| KafkaConsumer.KafkaMessage message = consumer.pollMessage(); |
| // Ignore the duplicate messages |
| if(offsetStats.containsKey(message.kafkaPart) && message.offSet <= offsetStats.get(message.kafkaPart)) |
| continue; |
| emitTuple(message.msg); |
| offsetStats.put(message.kafkaPart, message.offSet); |
| MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart); |
| if(offsetAndCount == null) { |
| currentWindowRecoveryState.put(message.kafkaPart, new MutablePair<Long, Integer>(message.offSet, 1)); |
| } else { |
| offsetAndCount.setRight(offsetAndCount.right+1); |
| } |
| } |
| emitCount += count; |
| } |
| |
| public void setConsumer(K consumer) |
| { |
| this.consumer = consumer; |
| } |
| |
| public KafkaConsumer getConsumer() |
| { |
| return consumer; |
| } |
| |
| // add topic as operator property |
| public void setTopic(String topic) |
| { |
| this.consumer.setTopic(topic); |
| } |
| |
| /** |
| * Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from. |
| * The operator will discover the brokers that it needs to consume messages from. |
| */ |
| public void setZookeeper(String zookeeperString) |
| { |
| SetMultimap<String, String> theClusters = HashMultimap.create(); |
| for (String zk : zookeeperString.split(";")) { |
| String[] parts = zk.split("::"); |
| String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0]; |
| String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(","); |
| String portId = ""; |
| for (int idx = hostNames.length - 1; idx >= 0; idx--) { |
| String[] zkParts = hostNames[idx].split(":"); |
| if (zkParts.length == 2) { |
| portId = zkParts[1]; |
| } |
| if (!portId.isEmpty() && portId != "") { |
| theClusters.put(clusterId, zkParts[0] + ":" + portId); |
| } else { |
| throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeperString + "\n" |
| + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2"); |
| } |
| } |
| } |
| this.consumer.setZookeeper(theClusters); |
| } |
| |
| @Override |
| public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions) |
| { |
| // update the last repartition time |
| lastRepartitionTime = System.currentTimeMillis(); |
| } |
| |
| @Override |
| public Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions, Partitioner.PartitioningContext context) |
| { |
| // Initialize brokers from zookeepers |
| getConsumer().initBrokers(); |
| |
| // check if it's the initial partition |
| boolean isInitialParitition = partitions.iterator().next().getStats() == null; |
| |
| // get partition metadata for topics. |
| // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic |
| // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata |
| Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic()); |
| |
| // Operator partitions |
| List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null; |
| |
| // initialize the offset |
| Map<KafkaPartition, Long> initOffset = null; |
| if(isInitialParitition && offsetManager !=null){ |
| initOffset = offsetManager.loadInitialOffsets(); |
| logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }"); |
| } |
| |
| Collection<IdempotentStorageManager> newManagers = Sets.newHashSet(); |
| Set<Integer> deletedOperators = Sets.newHashSet(); |
| |
| switch (strategy) { |
| |
| // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions |
| // Each operator partition will consume from only one kafka partition |
| case ONE_TO_ONE: |
| |
| if (isInitialParitition) { |
| lastRepartitionTime = System.currentTimeMillis(); |
| logger.info("[ONE_TO_ONE]: Initializing partition(s)"); |
| |
| // initialize the number of operator partitions according to number of kafka partitions |
| |
| newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(); |
| for (Map.Entry<String, List<PartitionMetadata>> kp : kafkaPartitions.entrySet()) { |
| String clusterId = kp.getKey(); |
| for (PartitionMetadata pm : kp.getValue()) { |
| logger.info("[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, pm.partitionId()); |
| newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers)); |
| } |
| } |
| |
| } |
| else if (newWaitingPartition.size() != 0) { |
| // add partition for new kafka partition |
| for (KafkaPartition newPartition : newWaitingPartition) { |
| logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, newPartition.getPartitionId()); |
| partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers)); |
| } |
| newWaitingPartition.clear(); |
| idempotentStorageManager.partitioned(newManagers, deletedOperators); |
| return partitions; |
| |
| } |
| break; |
| // For the 1 to N mapping The initial partition number is defined by stream application |
| // Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can |
| // and guarantee the total intake rate for each operator partition is below some threshold |
| case ONE_TO_MANY: |
| |
| if (getConsumer() instanceof HighlevelKafkaConsumer) { |
| throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy."); |
| } |
| |
| if (isInitialParitition) { |
| lastRepartitionTime = System.currentTimeMillis(); |
| logger.info("[ONE_TO_MANY]: Initializing partition(s)"); |
| int size = initialPartitionCount; |
| //Set<KafkaPartition>[] kps = new Set[size]; |
| @SuppressWarnings("unchecked") |
| Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size); |
| newPartitions = new ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size); |
| int i = 0; |
| for (Map.Entry<String, List<PartitionMetadata>> en : kafkaPartitions.entrySet()) { |
| String clusterId = en.getKey(); |
| for (PartitionMetadata pm : en.getValue()) { |
| if (kps[i % size] == null) { |
| kps[i % size] = new HashSet<KafkaPartition>(); |
| } |
| kps[i % size].add(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())); |
| i++; |
| } |
| } |
| for (i = 0; i < kps.length; i++) { |
| logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", ")); |
| newPartitions.add(createPartition(kps[i], initOffset, newManagers)); |
| } |
| |
| } |
| else if (newWaitingPartition.size() != 0) { |
| |
| logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", ")); |
| partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers)); |
| idempotentStorageManager.partitioned(newManagers, deletedOperators); |
| return partitions; |
| } |
| else { |
| |
| logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit"); |
| // size of the list depends on the load and capacity of each operator |
| newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(); |
| |
| // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition |
| // try to balance the load and minimize the number of containers with each container's load under the threshold |
| // the partition based on the latest 1 minute moving average |
| Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>(); |
| // get the offset for all partitions of each consumer |
| Map<KafkaPartition, Long> offsetTrack = new HashMap<KafkaPartition, Long>(); |
| for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition : partitions) { |
| List<Stats.OperatorStats> opss = partition.getStats().getLastWindowedStats(); |
| if (opss == null || opss.size() == 0) { |
| continue; |
| } |
| offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets()); |
| // Get the latest stats |
| |
| Stats.OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1); |
| if (stat.counters instanceof KafkaConsumer.KafkaMeterStats) { |
| KafkaConsumer.KafkaMeterStats kms = (KafkaConsumer.KafkaMeterStats) stat.counters; |
| kPIntakeRate.putAll(get_1minMovingAvgParMap(kms)); |
| } |
| } |
| |
| List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate); |
| |
| // Add the existing partition Ids to the deleted operators |
| for(Partitioner.Partition<AbstractKafkaInputOperator<K>> op : partitions) |
| { |
| deletedOperators.add(op.getPartitionedInstance().operatorId); |
| } |
| for (PartitionInfo r : partitionInfos) { |
| logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic); |
| newPartitions.add(createPartition(r.kpids, offsetTrack, newManagers)); |
| } |
| currentPartitionInfo.addAll(partitionInfos); |
| } |
| break; |
| |
| case ONE_TO_MANY_HEURISTIC: |
| throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet"); |
| default: |
| break; |
| } |
| |
| idempotentStorageManager.partitioned(newManagers, deletedOperators); |
| return newPartitions; |
| } |
| |
| // Create a new partition with the partition Ids and initial offset positions |
| protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers) |
| { |
| Kryo kryo = new Kryo(); |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| Output output = new Output(bos); |
| kryo.writeObject(output, this); |
| output.close(); |
| Input lInput = new Input(bos.toByteArray()); |
| @SuppressWarnings("unchecked") |
| Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass())); |
| p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets); |
| newManagers.add(p.getPartitionedInstance().idempotentStorageManager); |
| |
| PartitionInfo pif = new PartitionInfo(); |
| pif.kpids = pIds; |
| currentPartitionInfo.add(pif); |
| return p; |
| } |
| |
| private List<PartitionInfo> firstFitDecreasingAlgo(final Map<KafkaPartition, long[]> kPIntakeRate) |
| { |
| // (Decreasing) Sort the map by msgs/s and bytes/s in descending order |
| List<Map.Entry<KafkaPartition, long[]>> sortedMapEntry = new LinkedList<Map.Entry<KafkaPartition, long[]>>(kPIntakeRate.entrySet()); |
| Collections.sort(sortedMapEntry, new Comparator<Map.Entry<KafkaPartition, long[]>>() |
| { |
| @Override |
| public int compare(Map.Entry<KafkaPartition, long[]> firstEntry, Map.Entry<KafkaPartition, long[]> secondEntry) |
| { |
| long[] firstPair = firstEntry.getValue(); |
| long[] secondPair = secondEntry.getValue(); |
| if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) { |
| return (int) (secondPair[1] - firstPair[1]); |
| } else { |
| return (int) (secondPair[0] - firstPair[0]); |
| } |
| } |
| }); |
| |
| // (First-fit) Look for first fit operator to assign the consumer |
| // Go over all the kafka partitions and look for the right operator to assign to |
| // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions |
| List<PartitionInfo> pif = new LinkedList<PartitionInfo>(); |
| outer: |
| for (Map.Entry<KafkaPartition, long[]> entry : sortedMapEntry) { |
| long[] resourceRequired = entry.getValue(); |
| for (PartitionInfo r : pif) { |
| if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) { |
| // found first fit operator partition that has enough resource for this consumer |
| // add consumer to the operator partition |
| r.kpids.add(entry.getKey()); |
| // update the resource left in this partition |
| r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0]; |
| r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1]; |
| continue outer; |
| } |
| } |
| // didn't find the existing "operator" to assign this consumer |
| PartitionInfo nr = new PartitionInfo(); |
| nr.kpids = Sets.newHashSet(entry.getKey()); |
| nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0]; |
| nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1]; |
| pif.add(nr); |
| } |
| |
| return pif; |
| } |
| |
| @Override |
| public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) |
| { |
| StatsListener.Response resp = new StatsListener.Response(); |
| List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats); |
| resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats); |
| return resp; |
| } |
| |
| private void updateOffsets(List<KafkaConsumer.KafkaMeterStats> kstats) |
| { |
| //In every partition check interval, call offsetmanager to update the offsets |
| if (offsetManager != null) { |
| offsetManager.updateOffsets(getOffsetsForPartitions(kstats)); |
| } |
| } |
| |
| private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats) |
| { |
| //preprocess the stats |
| List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>(); |
| for (Stats.OperatorStats os : stats.getLastWindowedStats()) { |
| if (os != null && os.counters instanceof KafkaConsumer.KafkaMeterStats) { |
| kmsList.add((KafkaConsumer.KafkaMeterStats) os.counters); |
| } |
| } |
| return kmsList; |
| } |
| |
| /** |
| * |
| * Check whether the operator needs repartition based on reported stats |
| * |
| * @return true if repartition is required |
| * false if repartition is not required |
| */ |
| private boolean isPartitionRequired(int opid, List<KafkaConsumer.KafkaMeterStats> kstats) |
| { |
| |
| long t = System.currentTimeMillis(); |
| |
| if (t - lastCheckTime < repartitionCheckInterval) { |
| // return false if it's within repartitionCheckInterval since last time it check the stats |
| return false; |
| } |
| |
| logger.debug("Use OffsetManager to update offsets"); |
| updateOffsets(kstats); |
| |
| |
| if(repartitionInterval < 0){ |
| // if repartition is disabled |
| return false; |
| } |
| |
| if(t - lastRepartitionTime < repartitionInterval) { |
| // return false if it's still within repartitionInterval since last (re)partition |
| return false; |
| } |
| |
| |
| kafkaStatsHolder.put(opid, kstats); |
| |
| if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) { |
| // skip checking if the operator hasn't collected all the stats from all the current partitions |
| return false; |
| } |
| |
| try { |
| |
| // monitor if new kafka partition added |
| { |
| Set<KafkaPartition> existingIds = new HashSet<KafkaPartition>(); |
| for (PartitionInfo pio : currentPartitionInfo) { |
| existingIds.addAll(pio.kpids); |
| } |
| |
| Map<String, List<PartitionMetadata>> partitionsMeta = KafkaMetadataUtil.getPartitionsForTopic(consumer.brokers, consumer.getTopic()); |
| if(partitionsMeta == null){ |
| //broker(s) has temporary issue to get metadata |
| return false; |
| } |
| for (Map.Entry<String, List<PartitionMetadata>> en : partitionsMeta.entrySet()) { |
| if(en.getValue() == null){ |
| //broker(s) has temporary issue to get metadata |
| continue; |
| } |
| for (PartitionMetadata pm : en.getValue()) { |
| KafkaPartition pa = new KafkaPartition(en.getKey(), consumer.topic, pm.partitionId()); |
| if(!existingIds.contains(pa)){ |
| newWaitingPartition.add(pa); |
| } |
| } |
| } |
| if (newWaitingPartition.size() != 0) { |
| // found new kafka partition |
| lastRepartitionTime = t; |
| return true; |
| } |
| } |
| |
| if (strategy == PartitionStrategy.ONE_TO_ONE) { |
| return false; |
| } |
| |
| // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions |
| // and see if there is more optimal solution |
| // The decision is made by 2 constraint |
| // Hard constraint which is upper bound overall msgs/s or bytes/s |
| // Soft constraint which is more optimal solution |
| |
| boolean b = breakHardConstraint(kstats) || breakSoftConstraint(); |
| if (b) { |
| currentPartitionInfo.clear(); |
| kafkaStatsHolder.clear(); |
| } |
| return b; |
| } finally { |
| // update last check time |
| lastCheckTime = System.currentTimeMillis(); |
| } |
| } |
| |
| /** |
| * Check to see if there is other more optimal(less partition) partition assignment based on current statistics |
| * |
| * @return True if all windowed stats indicate different partition size we need to adjust the partition. |
| */ |
| private boolean breakSoftConstraint() |
| { |
| if (kafkaStatsHolder.size() != currentPartitionInfo.size()) { |
| return false; |
| } |
| int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size(); |
| for (int j = 0; j < length; j++) { |
| Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>(); |
| for (Integer pid : kafkaStatsHolder.keySet()) { |
| if(kafkaStatsHolder.get(pid).size() <= j) |
| continue; |
| kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j))); |
| } |
| if (kPIntakeRate.size() == 0) { |
| return false; |
| } |
| List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate); |
| if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) { |
| return false; |
| } |
| } |
| // if all windowed stats indicate different partition size we need to adjust the partition |
| return true; |
| } |
| |
| /** |
| * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s |
| * |
| * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s. |
| */ |
| private boolean breakHardConstraint(List<KafkaConsumer.KafkaMeterStats> kmss) |
| { |
| // Only care about the KafkaMeterStats |
| |
| // if there is no kafka meter stats at all, don't repartition |
| if (kmss == null || kmss.size() == 0) { |
| return false; |
| } |
| // if all the stats within the window have msgs/s above the upper bound threshold (hard limit) |
| boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>() |
| { |
| @Override |
| public boolean apply(KafkaConsumer.KafkaMeterStats kms) |
| { |
| // If there are more than 1 kafka partition and the total msg/s reach the limit |
| return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound; |
| } |
| }); |
| |
| // or all the stats within the window have bytes/s above the upper bound threshold (hard limit) |
| needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>() |
| { |
| @Override |
| public boolean apply(KafkaConsumer.KafkaMeterStats kms) |
| { |
| //If there are more than 1 kafka partition and the total bytes/s reach the limit |
| return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound; |
| } |
| }); |
| |
| return needRP; |
| |
| } |
| |
| public static enum PartitionStrategy |
| { |
| /** |
| * Each operator partition connect to only one kafka partition |
| */ |
| ONE_TO_ONE, |
| /** |
| * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s |
| * For now it <b>only</b> support <b>simple kafka consumer</b> |
| */ |
| ONE_TO_MANY, |
| /** |
| * 1 to N partition based on the heuristic function |
| * <b>NOT</b> implemented yet |
| * TODO implement this later |
| */ |
| ONE_TO_MANY_HEURISTIC |
| } |
| |
| static class PartitionInfo |
| { |
| Set<KafkaPartition> kpids; |
| long msgRateLeft; |
| long byteRateLeft; |
| } |
| |
| public IdempotentStorageManager getIdempotentStorageManager() |
| { |
| return idempotentStorageManager; |
| } |
| |
| public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) |
| { |
| this.idempotentStorageManager = idempotentStorageManager; |
| } |
| |
| public void setInitialPartitionCount(int partitionCount) |
| { |
| this.initialPartitionCount = partitionCount; |
| } |
| |
| public int getInitialPartitionCount() |
| { |
| return initialPartitionCount; |
| } |
| |
| public long getMsgRateUpperBound() |
| { |
| return msgRateUpperBound; |
| } |
| |
| public void setMsgRateUpperBound(long msgRateUpperBound) |
| { |
| this.msgRateUpperBound = msgRateUpperBound; |
| } |
| |
| public long getByteRateUpperBound() |
| { |
| return byteRateUpperBound; |
| } |
| |
| public void setByteRateUpperBound(long byteRateUpperBound) |
| { |
| this.byteRateUpperBound = byteRateUpperBound; |
| } |
| |
| public void setInitialOffset(String initialOffset) |
| { |
| this.consumer.initialOffset = initialOffset; |
| } |
| |
| public void setOffsetManager(OffsetManager offsetManager) |
| { |
| this.offsetManager = offsetManager; |
| } |
| |
| public void setRepartitionCheckInterval(long repartitionCheckInterval) |
| { |
| this.repartitionCheckInterval = repartitionCheckInterval; |
| } |
| |
| public long getRepartitionCheckInterval() |
| { |
| return repartitionCheckInterval; |
| } |
| |
| public void setRepartitionInterval(long repartitionInterval) |
| { |
| this.repartitionInterval = repartitionInterval; |
| } |
| |
| public long getRepartitionInterval() |
| { |
| return repartitionInterval; |
| } |
| |
| //@Pattern(regexp="ONE_TO_ONE|ONE_TO_MANY|ONE_TO_MANY_HEURISTIC", flags={Flag.CASE_INSENSITIVE}) |
| public void setStrategy(String policy) |
| { |
| this.strategy = PartitionStrategy.valueOf(policy.toUpperCase()); |
| } |
| |
| public boolean isIdempotent() |
| { |
| return idempotent; |
| } |
| |
| public void setIdempotent(boolean idempotent) |
| { |
| this.idempotent = idempotent; |
| } |
| } |