blob: 0e9d750db88d3be8ef1696ad3ef6dd33f44b0cf3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.EndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConversions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
/**
* Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a
* {@link KafkaSource} instance.
*/
public class KafkaUtils {
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
private static final String CLIENT_ID = "crunch-kafka-client";
private static final Random RANDOM = new Random();
/**
* Configuration property for the number of retry attempts that will be made to Kafka.
*/
public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts";
/**
* Default number of retry attempts.
*/
public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 120;
public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
/**
* Configuration property for the number of retry attempts that will be made to Kafka in the event of getting empty
* responses.
*/
public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts";
/**
* Default number of empty retry attempts.
*/
public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10;
public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING =
Integer.toString(KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
/**
* Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
* @param config the config to read properties
* @return a properties instance populated with all of the values inside the provided {@code config}.
*/
public static Properties getKafkaConnectionProperties(Configuration config) {
Properties props = new Properties();
for (Map.Entry<String, String> value : config) {
props.setProperty(value.getKey(), value.getValue());
}
return props;
}
/**
* Adds the {@code properties} to the provided {@code config} instance.
* @param properties the properties to add to the config.
* @param config the configuration instance to be modified.
* @return the config instance with the populated properties
*/
public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) {
for (String name : properties.stringPropertyNames()) {
config.set(name, properties.getProperty(name));
}
return config;
}
/**
* Returns a {@link TopicMetadataRequest} from the given topics
*
* @param topics an array of topics you want metadata for
* @return a {@link TopicMetadataRequest} from the given topics
* @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank
*/
private static TopicMetadataRequest getTopicMetadataRequest(String... topics) {
if (topics == null)
throw new IllegalArgumentException("topics cannot be null");
if (topics.length == 0)
throw new IllegalArgumentException("topics cannot be empty");
for (String topic : topics)
if (StringUtils.isBlank(topic))
throw new IllegalArgumentException("No topic can be null, empty or blank");
return new TopicMetadataRequest(Arrays.asList(topics));
}
/**
* <p>
* Retrieves the offset values for an array of topics at the specified time.
* </p>
* <p>
* If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist
* at that time this will instead return the earliest offset for that partition.
* </p>
*
* @param properties the properties containing the configuration for kafka
* @param time the time at which we want to know what the offset values were
* @param topics the topics we want to know the offset values of
* @return the offset values for an array of topics at the specified time
* @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of
* the topics are {@code null}, empty or blank, or if there is an error parsing the
* properties.
* @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
* @deprecated As of 1.0. Use beginning/end offset APIs on {@link org.apache.kafka.clients.consumer.Consumer}
*/
@Deprecated
public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
if (properties == null)
throw new IllegalArgumentException("properties cannot be null");
final List<Broker> brokers = getBrokers(properties);
Collections.shuffle(brokers, RANDOM);
return getBrokerOffsets(brokers, time, topics);
}
// Visible for testing
static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) {
if (topics == null)
throw new IllegalArgumentException("topics cannot be null");
if (topics.length == 0)
throw new IllegalArgumentException("topics cannot be empty");
for (String topic : topics)
if (StringUtils.isBlank(topic))
throw new IllegalArgumentException("No topic can be null, empty or blank");
TopicMetadataResponse topicMetadataResponse = null;
final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics);
for (final Broker broker : brokers) {
final SimpleConsumer consumer = getSimpleConsumer(broker);
try {
topicMetadataResponse = consumer.send(topicMetadataRequest);
break;
} catch (Exception err) {
EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
Arrays.toString(topics), endpoint.host()), err);
} finally {
consumer.close();
}
}
if (topicMetadataResponse == null) {
throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed",
Arrays.toString(topics), Arrays.toString(brokers.toArray())));
}
// From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that
// only the leader Broker has the partition offset information[1] so save the leader Broker so we
// can send the request to it.
// [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI
Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests =
new HashMap<>();
for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
for (PartitionMetadata partition : metadata.partitionsMetadata()) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<>();
BrokerEndPoint brokerEndPoint = partition.leader();
if(brokerEndPoint == null){
throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
+" partition:"+partition.partitionId());
}
EndPoint endPoint = new EndPoint(brokerEndPoint.host(), brokerEndPoint.port(),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
Broker leader = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
Option.<String>empty());
if (brokerRequests.containsKey(leader))
requestInfo = brokerRequests.get(leader);
requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
time, 1));
brokerRequests.put(leader, requestInfo);
}
}
Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>();
// Send the offset request to the leader broker
for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) {
SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey());
OffsetResponse offsetResponse = null;
try {
OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(),
CLIENT_ID);
offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
} finally {
simpleConsumer.close();
}
Map<TopicPartition, Long> earliestOffsets = null;
// Retrieve/parse the results
for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) {
TopicAndPartition topicAndPartition = entry.getKey();
TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
long offset;
// The Kafka API will return no value if a time is given which there is no log that contains messages from that time
// (i.e. before a topic existed or in a log that was rolled/cleaned)
if (offsets.length > 0) {
offset = offsets[0];
} else {
LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead",
topicAndPartition);
// This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest
// time we can't be sure what to do so quit
if (time == kafka.api.OffsetRequest.EarliestTime())
throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
+ "] but Kafka returned no values");
// Load the earliest offsets for the topic if it hasn't been loaded already
if (earliestOffsets == null)
earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()),
kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
offset = earliestOffsets.get(topicPartition);
}
topicPartitionToOffset.put(topicPartition, offset);
}
}
return topicPartitionToOffset;
}
/**
* Returns a {@link SimpleConsumer} connected to the given {@link Broker}
*/
private static SimpleConsumer getSimpleConsumer(final Broker broker) {
// BrokerHost, BrokerPort, timeout, buffer size, client id
EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
}
/**
* Returns a {@link Broker} list from the given {@link Properties}
*
* @param properties the {@link Properties} with configuration to connect to a Kafka broker
*/
private static List<Broker> getBrokers(final Properties properties) {
if (properties == null)
throw new IllegalArgumentException("props cannot be null");
String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list");
if (commaDelimitedBrokerList == null)
throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
// Split broker list into host/port pairs
String[] brokerPortList = commaDelimitedBrokerList.split(",");
if (brokerPortList.length < 1)
throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]");
final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length);
for (final String brokerHostPortString : brokerPortList) {
// Split host/port
String[] brokerHostPort = brokerHostPortString.split(":");
if (brokerHostPort.length != 2)
throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
+ Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
try {
EndPoint endPoint = new EndPoint(brokerHostPort[0], Integer.parseInt(brokerHostPort[1]),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
brokers.add(new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
Option.<String>empty()));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
}
}
return brokers;
}
}