blob: 5fc6bb14deb66c4641591aa81cb8e8485deb3113 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.kafka.record.KafkaInputFormat;
import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@RunWith(Suite.class)
@Suite.SuiteClasses({
// org.apache.crunch.kafka.record
org.apache.crunch.kafka.record.KafkaSourceIT.class, org.apache.crunch.kafka.record.KafkaRecordsIterableIT.class,
org.apache.crunch.kafka.record.KafkaDataIT.class
})
public class ClusterTest {
private static TemporaryFolder folder = new TemporaryFolder();
private static KafkaBrokerTestHarness kafka;
private static boolean runAsSuite = false;
private static Configuration conf;
private static FileSystem fs;
@BeforeClass
public static void startSuite() throws Exception {
runAsSuite = true;
startKafka();
setupFileSystem();
}
@AfterClass
public static void endSuite() throws Exception {
stopKafka();
}
public static void startTest() throws Exception {
if (!runAsSuite) {
startKafka();
setupFileSystem();
}
}
public static void endTest() throws Exception {
if (!runAsSuite) {
stopKafka();
}
}
private static void stopKafka() throws IOException {
kafka.tearDown();
}
private static void startKafka() throws IOException {
Properties props = new Properties();
props.setProperty("auto.create.topics.enable", Boolean.TRUE.toString());
kafka = new KafkaBrokerTestHarness(props);
kafka.setUp();
}
private static void setupFileSystem() throws IOException {
folder.create();
conf = new Configuration();
conf.set(RuntimeParameters.TMP_DIR, folder.getRoot().getAbsolutePath());
// Run Map/Reduce tests in process.
conf.set("mapreduce.jobtracker.address", "local");
}
public static Configuration getConf() {
// Clone the configuration so it doesn't get modified for other tests.
return new Configuration(conf);
}
public static Properties getConsumerProperties() {
Properties props = new Properties();
props.putAll(kafka.getProps());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
//set this because still needed by some APIs.
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
props.setProperty("enable.auto.commit", Boolean.toString(false));
//when set this causes some problems with initializing the consumer.
props.remove(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
return props;
}
public static Properties getProducerProperties() {
Properties props = new Properties();
props.putAll(kafka.getProps());
//set this because still needed by some APIs.
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
return props;
}
public static Configuration getConsumerConfig() {
Configuration kafkaConfig = new Configuration(conf);
KafkaUtils.addKafkaConnectionProperties(KafkaInputFormat.tagExistingKafkaConnectionProperties(
getConsumerProperties()), kafkaConfig);
return kafkaConfig;
}
public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) {
Properties producerProps = new Properties();
producerProps.putAll(props);
producerProps.setProperty("value.serializer", StringSerDe.class.getName());
producerProps.setProperty("key.serializer", StringSerDe.class.getName());
// Set the default compression used to be snappy
producerProps.setProperty("compression.codec", "snappy");
producerProps.setProperty("request.required.acks", "1");
Producer<String, String> producer = new KafkaProducer<>(producerProps);
List<String> keys = new LinkedList<>();
try {
for (int i = 0; i < loops; i++) {
for (int j = 0; j < numValuesPerLoop; j++) {
String key = "key" + batch + i + j;
String value = "value" + batch + i + j;
keys.add(key);
producer.send(new ProducerRecord<>(topic, key, value));
}
}
} finally {
producer.close();
}
return keys;
}
public static class StringSerDe implements Serializer<String>, Deserializer<String> {
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String topic, String value) {
return value.getBytes();
}
@Override
public String deserialize(String topic, byte[] bytes) {
return new String(bytes);
}
@Override
public void close() {
}
}
}