PIP-62: Remove tests and examples related to pulsar-adapters (#8511)
related to #8480
remove remaining modules related to pulsar-adapters:
- examples/flink
- examples/spark
- tests/pulsar-kafka-compat-client-test
- tests/pulsar-spark-test
- tests/pulsar-storm-test
diff --git a/tests/pom.xml b/tests/pom.xml
index b960ade..ef0dfa0 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -34,9 +34,6 @@
<modules>
<module>docker-images</module>
<module>integration</module>
- <module>pulsar-kafka-compat-client-test</module>
- <module>pulsar-storm-test</module>
- <module>pulsar-spark-test</module>
<module>bc_2_0_0</module>
<module>bc_2_0_1</module>
</modules>
diff --git a/tests/pulsar-kafka-compat-client-test/pom.xml b/tests/pulsar-kafka-compat-client-test/pom.xml
deleted file mode 100644
index e77cb8d..0000000
--- a/tests/pulsar-kafka-compat-client-test/pom.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>tests-parent</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-kafka-compat-client-test</artifactId>
- <packaging>jar</packaging>
- <name>Apache Pulsar :: Tests :: Pulsar Kafka Compat Client Tests</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>integration</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-admin</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-kafka</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!-- only run tests when -DintegrationTests is specified //-->
- <skipTests>true</skipTests>
- <systemPropertyVariables>
- <currentVersion>${project.version}</currentVersion>
- <maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
- </systemPropertyVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>integrationTests</id>
- <activation>
- <property>
- <name>integrationTests</name>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <properties>
- <property>
- <name>listener</name>
- <value>org.apache.pulsar.tests.PulsarTestListener</value>
- </property>
- </properties>
- <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
- </argLine>
- <skipTests>false</skipTests>
- <forkCount>1</forkCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
deleted file mode 100644
index fafb1bc..0000000
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ /dev/null
@@ -1,885 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.compat.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import lombok.Cleanup;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class KafkaApiTest extends PulsarStandaloneTestSuite {
-
- @Data
- public static class Foo {
- @Nullable
- private String field1;
- @Nullable
- private String field2;
- private int field3;
- }
-
- @Data
- public static class Bar {
- private boolean field1;
- }
-
- private static String getPlainTextServiceUrl() {
- return container.getPlainTextServiceUrl();
- }
-
- private static String getHttpServiceUrl() {
- return container.getHttpServiceUrl();
- }
-
- @Test(timeOut = 30000)
- public void testSimpleProducerConsumer() throws Exception {
- String topic = "persistent://public/default/testSimpleProducerConsumer";
-
- Properties producerProperties = new Properties();
- producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
- producerProperties.put("key.serializer", IntegerSerializer.class.getName());
- producerProperties.put("value.serializer", StringSerializer.class.getName());
- Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
- Properties consumerProperties = new Properties();
- consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
- consumerProperties.put("group.id", "my-subscription-name");
- consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
- consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
- consumerProperties.put("enable.auto.commit", "true");
- Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties);
- consumer.subscribe(Arrays.asList(topic));
-
- List<Long> offsets = new ArrayList<>();
-
- for (int i = 0; i < 10; i++) {
- RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get();
- offsets.add(md.offset());
- log.info("Published message at {}", Long.toHexString(md.offset()));
- }
-
- producer.flush();
- producer.close();
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<Integer, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key().intValue(), received.get());
- assertEquals(record.value(), "hello-" + received.get());
- assertEquals(record.offset(), offsets.get(received.get()).longValue());
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
-
- consumer.close();
- }
-
- @Test
- public void testSimpleConsumer() throws Exception {
- String topic = "testSimpleConsumer";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-
- @Cleanup
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- if (!records.isEmpty()) {
- records.forEach(record -> {
- String key = Integer.toString(received.get());
- String value = "hello-" + received.get();
- log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}",
- key, value, record.topic(), record.partition());
- assertEquals(record.key(), key);
- assertEquals(record.value(), value);
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
- }
- }
-
- @Test
- public void testConsumerAutoCommit() throws Exception {
- String topic = "testConsumerAutoCommit";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "true");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
- received.incrementAndGet();
- });
- }
-
- consumer.close();
-
- // Re-open consumer and verify every message was acknowledged
- Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
- consumer2.subscribe(Arrays.asList(topic));
-
- ConsumerRecords<String, String> records = consumer2.poll(100);
- assertEquals(records.count(), 0);
- consumer2.close();
- }
-
- @Test
- public void testConsumerManualOffsetCommit() throws Exception {
- String topic = "testConsumerManualOffsetCommit";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
-
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition(record.topic(), record.partition()),
- new OffsetAndMetadata(record.offset()));
- consumer.commitSync(offsets);
-
- received.incrementAndGet();
- });
- }
-
- consumer.close();
-
- // Re-open consumer and verify every message was acknowledged
- Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
- consumer2.subscribe(Arrays.asList(topic));
-
- ConsumerRecords<String, String> records = consumer2.poll(100);
- assertEquals(records.count(), 0);
- consumer2.close();
- }
-
- @Test
- public void testPartitions() throws Exception {
- String topic = "testPartitions";
-
- // Create 8 partitions in topic
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
- admin.topics().createPartitionedTopic(topic, 8);
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "true");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
- .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
-
- // Create 2 Kakfa consumer and verify each gets half of the messages
- List<Consumer<String, String>> consumers = new ArrayList<>();
- for (int c = 0; c < 2; c++) {
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
- consumers.add(consumer);
- }
-
- int N = 8 * 3;
-
- for (int i = 0; i < N; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- consumers.forEach(consumer -> {
- int expectedMessaged = N / consumers.size();
-
- for (int i = 0; i < expectedMessaged;) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- i += records.count();
- }
-
- // No more messages for this consumer
- ConsumerRecords<String, String> records = consumer.poll(100);
- assertEquals(records.count(), 0);
- });
-
- consumers.forEach(Consumer::close);
- }
-
- @Test
- public void testExplicitPartitions() throws Exception {
- String topic = "testExplicitPartitions";
-
- // Create 8 partitions in topic
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
- admin.topics().createPartitionedTopic(topic, 8);
-
- Properties producerProperties = new Properties();
- producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
- producerProperties.put("key.serializer", IntegerSerializer.class.getName());
- producerProperties.put("value.serializer", StringSerializer.class.getName());
-
- @Cleanup
- Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "true");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- // Create Kakfa consumer and verify all messages came from intended partition
- @Cleanup
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- int N = 8 * 3;
-
- final int choosenPartition = 5;
-
- for (int i = 0; i < N; i++) {
- producer.send(new ProducerRecord<>(topic, choosenPartition, i, "hello-" + i));
- }
-
- producer.flush();
-
- for (int i = 0; i < N;) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- i += records.count();
-
- records.forEach(record -> {
- assertEquals(record.partition(), choosenPartition);
- });
- }
-
- // No more messages for this consumer
- ConsumerRecords<String, String> records = consumer.poll(100);
- assertEquals(records.count(), 0);
- }
-
- public static class MyCustomPartitioner implements Partitioner {
-
- static int USED_PARTITION = 3;
-
- @Override
- public void configure(Map<String, ?> conf) {
- // Do nothing
- }
-
- @Override
- public void close() {
- // Do nothing
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // Dummy implementation that always return same partition
- return USED_PARTITION;
- }
- }
-
- @Test
- public void testCustomRouter() throws Exception {
- String topic = "testCustomRouter";
-
- // Create 8 partitions in topic
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
- admin.topics().createPartitionedTopic(topic, 8);
-
- Properties producerProperties = new Properties();
- producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
- producerProperties.put("key.serializer", IntegerSerializer.class.getName());
- producerProperties.put("value.serializer", StringSerializer.class.getName());
- producerProperties.put("partitioner.class", MyCustomPartitioner.class.getName());
-
- @Cleanup
- Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "true");
- props.put("key.deserializer", IntegerDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- // Create Kakfa consumer and verify all messages came from intended partition
- @Cleanup
- Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- int N = 8 * 3;
-
- for (int i = 0; i < N; i++) {
- producer.send(new ProducerRecord<>(topic, i, "hello-" + i));
- }
-
- producer.flush();
-
- for (int i = 0; i < N;) {
- ConsumerRecords<Integer, String> records = consumer.poll(100);
- i += records.count();
-
- records.forEach(record -> {
- assertEquals(record.partition(), MyCustomPartitioner.USED_PARTITION);
- });
- }
-
- // No more messages for this consumer
- ConsumerRecords<Integer, String> records = consumer.poll(100);
- assertEquals(records.count(), 0);
- }
-
- @Test
- public void testConsumerSeek() throws Exception {
- String topic = "testConsumerSeek";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
- props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
-
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
-
- consumer.seekToBeginning(Collections.emptyList());
-
- Thread.sleep(500);
-
- // Messages should be available again
- received.set(0);
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
-
- consumer.close();
- }
-
- @Test
- public void testConsumerSeekToEnd() throws Exception {
- String topic = "testConsumerSeekToEnd";
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
- props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
-
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
-
- consumer.seekToEnd(Collections.emptyList());
- Thread.sleep(500);
-
- consumer.close();
-
- // Recreate the consumer
- consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topic));
-
- ConsumerRecords<String, String> records = consumer.poll(100);
- // Since we are at the end of the topic, there should be no messages
- assertEquals(records.count(), 0);
-
- consumer.close();
- }
-
- @Test
- public void testSimpleProducer() throws Exception {
- String topic = "testSimpleProducer";
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
- .subscriptionName("my-subscription")
- .subscribe();
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
-
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- Producer<Integer, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
- }
-
- producer.flush();
- producer.close();
-
- for (int i = 0; i < 10; i++) {
- Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "hello-" + i);
- pulsarConsumer.acknowledge(msg);
- }
- }
-
- @Test(timeOut = 10000)
- public void testProducerCallback() throws Exception {
- String topic = "testProducerCallback";
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("my-subscription")
- .subscribe();
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
-
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- Producer<Integer, String> producer = new KafkaProducer<>(props);
-
- CountDownLatch counter = new CountDownLatch(10);
-
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
- assertEquals(metadata.topic(), topic);
- assertNull(exception);
-
- counter.countDown();
- });
- }
-
- counter.await();
-
- for (int i = 0; i < 10; i++) {
- Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
- assertEquals(new String(msg.getData()), "hello-" + i);
- pulsarConsumer.acknowledge(msg);
- }
-
- producer.close();
- }
-
- @Test
- public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
- String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
- AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
- AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
- pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("my-subscription")
- .subscribe();
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
- for (int i = 0; i < 10; i++) {
- Bar bar = new Bar();
- bar.setField1(true);
-
- Foo foo = new Foo();
- foo.setField1("field1");
- foo.setField2("field2");
- foo.setField3(i);
- producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
- }
- producer.flush();
- producer.close();
-
- for (int i = 0; i < 10; i++) {
- Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
- Foo value = fooSchema.decode(msg.getValue());
- Assert.assertEquals(value.getField1(), "field1");
- Assert.assertEquals(value.getField2(), "field2");
- Assert.assertEquals(value.getField3(), i);
- pulsarConsumer.acknowledge(msg);
- }
- }
-
- @Test
- public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
- String topic = "testConsumerAvroSchemaWithPulsarKafkaClient";
-
- StringSchema stringSchema = new StringSchema();
- AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- Consumer<String, Foo> consumer = new KafkaConsumer<String, Foo>(props, new StringSchema(), fooSchema);
- consumer.subscribe(Arrays.asList(topic));
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
- org.apache.pulsar.client.api.Producer<Foo> pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create();
-
- for (int i = 0; i < 10; i++) {
- Foo foo = new Foo();
- foo.setField1("field1");
- foo.setField2("field2");
- foo.setField3(i);
- pulsarProducer.newMessage().keyBytes(stringSchema.encode(Integer.toString(i))).value(foo).send();
- }
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, Foo> records = consumer.poll(100);
- if (!records.isEmpty()) {
- records.forEach(record -> {
- Assert.assertEquals(record.key(), Integer.toString(received.get()));
- Foo value = record.value();
- Assert.assertEquals(value.getField1(), "field1");
- Assert.assertEquals(value.getField2(), "field2");
- Assert.assertEquals(value.getField3(), received.get());
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
- }
- }
-
- @Test
- public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
- String topic = "testProducerConsumerAvroSchemaWithPulsarKafkaClient";
-
- AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
- AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
- consumer.subscribe(Arrays.asList(topic));
-
- Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
-
- for (int i = 0; i < 10; i++) {
- Bar bar = new Bar();
- bar.setField1(true);
-
- Foo foo = new Foo();
- foo.setField1("field1");
- foo.setField2("field2");
- foo.setField3(i);
- producer.send(new ProducerRecord<>(topic, bar, foo));
- }
- producer.flush();
- producer.close();
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<Bar, Foo> records = consumer.poll(100);
- if (!records.isEmpty()) {
- records.forEach(record -> {
- Bar key = record.key();
- Assert.assertTrue(key.isField1());
- Foo value = record.value();
- Assert.assertEquals(value.getField1(), "field1");
- Assert.assertEquals(value.getField2(), "field2");
- Assert.assertEquals(value.getField3(), received.get());
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
- }
- }
-
- @Test
- public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Exception {
- String topic = "testProducerConsumerJsonSchemaWithPulsarKafkaClient";
-
- JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
- JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
- consumer.subscribe(Arrays.asList(topic));
-
- Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
-
- for (int i = 0; i < 10; i++) {
- Bar bar = new Bar();
- bar.setField1(true);
-
- Foo foo = new Foo();
- foo.setField1("field1");
- foo.setField2("field2");
- foo.setField3(i);
- producer.send(new ProducerRecord<>(topic, bar, foo));
- }
- producer.flush();
- producer.close();
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<Bar, Foo> records = consumer.poll(100);
- if (!records.isEmpty()) {
- records.forEach(record -> {
- Bar key = record.key();
- Assert.assertTrue(key.isField1());
- Foo value = record.value();
- Assert.assertEquals(value.getField1(), "field1");
- Assert.assertEquals(value.getField2(), "field2");
- Assert.assertEquals(value.getField3(), received.get());
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
- }
- }
-
- @Test
- public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
- String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
-
- Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
- JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
- Properties props = new Properties();
- props.put("bootstrap.servers", getPlainTextServiceUrl());
- props.put("group.id", "my-subscription-name");
- props.put("enable.auto.commit", "false");
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- @Cleanup
- Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
- consumer.subscribe(Arrays.asList(topic));
-
- Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
-
- for (int i = 0; i < 10; i++) {
- Foo foo = new Foo();
- foo.setField1("field1");
- foo.setField2("field2");
- foo.setField3(i);
- producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
- }
- producer.flush();
- producer.close();
-
- AtomicInteger received = new AtomicInteger();
- while (received.get() < 10) {
- ConsumerRecords<String, Foo> records = consumer.poll(100);
- if (!records.isEmpty()) {
- records.forEach(record -> {
- String key = record.key();
- Assert.assertEquals(key, "hello" + received.get());
- Foo value = record.value();
- Assert.assertEquals(value.getField1(), "field1");
- Assert.assertEquals(value.getField2(), "field2");
- Assert.assertEquals(value.getField3(), received.get());
- received.incrementAndGet();
- });
-
- consumer.commitSync();
- }
- }
- }
-}
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
deleted file mode 100644
index e6e183a..0000000
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.compat.kafka;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-import java.util.Properties;
-/**
- * A test that tests if {@link PulsarKafkaProducer} is thread safe.
- */
-public class PulsarKafkaProducerThreadSafeTest extends PulsarStandaloneTestSuite {
- private Producer producer;
-
- private static String getPlainTextServiceUrl() {
- return container.getPlainTextServiceUrl();
- }
-
- @BeforeTest
- private void setup() {
- Properties producerProperties = new Properties();
- producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
- producerProperties.put("key.serializer", IntegerSerializer.class.getName());
- producerProperties.put("value.serializer", StringSerializer.class.getName());
- producer = new KafkaProducer<>(producerProperties);
- }
-
- /**
- * This test run 10 times in threadPool witch size is 5.
- * Different threads have same producer and different topics witch is based on thread time.
- * This test will be failed when producer failed to send if PulsarKafkaProducer is not thread safe.
- */
- @Test(threadPoolSize = 5, invocationCount = 10)
- public void testPulsarKafkaProducerThreadSafe() {
- String topic1 = "persistent://public/default/topic-" + System.currentTimeMillis();
- ProducerRecord<String, String> record = new ProducerRecord<>(topic1, "Hello");
- producer.send(record);
- }
-}
diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml
deleted file mode 100644
index 8737713..0000000
--- a/tests/pulsar-spark-test/pom.xml
+++ /dev/null
@@ -1,119 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>tests-parent</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-spark-test</artifactId>
- <packaging>jar</packaging>
- <name>Spark Streaming Pulsar Receivers Tests</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-spark</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>integration</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>mysql</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <!-- only run tests when -DintegrationTests is specified //-->
- <skipTests>true</skipTests>
- <systemPropertyVariables>
- <currentVersion>${project.version}</currentVersion>
- <maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
- </systemPropertyVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>integrationTests</id>
- <activation>
- <property>
- <name>integrationTests</name>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <properties>
- <property>
- <name>testRetryCount</name>
- <value>0</value>
- </property>
- <property>
- <name>listener</name>
- <value>org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener</value>
- </property>
- </properties>
- <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
- -Dio.netty.leakDetectionLevel=advanced
- </argLine>
- <skipTests>false</skipTests>
- <forkCount>1</forkCount>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
deleted file mode 100644
index bd619a5..0000000
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.spark;
-
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.spark.storage.StorageLevel;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.Test;
-
-public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
- private static final String TOPIC = "persistent://p1/c1/ns1/topic1";
- private static final String SUBS = "sub1";
- private static final String EXPECTED_MESSAGE = "pulsar-spark test message";
-
- @Test(dataProvider = "ServiceUrls")
- public void testReceivedMessage(String serviceUrl) throws Exception {
- ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
- Set<String> set = new HashSet<>();
- set.add(TOPIC);
- consConf.setTopicNames(set);
- consConf.setSubscriptionName(SUBS);
-
- MessageListener msgListener = spy(new MessageListener() {
- @Override
- public void received(Consumer consumer, Message msg) {
- return;
- }
- });
- final ArgumentCaptor<Consumer> consCaptor = ArgumentCaptor.forClass(Consumer.class);
- final ArgumentCaptor<Message> msgCaptor = ArgumentCaptor.forClass(Message.class);
- doNothing().when(msgListener).received(consCaptor.capture(), msgCaptor.capture());
- consConf.setMessageListener(msgListener);
-
- SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
- serviceUrl,
- consConf,
- new AuthenticationDisabled());
-
- receiver.onStart();
- waitForTransmission();
-
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
- producer.send(EXPECTED_MESSAGE.getBytes());
-
- waitForTransmission();
- receiver.onStop();
- assertEquals(new String(msgCaptor.getValue().getData()), EXPECTED_MESSAGE);
- }
-
- @Test(dataProvider = "ServiceUrls")
- public void testDefaultSettingsOfReceiver(String serviceUrl) {
- ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
- Set<String> set = new HashSet<>();
- set.add(TOPIC);
- consConf.setTopicNames(set);
- consConf.setSubscriptionName(SUBS);
-
- SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
- serviceUrl,
- consConf,
- new AuthenticationDisabled());
-
- assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
- assertNotNull(consConf.getMessageListener());
- }
-
- @Test(dataProvider = "ServiceUrls")
- public void testSharedSubscription(String serviceUrl) throws Exception {
- ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
- Set<String> set = new HashSet<>();
- set.add(TOPIC);
- consConf.setTopicNames(set);
- consConf.setSubscriptionName(SUBS);
- consConf.setSubscriptionType(SubscriptionType.Shared);
- consConf.setReceiverQueueSize(1);
-
- Map<String, MutableInt> receveidCounts = new HashMap<>();
-
- consConf.setMessageListener((consumer, msg) -> {
- receveidCounts.computeIfAbsent(consumer.getConsumerName(), x -> new MutableInt(0)).increment();
- });
-
- SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
- serviceUrl,
- consConf,
- new AuthenticationDisabled());
-
- SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
- serviceUrl,
- consConf,
- new AuthenticationDisabled());
-
- receiver1.onStart();
- receiver2.onStart();
- waitForTransmission();
-
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
- for (int i = 0; i < 10; i++) {
- producer.send(EXPECTED_MESSAGE.getBytes());
- }
-
- waitForTransmission();
- receiver1.onStop();
- receiver2.onStop();
-
- assertEquals(receveidCounts.size(), 2);
- }
-
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
- dataProvider = "ServiceUrls")
- public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
- new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
- }
-
- private static void waitForTransmission() {
- try {
- Thread.sleep(1_000);
- } catch (InterruptedException e) {
- }
- }
-}
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
deleted file mode 100644
index 3cf562d..0000000
--- a/tests/pulsar-storm-test/pom.xml
+++ /dev/null
@@ -1,119 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>tests-parent</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-storm-test</artifactId>
- <packaging>jar</packaging>
- <name>Pulsar Storm adapter Tests</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-storm</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <version>${storm.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-broker</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-broker</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>testmocks</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.asynchttpclient</groupId>
- <artifactId>async-http-client</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-</project>
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
deleted file mode 100644
index 4355ad6..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.storm.task.IOutputCollector;
-import org.apache.storm.tuple.Tuple;
-
-public class MockOutputCollector implements IOutputCollector {
-
- private boolean acked = false;
- private boolean failed = false;
- private Throwable lastError = null;
- private Tuple ackedTuple = null;
- private int numTuplesAcked = 0;
-
- @Override
- public void reportError(Throwable error) {
- lastError = error;
- }
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- return null;
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- }
-
- @Override
- public void ack(Tuple input) {
- acked = true;
- failed = false;
- ackedTuple = input;
- ++numTuplesAcked;
- }
-
- @Override
- public void fail(Tuple input) {
- failed = true;
- acked = false;
- }
-
- @Override
- public void resetTimeout(Tuple tuple) {
-
- }
-
- public boolean acked() {
- return acked;
- }
-
- public boolean failed() {
- return failed;
- }
-
- public Throwable getLastError() {
- return lastError;
- }
-
- public Tuple getAckedTuple() {
- return ackedTuple;
- }
-
- public int getNumTuplesAcked() {
- return numTuplesAcked;
- }
-
- public void reset() {
- acked = false;
- failed = false;
- lastError = null;
- ackedTuple = null;
- numTuplesAcked = 0;
- }
-
- @Override
- public void flush() {
- // Nothing to flush from buffer
- }
-
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
deleted file mode 100644
index 98c8d20..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-
- private boolean emitted = false;
- private Message lastMessage = null;
- private String data = null;
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- data = (String) tuple.get(0);
- lastMessage = (Message) messageId;
- return new ArrayList<Integer>();
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- emitted = true;
- data = (String) tuple.get(0);
- lastMessage = (Message) messageId;
- }
-
- @Override
- public long getPendingCount() {
- return 0;
- }
-
- @Override
- public void reportError(Throwable error) {
- }
-
- public boolean emitted() {
- return emitted;
- }
-
- public String getTupleData() {
- return data;
- }
-
- public Message getLastMessage() {
- return lastMessage;
- }
-
- public void reset() {
- emitted = false;
- data = null;
- lastMessage = null;
- }
-
- @Override
- public void flush() {
- // Nothing to flush from buffer
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
deleted file mode 100644
index f74f066..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarBoltTest extends ProducerConsumerBase {
-
- private static final int NO_OF_RETRIES = 10;
-
- public String serviceUrl;
- public final String topic = "persistent://my-property/my-ns/my-topic1";
- public final String subscriptionName = "my-subscriber-name";
-
- protected PulsarBoltConfiguration pulsarBoltConf;
- protected PulsarBolt bolt;
- protected MockOutputCollector mockCollector;
- protected Consumer consumer;
-
- @Override
- @BeforeMethod
- public void beforeMethod(Method m) throws Exception {
- super.beforeMethod(m);
- setup();
- }
-
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
-
- serviceUrl = pulsar.getWebServiceAddress();
-
- pulsarBoltConf = new PulsarBoltConfiguration();
- pulsarBoltConf.setServiceUrl(serviceUrl);
- pulsarBoltConf.setTopic(topic);
- pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
- pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
- bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- mockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- bolt.prepare(Maps.newHashMap(), context, collector);
- consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
- }
-
- @AfterMethod
- public void cleanup() throws Exception {
- bolt.close();
- consumer.close();
- super.internalCleanup();
- }
-
- @SuppressWarnings("serial")
- static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
- @Override
- public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
- if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
- return null;
- }
- if ("throw exception".equals(new String(tuple.getBinary(0)))) {
- throw new RuntimeException();
- }
- return msgBuilder.value(tuple.getBinary(0));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- };
-
- private Tuple getMockTuple(String msgContent) {
- Tuple mockTuple = mock(Tuple.class);
- when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
- when(mockTuple.getSourceComponent()).thenReturn("");
- when(mockTuple.getSourceStreamId()).thenReturn("");
- return mockTuple;
- }
-
- @Test
- public void testBasic() throws Exception {
- String msgContent = "hello world";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- for (int i = 0; i < NO_OF_RETRIES; i++) {
- Thread.sleep(1000);
- if (mockCollector.acked()) {
- break;
- }
- }
- Assert.assertTrue(mockCollector.acked());
- Assert.assertFalse(mockCollector.failed());
- Assert.assertNull(mockCollector.getLastError());
- Assert.assertEquals(tuple, mockCollector.getAckedTuple());
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- consumer.acknowledge(msg);
- Assert.assertEquals(msgContent, new String(msg.getData()));
- }
-
- @Test
- public void testExecuteFailure() throws Exception {
- String msgContent = "throw exception";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- Assert.assertFalse(mockCollector.acked());
- Assert.assertTrue(mockCollector.failed());
- Assert.assertNotNull(mockCollector.getLastError());
- }
-
- @Test
- public void testNoMessageSend() throws Exception {
- String msgContent = "message to be dropped";
- Tuple tuple = getMockTuple(msgContent);
- bolt.execute(tuple);
- Assert.assertTrue(mockCollector.acked());
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- Assert.assertNull(msg);
- }
-
- @Test
- public void testMetrics() throws Exception {
- bolt.resetMetrics();
- String msgContent = "hello world";
- Tuple tuple = getMockTuple(msgContent);
- for (int i = 0; i < 10; i++) {
- bolt.execute(tuple);
- }
- for (int i = 0; i < NO_OF_RETRIES; i++) {
- Thread.sleep(1000);
- if (mockCollector.getNumTuplesAcked() == 10) {
- break;
- }
- }
- @SuppressWarnings("rawtypes")
- Map metrics = (Map) bolt.getValueAndReset();
- Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
- Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
- 10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
- ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
- metrics = bolt.getMetrics();
- Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
- for (int i = 0; i < 10; i++) {
- Message msg = consumer.receive(5, TimeUnit.SECONDS);
- consumer.acknowledge(msg);
- }
- }
-
- @Test
- public void testSharedProducer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.publishers.size(), 1);
- PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- MockOutputCollector otherMockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherBolt.prepare(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.publishers.size(), 1);
-
- otherBolt.close();
-
- topicStats = admin.topics().getStats(topic);
- Assert.assertEquals(topicStats.publishers.size(), 1);
- }
-
- @Test
- public void testSerializability() throws Exception {
- // test serializability with no auth
- PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- TestUtil.testSerializability(boltWithNoAuth);
- }
-
- @Test
- public void testFailedProducer() {
- PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
- pulsarBoltConf.setServiceUrl(serviceUrl);
- pulsarBoltConf.setTopic("persistent://invalid");
- pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
- pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
- PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
- MockOutputCollector mockCollector = new MockOutputCollector();
- OutputCollector collector = new OutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("new" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- try {
- bolt.prepare(Maps.newHashMap(), context, collector);
- fail("should have failed as producer creation failed");
- } catch (IllegalStateException ie) {
- // Ok.
- }
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index 3247a5a..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarSpoutTest extends ProducerConsumerBase {
-
- public String serviceUrl;
- public final String topic = "persistent://my-property/my-ns/my-topic1";
- public final String subscriptionName = "my-subscriber-name";
-
- protected PulsarSpoutConfiguration pulsarSpoutConf;
- protected PulsarSpout spout;
- protected MockSpoutOutputCollector mockCollector;
- protected Producer producer;
-
- @Override
- @BeforeMethod
- public void beforeMethod(Method m) throws Exception {
- super.beforeMethod(m);
- setup();
- }
-
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
-
- serviceUrl = pulsar.getWebServiceAddress();
-
- pulsarSpoutConf = new PulsarSpoutConfiguration();
- pulsarSpoutConf.setServiceUrl(serviceUrl);
- pulsarSpoutConf.setTopic(topic);
- pulsarSpoutConf.setSubscriptionName(subscriptionName);
- pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
- pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
- pulsarSpoutConf.setMaxFailedRetries(2);
- pulsarSpoutConf.setSharedConsumerEnabled(true);
- pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
- pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
- spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- spout.open(Maps.newHashMap(), context, collector);
- producer = pulsarClient.newProducer().topic(topic).create();
- }
-
- @AfterMethod
- public void cleanup() throws Exception {
- producer.close();
- spout.close();
- super.internalCleanup();
- }
-
- @SuppressWarnings("serial")
- public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
- @Override
- public Values toValues(Message msg) {
- if ("message to be dropped".equals(new String(msg.getData()))) {
- return null;
- }
- return new Values(new String(msg.getData()));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- };
-
- @Test
- public void testBasic() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testRedeliverOnFail() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- Thread.sleep(150);
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testNoRedeliverOnAck() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.ack(mockCollector.getLastMessage());
- mockCollector.reset();
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testLimitedRedeliveriesOnTimeout() throws Exception {
- String msgContent = "chuck norris";
- producer.send(msgContent.getBytes());
-
- long startTime = System.currentTimeMillis();
- while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
- .currentTimeMillis()) {
- mockCollector.reset();
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
- // wait to avoid backoff
- Thread.sleep(500);
- }
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- Thread.sleep(500);
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testLimitedRedeliveriesOnCount() throws Exception {
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(150);
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(300);
-
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.fail(mockCollector.getLastMessage());
-
- mockCollector.reset();
- Thread.sleep(500);
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @Test
- public void testBackoffOnRetry() throws Exception {
- String msgContent = "chuck norris";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- spout.fail(mockCollector.getLastMessage());
- mockCollector.reset();
- // due to backoff we should not get the message again immediately
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- Thread.sleep(100);
- spout.nextTuple();
- assertTrue(mockCollector.emitted());
- assertEquals(mockCollector.getTupleData(), msgContent);
- spout.ack(mockCollector.getLastMessage());
- }
-
- @Test
- public void testMessageDrop() throws Exception {
- String msgContent = "message to be dropped";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- assertFalse(mockCollector.emitted());
- assertNull(mockCollector.getTupleData());
- }
-
- @SuppressWarnings({ "rawtypes" })
- @Test
- public void testMetrics() throws Exception {
- spout.resetMetrics();
- String msgContent = "hello world";
- producer.send(msgContent.getBytes());
- spout.nextTuple();
- Map metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
- assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
- 1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
- ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
- spout.fail(mockCollector.getLastMessage());
- metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
- Thread.sleep(150);
- spout.nextTuple();
- metrics = spout.getMetrics();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
- spout.ack(mockCollector.getLastMessage());
- metrics = (Map) spout.getValueAndReset();
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
- assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
- }
-
- @Test
- public void testSharedConsumer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
- PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherSpout.open(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
- }
-
- @Test
- public void testNoSharedConsumer() throws Exception {
- TopicStats topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
- pulsarSpoutConf.setSharedConsumerEnabled(false);
- PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
- when(context.getThisTaskId()).thenReturn(1);
- otherSpout.open(Maps.newHashMap(), context, collector);
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
-
- otherSpout.close();
-
- topicStats = admin.topics().getStats(topic);
- assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
- }
-
- @Test
- public void testSerializability() throws Exception {
- // test serializability with no auth
- PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- TestUtil.testSerializability(spoutWithNoAuth);
- }
-
- @Test
- public void testFailedConsumer() {
- PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
- pulsarSpoutConf.setServiceUrl(serviceUrl);
- pulsarSpoutConf.setTopic("persistent://invalidTopic");
- pulsarSpoutConf.setSubscriptionName(subscriptionName);
- pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
- pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
- pulsarSpoutConf.setMaxFailedRetries(2);
- pulsarSpoutConf.setSharedConsumerEnabled(false);
- pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
- pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
- PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
- MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
- SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
- TopologyContext context = mock(TopologyContext.class);
- when(context.getThisComponentId()).thenReturn("new-test" + methodName);
- when(context.getThisTaskId()).thenReturn(0);
- try {
- spout.open(Maps.newHashMap(), context, collector);
- fail("should have failed as consumer creation failed");
- } catch (IllegalStateException e) {
- // Ok
- }
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
deleted file mode 100644
index a71e088..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-
-import org.testng.Assert;
-
-public class TestUtil {
-
- public static void testSerializability(Object object) throws Exception {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(out);
- oos.writeObject(object);
- oos.close();
- Assert.assertTrue(out.toByteArray().length > 0);
- }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
deleted file mode 100644
index 93404ea..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm.example;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.storm.MessageToValuesMapper;
-import org.apache.pulsar.storm.PulsarBolt;
-import org.apache.pulsar.storm.PulsarBoltConfiguration;
-import org.apache.pulsar.storm.PulsarSpout;
-import org.apache.pulsar.storm.PulsarSpoutConfiguration;
-import org.apache.pulsar.storm.TupleToMessageMapper;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormExample {
- private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
- private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080";
-
- @SuppressWarnings("serial")
- static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
- @Override
- public Values toValues(Message msg) {
- return new Values(new String(msg.getData()));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // declare the output fields
- declarer.declare(new Fields("string"));
- }
- };
-
- @SuppressWarnings("serial")
- static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
- @Override
- public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
- String receivedMessage = tuple.getString(0);
- // message processing
- String processedMsg = receivedMessage + "-processed";
- return msgBuilder.value(processedMsg.getBytes());
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // declare the output fields
- }
- };
-
- public static void main(String[] args) throws Exception {
- // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
- // String authParams = "key1:val1,key2:val2";
- // clientConf.setAuthentication(authPluginClassName, authParams);
-
- String topic1 = "persistent://my-property/use/my-ns/my-topic1";
- String topic2 = "persistent://my-property/use/my-ns/my-topic2";
- String subscriptionName1 = "my-subscriber-name1";
- String subscriptionName2 = "my-subscriber-name2";
-
- // create spout
- PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
- spoutConf.setServiceUrl(serviceUrl);
- spoutConf.setTopic(topic1);
- spoutConf.setSubscriptionName(subscriptionName1);
- spoutConf.setMessageToValuesMapper(messageToValuesMapper);
- PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
-
- // create bolt
- PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
- boltConf.setServiceUrl(serviceUrl);
- boltConf.setTopic(topic2);
- boltConf.setTupleToMessageMapper(tupleToMessageMapper);
- PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("testSpout", spout);
- builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
-
- Config conf = new Config();
- conf.setNumWorkers(2);
- conf.setDebug(true);
- conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
-
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
- // create a consumer on topic2 to receive messages from the bolt when the processing is done
- Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
- // create a producer on topic1 to send messages that will be received by the spout
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create();
-
- for (int i = 0; i < 10; i++) {
- String msg = "msg-" + i;
- producer.send(msg.getBytes());
- LOG.info("Message {} sent", msg);
- }
- Message<byte[]> msg = null;
- for (int i = 0; i < 10; i++) {
- msg = consumer.receive(1, TimeUnit.SECONDS);
- LOG.info("Message {} received", new String(msg.getData()));
- }
- cluster.killTopology("test");
- cluster.shutdown();
-
- }
-
- class PulsarMetricsConsumer implements IMetricsConsumer {
-
- @Override
- public void prepare(Map stormConf, Object registrationArgument, TopologyContext context,
- IErrorReporter errorReporter) {
- }
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology.
- // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
- // is
- // "PulsarBoltMetrics-{componentId}-{taskIndex}".
- }
-
- @Override
- public void cleanup() {
- }
-
- }
-}