| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.geode.kafka; |
| |
| import static org.awaitility.Awaitility.await; |
| import static org.junit.Assert.assertEquals; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.Collections; |
| import java.util.Properties; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import kafka.admin.RackAwareMode; |
| import kafka.zk.AdminZkClient; |
| import kafka.zk.KafkaZkClient; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.common.serialization.StringDeserializer; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.apache.kafka.common.utils.Time; |
| import org.apache.kafka.connect.runtime.WorkerConfig; |
| import org.apache.zookeeper.server.quorum.QuorumPeerConfig; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.client.ClientCache; |
| import org.apache.geode.cache.client.ClientCacheFactory; |
| import org.apache.geode.cache.client.ClientRegionShortcut; |
| |
| public class GeodeKafkaTestCluster { |
| |
| @ClassRule |
| public static TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| private static boolean debug = true; |
| |
| public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]"; |
| public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]"; |
| |
| public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource"; |
| public static String TEST_REGION_FOR_SOURCE = "someRegionForSource"; |
| public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; |
| public static String TEST_REGION_FOR_SINK = "someRegionForSink"; |
| |
| private static ZooKeeperLocalCluster zooKeeperLocalCluster; |
| private static KafkaLocalCluster kafkaLocalCluster; |
| private static GeodeLocalCluster geodeLocalCluster; |
| private static WorkerAndHerderCluster workerAndHerderCluster; |
| private static Consumer<String, String> consumer; |
| |
| @BeforeClass |
| public static void setup() |
| throws IOException, QuorumPeerConfig.ConfigException, InterruptedException { |
| startZooKeeper(); |
| startKafka(); |
| startGeode(); |
| } |
| |
| |
| @AfterClass |
| public static void shutdown() { |
| workerAndHerderCluster.stop(); |
| KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, |
| 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); |
| // AdminZkClient adminZkClient = new AdminZkClient(zkClient); |
| // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE); |
| // adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK); |
| zkClient.close(); |
| kafkaLocalCluster.stop(); |
| geodeLocalCluster.stop(); |
| } |
| |
| |
| private static void startWorker(int maxTasks) throws IOException, InterruptedException { |
| workerAndHerderCluster = new WorkerAndHerderCluster(); |
| workerAndHerderCluster.start(String.valueOf(maxTasks)); |
| Thread.sleep(20000); |
| } |
| |
| private static void createTopic(String topicName, int numPartitions, int replicationFactor) { |
| KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, |
| 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); |
| |
| Properties topicProperties = new Properties(); |
| topicProperties.put("flush.messages", "1"); |
| AdminZkClient adminZkClient = new AdminZkClient(zkClient); |
| adminZkClient.createTopic(topicName, numPartitions, replicationFactor, topicProperties, |
| RackAwareMode.Disabled$.MODULE$); |
| } |
| |
| private static void deleteTopic(String topicName) { |
| KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200000, |
| 15000, 10, Time.SYSTEM, "myGroup", "myMetricType", null); |
| AdminZkClient adminZkClient = new AdminZkClient(zkClient); |
| adminZkClient.deleteTopic(topicName); |
| } |
| |
| private ClientCache createGeodeClient() { |
| return new ClientCacheFactory().addPoolLocator("localhost", 10334).create(); |
| } |
| |
| private static void startZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { |
| zooKeeperLocalCluster = new ZooKeeperLocalCluster(getZooKeeperProperties()); |
| zooKeeperLocalCluster.start(); |
| } |
| |
| private static void startKafka() |
| throws IOException, InterruptedException, QuorumPeerConfig.ConfigException { |
| kafkaLocalCluster = new KafkaLocalCluster(getKafkaConfig()); |
| kafkaLocalCluster.start(); |
| } |
| |
| private static void startGeode() throws IOException, InterruptedException { |
| geodeLocalCluster = new GeodeLocalCluster(); |
| geodeLocalCluster.start(); |
| } |
| |
| private static Properties getZooKeeperProperties() throws IOException { |
| Properties properties = new Properties(); |
| properties.setProperty("dataDir", |
| (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath()); |
| properties.setProperty("clientPort", "2181"); |
| properties.setProperty("tickTime", "2000"); |
| return properties; |
| } |
| |
| |
| private static Properties getKafkaConfig() throws IOException { |
| int BROKER_PORT = 9092; |
| Properties props = new Properties(); |
| |
| props.put("broker.id", "0"); |
| props.put("zookeeper.connect", "localhost:2181"); |
| props.put("host.name", "localHost"); |
| props.put("port", BROKER_PORT); |
| props.put("offsets.topic.replication.factor", "1"); |
| |
| // Specifically GeodeKafka connector configs |
| return props; |
| } |
| |
| |
| // consumer props, less important, just for testing? |
| public static Consumer<String, String> createConsumer() { |
| final Properties props = new Properties(); |
| props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| props.put(ConsumerConfig.GROUP_ID_CONFIG, |
| "myGroup"); |
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, |
| StringDeserializer.class.getName()); |
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, |
| StringDeserializer.class.getName()); |
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| |
| // Create the consumer using props. |
| final Consumer<String, String> consumer = |
| new KafkaConsumer<>(props); |
| // Subscribe to the topic. |
| consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE)); |
| return consumer; |
| } |
| |
| // consumer props, less important, just for testing? |
| public static Producer<String, String> createProducer() { |
| final Properties props = new Properties(); |
| props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
| props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, |
| StringSerializer.class.getName()); |
| props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, |
| StringSerializer.class.getName()); |
| |
| // Create the producer using props. |
| final Producer<String, String> producer = |
| new KafkaProducer<>(props); |
| return producer; |
| } |
| |
| @Test |
| public void endToEndSourceTest() throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SOURCE, 1, 1); |
| startWorker(1); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(TEST_REGION_FOR_SOURCE); |
| |
| for (int i = 0; i < 10; i++) { |
| region.put("KEY" + i, "VALUE" + i); |
| } |
| |
| AtomicInteger valueReceived = new AtomicInteger(0); |
| await().atMost(10, TimeUnit.SECONDS).until(() -> { |
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); |
| for (ConsumerRecord<String, String> record : records) { |
| valueReceived.incrementAndGet(); |
| } |
| return valueReceived.get() == 10; |
| }); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SOURCE); |
| } |
| } |
| |
| |
| @Test |
| public void endToEndSourceSingleRegionMultiTaskMultiPartitionTest() throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1); |
| startWorker(1); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(TEST_REGION_FOR_SOURCE); |
| |
| for (int i = 0; i < 10; i++) { |
| region.put("KEY" + i, "VALUE" + i); |
| } |
| |
| AtomicInteger valueReceived = new AtomicInteger(0); |
| await().atMost(10, TimeUnit.SECONDS).until(() -> { |
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); |
| for (ConsumerRecord<String, String> record : records) { |
| valueReceived.incrementAndGet(); |
| } |
| return valueReceived.get() == 10; |
| }); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SOURCE); |
| } |
| } |
| |
| @Test |
| public void endToEndSourceSingleRegionMultiTaskMultiPartitionWithMoreTasksThanPartitionsTest() |
| throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SOURCE, 2, 1); |
| startWorker(5); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY) |
| .create(TEST_REGION_FOR_SOURCE); |
| |
| for (int i = 0; i < 10; i++) { |
| region.put("KEY" + i, "VALUE" + i); |
| } |
| |
| AtomicInteger valueReceived = new AtomicInteger(0); |
| await().atMost(10, TimeUnit.SECONDS).until(() -> { |
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); |
| for (ConsumerRecord<String, String> record : records) { |
| valueReceived.incrementAndGet(); |
| } |
| return valueReceived.get() == 10; |
| }); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SOURCE); |
| } |
| } |
| |
| @Test |
| public void endToEndSinkTest() throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SINK, 1, 1); |
| startWorker(1); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = |
| client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); |
| |
| Producer<String, String> producer = createProducer(); |
| for (int i = 0; i < 10; i++) { |
| producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); |
| } |
| |
| int i = 0; |
| await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SINK); |
| } |
| } |
| |
| |
| @Test |
| public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicSinkTest() |
| throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SINK, 10, 1); |
| startWorker(5); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = |
| client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); |
| |
| Producer<String, String> producer = createProducer(); |
| for (int i = 0; i < 10; i++) { |
| producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); |
| } |
| |
| int i = 0; |
| await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SINK); |
| } |
| } |
| |
| @Test |
| public void endToEndWithOneTaskForASingleBindingAgainstAMultiPartitionTopicWithMoreWorkersSinkTest() |
| throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SINK, 10, 1); |
| startWorker(15); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = |
| client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); |
| |
| Producer<String, String> producer = createProducer(); |
| for (int i = 0; i < 10; i++) { |
| producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); |
| } |
| |
| int i = 0; |
| await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SINK); |
| } |
| } |
| |
| @Test |
| public void endToEndWithOneTaskForASingleBindingLessTasksThanPartitions() throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SINK, 10, 1); |
| startWorker(5); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = |
| client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); |
| |
| Producer<String, String> producer = createProducer(); |
| for (int i = 0; i < 10; i++) { |
| producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); |
| } |
| |
| int i = 0; |
| await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SINK); |
| } |
| } |
| |
| @Test |
| public void endToEndWithOneTaskForASingleBindingMoreTasksThanPartitions() throws Exception { |
| try { |
| createTopic(TEST_TOPIC_FOR_SINK, 10, 1); |
| startWorker(5); |
| consumer = createConsumer(); |
| |
| ClientCache client = createGeodeClient(); |
| Region region = |
| client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); |
| |
| Producer<String, String> producer = createProducer(); |
| for (int i = 0; i < 10; i++) { |
| producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, i, UUID.randomUUID().toString(), |
| UUID.randomUUID().toString())); |
| } |
| |
| int i = 0; |
| await().atMost(10, TimeUnit.SECONDS) |
| .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); |
| } finally { |
| deleteTopic(TEST_TOPIC_FOR_SINK); |
| } |
| } |
| |
| } |