/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.schema.GenericRecord;
import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Test(groups = "flaky")
public class DeadLetterTopicTest extends ProducerConsumerBase {

    private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);

    @BeforeMethod(alwaysRun = true)
    @Override
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @AfterMethod(alwaysRun = true)
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(groups = "quarantine")
    public void testDeadLetterTopic() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic";

        final int maxRedeliveryCount = 2;

        final int sendMessages = 100;

        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                .receiverQueueSize(100)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        @Cleanup
        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic)
                .create();

        for (int i = 0; i < sendMessages; i++) {
            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
        }

        producer.close();

        int totalReceived = 0;
        do {
            Message<byte[]> message = consumer.receive();
            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            totalReceived++;
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        } while (totalInDeadLetter < sendMessages);

        deadLetterConsumer.close();
        consumer.close();

        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
        }
        assertNull(checkMessage);

        checkConsumer.close();
    }

    @Test(timeOut = 20000)
    public void testDeadLetterTopicHasOriginalInfo() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic";

        final int maxRedeliveryCount = 1;
        final int sendMessages = 10;

        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        @Cleanup
        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic)
                .create();
        Set<String> messageIds = new HashSet<>();
        for (int i = 0; i < sendMessages; i++) {
            MessageId messageId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
            messageIds.add(messageId.toString());
        }
        producer.close();

        int totalReceived = 0;
        do {
            consumer.receive();
            totalReceived++;
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

        int totalInDeadLetter = 0;
        do {
            Message<byte[]> message = deadLetterConsumer.receive();
            //Original info should exists
            assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
            assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        } while (totalInDeadLetter < sendMessages);
        assertEquals(totalInDeadLetter, sendMessages);
        deadLetterConsumer.close();
        consumer.close();
    }

    @Data
    public static class Foo {
        @Nullable
        private String field1;
        @Nullable
        private String field2;
    }

    @Data
    public static class FooV2 {
        @Nullable
        private String field1;
        @Nullable
        private String field2;
        @Nullable
        private String field3;
    }

    @Test(timeOut = 20000)
    public void testAutoConsumeSchemaDeadLetter() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
        final String subName = "my-subscription";
        final int maxRedeliveryCount = 1;
        final int sendMessages = 10;

        admin.topics().createNonPartitionedTopic(topic);
        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
        Consumer<FooV2> deadLetterConsumer = newPulsarClient.newConsumer(Schema.AVRO(FooV2.class))
                .topic(topic + "-" + subName + "-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
                .topic(topic)
                .create();
        Set<String> messageIds = new HashSet<>();
        for (int i = 0; i < sendMessages; i++) {
            if (i % 2 == 0) {
                Foo foo = new Foo();
                foo.field1 = i + "";
                foo.field2 = i + "";
                messageIds.add(producer.newMessage(Schema.AVRO(Foo.class)).value(foo).send().toString());
            } else {
                FooV2 foo = new FooV2();
                foo.field1 = i + "";
                foo.field2 = i + "";
                foo.field3 = i + "";
                messageIds.add(producer.newMessage(Schema.AVRO(FooV2.class)).value(foo).send().toString());
            }
        }
        Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
                .topic(topic)
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        producer.close();

        int totalReceived = 0;
        do {
            consumer.receive();
            totalReceived++;
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

        int totalInDeadLetter = 0;

        for (int i = 0; i < sendMessages; i++) {
            Message<FooV2> message;
            message = deadLetterConsumer.receive();
            FooV2 fooV2 = message.getValue();
            assertNotNull(fooV2.field1);
            assertEquals(fooV2.field2, fooV2.field1);
            assertTrue(fooV2.field3 == null || fooV2.field1.equals(fooV2.field3));
            assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
            assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        }
        assertEquals(totalInDeadLetter, sendMessages);
        deadLetterConsumer.close();
        consumer.close();
        newPulsarClient.close();
    }

    @Test(timeOut = 30000)
    public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
        final int maxRedeliveryCount = 1;
        final int messageCount = 10;
        final int consumerCount = 3;
        //1 start 3 parallel consumers
        List<Consumer<String>> consumers = new ArrayList<>();
        final AtomicInteger totalReceived = new AtomicInteger(0);
        @Cleanup("shutdownNow")
        ExecutorService executor = Executors.newFixedThreadPool(consumerCount);
        for (int i = 0; i < consumerCount; i++) {
            executor.execute(() -> {
                try {
                    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                            .topic(topic)
                            .subscriptionName("my-subscription-DuplicatedMessage")
                            .subscriptionType(SubscriptionType.Shared)
                            .ackTimeout(1001, TimeUnit.MILLISECONDS)
                            .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(topic + "-DLQ").build())
                            .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
                            .receiverQueueSize(100)
                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                            .messageListener((MessageListener<String>) (consumer1, msg) -> {
                                totalReceived.getAndIncrement();
                                //never ack
                            })
                            .subscribe();
                    consumers.add(consumer);
                } catch (PulsarClientException e) {
                    fail();
                }
            });
        }

        //2 send messages
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
        for (int i = 0; i < messageCount; i++) {
            producer.send(String.format("Message [%d]", i));
        }

        //3 start a DLQ consumer
        Consumer<String> deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic + "-DLQ")
                .subscriptionName("my-subscription-DuplicatedMessage-DLQ")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        int totalInDeadLetter = 0;
        while (true) {
            Message<String> message = deadLetterConsumer.receive(10, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        }

        //4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1)
        assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1));

        //5 The message in DLQ should be equal to messageCount
        assertEquals(totalInDeadLetter, messageCount);

        //6 clean up
        producer.close();
        deadLetterConsumer.close();
        for (Consumer<String> consumer : consumers) {
            consumer.close();
        }
    }

    /**
     * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}.
     * @throws Exception
     */
    @Test(enabled = false)
    public void testDeadLetterTopicWithMultiTopic() throws Exception {
        final String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1";
        final String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2";

        final int maxRedeliveryCount = 2;

        int sendMessages = 100;

        // subscribe to the original topics before publish
        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic1, topic2)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                .receiverQueueSize(100)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        // subscribe to the DLQ topics before consuming original topics
        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic1)
                .create();

        Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic2)
                .create();

        for (int i = 0; i < sendMessages; i++) {
            producer1.send(String.format("Hello Pulsar [%d]", i).getBytes());
            producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
        }

        sendMessages = sendMessages * 2;

        producer1.close();
        producer2.close();

        int totalReceived = 0;
        do {
            Message<byte[]> message = consumer.receive();
            log.info("consumer received message : {} {} - total = {}",
                message.getMessageId(), new String(message.getData()), ++totalReceived);
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        } while (totalInDeadLetter < sendMessages);

        deadLetterConsumer.close();
        consumer.close();

        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic1, topic2)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
        }
        assertNull(checkMessage);

        checkConsumer.close();
    }

    @Test(groups = "quarantine")
    public void testDeadLetterTopicByCustomTopicName() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
        final int maxRedeliveryCount = 2;
        final int sendMessages = 100;

        // subscribe before publish
        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
                        .build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        @Cleanup
        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
                .subscriptionName("my-subscription")
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic)
                .create();
        for (int i = 0; i < sendMessages; i++) {
            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
        }
        producer.close();

        int totalReceived = 0;
        do {
            Message<byte[]> message = consumer.receive();
            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            totalReceived++;
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer.receive();
            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            deadLetterConsumer.acknowledge(message);
            totalInDeadLetter++;
        } while (totalInDeadLetter < sendMessages);
        deadLetterConsumer.close();
        consumer.close();
        @Cleanup
        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
        Consumer<byte[]> checkConsumer = newPulsarClient1.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
        }
        assertNull(checkMessage);
        checkConsumer.close();
    }

    /**
     * issue https://github.com/apache/pulsar/issues/3077
     */
    @Test(timeOut = 200000)
    public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";

        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName("my-subscription")
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topic)
                .create();

        producer.send(("a message").getBytes());

        // Wait a while, message should not be send to DLQ
        Thread.sleep(5000L);

        Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
        assertNotNull(msg);
    }

    @Test
    public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() throws Exception {
        final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic";

        final int maxRedeliveryCount = 2;

        final int sendMessages = 1;

        int partitionCount = 2;

        admin.topics().createPartitionedTopic(topic, partitionCount);

        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Key_Shared)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .ackTimeout(1, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
                .receiverQueueSize(100)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Consumer<byte[]> deadLetterConsumer0 = pulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Consumer<byte[]> deadLetterConsumer1 = pulsarClient.newConsumer(Schema.BYTES)
                .topic("persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ")
                .subscriptionName("my-subscription")
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                .topic(topic)
                .create();

        for (int i = 0; i < sendMessages; i++) {
            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
        }

        producer.close();

        int totalReceived = 0;
        do {
            Message<byte[]> message = consumer.receive();
            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
            totalReceived++;
        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

        int totalInDeadLetter = 0;
        do {
            Message message = deadLetterConsumer0.receive(3, TimeUnit.SECONDS);
            if (message != null) {
                log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
                deadLetterConsumer0.acknowledge(message);
                totalInDeadLetter++;
            } else {
                break;
            }
        } while (totalInDeadLetter < sendMessages);

        do {
            Message message = deadLetterConsumer1.receive(3, TimeUnit.SECONDS);
            if (message != null) {
                log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
                deadLetterConsumer1.acknowledge(message);
                totalInDeadLetter++;
            } else {
                break;
            }
        } while (totalInDeadLetter < sendMessages);

        assertEquals(totalInDeadLetter, sendMessages);
        deadLetterConsumer0.close();
        deadLetterConsumer1.close();
        consumer.close();

        Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Key_Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();

        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
        if (checkMessage != null) {
            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
        }
        assertNull(checkMessage);

        checkConsumer.close();
    }
}
