blob: 707f1b36f7e036dab7e31120c5dc2cb607dccf9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import org.apache.crunch.kafka.ClusterTest;
import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import scala.Option;
import scala.collection.JavaConversions;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
public class KafkaUtilsIT {
@Rule
public TestName testName = new TestName();
private String topic;
private static Broker broker;
@BeforeClass
public static void startup() throws Exception {
ClusterTest.startTest();
Properties props = ClusterTest.getConsumerProperties();
String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
String brokerHostPortString = brokerHostPorts.split(",")[0];
String[] brokerHostPort = brokerHostPortString.split(":");
String brokerHost = brokerHostPort[0];
int brokerPort = Integer.parseInt(brokerHostPort[1]);
EndPoint endPoint = new EndPoint(brokerHost, brokerPort,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
broker = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
}
@AfterClass
public static void shutdown() throws Exception {
ClusterTest.endTest();
}
@Before
public void setup() throws IOException {
topic = "topic-" + testName.getMethodName();
}
@Test
public void getKafkaProperties() {
Configuration config = new Configuration(false);
String propertyKey = "fake.kafka.property";
String propertyValue = testName.getMethodName();
config.set(propertyKey, propertyValue);
Properties props = KafkaUtils.getKafkaConnectionProperties(config);
assertThat(props.get(propertyKey), is((Object) propertyValue));
}
@Test
public void addKafkaProperties() {
String propertyKey = "fake.kafka.property";
String propertyValue = testName.getMethodName();
Properties props = new Properties();
props.setProperty(propertyKey, propertyValue);
Configuration config = new Configuration(false);
KafkaUtils.addKafkaConnectionProperties(props, config);
assertThat(config.get(propertyKey), is(propertyValue));
}
@Test(expected = IllegalArgumentException.class)
public void getBrokerOffsetsKafkaNullProperties() throws IOException {
KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic);
}
@Test(expected = IllegalArgumentException.class)
public void getBrokerOffsetsKafkaNullTopics() throws IOException {
KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null);
}
@Test(expected = IllegalArgumentException.class)
public void getBrokerOffsetsKafkaEmptyTopics() throws IOException {
KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime());
}
@Test(timeout = 10000)
public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException {
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
while (true) {
Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
kafka.api.OffsetRequest.LatestTime(), topic);
assertNotNull(offsets);
assertThat(offsets.size(), is(4));
boolean allMatch = true;
for (int i = 0; i < 4; i++) {
TopicPartition tp = new TopicPartition(topic, i);
assertThat(offsets.keySet(), hasItem(tp));
allMatch &= (offsets.get(tp) == 1L);
}
if (allMatch) {
break;
}
Thread.sleep(100L);
}
}
@Test
public void getEarliestBrokerOffsetsKafka() throws IOException {
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1);
Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
kafka.api.OffsetRequest.EarliestTime(), topic);
assertNotNull(offsets);
//default create 4 topics
assertThat(offsets.size(), is(4));
for (int i = 0; i < 4; i++) {
assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
}
}
@Test
public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException {
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
// A time of 1L (1 ms after epoch) should be before the topic was created
Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic);
assertNotNull(offsets);
//default create 4 topics
assertThat(offsets.size(), is(4));
for (int i = 0; i < 4; i++) {
assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
}
}
@Test(expected = IllegalStateException.class)
public void getBrokerOffsetsNoHostAvailable() throws IOException {
Properties testProperties = ClusterTest.getConsumerProperties();
testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic));
}
@Test
public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
EndPoint endPoint = new EndPoint("dummyBrokerHost1", 0,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
final Broker bad = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
}
}