blob: 10720d63c25f08471da31809cf0f53c514a70fcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Bytes;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
NamespaceExtractionCacheManager.class,
CacheHandler.class
})
@PowerMockIgnore({
"javax.management.*",
"javax.net.ssl.*",
"org.apache.logging.*",
"org.slf4j.*",
"com.sun.*",
"javax.script.*",
"jdk.*"
})
public class KafkaLookupExtractorFactoryTest
{
private static final String TOPIC = "some_topic";
private static final Map<String, String> DEFAULT_PROPERTIES = ImmutableMap.of(
"some.property", "some.value"
);
private final ObjectMapper mapper = new DefaultObjectMapper();
private final NamespaceExtractionCacheManager cacheManager = PowerMock.createStrictMock(
NamespaceExtractionCacheManager.class);
private final CacheHandler cacheHandler = PowerMock.createStrictMock(CacheHandler.class);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
mapper.setInjectableValues(new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId,
DeserializationContext ctxt,
BeanProperty forProperty,
Object beanInstance
)
{
if ("org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
return cacheManager;
} else {
return null;
}
}
});
}
@Test
public void testSimpleSerDe() throws Exception
{
final KafkaLookupExtractorFactory expected = new KafkaLookupExtractorFactory(null, TOPIC, DEFAULT_PROPERTIES);
final KafkaLookupExtractorFactory result = mapper.readValue(
mapper.writeValueAsString(expected),
KafkaLookupExtractorFactory.class
);
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
Assert.assertEquals(cacheManager, result.getCacheManager());
Assert.assertEquals(0, expected.getCompletedEventCount());
Assert.assertEquals(0, result.getCompletedEventCount());
}
@Test
public void testCacheKeyScramblesOnNewData()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.of());
final AtomicLong events = factory.getDoubleEventCount();
final LookupExtractor extractor = factory.get();
final Set<List<Byte>> byteArrays = Sets.newHashSetWithExpectedSize(n);
for (int i = 0; i < n; ++i) {
final List<Byte> myKey = Bytes.asList(extractor.getCacheKey());
Assert.assertFalse(byteArrays.contains(myKey));
byteArrays.add(myKey);
events.incrementAndGet();
}
Assert.assertEquals(n, byteArrays.size());
}
@Test
public void testCacheKeyScramblesDifferentStarts()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.of());
final AtomicLong events = factory.getDoubleEventCount();
final Set<List<Byte>> byteArrays = Sets.newHashSetWithExpectedSize(n);
for (int i = 0; i < n; ++i) {
final LookupExtractor extractor = factory.get();
final List<Byte> myKey = Bytes.asList(extractor.getCacheKey());
Assert.assertFalse(byteArrays.contains(myKey));
byteArrays.add(myKey);
events.incrementAndGet();
}
Assert.assertEquals(n, byteArrays.size());
}
@Test
public void testCacheKeySameOnNoChange()
{
final int n = 1000;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory.getMapRef().set(ImmutableMap.of());
final LookupExtractor extractor = factory.get();
final byte[] baseKey = extractor.getCacheKey();
for (int i = 0; i < n; ++i) {
Assert.assertArrayEquals(baseKey, factory.get().getCacheKey());
}
}
@Test
public void testCacheKeyDifferentForTopics()
{
final KafkaLookupExtractorFactory factory1 = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
factory1.getMapRef().set(ImmutableMap.of());
//noinspection StringConcatenationMissingWhitespace
final KafkaLookupExtractorFactory factory2 = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC + "b",
DEFAULT_PROPERTIES
);
factory2.getMapRef().set(ImmutableMap.of());
Assert.assertFalse(Arrays.equals(factory1.get().getCacheKey(), factory2.get().getCacheKey()));
}
@Test
public void testReplaces()
{
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
Assert.assertTrue(factory.replaces(null));
Assert.assertTrue(factory.replaces(new MapLookupExtractorFactory(ImmutableMap.of(), false)));
Assert.assertFalse(factory.replaces(factory));
Assert.assertFalse(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
)));
//noinspection StringConcatenationMissingWhitespace
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC + "b",
DEFAULT_PROPERTIES
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("some.property", "some.other.value")
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("some.other.property", "some.value")
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES,
1,
false
)));
Assert.assertTrue(factory.replaces(new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES,
0,
true
)));
}
@Test
public void testStopWithoutStart()
{
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
);
Assert.assertTrue(factory.close());
}
@Test
public void testStartStop()
{
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
Consumer<String, String> getConsumer()
{
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.getFuture().isDone());
PowerMock.verify(cacheManager, cacheHandler);
}
@Test
public void testStartFailsFromTimeout()
{
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("bootstrap.servers", "localhost"),
1,
false
)
{
@Override
Consumer getConsumer()
{
// Lock up
try {
Thread.currentThread().join();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("shouldn't make it here");
}
};
Assert.assertFalse(factory.start());
Assert.assertTrue(factory.getFuture().isDone());
Assert.assertTrue(factory.getFuture().isCancelled());
PowerMock.verify(cacheManager, cacheHandler);
}
@Test
public void testStartStopStart()
{
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("bootstrap.servers", "localhost")
)
{
@Override
Consumer<String, String> getConsumer()
{
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertFalse(factory.start());
PowerMock.verify(cacheManager, cacheHandler);
}
@Test
public void testStartStartStopStop()
{
Consumer<String, String> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall().once();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("bootstrap.servers", "localhost"),
10_000L,
false
)
{
@Override
Consumer<String, String> getConsumer()
{
return kafkaConsumer;
}
};
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
Assert.assertTrue(factory.close());
PowerMock.verify(cacheManager, cacheHandler);
}
@Test
public void testStartFailsOnMissingConnect()
{
expectedException.expectMessage("bootstrap.servers required property");
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of()
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
PowerMock.verify(cacheManager);
}
@Test
public void testStartFailsOnGroupID()
{
expectedException.expectMessage(
"Cannot set kafka property [group.id]. Property is randomly generated for you. Found");
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>());
cacheHandler.close();
PowerMock.replay(cacheManager, cacheHandler);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("group.id", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
PowerMock.verify(cacheManager);
}
@Test
public void testStartFailsOnAutoOffset()
{
expectedException.expectMessage(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found ");
EasyMock.expect(cacheManager.createCache())
.andReturn(cacheHandler)
.once();
EasyMock.expect(cacheHandler.getCache()).andReturn(new ConcurrentHashMap<>()).once();
cacheHandler.close();
EasyMock.expectLastCall();
PowerMock.replay(cacheManager);
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("auto.offset.reset", "make me fail")
);
Assert.assertTrue(factory.start());
Assert.assertTrue(factory.close());
PowerMock.verify(cacheManager);
}
@Test
public void testFailsGetNotStarted()
{
expectedException.expectMessage("Not started");
new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
DEFAULT_PROPERTIES
).get();
}
@Test
public void testSerDe() throws Exception
{
final NamespaceExtractionCacheManager cacheManager = PowerMock.createStrictMock(NamespaceExtractionCacheManager.class);
final String kafkaTopic = "some_topic";
final Map<String, String> kafkaProperties = ImmutableMap.of("some_key", "some_value");
final long connectTimeout = 999;
final boolean injective = true;
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
kafkaTopic,
kafkaProperties,
connectTimeout,
injective
);
final KafkaLookupExtractorFactory otherFactory = mapper.readValue(
mapper.writeValueAsString(factory),
KafkaLookupExtractorFactory.class
);
Assert.assertEquals(kafkaTopic, otherFactory.getKafkaTopic());
Assert.assertEquals(kafkaProperties, otherFactory.getKafkaProperties());
Assert.assertEquals(connectTimeout, otherFactory.getConnectTimeout());
Assert.assertEquals(injective, otherFactory.isInjective());
}
private IAnswer<Boolean> getBlockingAnswer()
{
return () -> {
Thread.sleep(60000);
Assert.fail("Test failed to complete within 60000ms");
return false;
};
}
}