/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
    @BeforeClass
    @Override
    protected void setup() throws Exception {
        super.internalSetup();
        this.prepareData();

        ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() {
            @Override
            public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
                                                  long currentMessagesUsed, long lastReportedMessages,
                                                  long lastReportTimeMSecsSinceEpoch) {
                return false;
            }

            @Override
            public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
                return 0;
            }
        };

        ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(pulsar);
        this.rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, transportMgr, dummyQuotaCalc);
    }

    @AfterClass(alwaysRun = true)
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testProduceConsumeUsageOnRG() throws Exception {
        testProduceConsumeUsageOnRG(PRODUCE_CONSUME_PERSISTENT_TOPIC);
        testProduceConsumeUsageOnRG(PRODUCE_CONSUME_NON_PERSISTENT_TOPIC);
    }

    private void testProduceConsumeUsageOnRG(String topicString) throws Exception {
        ResourceUsagePublisher ruP = new ResourceUsagePublisher() {
            @Override
            public String getID() { return activeRG.getID(); }
            @Override
            public void fillResourceUsage(ResourceUsage resourceUsage) {
                activeRG.rgFillResourceUsage(resourceUsage);
                numRgFillUsageCallbacks++;
            }
        };

        ResourceUsageConsumer ruC = new ResourceUsageConsumer() {
            @Override
            public String getID() { return activeRG.getID(); }
            @Override
            public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
                activeRG.rgResourceUsageListener(broker, resourceUsage);
                numRgUsageListenerCallbacks++;
            }
        };

        rgConfig.setPublishRateInBytes(1500);
        rgConfig.setPublishRateInMsgs(100);
        rgConfig.setDispatchRateInBytes(4000);
        rgConfig.setPublishRateInMsgs(500);
        rgs.resourceGroupCreate(activeRgName, rgConfig, ruP, ruC);

        activeRG = rgs.resourceGroupGet(activeRgName);
        Assert.assertNotEquals(activeRG, null);


        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topicString)
                .create();

        Consumer<byte[]> consumer = null;
        try {
            consumer = pulsarClient.newConsumer()
                    .topic(topicString)
                    .subscriptionName("my-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();
        } catch (PulsarClientException p) {
            final String errMsg = String.format("Got exception while building consumer: ex=%s", p.getMessage());
            Assert.fail(errMsg);
        }

        final TopicName myTopic = TopicName.get(topicString);
        final String tenantString = myTopic.getTenant();
        final String nsString = myTopic.getNamespace();
        rgs.registerTenant(activeRgName, tenantString);
        rgs.registerNameSpace(activeRgName, NamespaceName.get(nsString));

        final int NumMessagesToSend = 10;
        int sentNumBytes = 0;
        int sentNumMsgs = 0;
        int recvdNumBytes = 0;
        int recvdNumMsgs = 0;
        for (int ix = 0; ix < NumMessagesToSend; ix++) {
            byte[] mesg;
            try {
                mesg = String.format("Hi, ix=%s", ix).getBytes();
                producer.send(mesg);
                sentNumBytes += mesg.length;
                sentNumMsgs++;
            } catch (PulsarClientException p) {
                final String errMsg = String.format("Got exception while sending %s-th time: ex=%s", ix, p.getMessage());
                Assert.fail(errMsg);
            }
        }
        producer.close();

        this.verifyStats(topicString, activeRgName, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs,
                true, false);

        Message<byte[]> message = null;
        while (recvdNumMsgs < sentNumMsgs) {
            try {
                message = consumer.receive();
                recvdNumBytes += message.getValue().length;
            } catch (PulsarClientException p) {
                final String errMesg = String.format("Got exception in while receiving %s-th mesg at consumer: ex=%s",
                        recvdNumMsgs, p.getMessage());
                Assert.fail(errMesg);
            }
            recvdNumMsgs++;
        }

        this.verifyStats(topicString, activeRgName, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs,
                true, true);

        consumer.close();

        rgs.unRegisterTenant(activeRgName, tenantString);
        rgs.unRegisterNameSpace(activeRgName, NamespaceName.get(nsString));
        rgs.resourceGroupDelete(activeRgName);
    }

    // Verify the app stats with what we see from the broker-service, and the resource-group (which in turn internally
    // derives stats from the broker service)
    // There appears to be a 45-byte message header which is accounted in the stats, additionally to what the
    // application-level sends/receives. Hence, the byte counts are a ">=" check, instead of an equality check.
    private void verifyStats(String topicString, String rgName,
                             int sentNumBytes, int sentNumMsgs,
                             int recvdNumBytes, int recvdNumMsgs,
                             boolean checkProduce, boolean checkConsume)
                                                                throws InterruptedException, PulsarAdminException {
        BrokerService bs = pulsar.getBrokerService();
        Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
        for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
            String mapTopicName = entry.getKey();
            if (mapTopicName.equals(topicString)) {
                TopicStatsImpl stats = entry.getValue();
                if (checkProduce) {
                    Assert.assertTrue(stats.bytesInCounter >= sentNumBytes);
                    Assert.assertEquals(sentNumMsgs, stats.msgInCounter);
                }
                if (checkConsume) {
                    Assert.assertTrue(stats.bytesOutCounter >= recvdNumBytes);
                    Assert.assertEquals(recvdNumMsgs, stats.msgOutCounter);
                }

                if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
                    rgs.aggregateResourceGroupLocalUsages();  // hack to ensure aggregator calculation without waiting
                    BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
                            ResourceGroupUsageStatsType.Cumulative);
                    BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
                            ResourceGroupUsageStatsType.Cumulative);

                    // Re-do the getRGUsage.
                    // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC.
                    BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish,
                            ResourceGroupUsageStatsType.Cumulative);
                    BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch,
                            ResourceGroupUsageStatsType.Cumulative);

                    Assert.assertEquals(prodCounts1.bytes, prodCounts.bytes);
                    Assert.assertEquals(prodCounts1.messages, prodCounts.messages);
                    Assert.assertEquals(consCounts1.bytes, consCounts.bytes);
                    Assert.assertEquals(consCounts1.messages, consCounts.messages);

                    if (checkProduce) {
                        Assert.assertTrue(prodCounts.bytes >= sentNumBytes);
                        Assert.assertEquals(sentNumMsgs, prodCounts.messages);
                    }
                    if (checkConsume) {
                        Assert.assertTrue(consCounts.bytes >= recvdNumBytes);
                        Assert.assertEquals(recvdNumMsgs, consCounts.messages);
                    }
                }
            }
        }
    }

    ResourceGroupService rgs;
    ResourceGroup activeRG;
    final org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
            new org.apache.pulsar.common.policies.data.ResourceGroup();
    final String activeRgName = "runProduceConsume";
    int numRgUsageListenerCallbacks = 0;
    int numRgFillUsageCallbacks = 0;

    final String TenantName = "pulsar-test";
    final String NsName = "test";
    final String TenantAndNsName = TenantName + "/" + NsName;
    final String TestProduceConsumeTopicName = "/test/prod-cons-topic";
    final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://" + TenantAndNsName + TestProduceConsumeTopicName;
    final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC =
                                                "non-persistent://" + TenantAndNsName + TestProduceConsumeTopicName;
    private static final int PUBLISH_INTERVAL_SECS = 300;

    // Initial set up for transport manager and producer/consumer clusters/tenants/namespaces/topics.
    private void prepareData() throws PulsarAdminException {
        this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);

        this.conf.setAllowAutoTopicCreation(true);

        final String clusterName = "test";
        admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
            admin.tenants().createTenant(TenantName,
                    new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName)));
        admin.namespaces().createNamespace(TenantAndNsName);
        admin.namespaces().setNamespaceReplicationClusters(TenantAndNsName, Sets.newHashSet(clusterName));
    }
}
