blob: d72a7ac9086142538328c1622b1b5e205e341a0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Some;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
/**
*
*/
public class TestKafkaExtractionCluster
{
private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
private static final String TOPIC_NAME = "testTopic";
private static final Map<String, String> KAFKA_PROPERTIES = new HashMap<>();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final Closer closer = Closer.create();
private TestingCluster zkServer;
private KafkaServer kafkaServer;
private Injector injector;
private ObjectMapper mapper;
private KafkaLookupExtractorFactory factory;
private static List<ProducerRecord<byte[], byte[]>> generateRecords()
{
return ImmutableList.of(
new ProducerRecord<>(TOPIC_NAME, 0,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")));
}
@Before
public void setUp() throws Exception
{
zkServer = new TestingCluster(1);
zkServer.start();
kafkaServer = new KafkaServer(
getBrokerProperties(),
Time.SYSTEM,
Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
false);
kafkaServer.startup();
log.info("---------------------------Started Kafka Broker ---------------------------");
log.info("---------------------------Publish Messages to topic-----------------------");
publishRecordsToKafka();
System.setProperty("druid.extensions.searchCurrentClassloader", "false");
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
}
},
// These injections fail under IntelliJ but are required for maven
new NamespaceExtractionModule(),
new KafkaExtractionNamespaceModule()
)
);
mapper = injector.getInstance(ObjectMapper.class);
log.info("--------------------------- placed default item via producer ---------------------------");
final Map<String, String> consumerProperties = getConsumerProperties();
final KafkaLookupExtractorFactory kafkaLookupExtractorFactory = new KafkaLookupExtractorFactory(
null,
TOPIC_NAME,
consumerProperties
);
factory = (KafkaLookupExtractorFactory) mapper.readValue(
mapper.writeValueAsString(kafkaLookupExtractorFactory),
LookupExtractorFactory.class
);
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaTopic(), factory.getKafkaTopic());
Assert.assertEquals(kafkaLookupExtractorFactory.getKafkaProperties(), factory.getKafkaProperties());
factory.start();
closer.register(() -> factory.close());
log.info("--------------------------- started rename manager ---------------------------");
}
@Nonnull
private Map<String, String> getConsumerProperties()
{
final Map<String, String> props = new HashMap<>(KAFKA_PROPERTIES);
int port = kafkaServer.socketServer().config().port();
props.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
return props;
}
private void publishRecordsToKafka()
{
final Properties kafkaProducerProperties = makeProducerProperties();
try (final Producer<byte[], byte[]> producer = new KafkaProducer(kafkaProducerProperties)) {
generateRecords().forEach(producer::send);
}
}
@Nonnull
private KafkaConfig getBrokerProperties() throws IOException
{
final Properties serverProperties = new Properties();
serverProperties.putAll(KAFKA_PROPERTIES);
serverProperties.put("broker.id", "0");
serverProperties.put("zookeeper.connect", zkServer.getConnectString());
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
serverProperties.put("auto.create.topics.enable", "true");
serverProperties.put("log.dir", temporaryFolder.newFolder().getAbsolutePath());
serverProperties.put("num.partitions", "1");
serverProperties.put("offsets.topic.replication.factor", "1");
serverProperties.put("default.replication.factor", "1");
serverProperties.put("log.cleaner.enable", "true");
serverProperties.put("advertised.host.name", "localhost");
serverProperties.put("zookeeper.session.timeout.ms", "30000");
serverProperties.put("zookeeper.sync.time.ms", "200");
return new KafkaConfig(serverProperties);
}
@After
public void tearDown() throws Exception
{
closer.close();
}
private Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(KAFKA_PROPERTIES);
int port = kafkaServer.socketServer().config().port();
kafkaProducerProperties.put("bootstrap.servers", StringUtils.format("127.0.0.1:%d", port));
kafkaProducerProperties.put("key.serializer", ByteArraySerializer.class.getName());
kafkaProducerProperties.put("value.serializer", ByteArraySerializer.class.getName());
kafkaProducerProperties.put("acks", "all");
KAFKA_PROPERTIES.put("request.required.acks", "1");
return kafkaProducerProperties;
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void checkServer() throws Exception
{
try (Admin adminClient = Admin.create((Map) getConsumerProperties())) {
if (adminClient.describeCluster().controller().get() == null) {
throw new ISE("server is not active!");
}
}
}
@Test(timeout = 60_000L)
public void testSimpleLookup() throws Exception
{
try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
checkServer();
assertUpdated(null, "foo");
assertReverseUpdated(ImmutableList.of(), "foo");
long events = factory.getCompletedEventCount();
log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
long start = System.currentTimeMillis();
while (events == factory.getCompletedEventCount()) {
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking foo bar -------------------------------");
assertUpdated("bar", "foo");
assertReverseUpdated(Collections.singletonList("foo"), "bar");
assertUpdated(null, "baz");
checkServer();
events = factory.getCompletedEventCount();
log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new ProducerRecord<>(TOPIC_NAME, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == factory.getCompletedEventCount()) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}
log.info("------------------------- Checking baz bat -------------------------------");
Assert.assertEquals("bat", factory.get().apply("baz"));
Assert.assertEquals(Collections.singletonList("baz"), factory.get().unapply("bat"));
}
}
private void assertUpdated(
String expected,
String key
)
throws InterruptedException
{
final LookupExtractor extractor = factory.get();
if (expected == null) {
while (extractor.apply(key) != null) {
Thread.sleep(100);
}
} else {
while (!expected.equals(extractor.apply(key))) {
Thread.sleep(100);
}
}
Assert.assertEquals("update check", expected, extractor.apply(key));
}
private void assertReverseUpdated(
List<String> expected,
String key
)
throws InterruptedException
{
final LookupExtractor extractor = factory.get();
while (!expected.equals(extractor.unapply(key))) {
Thread.sleep(100);
}
Assert.assertEquals("update check", expected, extractor.unapply(key));
}
}