/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.zookeeper;

import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.google.common.util.concurrent.AtomicDouble;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class ZooKeeperClientAspectJTest {

    private ZookeeperServerTest localZkS;
    private ZooKeeper localZkc;
    private final long ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 2000;
    private final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;

    static {
        // load agent with aspectjweaver-Agent for testing
        // maven-test waves advice on build-goal so, maven doesn't need explicit loading
        // uncomment it while testing on eclipse:
        // AgentLoader.loadAgentClass(Agent.class.getName(), null);
    }

    @Test
    public void testZkConnected() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        try {
            ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
            CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
                    SessionType.ReadWrite,
                    (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
            localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            assertTrue(localZkc.getState().isConnected());
            assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
        }finally{
            if (localZkc != null) {
                localZkc.close();
            }

            executor.shutdown();
        }
    }

    @Test
    public void testInitZk() throws Exception {
        try {
            ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
            CompletableFuture<ZooKeeper> zkFuture = zkfactory.create("127.0.0.1:" + localZkS.getZookeeperPort(),
                    SessionType.ReadWrite,
                    (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
            localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            assertTrue(localZkc.getState().isConnected());
            assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);

            String connection = "127.0.0.1:" + localZkS.getZookeeperPort() + "/prefix";
            ZooKeeper chrootZkc = PulsarClusterMetadataSetup.initZk(connection, (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
            assertTrue(chrootZkc.getState().isConnected());
            assertNotEquals(chrootZkc.getState(), States.CONNECTEDREADONLY);
            chrootZkc.close();

            assertNotNull(localZkc.exists("/prefix", false));
        } finally {
            if (localZkc != null) {
                localZkc.close();
            }
        }
    }

    @BeforeMethod
    void setup() throws Exception {
        localZkS = new ZookeeperServerTest(0);
        localZkS.start();
    }

    @AfterMethod(alwaysRun = true)
    void teardown() throws Exception {
        localZkS.close();
    }

    /**
     * Verifies that aspect-advice calculates the latency of of zk-operation
     *
     * @throws Exception
     */
    @Test(enabled = false, timeOut = 7000)
    void testZkClientAspectJTrigger() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
        CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
                SessionType.ReadWrite,
                (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
        localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        try {
            assertTrue(localZkc.getState().isConnected());
            assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);

            final AtomicInteger writeCount = new AtomicInteger(0);
            final AtomicInteger readCount = new AtomicInteger(0);
            EventListner listener = new EventListner() {
                @Override
                public void recordLatency(EventType eventType, long latencyMiliSecond) {
                    if (eventType.equals(EventType.write)) {
                        writeCount.incrementAndGet();
                    } else if (eventType.equals(EventType.read)) {
                        readCount.incrementAndGet();
                    }
                }
            };
            ClientCnxnAspect.addListener(listener);
            CountDownLatch createLatch = new CountDownLatch(1);
            CountDownLatch deleteLatch = new CountDownLatch(1);
            CountDownLatch readLatch = new CountDownLatch(1);
            CountDownLatch existLatch = new CountDownLatch(1);
            localZkc.create("/createTest", "data".getBytes(), Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
                createLatch.countDown();
            }, "create");
            localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> {
                deleteLatch.countDown();
            }, "delete");
            localZkc.exists("/createTest", null, (int rc, String path, Object ctx, Stat stat) -> {
                existLatch.countDown();
            }, null);
            localZkc.getData("/createTest", null, (int rc, String path, Object ctx, byte data[], Stat stat) -> {
                readLatch.countDown();
            }, null);
            createLatch.await();
            deleteLatch.await();
            existLatch.await();
            readLatch.await();
            Thread.sleep(500);
            Assert.assertEquals(writeCount.get(), 2);
            Assert.assertEquals(readCount.get(), 2);
            ClientCnxnAspect.removeListener(listener);
        } finally {
            if (localZkc != null) {
                localZkc.close();
            }

            executor.shutdown();
        }
    }

    /**
     * Verifies that aspect-advice calculates the latency of of zk-operation and updates PulsarStats
     *
     * @throws Exception
     */
    @Test(enabled = false, timeOut = 7000)
    public void testZkOpStatsMetrics() throws Exception {
        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
        ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl(executor);
        CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(),
                SessionType.ReadWrite,
                (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
        localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);

        MockPulsar mockPulsar = new MockPulsar(localZkc);
        mockPulsar.setup();
        try {
            PulsarClient pulsarClient = mockPulsar.getClient();
            PulsarService pulsar = mockPulsar.getPulsar();

            pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
            Metrics zkOpMetric = getMetric(pulsar, "zk_write_latency");
            Assert.assertNotNull(zkOpMetric);
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_rate_s"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_95percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_99_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_9_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_99_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_mean_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_write_time_median_ms"));

            zkOpMetric = getMetric(pulsar, "zk_read_latency");
            Assert.assertNotNull(zkOpMetric);
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_rate_s"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_95percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_99_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_9_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_99_percentile_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_mean_ms"));
            Assert.assertTrue(zkOpMetric.getMetrics().containsKey("brk_zk_read_time_median_ms"));

            CountDownLatch createLatch = new CountDownLatch(1);
            CountDownLatch deleteLatch = new CountDownLatch(1);
            CountDownLatch readLatch = new CountDownLatch(1);
            CountDownLatch existLatch = new CountDownLatch(1);
            localZkc.create("/createTest", "data".getBytes(), Acl, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
                createLatch.countDown();
            }, "create");
            localZkc.delete("/deleteTest", -1, (rc, path, ctx) -> {
                deleteLatch.countDown();
            }, "delete");
            localZkc.exists("/createTest", null, (int rc, String path, Object ctx, Stat stat) -> {
                existLatch.countDown();
            }, null);
            localZkc.getData("/createTest", null, (int rc, String path, Object ctx, byte data[], Stat stat) -> {
                readLatch.countDown();
            }, null);
            createLatch.await();
            deleteLatch.await();
            existLatch.await();
            readLatch.await();
            Thread.sleep(10);

            BrokerService brokerService = pulsar.getBrokerService();
            brokerService.updateRates();
            List<Metrics> metrics = brokerService.getTopicMetrics();
            AtomicDouble writeRate = new AtomicDouble();
            AtomicDouble readRate = new AtomicDouble();
            metrics.forEach(m -> {
                if ("zk_write_latency".equalsIgnoreCase(m.getDimension("metric"))) {
                    writeRate.set((double) m.getMetrics().get("brk_zk_write_latency_rate_s"));
                } else if ("zk_read_latency".equalsIgnoreCase(m.getDimension("metric"))) {
                    readRate.set((double) m.getMetrics().get("brk_zk_read_latency_rate_s"));
                }
            });
            Assert.assertTrue(readRate.get() > 0);
            Assert.assertTrue(writeRate.get() > 0);
        } finally {
            mockPulsar.cleanup();
            if (localZkc != null) {
                localZkc.close();
            }

            executor.shutdown();
        }
    }

    private Metrics getMetric(PulsarService pulsar, String dimension) {
        BrokerService brokerService = pulsar.getBrokerService();
        brokerService.updateRates();
        for (Metrics metric : brokerService.getTopicMetrics()) {
            if (dimension.equalsIgnoreCase(metric.getDimension("metric"))) {
                return metric;
            }
        }
        return null;
    }

    static class ZookeeperServerTest implements Closeable {
        private final File zkTmpDir;
        private ZooKeeperServer zks;
        private NIOServerCnxnFactory serverFactory;
        private final int zkPort;
        private final String hostPort;

        public ZookeeperServerTest(int zkPort) throws IOException {
            this.zkPort = zkPort;
            this.hostPort = "127.0.0.1:" + zkPort;
            this.zkTmpDir = File.createTempFile("zookeeper", "test");
            log.info("**** Start GZK on {} ****", zkTmpDir);
            if (!zkTmpDir.delete() || !zkTmpDir.mkdir()) {
                throw new IOException("Couldn't create zk directory " + zkTmpDir);
            }
        }

        public void start() throws IOException {
            try {
                zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
                zks.setMaxSessionTimeout(20000);
                serverFactory = new NIOServerCnxnFactory();
                serverFactory.configure(new InetSocketAddress(zkPort), 1000);
                serverFactory.startup(zks);
            } catch (Exception e) {
                log.error("Exception while instantiating ZooKeeper", e);
            }

            LocalBookkeeperEnsemble.waitForServerUp(hostPort, 30000);
            log.info("ZooKeeper started at {}", hostPort);
        }

        public void stop() throws IOException {
            zks.shutdown();
            serverFactory.shutdown();
            log.info("Stoppend ZK server at {}", hostPort);
        }

        @Override
        public void close() throws IOException {
            zks.shutdown();
            serverFactory.shutdown();
            zkTmpDir.delete();
        }

        public int getZookeeperPort() {
            return serverFactory.getLocalPort();
        }

        private final Logger log = LoggerFactory.getLogger(ZookeeperServerTest.class);
    }

    class MockPulsar extends BrokerTestBase {

        private final ZooKeeper zk;

        public MockPulsar(ZooKeeper zk) {
            this.zk = zk;
        }

        @Override
        protected void setup() throws Exception {
            super.baseSetup();
            doReturn(new ZooKeeperClientFactory() {
                @Override
                public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
                        int zkSessionTimeoutMillis) {
                    return CompletableFuture.completedFuture(zk);
                }
            }).when(pulsar).getZooKeeperClientFactory();
        }

        @Override
        protected void cleanup() throws Exception {
            super.internalCleanup();
        }

        public PulsarService getPulsar() {
            return pulsar;
        }

        public PulsarClient getClient() {
            return pulsarClient;
        }

    }
}
