blob: a1109e76359e98f6e154c403b401874af65338ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.kafka.connect;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.stream.kafka.TestKafkaBroker;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
/**
* Tests for {@link IgniteSourceConnector}.
*/
public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
/** Number of input messages. */
private static final int EVENT_CNT = 100;
/** Cache name. */
private static final String CACHE_NAME = "testCache";
/** Test topics created by connector. */
private static final String[] TOPICS = {"src-test1", "src-test2"};
/** Worker id. */
private static final String WORKER_ID = "workerId";
/** Test Kafka broker. */
private TestKafkaBroker kafkaBroker;
/** Worker to run tasks. */
private Worker worker;
/** Workers' herder. */
private Herder herder;
/** Ignite server node shared among tests. */
private static Ignite grid;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
IgniteConfiguration cfg = loadConfiguration("example-ignite.xml");
cfg.setClientMode(false);
grid = startGrid("igniteServerNode", cfg);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
kafkaBroker = new TestKafkaBroker();
Map<String, String> props = makeWorkerProps();
WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
offBackingStore.configure(workerCfg);
worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
worker.start();
herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
herder.start();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
herder.stop();
worker.stop();
kafkaBroker.shutdown();
grid.cache(CACHE_NAME).clear();
// reset cache name to overwrite task configurations.
Field field = IgniteSourceTask.class.getDeclaredField("cacheName");
field.setAccessible(true);
field.set(IgniteSourceTask.class, null);
}
/**
* Tests data flow from injecting data into grid and transferring it to Kafka cluster
* without user-specified filter.
*
* @throws Exception Thrown in case of the failure.
*/
@Test
public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, ","));
srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
doTest(srcProps, false);
}
/**
* Tests data flow from injecting data into grid and transferring it to Kafka cluster.
*
* @throws Exception Thrown in case of the failure.
*/
@Test
public void testEventsInjectedIntoKafka() throws Exception {
doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
}
/**
* Tests the source with the specified source configurations.
*
* @param srcProps Source properties.
* @param conditioned Flag indicating whether filtering is enabled.
* @throws Exception Fails if error.
*/
private void doTest(Map<String, String> srcProps, boolean conditioned) throws Exception {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
@Override public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
if (error != null)
throw new RuntimeException("Failed to create a job!", error);
}
});
herder.putConnectorConfig(
srcProps.get(ConnectorConfig.NAME_CONFIG),
srcProps, true, cb);
cb.get();
// Ugh! To be sure Kafka Connect's worker thread is properly started...
Thread.sleep(5000);
final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
final IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
assert evt != null;
latch.countDown();
return true;
}
};
grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
assertEquals(0, cache.size(CachePeekMode.PRIMARY));
Map<String, String> keyValMap = new HashMap<>(EVENT_CNT);
keyValMap.putAll(sendData());
// Checks all events are processed.
assertTrue(latch.await(10, TimeUnit.SECONDS));
grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(locLsnr);
assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
// Checks the events are transferred to Kafka broker.
if (conditioned)
checkDataDelivered(EVENT_CNT * TOPICS.length / 2);
else
checkDataDelivered(EVENT_CNT * TOPICS.length);
}
/**
* Sends messages to the grid.
*
* @return Map of key value messages.
* @throws IOException If failed.
*/
private Map<String, String> sendData() throws IOException {
Map<String, String> keyValMap = new HashMap<>();
for (int evt = 0; evt < EVENT_CNT; evt++) {
long runtime = System.currentTimeMillis();
String key = "test_" + String.valueOf(evt);
String msg = runtime + String.valueOf(evt);
if (evt >= EVENT_CNT / 2)
key = "conditioned_" + key;
grid.cache(CACHE_NAME).put(key, msg);
keyValMap.put(key, msg);
}
return keyValMap;
}
/**
* Checks if events were delivered to Kafka server.
*
* @param expectedEventsCnt Expected events count.
* @throws Exception If failed.
*/
private void checkDataDelivered(final int expectedEventsCnt) throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer");
final KafkaConsumer<String, CacheEvent> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPICS));
final AtomicInteger evtCnt = new AtomicInteger();
try {
// Wait for expected events count.
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
for (ConsumerRecord<String, CacheEvent> record : records) {
info("Record: " + record);
evtCnt.getAndIncrement();
}
return evtCnt.get() >= expectedEventsCnt;
}
}, 20_000);
info("Waiting for unexpected records for 5 secs.");
assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
for (ConsumerRecord<String, CacheEvent> record : records) {
error("Unexpected record: " + record);
evtCnt.getAndIncrement();
}
return evtCnt.get() > expectedEventsCnt;
}
}, 5_000));
}
catch (WakeupException ignored) {
// ignore for shutdown.
}
finally {
consumer.close();
assertEquals(expectedEventsCnt, evtCnt.get());
}
}
/**
* Creates properties for test source connector.
*
* @param topics Topics.
* @return Test source connector properties.
*/
private Map<String, String> makeSourceProps(String topics) {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
return props;
}
/**
* Creates properties for Kafka Connect workers.
*
* @return Worker configurations.
* @throws IOException If failed.
*/
private Map<String, String> makeWorkerProps() throws IOException {
Map<String, String> props = new HashMap<>();
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put("internal.key.converter.schemas.enable", "false");
props.put("internal.value.converter.schemas.enable", "false");
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter");
props.put("key.converter.schemas.enable", "false");
props.put("value.converter.schemas.enable", "false");
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
props.put("offset.storage.file.filename", "/tmp/connect.offsets");
// fast flushing for testing.
props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
return props;
}
}