blob: 9fad07387e13c2a780938abc48f9be8f4da285d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
/**
* A Kafka client that consumes records from a Kafka cluster.
* <P>
* The consumer is <i>thread safe</i> and should generally be shared among all threads for best performance.
* <p>
* The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
* needs to communicate with. Failure to close the consumer after use will leak these resources.
* <h3>Usage Examples</h3>
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of
* the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages
* and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as
* a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
* <pre>
* {@code
* private Map<TopicPartition, Long> process(Map<String, ConsumerRecord<byte[], byte[]> records) {
* Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
* for(Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
* List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
* for(int i = 0;i < recordsPerTopic.size();i++) {
* ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
* // process record
* try {
* processedOffsets.put(record.topicAndpartition(), record.offset());
* } catch (Exception e) {
* e.printStackTrace();
* }
* }
* }
* return processedOffsets;
* }
* }
* </pre>
* <p>
* This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
* balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
* as controlled by the auto.commit.interval.ms config
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "10000");
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* consumer.subscribe("foo", "bar");
* boolean isRunning = true;
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* process(records);
* }
* consumer.close();
* }
* </pre>
* This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
* balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using
* the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed
* messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets
* of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false");
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* consumer.subscribe("foo", "bar");
* int commitInterval = 100;
* int numRecords = 0;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* try {
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* numRecords += records.size();
* // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
* if(numRecords % commitInterval == 0)
* consumer.commit(false);
* } catch(Exception e) {
* try {
* // rewind consumer's offsets for failed partitions
* // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
* List<TopicPartition> failedPartitions = failedPartitions();
* Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
* for(TopicPartition failedPartition : failedPartitions) {
* // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
* // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
* offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
* }
* // seek to new offsets only for partitions that failed the last process()
* consumer.seek(offsetsToRewindTo);
* } catch(Exception e) { break; } // rewind failed
* }
* }
* consumer.close();
* }
* </pre>
* <p>
* This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's
* group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in
* Kafka. If group management is used, the right place to systematically rewind offsets for <i>every</i> consumer instance is inside the
* ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance
* <i>and</i> before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It
* is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you
* always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used.
* This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit.
* And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case,
* you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance
* in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for
* the partitions they own, effectively rewinding the offsets for the entire consumer group.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false");
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
* props,
* new ConsumerRebalanceCallback() {
* boolean rewindOffsets = true; // should be retrieved from external application config
* public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
* if(rewindOffsets)
* Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
* consumer.seek(newOffsets);
* }
* public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
* consumer.commit(true);
* }
* // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
* private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
* long numberOfMessagesToRewindBackTo) {
* Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
* for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet())
* newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
* return newOffsets;
* }
* });
* consumer.subscribe("foo", "bar");
* int commitInterval = 100;
* int numRecords = 0;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* numRecords += records.size();
* // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
* if(numRecords % commitInterval == 0)
* consumer.commit(consumedOffsets, true);
* }
* consumer.commit(true);
* consumer.close();
* }
* </pre>
* This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
* In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
* plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
* callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
* before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
* <p>
* Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked
* callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place
* to commit the offsets for the current set of partitions owned by the consumer.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* props.put("group.id", "test");
* props.put("session.timeout.ms", "1000");
* props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
* props,
* new ConsumerRebalanceCallback() {
* public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
* consumer.seek(lastCommittedOffsets);
* }
* public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
* Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
* commitOffsetsToCustomStore(offsets);
* }
* // following APIs should be implemented by the user for custom offset management
* private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
* return null;
* }
* private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
* private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
* });
* consumer.subscribe("foo", "bar");
* int commitInterval = 100;
* int numRecords = 0;
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* numRecords += records.size();
* // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
* if(numRecords % commitInterval == 0)
* commitOffsetsToCustomStore(consumedOffsets);
* }
* consumer.commit(true);
* consumer.close();
* }
* </pre>
* This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
* available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
* the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
* This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
* based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group
* management is used.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* props.put("group.id", "test");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "10000");
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* // subscribe to some partitions of topic foo
* TopicPartition partition0 = new TopicPartition("foo", 0);
* TopicPartition partition1 = new TopicPartition("foo", 1);
* TopicPartition[] partitions = new TopicPartition[2];
* partitions[0] = partition0;
* partitions[1] = partition1;
* consumer.subscribe(partitions);
* // find the last committed offsets for partitions 0,1 of topic foo
* Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
* // seek to the last committed offsets to avoid duplicates
* consumer.seek(lastCommittedOffsets);
* // find the offsets of the latest available messages to know where to stop consumption
* Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* for(TopicPartition partition : partitions) {
* if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
* isRunning = false;
* else
* isRunning = true;
* }
* }
* consumer.commit(true);
* consumer.close();
* }
* </pre>
* This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
* available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
* the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
* This example assumes that the user chooses to use custom offset storage.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("metadata.broker.list", "localhost:9092");
* KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
* // subscribe to some partitions of topic foo
* TopicPartition partition0 = new TopicPartition("foo", 0);
* TopicPartition partition1 = new TopicPartition("foo", 1);
* TopicPartition[] partitions = new TopicPartition[2];
* partitions[0] = partition0;
* partitions[1] = partition1;
* consumer.subscribe(partitions);
* Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
* // seek to the last committed offsets to avoid duplicates
* consumer.seek(lastCommittedOffsets);
* // find the offsets of the latest available messages to know where to stop consumption
* Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
* boolean isRunning = true;
* Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
* while(isRunning) {
* Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
* Map<TopicPartition, Long> lastConsumedOffsets = process(records);
* consumedOffsets.putAll(lastConsumedOffsets);
* // commit offsets for partitions 0,1 for topic foo to custom store
* commitOffsetsToCustomStore(consumedOffsets);
* for(TopicPartition partition : partitions) {
* if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
* isRunning = false;
* else
* isRunning = true;
* }
* }
* commitOffsetsToCustomStore(consumedOffsets);
* consumer.close();
* }
* </pre>
*/
public class KafkaConsumer<K,V> implements Consumer<K,V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private final long metadataFetchTimeoutMs;
private final long totalMemorySize;
private final Metrics metrics;
private final Set<String> subscribedTopics;
private final Set<TopicPartition> subscribedPartitions;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
* either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
this(configs, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback}
* implementation
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param configs The consumer configs
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
*/
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
this(configs, callback, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
* implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param configs The consumer configs
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
* be called when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
callback, keyDeserializer, valueDeserializer);
}
private static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keyDeserializer != null)
newConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
if (keyDeserializer != null)
newConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
return newConfigs;
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
* Valid configuration strings are documented at {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
this(properties, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
* {@link ConsumerRebalanceCallback} implementation.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param properties The consumer configuration properties
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
*/
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
this(properties, callback, null, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
* @param properties The consumer configuration properties
* @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
* every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
* be called when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called when the deserializer is passed in directly.
*/
public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
callback, keyDeserializer, valueDeserializer);
}
private static Properties addDeserializerToConfig(Properties properties,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null)
newProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
if (keyDeserializer != null)
newProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
return newProperties;
}
private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
log.trace("Starting the Kafka consumer");
subscribedTopics = new HashSet<String>();
subscribedPartitions = new HashSet<TopicPartition>();
this.metrics = new Metrics(new MetricConfig(),
Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
new SystemTime());
this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (keyDeserializer == null)
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
else
this.keyDeserializer = keyDeserializer;
if (valueDeserializer == null)
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
else
this.valueDeserializer = valueDeserializer;
config.logUnused();
log.debug("Kafka consumer started");
}
/**
* Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
* <p>
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and
* will trigger a rebalance operation if one of the following events trigger -
* <ul>
* <li> Number of partitions change for any of the subscribed list of topics
* <li> Topic is created or deleted
* <li> An existing member of the consumer group dies
* <li> A new member is added to an existing consumer group via the join API
* </ul>
* @param topics A variable list of topics that the consumer wants to subscribe to
*/
@Override
public void subscribe(String... topics) {
if(subscribedPartitions.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
for(String topic:topics)
subscribedTopics.add(topic);
// TODO: trigger a rebalance operation
}
/**
* Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such,
* there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
* <p>
* @param partitions Partitions to incrementally subscribe to
*/
@Override
public void subscribe(TopicPartition... partitions) {
if(subscribedTopics.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
for(TopicPartition partition:partitions)
subscribedPartitions.add(partition);
}
/**
* Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned
* from the next {@link #poll(long) poll()} onwards
* @param topics Topics to unsubscribe from
*/
public void unsubscribe(String... topics) {
// throw an exception if the topic was never subscribed to
for(String topic:topics) {
if(!subscribedTopics.contains(topic))
throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
" to unsubscribe(" + topic + ")");
subscribedTopics.remove(topic);
}
// TODO trigger a rebalance operation
}
/**
* Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
* {@link #poll(long) poll()} onwards
* @param partitions Partitions to unsubscribe from
*/
public void unsubscribe(TopicPartition... partitions) {
// throw an exception if the partition was never subscribed to
for(TopicPartition partition:partitions) {
if(!subscribedPartitions.contains(partition))
throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
partition.topic() + "," + partition.partition() + ") should be called prior" +
" to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
subscribedPartitions.remove(partition);
}
// trigger a rebalance operation
}
/**
* Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
* any topics or partitions before polling for data.
* <p>
* The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)}
* is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and
* on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset
* using {@link #commit(Map, boolean) commit(offsets, sync)}
* for the subscribed list of partitions.
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*/
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
* and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
* @param offsets The list of offsets per partition that should be committed to Kafka.
* @param sync If true, commit will block until the consumer receives an acknowledgment
* @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
* if the sync flag is set to false.
*/
@Override
public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
throw new UnsupportedOperationException();
}
/**
* Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and
* partitions.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
* and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
* @param sync If true, commit will block until the consumer receives an acknowledgment
* @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
* if the sync flag is set to false.
*/
@Override
public OffsetMetadata commit(boolean sync) {
throw new UnsupportedOperationException();
}
/**
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked
* for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is
* arbitrarily used in the middle of consumption, to reset the fetch offsets
*/
@Override
public void seek(Map<TopicPartition, Long> offsets) {
}
/**
* Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
* @param partitions Partitions for which the fetch position will be returned
* @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
*/
public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
return null;
}
/**
* Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset
* storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data.
* @param partitions The list of partitions to return the last committed offset for
* @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)}
*/
@Override
public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException();
}
/**
* Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact
* message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages
* returned by the consumer.
* @param partitions The list of partitions for which the offsets are returned
* @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
* @return The offsets per partition before the specified timestamp.
*/
public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions) {
return null;
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}
@Override
public void close() {
log.trace("Closing the Kafka consumer.");
subscribedTopics.clear();
subscribedPartitions.clear();
this.metrics.close();
log.debug("The Kafka consumer has closed.");
}
}