/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.*;

@Slf4j
@Test(groups = "broker")
public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest {

    @BeforeMethod
    @Override
    public void setup() throws Exception {
        super.internalSetup();

        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
        admin.tenants().createTenant("max-unacked-messages", tenantInfo);
    }

    @AfterMethod(alwaysRun = true)
    @Override
    public void cleanup() throws Exception {
        super.internalCleanup();
        resetConfig();
    }

    @Test(timeOut = 30000)
    public void testNamespacePolicy() throws Exception {
        pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(3);
        admin.namespaces().createNamespace("max-unacked-messages/policy-on-consumers");
        final String namespace = "max-unacked-messages/policy-on-consumers";
        final String topic = "persistent://" + namespace + "/testNamespacePolicy";

        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer().receiverQueueSize(1)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName("sub").topic(topic).subscribe();
        //set namespace-level policy
        admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 1);
        PersistentTopic persistentTopic =
                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        Awaitility.await().untilAsserted(() ->
                assertEquals(persistentTopic.getSubscription("sub")
                        .getConsumers().get(0).getMaxUnackedMessages(), 1));
        //consumer-throttling should take effect
        for (int i = 0; i < 20; i++) {
            producer.send("msg".getBytes());
        }
        Message<byte[]> message = consumer.receive(500, TimeUnit.MILLISECONDS);
        assertNotNull(message);
        Message<byte[]> nullMsg = consumer.receive(500, TimeUnit.MILLISECONDS);
        assertNull(nullMsg);

        //disable limit check
        admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 0);
        Awaitility.await().untilAsserted(() ->
                assertEquals(persistentTopic.getSubscription("sub")
                        .getConsumers().get(0).getMaxUnackedMessages(), 0));
        consumer.acknowledge(message);
        message = consumer.receive(500, TimeUnit.MILLISECONDS);
        assertNotNull(message);
    }

    @Test
    public void testMaxUnackedMessagesOnConsumers() throws Exception {
        admin.namespaces().createNamespace("max-unacked-messages/default-on-consumers");
        String namespace = "max-unacked-messages/default-on-consumers";
        assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
        admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 2*50000);
        assertEquals(2*50000, admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue());
    }

    @Test
    public void testMaxUnackedMessagesOnSubscription() throws Exception {
        admin.namespaces().createNamespace("max-unacked-messages/default-on-subscription");
        String namespace = "max-unacked-messages/default-on-subscription";
        assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace));
        admin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 2*200000);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertEquals(2*200000, admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace).intValue()));

        admin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(namespace)));
    }

    @Test
    public void testMaxUnackedMessagesPerConsumerPriority() throws Exception {
        int brokerLevelPolicy = 3;
        int namespaceLevelPolicy = 2;
        int topicLevelPolicy = 1;
        cleanup();
        conf.setSystemTopicEnabled(true);
        conf.setTopicLevelPoliciesEnabled(true);
        conf.setMaxUnackedMessagesPerConsumer(brokerLevelPolicy);
        setup();
        final String namespace = "max-unacked-messages/priority-on-consumers";
        final String topic = "persistent://" + namespace + "/testMaxUnackedMessagesPerConsumerPriority";
        admin.namespaces().createNamespace(namespace);
        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
        for (int i = 0; i < 50; i++) {
            producer.send("msg".getBytes());
        }
        Awaitility.await().until(()
                -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace));
        assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic));
        admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, namespaceLevelPolicy);
        admin.topics().setMaxUnackedMessagesOnConsumer(topic, topicLevelPolicy);

        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)));
        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
        assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), namespaceLevelPolicy);
        assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), topicLevelPolicy);
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionName("sub").topic(topic).receiverQueueSize(1).subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        org.apache.pulsar.broker.service.Consumer serverConsumer =
                persistentTopic.getSubscription("sub").getConsumers().get(0);
        assertEquals(serverConsumer.getMaxUnackedMessages(), topicLevelPolicy);
        List<Message> msgs = consumeMsg(consumer, 3);
        assertEquals(msgs.size(), 1);
        //disable topic-level limiter
        admin.topics().setMaxUnackedMessagesOnConsumer(topic, 0);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 0));
        ackMsgs(consumer, msgs);
        //remove topic-level policy, namespace-level should take effect
        admin.topics().removeMaxUnackedMessagesOnConsumer(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
        assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), namespaceLevelPolicy);
        Awaitility.await().untilAsserted(() ->
                assertEquals(serverConsumer.getMaxUnackedMessages(), namespaceLevelPolicy));
        msgs = consumeMsg(consumer, 5);
        assertEquals(msgs.size(), namespaceLevelPolicy);
        ackMsgs(consumer, msgs);
        //disable namespace-level limiter
        admin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 0);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace).intValue(), 0));
        msgs = consumeMsg(consumer, 5);
        assertEquals(msgs.size(), 5);
        ackMsgs(consumer, msgs);
        //remove namespace-level policy, broker-level should take effect
        admin.namespaces().removeMaxUnackedMessagesPerConsumer(namespace);
        Awaitility.await().untilAsserted(()
                -> assertNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)));
        msgs = consumeMsg(consumer, 5);
        assertEquals(msgs.size(), brokerLevelPolicy);
        ackMsgs(consumer, msgs);

    }

    private List<Message> consumeMsg(Consumer<?> consumer, int msgNum) throws Exception {
        List<Message> list = new ArrayList<>();
        for (int i = 0; i <msgNum; i++) {
            Message message = consumer.receive(500, TimeUnit.MILLISECONDS);
            if (message == null) {
                break;
            }
            list.add(message);
        }
        return list;
    }

    private void ackMsgs(Consumer<?> consumer, List<Message> msgs) throws Exception {
        for (Message msg : msgs) {
            consumer.acknowledge(msg);
        }
    }
}
