PIP-62: Remove tests and examples related to pulsar-adapters (#8511)

related to #8480

remove remaining modules related to pulsar-adapters:
- examples/flink
- examples/spark
- tests/pulsar-kafka-compat-client-test
- tests/pulsar-spark-test
- tests/pulsar-storm-test
diff --git a/tests/pom.xml b/tests/pom.xml
index b960ade..ef0dfa0 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -34,9 +34,6 @@
   <modules>
     <module>docker-images</module>
     <module>integration</module>
-    <module>pulsar-kafka-compat-client-test</module>
-    <module>pulsar-storm-test</module>
-    <module>pulsar-spark-test</module>
     <module>bc_2_0_0</module>
     <module>bc_2_0_1</module>
   </modules>
diff --git a/tests/pulsar-kafka-compat-client-test/pom.xml b/tests/pulsar-kafka-compat-client-test/pom.xml
deleted file mode 100644
index e77cb8d..0000000
--- a/tests/pulsar-kafka-compat-client-test/pom.xml
+++ /dev/null
@@ -1,127 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.pulsar.tests</groupId>
-    <artifactId>tests-parent</artifactId>
-    <version>2.7.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-kafka-compat-client-test</artifactId>
-  <packaging>jar</packaging>
-  <name>Apache Pulsar :: Tests :: Pulsar Kafka Compat Client Tests</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.pulsar.tests</groupId>
-      <artifactId>integration</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka-clients</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.testcontainers</groupId>
-      <artifactId>testcontainers</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-admin</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-kafka</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.kafka</groupId>
-          <artifactId>kafka-clients</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <!-- only run tests when -DintegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <systemPropertyVariables>
-            <currentVersion>${project.version}</currentVersion>
-            <maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
-          </systemPropertyVariables>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-  <profiles>
-    <profile>
-      <id>integrationTests</id>
-      <activation>
-        <property>
-          <name>integrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <properties>
-                <property>
-                  <name>listener</name>
-                  <value>org.apache.pulsar.tests.PulsarTestListener</value>
-                </property>
-              </properties>
-              <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
-              -Dio.netty.leakDetectionLevel=advanced
-              </argLine>
-              <skipTests>false</skipTests>
-              <forkCount>1</forkCount>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
deleted file mode 100644
index fafb1bc..0000000
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ /dev/null
@@ -1,885 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.compat.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import lombok.Cleanup;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class KafkaApiTest extends PulsarStandaloneTestSuite {
-
-    @Data
-    public static class Foo {
-        @Nullable
-        private String field1;
-        @Nullable
-        private String field2;
-        private int field3;
-    }
-
-    @Data
-    public static class Bar {
-        private boolean field1;
-    }
-
-    private static String getPlainTextServiceUrl() {
-        return container.getPlainTextServiceUrl();
-    }
-
-    private static String getHttpServiceUrl() {
-        return container.getHttpServiceUrl();
-    }
-
-    @Test(timeOut = 30000)
-    public void testSimpleProducerConsumer() throws Exception {
-        String topic = "persistent://public/default/testSimpleProducerConsumer";
-
-        Properties producerProperties = new Properties();
-        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
-        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
-        producerProperties.put("value.serializer", StringSerializer.class.getName());
-        Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
-        Properties consumerProperties = new Properties();
-        consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
-        consumerProperties.put("group.id", "my-subscription-name");
-        consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
-        consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
-        consumerProperties.put("enable.auto.commit", "true");
-        Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProperties);
-        consumer.subscribe(Arrays.asList(topic));
-
-        List<Long> offsets = new ArrayList<>();
-
-        for (int i = 0; i < 10; i++) {
-            RecordMetadata md = producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)).get();
-            offsets.add(md.offset());
-            log.info("Published message at {}", Long.toHexString(md.offset()));
-        }
-
-        producer.flush();
-        producer.close();
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<Integer, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key().intValue(), received.get());
-                assertEquals(record.value(), "hello-" + received.get());
-                assertEquals(record.offset(), offsets.get(received.get()).longValue());
-
-                received.incrementAndGet();
-            });
-
-            consumer.commitSync();
-        }
-
-        consumer.close();
-    }
-
-    @Test
-    public void testSimpleConsumer() throws Exception {
-        String topic = "testSimpleConsumer";
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-
-        @Cleanup
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            if (!records.isEmpty()) {
-                records.forEach(record -> {
-                    String key = Integer.toString(received.get());
-                    String value = "hello-" + received.get();
-                    log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}",
-                        key, value, record.topic(), record.partition());
-                    assertEquals(record.key(), key);
-                    assertEquals(record.value(), value);
-
-                    received.incrementAndGet();
-                });
-
-                consumer.commitSync();
-            }
-        }
-    }
-
-    @Test
-    public void testConsumerAutoCommit() throws Exception {
-        String topic = "testConsumerAutoCommit";
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "true");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-                received.incrementAndGet();
-            });
-        }
-
-        consumer.close();
-
-        // Re-open consumer and verify every message was acknowledged
-        Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
-        consumer2.subscribe(Arrays.asList(topic));
-
-        ConsumerRecords<String, String> records = consumer2.poll(100);
-        assertEquals(records.count(), 0);
-        consumer2.close();
-    }
-
-    @Test
-    public void testConsumerManualOffsetCommit() throws Exception {
-        String topic = "testConsumerManualOffsetCommit";
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-
-                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-                offsets.put(new TopicPartition(record.topic(), record.partition()),
-                        new OffsetAndMetadata(record.offset()));
-                consumer.commitSync(offsets);
-
-                received.incrementAndGet();
-            });
-        }
-
-        consumer.close();
-
-        // Re-open consumer and verify every message was acknowledged
-        Consumer<String, String> consumer2 = new KafkaConsumer<>(props);
-        consumer2.subscribe(Arrays.asList(topic));
-
-        ConsumerRecords<String, String> records = consumer2.poll(100);
-        assertEquals(records.count(), 0);
-        consumer2.close();
-    }
-
-    @Test
-    public void testPartitions() throws Exception {
-        String topic = "testPartitions";
-
-        // Create 8 partitions in topic
-        @Cleanup
-        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
-        admin.topics().createPartitionedTopic(topic, 8);
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "true");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
-                .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
-
-        // Create 2 Kakfa consumer and verify each gets half of the messages
-        List<Consumer<String, String>> consumers = new ArrayList<>();
-        for (int c = 0; c < 2; c++) {
-            Consumer<String, String> consumer = new KafkaConsumer<>(props);
-            consumer.subscribe(Arrays.asList(topic));
-            consumers.add(consumer);
-        }
-
-        int N = 8 * 3;
-
-        for (int i = 0; i < N; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        consumers.forEach(consumer -> {
-            int expectedMessaged = N / consumers.size();
-
-            for (int i = 0; i < expectedMessaged;) {
-                ConsumerRecords<String, String> records = consumer.poll(100);
-                i += records.count();
-            }
-
-            // No more messages for this consumer
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            assertEquals(records.count(), 0);
-        });
-
-        consumers.forEach(Consumer::close);
-    }
-
-    @Test
-    public void testExplicitPartitions() throws Exception {
-        String topic = "testExplicitPartitions";
-
-        // Create 8 partitions in topic
-        @Cleanup
-        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
-        admin.topics().createPartitionedTopic(topic, 8);
-
-        Properties producerProperties = new Properties();
-        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
-        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
-        producerProperties.put("value.serializer", StringSerializer.class.getName());
-
-        @Cleanup
-        Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "true");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        // Create Kakfa consumer and verify all messages came from intended partition
-        @Cleanup
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        int N = 8 * 3;
-
-        final int choosenPartition = 5;
-
-        for (int i = 0; i < N; i++) {
-            producer.send(new ProducerRecord<>(topic, choosenPartition, i, "hello-" + i));
-        }
-
-        producer.flush();
-
-        for (int i = 0; i < N;) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            i += records.count();
-
-            records.forEach(record -> {
-                assertEquals(record.partition(), choosenPartition);
-            });
-        }
-
-        // No more messages for this consumer
-        ConsumerRecords<String, String> records = consumer.poll(100);
-        assertEquals(records.count(), 0);
-    }
-
-    public static class MyCustomPartitioner implements Partitioner {
-
-        static int USED_PARTITION = 3;
-
-        @Override
-        public void configure(Map<String, ?> conf) {
-            // Do nothing
-        }
-
-        @Override
-        public void close() {
-            // Do nothing
-        }
-
-        @Override
-        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
-            // Dummy implementation that always return same partition
-            return USED_PARTITION;
-        }
-    }
-
-    @Test
-    public void testCustomRouter() throws Exception {
-        String topic = "testCustomRouter";
-
-        // Create 8 partitions in topic
-        @Cleanup
-        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
-        admin.topics().createPartitionedTopic(topic, 8);
-
-        Properties producerProperties = new Properties();
-        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
-        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
-        producerProperties.put("value.serializer", StringSerializer.class.getName());
-        producerProperties.put("partitioner.class", MyCustomPartitioner.class.getName());
-
-        @Cleanup
-        Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "true");
-        props.put("key.deserializer", IntegerDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        // Create Kakfa consumer and verify all messages came from intended partition
-        @Cleanup
-        Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        int N = 8 * 3;
-
-        for (int i = 0; i < N; i++) {
-            producer.send(new ProducerRecord<>(topic, i, "hello-" + i));
-        }
-
-        producer.flush();
-
-        for (int i = 0; i < N;) {
-            ConsumerRecords<Integer, String> records = consumer.poll(100);
-            i += records.count();
-
-            records.forEach(record -> {
-                assertEquals(record.partition(), MyCustomPartitioner.USED_PARTITION);
-            });
-        }
-
-        // No more messages for this consumer
-        ConsumerRecords<Integer, String> records = consumer.poll(100);
-        assertEquals(records.count(), 0);
-    }
-
-    @Test
-    public void testConsumerSeek() throws Exception {
-        String topic = "testConsumerSeek";
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-        props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
-
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-
-                received.incrementAndGet();
-            });
-
-            consumer.commitSync();
-        }
-
-        consumer.seekToBeginning(Collections.emptyList());
-
-        Thread.sleep(500);
-
-        // Messages should be available again
-        received.set(0);
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-
-                received.incrementAndGet();
-            });
-
-            consumer.commitSync();
-        }
-
-        consumer.close();
-    }
-
-    @Test
-    public void testConsumerSeekToEnd() throws Exception {
-        String topic = "testConsumerSeekToEnd";
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-        props.put("pulsar.consumer.acknowledgments.group.time.millis", "0");
-
-        Consumer<String, String> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            pulsarProducer.newMessage().key(Integer.toString(i)).value(("hello-" + i).getBytes()).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-
-                received.incrementAndGet();
-            });
-
-            consumer.commitSync();
-        }
-
-        consumer.seekToEnd(Collections.emptyList());
-        Thread.sleep(500);
-
-        consumer.close();
-
-        // Recreate the consumer
-        consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topic));
-
-        ConsumerRecords<String, String> records = consumer.poll(100);
-        // Since we are at the end of the topic, there should be no messages
-        assertEquals(records.count(), 0);
-
-        consumer.close();
-    }
-
-    @Test
-    public void testSimpleProducer() throws Exception {
-        String topic = "testSimpleProducer";
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
-                .subscriptionName("my-subscription")
-                .subscribe();
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-
-        Producer<Integer, String> producer = new KafkaProducer<>(props);
-
-        for (int i = 0; i < 10; i++) {
-            producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
-        }
-
-        producer.flush();
-        producer.close();
-
-        for (int i = 0; i < 10; i++) {
-            Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
-            assertEquals(new String(msg.getData()), "hello-" + i);
-            pulsarConsumer.acknowledge(msg);
-        }
-    }
-
-    @Test(timeOut = 10000)
-    public void testProducerCallback() throws Exception {
-        String topic = "testProducerCallback";
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("my-subscription")
-                .subscribe();
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-
-        Producer<Integer, String> producer = new KafkaProducer<>(props);
-
-        CountDownLatch counter = new CountDownLatch(10);
-
-        for (int i = 0; i < 10; i++) {
-            producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> {
-                assertEquals(metadata.topic(), topic);
-                assertNull(exception);
-
-                counter.countDown();
-            });
-        }
-
-        counter.await();
-
-        for (int i = 0; i < 10; i++) {
-            Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
-            assertEquals(new String(msg.getData()), "hello-" + i);
-            pulsarConsumer.acknowledge(msg);
-        }
-
-        producer.close();
-    }
-
-    @Test
-    public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
-        String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
-        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
-                pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("my-subscription")
-                .subscribe();
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-
-        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
-        for (int i = 0; i < 10; i++) {
-            Bar bar = new Bar();
-            bar.setField1(true);
-
-            Foo foo = new Foo();
-            foo.setField1("field1");
-            foo.setField2("field2");
-            foo.setField3(i);
-            producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
-        }
-        producer.flush();
-        producer.close();
-
-        for (int i = 0; i < 10; i++) {
-            Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
-            Foo value = fooSchema.decode(msg.getValue());
-            Assert.assertEquals(value.getField1(), "field1");
-            Assert.assertEquals(value.getField2(), "field2");
-            Assert.assertEquals(value.getField3(), i);
-            pulsarConsumer.acknowledge(msg);
-        }
-    }
-
-    @Test
-    public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
-        String topic = "testConsumerAvroSchemaWithPulsarKafkaClient";
-
-        StringSchema stringSchema = new StringSchema();
-        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        Consumer<String, Foo> consumer = new KafkaConsumer<String, Foo>(props, new StringSchema(), fooSchema);
-        consumer.subscribe(Arrays.asList(topic));
-
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
-        org.apache.pulsar.client.api.Producer<Foo> pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create();
-
-        for (int i = 0; i < 10; i++) {
-            Foo foo = new Foo();
-            foo.setField1("field1");
-            foo.setField2("field2");
-            foo.setField3(i);
-            pulsarProducer.newMessage().keyBytes(stringSchema.encode(Integer.toString(i))).value(foo).send();
-        }
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, Foo> records = consumer.poll(100);
-            if (!records.isEmpty()) {
-                records.forEach(record -> {
-                    Assert.assertEquals(record.key(), Integer.toString(received.get()));
-                    Foo value = record.value();
-                    Assert.assertEquals(value.getField1(), "field1");
-                    Assert.assertEquals(value.getField2(), "field2");
-                    Assert.assertEquals(value.getField3(), received.get());
-                    received.incrementAndGet();
-                });
-
-                consumer.commitSync();
-            }
-        }
-    }
-
-    @Test
-    public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
-        String topic = "testProducerConsumerAvroSchemaWithPulsarKafkaClient";
-
-        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
-        consumer.subscribe(Arrays.asList(topic));
-
-        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
-
-        for (int i = 0; i < 10; i++) {
-            Bar bar = new Bar();
-            bar.setField1(true);
-
-            Foo foo = new Foo();
-            foo.setField1("field1");
-            foo.setField2("field2");
-            foo.setField3(i);
-            producer.send(new ProducerRecord<>(topic, bar, foo));
-        }
-        producer.flush();
-        producer.close();
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<Bar, Foo> records = consumer.poll(100);
-            if (!records.isEmpty()) {
-                records.forEach(record -> {
-                    Bar key = record.key();
-                    Assert.assertTrue(key.isField1());
-                    Foo value = record.value();
-                    Assert.assertEquals(value.getField1(), "field1");
-                    Assert.assertEquals(value.getField2(), "field2");
-                    Assert.assertEquals(value.getField3(), received.get());
-                    received.incrementAndGet();
-                });
-
-                consumer.commitSync();
-            }
-        }
-    }
-
-    @Test
-    public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Exception {
-        String topic = "testProducerConsumerJsonSchemaWithPulsarKafkaClient";
-
-        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
-        consumer.subscribe(Arrays.asList(topic));
-
-        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
-
-        for (int i = 0; i < 10; i++) {
-            Bar bar = new Bar();
-            bar.setField1(true);
-
-            Foo foo = new Foo();
-            foo.setField1("field1");
-            foo.setField2("field2");
-            foo.setField3(i);
-            producer.send(new ProducerRecord<>(topic, bar, foo));
-        }
-        producer.flush();
-        producer.close();
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<Bar, Foo> records = consumer.poll(100);
-            if (!records.isEmpty()) {
-                records.forEach(record -> {
-                    Bar key = record.key();
-                    Assert.assertTrue(key.isField1());
-                    Foo value = record.value();
-                    Assert.assertEquals(value.getField1(), "field1");
-                    Assert.assertEquals(value.getField2(), "field2");
-                    Assert.assertEquals(value.getField3(), received.get());
-                    received.incrementAndGet();
-                });
-
-                consumer.commitSync();
-            }
-        }
-    }
-
-    @Test
-    public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
-        String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
-
-        Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
-        JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", getPlainTextServiceUrl());
-        props.put("group.id", "my-subscription-name");
-        props.put("enable.auto.commit", "false");
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-        props.put("key.deserializer", StringDeserializer.class.getName());
-        props.put("value.deserializer", StringDeserializer.class.getName());
-
-        @Cleanup
-        Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
-        consumer.subscribe(Arrays.asList(topic));
-
-        Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
-
-        for (int i = 0; i < 10; i++) {
-            Foo foo = new Foo();
-            foo.setField1("field1");
-            foo.setField2("field2");
-            foo.setField3(i);
-            producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
-        }
-        producer.flush();
-        producer.close();
-
-        AtomicInteger received = new AtomicInteger();
-        while (received.get() < 10) {
-            ConsumerRecords<String, Foo> records = consumer.poll(100);
-            if (!records.isEmpty()) {
-                records.forEach(record -> {
-                    String key = record.key();
-                    Assert.assertEquals(key, "hello" + received.get());
-                    Foo value = record.value();
-                    Assert.assertEquals(value.getField1(), "field1");
-                    Assert.assertEquals(value.getField2(), "field2");
-                    Assert.assertEquals(value.getField3(), received.get());
-                    received.incrementAndGet();
-                });
-
-                consumer.commitSync();
-            }
-        }
-    }
-}
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
deleted file mode 100644
index e6e183a..0000000
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.compat.kafka;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-import java.util.Properties;
-/**
- * A test that tests if {@link PulsarKafkaProducer} is thread safe.
- */
-public class PulsarKafkaProducerThreadSafeTest extends PulsarStandaloneTestSuite {
-    private Producer producer;
-
-    private static String getPlainTextServiceUrl() {
-        return container.getPlainTextServiceUrl();
-    }
-
-    @BeforeTest
-    private void setup() {
-        Properties producerProperties = new Properties();
-        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
-        producerProperties.put("key.serializer", IntegerSerializer.class.getName());
-        producerProperties.put("value.serializer", StringSerializer.class.getName());
-        producer = new KafkaProducer<>(producerProperties);
-    }
-
-    /**
-     * This test run 10 times in threadPool witch size is 5.
-     * Different threads have same producer and different topics witch is based on thread time.
-     * This test will be failed when producer failed to send if PulsarKafkaProducer is not thread safe.
-     */
-    @Test(threadPoolSize = 5, invocationCount = 10)
-    public void testPulsarKafkaProducerThreadSafe() {
-        String topic1 = "persistent://public/default/topic-" + System.currentTimeMillis();
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic1, "Hello");
-        producer.send(record);
-    }
-}
diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml
deleted file mode 100644
index 8737713..0000000
--- a/tests/pulsar-spark-test/pom.xml
+++ /dev/null
@@ -1,119 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.pulsar.tests</groupId>
-        <artifactId>tests-parent</artifactId>
-        <version>2.7.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>pulsar-spark-test</artifactId>
-    <packaging>jar</packaging>
-    <name>Spark Streaming Pulsar Receivers Tests</name>
-
-    <dependencies>
-
-        <dependency>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-spark</artifactId>
-          <version>${project.version}</version>
-          <scope>test</scope>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.pulsar.tests</groupId>
-          <artifactId>integration</artifactId>
-          <version>${project.version}</version>
-          <type>test-jar</type>
-          <scope>test</scope>
-        </dependency>
-
-        <dependency>
-          <groupId>org.testcontainers</groupId>
-          <artifactId>mysql</artifactId>
-          <scope>test</scope>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-streaming_2.10</artifactId>
-          <scope>test</scope>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-      <plugins>
-        <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <!-- only run tests when -DintegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <systemPropertyVariables>
-            <currentVersion>${project.version}</currentVersion>
-            <maven.buildDirectory>${project.build.directory}</maven.buildDirectory>
-          </systemPropertyVariables>
-        </configuration>
-      </plugin>
-      </plugins>
-    </build>
-
-    <profiles>
-    <profile>
-      <id>integrationTests</id>
-      <activation>
-        <property>
-          <name>integrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <properties>
-                <property>
-                  <name>testRetryCount</name>
-                  <value>0</value>
-                </property>
-                <property>
-                  <name>listener</name>
-                  <value>org.apache.pulsar.tests.PulsarTestListener,org.apache.pulsar.tests.AnnotationListener</value>
-                </property>
-              </properties>
-              <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
-              -Dio.netty.leakDetectionLevel=advanced
-              </argLine>
-              <skipTests>false</skipTests>
-              <forkCount>1</forkCount>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
deleted file mode 100644
index bd619a5..0000000
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.spark;
-
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.spark.storage.StorageLevel;
-import org.mockito.ArgumentCaptor;
-import org.testng.annotations.Test;
-
-public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
-    private static final String TOPIC = "persistent://p1/c1/ns1/topic1";
-    private static final String SUBS = "sub1";
-    private static final String EXPECTED_MESSAGE = "pulsar-spark test message";
-
-    @Test(dataProvider = "ServiceUrls")
-    public void testReceivedMessage(String serviceUrl) throws Exception {
-        ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
-        Set<String> set = new HashSet<>();
-        set.add(TOPIC);
-        consConf.setTopicNames(set);
-        consConf.setSubscriptionName(SUBS);
-
-        MessageListener msgListener = spy(new MessageListener() {
-            @Override
-            public void received(Consumer consumer, Message msg) {
-                return;
-            }
-        });
-        final ArgumentCaptor<Consumer> consCaptor = ArgumentCaptor.forClass(Consumer.class);
-        final ArgumentCaptor<Message> msgCaptor = ArgumentCaptor.forClass(Message.class);
-        doNothing().when(msgListener).received(consCaptor.capture(), msgCaptor.capture());
-        consConf.setMessageListener(msgListener);
-
-        SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
-            serviceUrl,
-            consConf,
-            new AuthenticationDisabled());
-
-        receiver.onStart();
-        waitForTransmission();
-
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-        Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
-        producer.send(EXPECTED_MESSAGE.getBytes());
-
-        waitForTransmission();
-        receiver.onStop();
-        assertEquals(new String(msgCaptor.getValue().getData()), EXPECTED_MESSAGE);
-    }
-
-    @Test(dataProvider = "ServiceUrls")
-    public void testDefaultSettingsOfReceiver(String serviceUrl) {
-        ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
-        Set<String> set = new HashSet<>();
-        set.add(TOPIC);
-        consConf.setTopicNames(set);
-        consConf.setSubscriptionName(SUBS);
-
-        SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
-            serviceUrl,
-            consConf,
-            new AuthenticationDisabled());
-
-        assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
-        assertNotNull(consConf.getMessageListener());
-    }
-
-    @Test(dataProvider = "ServiceUrls")
-    public void testSharedSubscription(String serviceUrl) throws Exception {
-        ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
-
-        Set<String> set = new HashSet<>();
-        set.add(TOPIC);
-        consConf.setTopicNames(set);
-        consConf.setSubscriptionName(SUBS);
-        consConf.setSubscriptionType(SubscriptionType.Shared);
-        consConf.setReceiverQueueSize(1);
-
-        Map<String, MutableInt> receveidCounts = new HashMap<>();
-
-        consConf.setMessageListener((consumer, msg) -> {
-            receveidCounts.computeIfAbsent(consumer.getConsumerName(), x -> new MutableInt(0)).increment();
-        });
-
-        SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
-            serviceUrl,
-            consConf,
-            new AuthenticationDisabled());
-
-        SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
-                serviceUrl,
-                consConf,
-                new AuthenticationDisabled());
-
-        receiver1.onStart();
-        receiver2.onStart();
-        waitForTransmission();
-
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-        Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
-        for (int i = 0; i < 10; i++) {
-            producer.send(EXPECTED_MESSAGE.getBytes());
-        }
-
-        waitForTransmission();
-        receiver1.onStop();
-        receiver2.onStop();
-
-        assertEquals(receveidCounts.size(), 2);
-    }
-
-    @Test(expectedExceptions = NullPointerException.class,
-            expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
-            dataProvider = "ServiceUrls")
-    public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
-        new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
-    }
-
-    private static void waitForTransmission() {
-        try {
-            Thread.sleep(1_000);
-        } catch (InterruptedException e) {
-        }
-    }
-}
diff --git a/tests/pulsar-storm-test/pom.xml b/tests/pulsar-storm-test/pom.xml
deleted file mode 100644
index 3cf562d..0000000
--- a/tests/pulsar-storm-test/pom.xml
+++ /dev/null
@@ -1,119 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.pulsar.tests</groupId>
-    <artifactId>tests-parent</artifactId>
-    <version>2.7.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-storm-test</artifactId>
-  <packaging>jar</packaging>
-  <name>Pulsar Storm adapter Tests</name>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-storm</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-client</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-     <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-server</artifactId>
-      <version>${storm.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-        </exclusion>
-        <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-broker</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>testmocks</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.asynchttpclient</groupId>
-      <artifactId>async-http-client</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-core</artifactId>
-      <version>${storm.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>ch.qos.logback</groupId>
-          <artifactId>logback-classic</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-  </dependencies>
-</project>
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
deleted file mode 100644
index 4355ad6..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.storm.task.IOutputCollector;
-import org.apache.storm.tuple.Tuple;
-
-public class MockOutputCollector implements IOutputCollector {
-
-    private boolean acked = false;
-    private boolean failed = false;
-    private Throwable lastError = null;
-    private Tuple ackedTuple = null;
-    private int numTuplesAcked = 0;
-
-    @Override
-    public void reportError(Throwable error) {
-        lastError = error;
-    }
-
-    @Override
-    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-        return null;
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-    }
-
-    @Override
-    public void ack(Tuple input) {
-        acked = true;
-        failed = false;
-        ackedTuple = input;
-        ++numTuplesAcked;
-    }
-
-    @Override
-    public void fail(Tuple input) {
-        failed = true;
-        acked = false;
-    }
-
-    @Override
-    public void resetTimeout(Tuple tuple) {
-
-    }
-
-    public boolean acked() {
-        return acked;
-    }
-
-    public boolean failed() {
-        return failed;
-    }
-
-    public Throwable getLastError() {
-        return lastError;
-    }
-
-    public Tuple getAckedTuple() {
-        return ackedTuple;
-    }
-
-    public int getNumTuplesAcked() {
-        return numTuplesAcked;
-    }
-
-    public void reset() {
-        acked = false;
-        failed = false;
-        lastError = null;
-        ackedTuple = null;
-        numTuplesAcked = 0;
-    }
-
-    @Override
-    public void flush() {
-        // Nothing to flush from buffer
-    }
-
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
deleted file mode 100644
index 98c8d20..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pulsar.client.api.Message;
-
-import org.apache.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-
-    private boolean emitted = false;
-    private Message lastMessage = null;
-    private String data = null;
-
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-        data = (String) tuple.get(0);
-        lastMessage = (Message) messageId;
-        return new ArrayList<Integer>();
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
-        emitted = true;
-        data = (String) tuple.get(0);
-        lastMessage = (Message) messageId;
-    }
-
-    @Override
-    public long getPendingCount() {
-        return 0;
-    }
-
-    @Override
-    public void reportError(Throwable error) {
-    }
-
-    public boolean emitted() {
-        return emitted;
-    }
-
-    public String getTupleData() {
-        return data;
-    }
-
-    public Message getLastMessage() {
-        return lastMessage;
-    }
-
-    public void reset() {
-        emitted = false;
-        data = null;
-        lastMessage = null;
-    }
-    
-    @Override
-    public void flush() {
-        // Nothing to flush from buffer
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
deleted file mode 100644
index f74f066..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarBoltTest extends ProducerConsumerBase {
-
-    private static final int NO_OF_RETRIES = 10;
-
-    public String serviceUrl;
-    public final String topic = "persistent://my-property/my-ns/my-topic1";
-    public final String subscriptionName = "my-subscriber-name";
-
-    protected PulsarBoltConfiguration pulsarBoltConf;
-    protected PulsarBolt bolt;
-    protected MockOutputCollector mockCollector;
-    protected Consumer consumer;
-
-    @Override
-    @BeforeMethod
-    public void beforeMethod(Method m) throws Exception {
-        super.beforeMethod(m);
-        setup();
-    }
-
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-
-        serviceUrl = pulsar.getWebServiceAddress();
-
-        pulsarBoltConf = new PulsarBoltConfiguration();
-        pulsarBoltConf.setServiceUrl(serviceUrl);
-        pulsarBoltConf.setTopic(topic);
-        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
-        bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        mockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        bolt.prepare(Maps.newHashMap(), context, collector);
-        consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName).subscribe();
-    }
-
-    @AfterMethod
-    public void cleanup() throws Exception {
-        bolt.close();
-        consumer.close();
-        super.internalCleanup();
-    }
-
-    @SuppressWarnings("serial")
-    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
-        @Override
-        public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
-            if ("message to be dropped".equals(new String(tuple.getBinary(0)))) {
-                return null;
-            }
-            if ("throw exception".equals(new String(tuple.getBinary(0)))) {
-                throw new RuntimeException();
-            }
-            return msgBuilder.value(tuple.getBinary(0));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    };
-
-    private Tuple getMockTuple(String msgContent) {
-        Tuple mockTuple = mock(Tuple.class);
-        when(mockTuple.getBinary(0)).thenReturn(msgContent.getBytes());
-        when(mockTuple.getSourceComponent()).thenReturn("");
-        when(mockTuple.getSourceStreamId()).thenReturn("");
-        return mockTuple;
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        String msgContent = "hello world";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        for (int i = 0; i < NO_OF_RETRIES; i++) {
-            Thread.sleep(1000);
-            if (mockCollector.acked()) {
-                break;
-            }
-        }
-        Assert.assertTrue(mockCollector.acked());
-        Assert.assertFalse(mockCollector.failed());
-        Assert.assertNull(mockCollector.getLastError());
-        Assert.assertEquals(tuple, mockCollector.getAckedTuple());
-        Message msg = consumer.receive(5, TimeUnit.SECONDS);
-        consumer.acknowledge(msg);
-        Assert.assertEquals(msgContent, new String(msg.getData()));
-    }
-
-    @Test
-    public void testExecuteFailure() throws Exception {
-        String msgContent = "throw exception";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        Assert.assertFalse(mockCollector.acked());
-        Assert.assertTrue(mockCollector.failed());
-        Assert.assertNotNull(mockCollector.getLastError());
-    }
-
-    @Test
-    public void testNoMessageSend() throws Exception {
-        String msgContent = "message to be dropped";
-        Tuple tuple = getMockTuple(msgContent);
-        bolt.execute(tuple);
-        Assert.assertTrue(mockCollector.acked());
-        Message msg = consumer.receive(5, TimeUnit.SECONDS);
-        Assert.assertNull(msg);
-    }
-
-    @Test
-    public void testMetrics() throws Exception {
-        bolt.resetMetrics();
-        String msgContent = "hello world";
-        Tuple tuple = getMockTuple(msgContent);
-        for (int i = 0; i < 10; i++) {
-            bolt.execute(tuple);
-        }
-        for (int i = 0; i < NO_OF_RETRIES; i++) {
-            Thread.sleep(1000);
-            if (mockCollector.getNumTuplesAcked() == 10) {
-                break;
-            }
-        }
-        @SuppressWarnings("rawtypes")
-        Map metrics = (Map) bolt.getValueAndReset();
-        Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 10);
-        Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_RATE)).doubleValue(),
-                10.0 / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        Assert.assertEquals(((Double) metrics.get(PulsarBolt.PRODUCER_THROUGHPUT_BYTES)).doubleValue(),
-                ((double) msgContent.getBytes().length * 10) / pulsarBoltConf.getMetricsTimeIntervalInSecs());
-        metrics = bolt.getMetrics();
-        Assert.assertEquals(((Long) metrics.get(PulsarBolt.NO_OF_MESSAGES_SENT)).longValue(), 0);
-        for (int i = 0; i < 10; i++) {
-            Message msg = consumer.receive(5, TimeUnit.SECONDS);
-            consumer.acknowledge(msg);
-        }
-    }
-
-    @Test
-    public void testSharedProducer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.publishers.size(), 1);
-        PulsarBolt otherBolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        MockOutputCollector otherMockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-bolt-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherBolt.prepare(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.publishers.size(), 1);
-
-        otherBolt.close();
-
-        topicStats = admin.topics().getStats(topic);
-        Assert.assertEquals(topicStats.publishers.size(), 1);
-    }
-
-    @Test
-    public void testSerializability() throws Exception {
-        // test serializability with no auth
-        PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        TestUtil.testSerializability(boltWithNoAuth);
-    }
-
-    @Test
-    public void testFailedProducer() {
-        PulsarBoltConfiguration pulsarBoltConf = new PulsarBoltConfiguration();
-        pulsarBoltConf.setServiceUrl(serviceUrl);
-        pulsarBoltConf.setTopic("persistent://invalid");
-        pulsarBoltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        pulsarBoltConf.setMetricsTimeIntervalInSecs(60);
-        PulsarBolt bolt = new PulsarBolt(pulsarBoltConf, PulsarClient.builder());
-        MockOutputCollector mockCollector = new MockOutputCollector();
-        OutputCollector collector = new OutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("new" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        try {
-            bolt.prepare(Maps.newHashMap(), context, collector);
-            fail("should have failed as producer creation failed");
-        } catch (IllegalStateException ie) {
-            // Ok.
-        }
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
deleted file mode 100644
index 3247a5a..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.testng.collections.Maps;
-
-public class PulsarSpoutTest extends ProducerConsumerBase {
-
-    public String serviceUrl;
-    public final String topic = "persistent://my-property/my-ns/my-topic1";
-    public final String subscriptionName = "my-subscriber-name";
-
-    protected PulsarSpoutConfiguration pulsarSpoutConf;
-    protected PulsarSpout spout;
-    protected MockSpoutOutputCollector mockCollector;
-    protected Producer producer;
-
-    @Override
-    @BeforeMethod
-    public void beforeMethod(Method m) throws Exception {
-        super.beforeMethod(m);
-        setup();
-    }
-
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-
-        serviceUrl = pulsar.getWebServiceAddress();
-
-        pulsarSpoutConf = new PulsarSpoutConfiguration();
-        pulsarSpoutConf.setServiceUrl(serviceUrl);
-        pulsarSpoutConf.setTopic(topic);
-        pulsarSpoutConf.setSubscriptionName(subscriptionName);
-        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
-        pulsarSpoutConf.setMaxFailedRetries(2);
-        pulsarSpoutConf.setSharedConsumerEnabled(true);
-        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
-        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
-        spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        spout.open(Maps.newHashMap(), context, collector);
-        producer = pulsarClient.newProducer().topic(topic).create();
-    }
-
-    @AfterMethod
-    public void cleanup() throws Exception {
-        producer.close();
-        spout.close();
-        super.internalCleanup();
-    }
-
-    @SuppressWarnings("serial")
-    public static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
-        @Override
-        public Values toValues(Message msg) {
-            if ("message to be dropped".equals(new String(msg.getData()))) {
-                return null;
-            }
-            return new Values(new String(msg.getData()));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    };
-
-    @Test
-    public void testBasic() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testRedeliverOnFail() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        Thread.sleep(150);
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testNoRedeliverOnAck() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.ack(mockCollector.getLastMessage());
-        mockCollector.reset();
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testLimitedRedeliveriesOnTimeout() throws Exception {
-        String msgContent = "chuck norris";
-        producer.send(msgContent.getBytes());
-
-        long startTime = System.currentTimeMillis();
-        while (startTime + pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.MILLISECONDS) > System
-                .currentTimeMillis()) {
-            mockCollector.reset();
-            spout.nextTuple();
-            assertTrue(mockCollector.emitted());
-            assertEquals(mockCollector.getTupleData(), msgContent);
-            spout.fail(mockCollector.getLastMessage());
-            // wait to avoid backoff
-            Thread.sleep(500);
-        }
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        Thread.sleep(500);
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testLimitedRedeliveriesOnCount() throws Exception {
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(150);
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(300);
-
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.fail(mockCollector.getLastMessage());
-
-        mockCollector.reset();
-        Thread.sleep(500);
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @Test
-    public void testBackoffOnRetry() throws Exception {
-        String msgContent = "chuck norris";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        spout.fail(mockCollector.getLastMessage());
-        mockCollector.reset();
-        // due to backoff we should not get the message again immediately
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-        Thread.sleep(100);
-        spout.nextTuple();
-        assertTrue(mockCollector.emitted());
-        assertEquals(mockCollector.getTupleData(), msgContent);
-        spout.ack(mockCollector.getLastMessage());
-    }
-
-    @Test
-    public void testMessageDrop() throws Exception {
-        String msgContent = "message to be dropped";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        assertFalse(mockCollector.emitted());
-        assertNull(mockCollector.getTupleData());
-    }
-
-    @SuppressWarnings({ "rawtypes" })
-    @Test
-    public void testMetrics() throws Exception {
-        spout.resetMetrics();
-        String msgContent = "hello world";
-        producer.send(msgContent.getBytes());
-        spout.nextTuple();
-        Map metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
-        assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_RATE)).doubleValue(),
-                1.0 / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        assertEquals(((Double) metrics.get(PulsarSpout.CONSUMER_THROUGHPUT_BYTES)).doubleValue(),
-                ((double) msgContent.getBytes().length) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
-        spout.fail(mockCollector.getLastMessage());
-        metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
-        Thread.sleep(150);
-        spout.nextTuple();
-        metrics = spout.getMetrics();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 1);
-        spout.ack(mockCollector.getLastMessage());
-        metrics = (Map) spout.getValueAndReset();
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_RECEIVED)).longValue(), 1);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_MESSAGES_EMITTED)).longValue(), 2);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_FAILED_MESSAGES)).longValue(), 0);
-        assertEquals(((Long) metrics.get(PulsarSpout.NO_OF_PENDING_ACKS)).longValue(), 0);
-    }
-
-    @Test
-    public void testSharedConsumer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherSpout.open(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-    }
-
-    @Test
-    public void testNoSharedConsumer() throws Exception {
-        TopicStats topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-        pulsarSpoutConf.setSharedConsumerEnabled(false);
-        PulsarSpout otherSpout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector otherMockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(otherMockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("test-spout-" + methodName);
-        when(context.getThisTaskId()).thenReturn(1);
-        otherSpout.open(Maps.newHashMap(), context, collector);
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 2);
-
-        otherSpout.close();
-
-        topicStats = admin.topics().getStats(topic);
-        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
-    }
-
-    @Test
-    public void testSerializability() throws Exception {
-        // test serializability with no auth
-        PulsarSpout spoutWithNoAuth = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        TestUtil.testSerializability(spoutWithNoAuth);
-    }
-
-    @Test
-    public void testFailedConsumer() {
-        PulsarSpoutConfiguration pulsarSpoutConf = new PulsarSpoutConfiguration();
-        pulsarSpoutConf.setServiceUrl(serviceUrl);
-        pulsarSpoutConf.setTopic("persistent://invalidTopic");
-        pulsarSpoutConf.setSubscriptionName(subscriptionName);
-        pulsarSpoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        pulsarSpoutConf.setFailedRetriesTimeout(1, TimeUnit.SECONDS);
-        pulsarSpoutConf.setMaxFailedRetries(2);
-        pulsarSpoutConf.setSharedConsumerEnabled(false);
-        pulsarSpoutConf.setMetricsTimeIntervalInSecs(60);
-        pulsarSpoutConf.setSubscriptionType(SubscriptionType.Shared);
-        PulsarSpout spout = new PulsarSpout(pulsarSpoutConf, PulsarClient.builder());
-        MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector);
-        TopologyContext context = mock(TopologyContext.class);
-        when(context.getThisComponentId()).thenReturn("new-test" + methodName);
-        when(context.getThisTaskId()).thenReturn(0);
-        try {
-            spout.open(Maps.newHashMap(), context, collector);
-            fail("should have failed as consumer creation failed");
-        } catch (IllegalStateException e) {
-            // Ok
-        }
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
deleted file mode 100644
index a71e088..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/TestUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-
-import org.testng.Assert;
-
-public class TestUtil {
-
-    public static void testSerializability(Object object) throws Exception {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(out);
-        oos.writeObject(object);
-        oos.close();
-        Assert.assertTrue(out.toByteArray().length > 0);
-    }
-}
diff --git a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
deleted file mode 100644
index 93404ea..0000000
--- a/tests/pulsar-storm-test/src/test/java/org/apache/pulsar/storm/example/StormExample.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.storm.example;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.storm.MessageToValuesMapper;
-import org.apache.pulsar.storm.PulsarBolt;
-import org.apache.pulsar.storm.PulsarBoltConfiguration;
-import org.apache.pulsar.storm.PulsarSpout;
-import org.apache.pulsar.storm.PulsarSpoutConfiguration;
-import org.apache.pulsar.storm.TupleToMessageMapper;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.metric.api.IMetricsConsumer;
-import org.apache.storm.task.IErrorReporter;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormExample {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
-    private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080";
-
-    @SuppressWarnings("serial")
-    static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
-
-        @Override
-        public Values toValues(Message msg) {
-            return new Values(new String(msg.getData()));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // declare the output fields
-            declarer.declare(new Fields("string"));
-        }
-    };
-
-    @SuppressWarnings("serial")
-    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
-
-        @Override
-        public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {
-            String receivedMessage = tuple.getString(0);
-            // message processing
-            String processedMsg = receivedMessage + "-processed";
-            return msgBuilder.value(processedMsg.getBytes());
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // declare the output fields
-        }
-    };
-
-    public static void main(String[] args) throws Exception {
-        // String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
-        // String authParams = "key1:val1,key2:val2";
-        // clientConf.setAuthentication(authPluginClassName, authParams);
-
-        String topic1 = "persistent://my-property/use/my-ns/my-topic1";
-        String topic2 = "persistent://my-property/use/my-ns/my-topic2";
-        String subscriptionName1 = "my-subscriber-name1";
-        String subscriptionName2 = "my-subscriber-name2";
-
-        // create spout
-        PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
-        spoutConf.setServiceUrl(serviceUrl);
-        spoutConf.setTopic(topic1);
-        spoutConf.setSubscriptionName(subscriptionName1);
-        spoutConf.setMessageToValuesMapper(messageToValuesMapper);
-        PulsarSpout spout = new PulsarSpout(spoutConf, PulsarClient.builder());
-
-        // create bolt
-        PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
-        boltConf.setServiceUrl(serviceUrl);
-        boltConf.setTopic(topic2);
-        boltConf.setTupleToMessageMapper(tupleToMessageMapper);
-        PulsarBolt bolt = new PulsarBolt(boltConf, PulsarClient.builder());
-
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("testSpout", spout);
-        builder.setBolt("testBolt", bolt).shuffleGrouping("testSpout");
-
-        Config conf = new Config();
-        conf.setNumWorkers(2);
-        conf.setDebug(true);
-        conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("test", conf, builder.createTopology());
-        Utils.sleep(10000);
-
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
-        // create a consumer on topic2 to receive messages from the bolt when the processing is done
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic2).subscriptionName(subscriptionName2).subscribe();
-        // create a producer on topic1 to send messages that will be received by the spout
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic1).create();
-
-        for (int i = 0; i < 10; i++) {
-            String msg = "msg-" + i;
-            producer.send(msg.getBytes());
-            LOG.info("Message {} sent", msg);
-        }
-        Message<byte[]> msg = null;
-        for (int i = 0; i < 10; i++) {
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-            LOG.info("Message {} received", new String(msg.getData()));
-        }
-        cluster.killTopology("test");
-        cluster.shutdown();
-
-    }
-
-    class PulsarMetricsConsumer implements IMetricsConsumer {
-
-        @Override
-        public void prepare(Map stormConf, Object registrationArgument, TopologyContext context,
-                IErrorReporter errorReporter) {
-        }
-
-        @Override
-        public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-            // The collection will contain metrics for all the spouts/bolts that register the metrics in the topology.
-            // The name for the Pulsar Spout is "PulsarSpoutMetrics-{componentId}-{taskIndex}" and for the Pulsar Bolt
-            // is
-            // "PulsarBoltMetrics-{componentId}-{taskIndex}".
-        }
-
-        @Override
-        public void cleanup() {
-        }
-
-    }
-}