/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.util.Timeout;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {

    enum ConsumerType {
        PARTITIONED, NON_PARTITIONED
    }

    protected final String subscription;
    protected final ConsumerConfigurationData<T> conf;
    protected final String consumerName;
    protected final CompletableFuture<Consumer<T>> subscribeFuture;
    protected final MessageListener<T> listener;
    protected final ConsumerEventListener consumerEventListener;
    protected final ExecutorService listenerExecutor;
    final BlockingQueue<Message<T>> incomingMessages;
    protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
    protected int maxReceiverQueueSize;
    protected final Schema<T> schema;
    protected final ConsumerInterceptors<T> interceptors;
    protected final BatchReceivePolicy batchReceivePolicy;
    protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
    private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
            .newUpdater(ConsumerBase.class, "incomingMessagesSize");
    protected volatile long incomingMessagesSize = 0;
    protected volatile Timeout batchReceiveTimeout = null;
    protected final Lock reentrantLock = new ReentrantLock();

    protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                           int receiverQueueSize, ExecutorService listenerExecutor,
                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
        super(client, topic);
        this.maxReceiverQueueSize = receiverQueueSize;
        this.subscription = conf.getSubscriptionName();
        this.conf = conf;
        this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
        this.subscribeFuture = subscribeFuture;
        this.listener = conf.getMessageListener();
        this.consumerEventListener = conf.getConsumerEventListener();
        // Always use growable queue since items can exceed the advertised size
        this.incomingMessages = new GrowableArrayBlockingQueue<>();
        this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();

        this.listenerExecutor = listenerExecutor;
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
        this.schema = schema;
        this.interceptors = interceptors;
        if (conf.getBatchReceivePolicy() != null) {
            BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
            if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
                this.batchReceivePolicy = BatchReceivePolicy.builder()
                        .maxNumMessages(this.maxReceiverQueueSize)
                        .maxNumBytes(userBatchReceivePolicy.getMaxNumBytes())
                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
                        .build();
                log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, " +
                        "reset to maxReceiverQueueSize. batchReceivePolicy: {}",
                        userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize,
                        this.batchReceivePolicy.toString());
            } else if (userBatchReceivePolicy.getMaxNumMessages() <= 0 && userBatchReceivePolicy.getMaxNumBytes() <= 0) {
                this.batchReceivePolicy = BatchReceivePolicy.builder()
                        .maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages())
                        .maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes())
                        .timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
                        .build();
                log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. " +
                        "Reset to DEFAULT_POLICY. batchReceivePolicy: {}", userBatchReceivePolicy.getMaxNumMessages(),
                        userBatchReceivePolicy.getMaxNumBytes(), this.batchReceivePolicy.toString());
            } else {
                this.batchReceivePolicy = conf.getBatchReceivePolicy();
            }
        } else {
            this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
        }

        if (batchReceivePolicy.getTimeoutMs() > 0) {
            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public Message<T> receive() throws PulsarClientException {
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }
        verifyConsumerState();
        return internalReceive();
    }

    @Override
    public CompletableFuture<Message<T>> receiveAsync() {
        if (listener != null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set"));
        }
        try {
            verifyConsumerState();
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
        return internalReceiveAsync();
    }

    protected abstract Message<T> internalReceive() throws PulsarClientException;

    protected abstract CompletableFuture<Message<T>> internalReceiveAsync();

    @Override
    public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
        if (conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Can't use receive with timeout, if the queue size is 0");
        }
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

        verifyConsumerState();
        return internalReceive(timeout, unit);
    }

    protected abstract Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;

    @Override
    public Messages<T> batchReceive() throws PulsarClientException {
        verifyBatchReceive();
        verifyConsumerState();
        return internalBatchReceive();
    }

    @Override
    public CompletableFuture<Messages<T>> batchReceiveAsync() {
        try {
            verifyBatchReceive();
            verifyConsumerState();
            return internalBatchReceiveAsync();
        } catch (PulsarClientException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    protected CompletableFuture<Message<T>> peekPendingReceive() {
        CompletableFuture<Message<T>> receivedFuture = null;
        while (receivedFuture == null) {
            receivedFuture = pendingReceives.peek();
            if (receivedFuture == null) {
                break;
            }
            // skip done futures (cancelling a future could mark it done)
            if (receivedFuture.isDone()) {
                CompletableFuture<Message<T>> removed = pendingReceives.poll();
                if (removed != receivedFuture) {
                    log.error("Bug! Removed future wasn't the expected one. expected={} removed={}", receivedFuture, removed);
                }
                receivedFuture = null;
            }
        }
        return receivedFuture;
    }

    protected CompletableFuture<Message<T>> pollPendingReceive() {
        CompletableFuture<Message<T>> receivedFuture;
        while (true) {
            receivedFuture = pendingReceives.poll();
            // skip done futures (cancelling a future could mark it done)
            if (receivedFuture == null || !receivedFuture.isDone()) {
                break;
            }
        }
        return receivedFuture;
    }

    protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
        listenerExecutor.execute(() -> {
            if (!receivedFuture.complete(message)) {
                log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}",
                        receivedFuture.isCancelled(), message);
            }
        });
    }

    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
        while (!pendingReceives.isEmpty()) {
            CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
            if (receiveFuture == null) {
                break;
            }
            if (!receiveFuture.isDone()) {
                receiveFuture.completeExceptionally(
                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                "was already closed when cleaning and closing the consumers", topic, subscription)));
            }
        }
    }

    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
        while (!pendingBatchReceives.isEmpty()) {
            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
            if (opBatchReceive == null || opBatchReceive.future == null) {
                break;
            }
            if (!opBatchReceive.future.isDone()) {
                opBatchReceive.future.completeExceptionally(
                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                "was already closed when cleaning and closing the consumers", topic, subscription)));
            }
        }
    }

    abstract protected Messages<T> internalBatchReceive() throws PulsarClientException;

    abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();

    @Override
    public void acknowledge(Message<?> message) throws PulsarClientException {
        try {
            acknowledge(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledge(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeAsync(messageId).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public void acknowledge(Messages<?> messages) throws PulsarClientException {
        try {
            acknowledgeAsync(messages).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
        if (conf.isRetryEnable() == false) {
            throw new PulsarClientException("reconsumeLater method not support!");
        }
        try {
            reconsumeLaterAsync(message, delayTime, unit).get();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException) t;
            } else {
                throw new PulsarClientException(t);
            }
        }
    }

    @Override
    public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
        try {
            reconsumeLaterAsync(messages, delayTime, unit).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
        try {
            acknowledgeCumulative(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeCumulativeAsync(messageId).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
        try {
            reconsumeLaterCumulativeAsync(message, delayTime, unit).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
        try {
            return acknowledgeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
        try {
            messages.forEach(this::acknowledgeAsync);
            return CompletableFuture.completedFuture(null);
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
        if (conf.isRetryEnable() == false) {
            return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
        }
        try {
            return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
        try {
            messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit));
            return CompletableFuture.completedFuture(null);
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
        try {
            return acknowledgeCumulativeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
        if (conf.isRetryEnable() == false) {
            return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
        }
        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use cumulative acks on a non-exclusive subscription"));
        }
        return doReconsumeLater(message, AckType.Cumulative, Collections.emptyMap(), delayTime, unit);
    }

    @Override
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
        return acknowledgeAsync(messageId, null);
    }

    // TODO: expose this method to consumer interface when the transaction feature is completed
    // @Override
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
                                                    Transaction txn) {
        TransactionImpl txnImpl = null;
        if (null != txn) {
            checkArgument(txn instanceof TransactionImpl);
            txnImpl = (TransactionImpl) txn;
        }
        return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
        return acknowledgeCumulativeAsync(messageId, null);
    }

    // TODO: expose this method to consumer interface when the transaction feature is completed
    // @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn) {
        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use cumulative acks on a non-exclusive/non-failover subscription"));
        }

        TransactionImpl txnImpl = null;
        if (null != txn) {
            checkArgument(txn instanceof TransactionImpl);
            txnImpl = (TransactionImpl) txn;
        }
        return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl);
    }

    @Override
    public void negativeAcknowledge(Message<?> message) {
        negativeAcknowledge(message.getMessageId());
    }

    protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckType ackType,
                                                           Map<String,Long> properties,
                                                           TransactionImpl txn) {
        CompletableFuture<Void> ackFuture = doAcknowledge(messageId, ackType, properties, txn);
        if (txn != null) {
            // it is okay that we register acked topic after sending the acknowledgements. because
            // the transactional ack will not be visiable for consumers until the transaction is
            // committed
            txn.registerAckedTopic(getTopic());
            // register the ackFuture as part of the transaction
            return txn.registerAckOp(ackFuture);
        } else {
            return ackFuture;
        }
    }

    protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
                                                             Map<String,Long> properties,
                                                             TransactionImpl txn);

    protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
                                                                Map<String,Long> properties,
                                                                long delayTime,
                                                                TimeUnit unit);

    @Override
    public void negativeAcknowledge(Messages<?> messages) {
        messages.forEach(this::negativeAcknowledge);
    }


    @Override
    public void unsubscribe() throws PulsarClientException {
        try {
            unsubscribeAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public abstract CompletableFuture<Void> unsubscribeAsync();

    @Override
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public abstract CompletableFuture<Void> closeAsync();


    @Override
    public MessageId getLastMessageId() throws PulsarClientException {
        try {
            return getLastMessageIdAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public abstract CompletableFuture<MessageId> getLastMessageIdAsync();

    private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
        return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type;
    }

    protected SubType getSubType() {
        SubscriptionType type = conf.getSubscriptionType();
        switch (type) {
        case Exclusive:
            return SubType.Exclusive;

        case Shared:
            return SubType.Shared;

        case Failover:
            return SubType.Failover;

        case Key_Shared:
            return SubType.Key_Shared;
        }

        // Should not happen since we cover all cases above
        return null;
    }

    public abstract int getAvailablePermits();

    public abstract int numMessagesInQueue();

    public CompletableFuture<Consumer<T>> subscribeFuture() {
        return subscribeFuture;
    }

    @Override
    public String getTopic() {
        return topic;
    }

    @Override
    public String getSubscription() {
        return subscription;
    }

    @Override
    public String getConsumerName() {
        return this.consumerName;
    }

    /**
     * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
     * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
     * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
     * breaks, the messages are redelivered after reconnect.
     */
    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);

    @Override
    public String toString() {
        return "ConsumerBase{" +
                "subscription='" + subscription + '\'' +
                ", consumerName='" + consumerName + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }

    protected void setMaxReceiverQueueSize(int newSize) {
        this.maxReceiverQueueSize = newSize;
    }

    protected Message<T> beforeConsume(Message<T> message) {
        if (interceptors != null) {
            return interceptors.beforeConsume(this, message);
        } else {
            return message;
        }
    }

    protected void onAcknowledge(MessageId messageId, Throwable exception) {
        if (interceptors != null) {
            interceptors.onAcknowledge(this, messageId, exception);
        }
    }

    protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
        if (interceptors != null) {
            interceptors.onAcknowledgeCumulative(this, messageId, exception);
        }
    }

    protected void onNegativeAcksSend(Set<MessageId> messageIds) {
        if (interceptors != null) {
            interceptors.onNegativeAcksSend(this, messageIds);
        }
    }

    protected void onAckTimeoutSend(Set<MessageId> messageIds) {
        if (interceptors != null) {
            interceptors. onAckTimeoutSend(this, messageIds);
        }
    }

    protected boolean canEnqueueMessage(Message<T> message) {
        // Default behavior, can be overridden in subclasses
        return true;
    }

    protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
        if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
            increaseIncomingMessageSize(message);
        }
        return hasEnoughMessagesForBatchReceive();
    }

    protected boolean hasEnoughMessagesForBatchReceive() {
        if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
            return false;
        }
        return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
                || (batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes());
    }

    private void verifyConsumerState() throws PulsarClientException {
        switch (getState()) {
            case Ready:
            case Connecting:
                break; // Ok
            case Closing:
            case Closed:
                throw  new PulsarClientException.AlreadyClosedException("Consumer already closed");
            case Terminated:
                throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
            case Failed:
            case Uninitialized:
                throw new PulsarClientException.NotConnectedException();
            default:
                break;
        }
    }

    private void verifyBatchReceive() throws PulsarClientException {
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                "Cannot use receive() when a listener has been set");
        }
        if (conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException(
                "Can't use batch receive, if the queue size is 0");
        }
    }

    protected static final class OpBatchReceive<T> {

        final CompletableFuture<Messages<T>> future;
        final long createdAt;

        private OpBatchReceive(CompletableFuture<Messages<T>> future) {
            this.future = future;
            this.createdAt = System.nanoTime();
        }

        static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) {
            return new OpBatchReceive<>(future);
        }
    }

    protected void notifyPendingBatchReceivedCallBack() {
        OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
        if (opBatchReceive == null) {
            return;
        }
        try {
            reentrantLock.lock();
            notifyPendingBatchReceivedCallBack(opBatchReceive);
        } finally {
            reentrantLock.unlock();
        }
    }

    private OpBatchReceive<T> peekNextBatchReceive() {
        OpBatchReceive<T> opBatchReceive = null;
        while (opBatchReceive == null) {
            opBatchReceive = pendingBatchReceives.peek();
            // no entry available
            if (opBatchReceive == null) {
                return null;
            }
            // remove entries where future is null or has been completed (cancel / timeout)
            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
                OpBatchReceive<T> removed = pendingBatchReceives.poll();
                if (removed != opBatchReceive) {
                    log.error("Bug: Removed entry wasn't the expected one. expected={}, removed={}", opBatchReceive, removed);
                }
                opBatchReceive = null;
            }
        }
        return opBatchReceive;
    }


    private OpBatchReceive<T> pollNextBatchReceive() {
        OpBatchReceive<T> opBatchReceive = null;
        while (opBatchReceive == null) {
            opBatchReceive = pendingBatchReceives.poll();
            // no entry available
            if (opBatchReceive == null) {
                return null;
            }
            // skip entries where future is null or has been completed (cancel / timeout)
            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
                opBatchReceive = null;
            }
        }
        return opBatchReceive;
    }

    protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
        MessagesImpl<T> messages = getNewMessagesImpl();
        Message<T> msgPeeked = incomingMessages.peek();
        while (msgPeeked != null && messages.canAdd(msgPeeked)) {
            Message<T> msg = incomingMessages.poll();
            if (msg != null) {
                messageProcessed(msg);
                Message<T> interceptMsg = beforeConsume(msg);
                messages.add(interceptMsg);
            }
            msgPeeked = incomingMessages.peek();
        }
        completePendingBatchReceive(opBatchReceive.future, messages);
    }

    protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
        if (!future.complete(messages)) {
            log.warn("Race condition detected. batch receive future was already completed (cancelled={}) and messages were dropped. messages={}",
                    future.isCancelled(), messages);
        }
    }

    protected abstract void messageProcessed(Message<?> msg);


    private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
        if (timeout.isCancelled()) {
            return;
        }

        long timeToWaitMs;

        synchronized (this) {
            // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
            if (getState() == State.Closing || getState() == State.Closed) {
                return;
            }
            if (pendingBatchReceives == null) {
                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
            }
            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
            timeToWaitMs = batchReceivePolicy.getTimeoutMs();

            while (firstOpBatchReceive != null) {
                // If there is at least one batch receive, calculate the diff between the batch receive timeout
                // and the elapsed time since the operation was created.
                long diff = batchReceivePolicy.getTimeoutMs()
                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
                if (diff <= 0) {
                    // The diff is less than or equal to zero, meaning that the batch receive has been timed out.
                    // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
                    OpBatchReceive<T> op = pollNextBatchReceive();
                    if (op != null) {
                        completeOpBatchReceive(op);
                    }
                    firstOpBatchReceive = peekNextBatchReceive();
                } else {
                    // The diff is greater than zero, set the timeout to the diff value
                    timeToWaitMs = diff;
                    break;
                }
            }
            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, timeToWaitMs, TimeUnit.MILLISECONDS);
        }
    }

    protected MessagesImpl<T> getNewMessagesImpl() {
        return new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
                batchReceivePolicy.getMaxNumBytes());
    }

    protected boolean hasPendingBatchReceive() {
        return pendingBatchReceives != null && peekNextBatchReceive() != null;
    }

    protected void increaseIncomingMessageSize(final Message<?> message) {
        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
                this, message.getData() == null ? 0 : message.getData().length);
    }

    protected void resetIncomingMessageSize() {
        INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
    }

    protected void decreaseIncomingMessageSize(final Message<?> message) {
        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
                (message.getData() != null) ? -message.getData().length : 0);
    }

    public long getIncomingMessageSize() {
        return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
    }

    protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

    private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}
