package org.apache.helix.metaclient.impl.zk;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;


public class TestZkMetaClient extends ZkMetaClientTestBase{

  private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath";
  private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
  private static final int DEFAULT_LISTENER_WAIT_TIMEOUT = 5000;

  private final Object _syncObject = new Object();

  @Test
  public void testCreate() {
    final String key = "/TestZkMetaClient_testCreate";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.create(key, ENTRY_STRING_VALUE);
      Assert.assertNotNull(zkMetaClient.exists(key));

      try {
        zkMetaClient.create("a/b/c", "invalid_path");
        Assert.fail("Should have failed with incorrect path.");
      } catch (Exception ignored) {
      }
    }
  }

  @Test
  public void testCreateContainer() {
    final String key = "/TestZkMetaClient_testCreateContainer";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.create(key, ENTRY_STRING_VALUE, CONTAINER);
      Assert.assertNotNull(zkMetaClient.exists(key));
    }
  }

  @Test
  public void testCreateTTL() {
    final String key = "/TestZkMetaClient_testTTL";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.createWithTTL(key, ENTRY_STRING_VALUE, 1000);
      Assert.assertNotNull(zkMetaClient.exists(key));
    }
  }

  @Test
  public void testRenewTTL() {
    final String key = "/TestZkMetaClient_testRenewTTL_1";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.createWithTTL(key, ENTRY_STRING_VALUE, 10000);
      Assert.assertNotNull(zkMetaClient.exists(key));
      MetaClientInterface.Stat stat = zkMetaClient.exists(key);
      zkMetaClient.renewTTLNode(key);
      // Renewing a ttl node changes the nodes modified_time. Should be different
      // from the time the node was created.
      Assert.assertNotSame(stat.getCreationTime(), stat.getModifiedTime());
      try {
        zkMetaClient.renewTTLNode(TEST_INVALID_PATH);
      } catch (MetaClientNoNodeException ignored) {
      }
    }
  }

  @Test
  public void testGet() {
    final String key = "/TestZkMetaClient_testGet";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      String value;
      zkMetaClient.create(key, ENTRY_STRING_VALUE);
      String dataValue = zkMetaClient.get(key);
      Assert.assertEquals(dataValue, ENTRY_STRING_VALUE);

      value = zkMetaClient.get(key + "/a/b/c");
      Assert.assertNull(value);

      zkMetaClient.delete(key);

      value = zkMetaClient.get(key);
      Assert.assertNull(value);
    }
  }

  @Test
  public void testSet() {
    final String key = "/TestZkMetaClient_testSet";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.create(key, ENTRY_STRING_VALUE);
      String testValueV1 = ENTRY_STRING_VALUE + "-v1";
      String testValueV2 = ENTRY_STRING_VALUE + "-v2";

      // test set() with no expected version and validate result.
      zkMetaClient.set(key, testValueV1, -1);
      Assert.assertEquals(zkMetaClient.get(key), testValueV1);
      MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
      Assert.assertEquals(entryStat.getVersion(), 1);
      Assert.assertEquals(entryStat.getEntryType().name(), PERSISTENT.name());

      // test set() with expected version and validate result and new version number
      zkMetaClient.set(key, testValueV2, 1);
      entryStat = zkMetaClient.exists(key);
      Assert.assertEquals(zkMetaClient.get(key), testValueV2);
      Assert.assertEquals(entryStat.getVersion(), 2);

      // test set() with a wrong version
      try {
        zkMetaClient.set(key, "test-node-changed", 10);
        Assert.fail("No reach.");
      } catch (MetaClientException ex) {
        Assert.assertEquals(ex.getClass().getName(),
            "org.apache.helix.metaclient.exception.MetaClientBadVersionException");
      }
      zkMetaClient.delete(key);
    }
  }

  @Test
  public void testUpdate() {
    final String key = "/TestZkMetaClient_testUpdate";
    ZkMetaClientConfig config =
        new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
    try (ZkMetaClient<Integer> zkMetaClient = new ZkMetaClient<>(config)) {
      zkMetaClient.connect();
      int initValue = 3;
      zkMetaClient.create(key, initValue);
      MetaClientInterface.Stat entryStat = zkMetaClient.exists(key);
      Assert.assertEquals(entryStat.getVersion(), 0);

      // test update() and validate entry value and version
      Integer newData = zkMetaClient.update(key, new DataUpdater<Integer>() {
        @Override
        public Integer update(Integer currentData) {
          return currentData + 1;
        }
      });
      Assert.assertEquals((int) newData, (int) initValue + 1);

      entryStat = zkMetaClient.exists(key);
      Assert.assertEquals(entryStat.getVersion(), 1);

      newData = zkMetaClient.update(key, new DataUpdater<Integer>() {

        @Override
        public Integer update(Integer currentData) {
          return currentData + 1;
        }
      });

      entryStat = zkMetaClient.exists(key);
      Assert.assertEquals(entryStat.getVersion(), 2);
      Assert.assertEquals((int) newData, (int) initValue + 2);
      zkMetaClient.delete(key);
    }
  }

  @Test
  public void testGetAndCountChildrenAndRecursiveDelete() {
    final String key = "/TestZkMetaClient_testGetAndCountChildren";
    List<String> childrenNames = Arrays.asList("/c1", "/c2", "/c3");

    // create child nodes and validate retrieved children count and names
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      zkMetaClient.create(key, ENTRY_STRING_VALUE);
      Assert.assertEquals(zkMetaClient.countDirectChildren(key), 0);
      for (String str : childrenNames) {
        zkMetaClient.create(key + str, ENTRY_STRING_VALUE);
      }

      List<String> retrievedChildrenNames = zkMetaClient.getDirectChildrenKeys(key);
      Assert.assertEquals(retrievedChildrenNames.size(), childrenNames.size());
      Set<String> childrenNameSet = new HashSet<>(childrenNames);
      for (String str : retrievedChildrenNames) {
        Assert.assertTrue(childrenNameSet.contains("/" + str));
      }

      // recursive delete and validate
      Assert.assertEquals(zkMetaClient.countDirectChildren(key), childrenNames.size());
      Assert.assertNotNull(zkMetaClient.exists(key));
      zkMetaClient.recursiveDelete(key);
      Assert.assertNull(zkMetaClient.exists(key));
    }
  }

  @Test
  public void testDataChangeListenerTriggerWithZkWatcher() throws Exception {
    final String path = "/TestZkMetaClient_testTriggerWithZkWatcher";
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      MockDataChangeListener listener = new MockDataChangeListener();
      zkMetaClient.subscribeDataChange(path, listener, false);
      zkMetaClient.create(path, "test-node");
      int expectedCallCount = 0;
      synchronized (_syncObject) {
        while (listener.getTriggeredCount() == expectedCallCount) {
          _syncObject.wait(DEFAULT_TIMEOUT_MS);
        }
        expectedCallCount++;
        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_CREATED);
      }
      zkMetaClient.set(path, "test-node-changed", -1);
      synchronized (_syncObject) {
        while (listener.getTriggeredCount() == expectedCallCount) {
          _syncObject.wait(DEFAULT_TIMEOUT_MS);
        }
        expectedCallCount++;
        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_UPDATE);
      }
      zkMetaClient.delete(path);
      synchronized (_syncObject) {
        while (listener.getTriggeredCount() == expectedCallCount) {
          _syncObject.wait(DEFAULT_TIMEOUT_MS);
        }
        expectedCallCount++;
        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
        Assert.assertEquals(listener.getLastEventType(), DataChangeListener.ChangeType.ENTRY_DELETED);
      }
      // unregister listener, expect no more call
      zkMetaClient.unsubscribeDataChange(path, listener);
      zkMetaClient.create(path, "test-node");
      synchronized (_syncObject) {
        _syncObject.wait(DEFAULT_TIMEOUT_MS);
        Assert.assertEquals(listener.getTriggeredCount(), expectedCallCount);
      }
      // register a new non-persistent listener
      try {
        zkMetaClient.subscribeOneTimeDataChange(path, new MockDataChangeListener(), false);
        Assert.fail("One-time listener is not supported, NotImplementedException should be thrown.");
      } catch (NotImplementedException e) {
        // expected
      }
    }
  }

  @Test(dependsOnMethods = "testDataChangeListenerTriggerWithZkWatcher")
  public void testMultipleDataChangeListeners() throws Exception {
    final String basePath = "/TestZkMetaClient_testMultipleDataChangeListeners";
    final int count = 5;
    final String testData = "test-data";
    final AtomicBoolean dataExpected = new AtomicBoolean(true);
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      Map<String, Set<DataChangeListener>> listeners = new HashMap<>();
      CountDownLatch countDownLatch = new CountDownLatch(count);
      zkMetaClient.create(basePath + "_1", testData);
      // create paths
      for (int i = 0; i < 2; i++) {
        String path = basePath + "_" + i;
        listeners.put(path, new HashSet<>());
        // 5 listeners for each path
        for (int j = 0; j < count; j++) {
          DataChangeListener listener = new DataChangeListener() {
            @Override
            public void handleDataChange(String key, Object data, ChangeType changeType) {
              countDownLatch.countDown();
              dataExpected.set(dataExpected.get() && testData.equals(data));
            }
          };
          listeners.get(path).add(listener);
          zkMetaClient.subscribeDataChange(path, listener, false);
        }
      }
      zkMetaClient.set(basePath + "_1", testData, -1);
      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
      Assert.assertTrue(dataExpected.get());
    }
  }

  @Test
  public void testDirectChildChangeListener() throws Exception {
    final String basePath = "/TestZkMetaClient_testDirectChildChangeListener";
    final int count = 1000;
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      CountDownLatch countDownLatch = new CountDownLatch(count);
      DirectChildChangeListener listener = new DirectChildChangeListener() {
        @Override
        public void handleDirectChildChange(String key) throws Exception {
          countDownLatch.countDown();
        }
      };
      zkMetaClient.create(basePath, "");
      Assert.assertTrue(
          zkMetaClient.subscribeDirectChildChange(basePath, listener, false)
              .isRegistered());
      for(int i=0; i<1000; ++i){
        zkMetaClient.create(basePath + "/child_" +i, "test-data");
      }
      // Verify no one time watcher is registered. Only one persist listener is registered.
      Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
      Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));

      zkMetaClient.unsubscribeDirectChildChange(basePath, listener);
      // verify that no listener is registered on any path
      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
    }
  }

  @Test
  public void testDataChangeListener() throws Exception {
    final String basePath = "/TestZkMetaClient_testDataChangeListener";
    final int count = 200;
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      CountDownLatch countDownLatch = new CountDownLatch(count);
      DataChangeListener listener = new DataChangeListener() {

        @Override
        public void handleDataChange(String key, Object data, ChangeType changeType)
            throws Exception {
          if(changeType == ENTRY_UPDATE) {
            countDownLatch.countDown();
          }
        }
      };
      zkMetaClient.create(basePath, "");
      Assert.assertTrue(
          zkMetaClient.subscribeDataChange(basePath, listener, false)
      );
      // Verify no one time watcher is registered. Only one persist listener is registered.
      Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentWatches").size(), 1);
      Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);

      for (int i=0; i<200; ++i) {
        zkMetaClient.set(basePath, "data7" + i, -1);
      }
      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));


      zkMetaClient.unsubscribeDataChange(basePath, listener);
      // verify that no listener is registered on any path
      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);

    }
  }

  @Test
  public void testChildChangeListener() throws Exception {
    final String basePath = "/TestZkMetaClient_testChildChangeListener";
    final int count = 100;
    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      CountDownLatch countDownLatch = new CountDownLatch(count*4);
      ChildChangeListener listener = new ChildChangeListener() {

        @Override
        public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
          countDownLatch.countDown();

        }
      };
      zkMetaClient.create(basePath, "");
      Assert.assertTrue(
          zkMetaClient.subscribeChildChanges(basePath, listener, false)
      );

      DataChangeListener dummyDataListener = new DataChangeListener() {
        @Override
        public void handleDataChange(String key, Object data, ChangeType changeType)
            throws Exception {
        }
      };
      try {
        zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
        Assert.fail("subscribeDataChange should throw exception");
      } catch (UnsupportedOperationException ex) {
        // we are expecting a UnsupportedOperationException, continue with test.
      }

      DirectChildChangeListener dummyCldListener = new DirectChildChangeListener() {
        @Override
        public void handleDirectChildChange(String key) throws Exception {

        }
      };
      try {
        zkMetaClient.subscribeDirectChildChange(basePath, dummyCldListener, false);
      } catch ( Exception ex) {
        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
      }

      // Verify no one time watcher is registered. Only one persist listener is registered.
      Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 1);
      Assert.assertEquals(watchers.get("persistentRecursiveWatches").get(0), basePath);
      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);

      for (int i=0; i<count; ++i) {
        zkMetaClient.set(basePath, "data7" + i, -1);
        zkMetaClient.create(basePath+"/c1_" +i , "datat");
        zkMetaClient.create(basePath+"/c1_" +i + "/c2", "datat");
        zkMetaClient.delete(basePath+"/c1_" +i + "/c2");
      }
      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));

      zkMetaClient.unsubscribeChildChanges(basePath, listener);
      // verify that no listener is registered on any path
      watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
      Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 0);
      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
      Assert.assertEquals(watchers.get("childWatches").size(), 0);
      Assert.assertEquals(watchers.get("dataWatches").size(), 0);

    }
  }

  /**
   * Transactional op calls zk.multi() with a set of ops (operations)
   * and the return values are converted into metaclient opResults.
   * This test checks whether each op was run by verifying its opResult and
   * the created/deleted/set path in zk.
   */
  @Test
  public void testTransactionOps() {
    String test_name = "/test_transaction_ops";

    try(ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();

      //Create Nodes
      List<Op> ops = Arrays.asList(
          Op.create(TRANSACTION_TEST_PARENT_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
          Op.create(TRANSACTION_TEST_PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
          Op.delete(TRANSACTION_TEST_PARENT_PATH + test_name, -1),
          Op.create(TRANSACTION_TEST_PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
          Op.set(TRANSACTION_TEST_PARENT_PATH + test_name, new byte[0], -1));

      //Execute transactional support on operations
      List<OpResult> opResults = zkMetaClient.transactionOP(ops);

      //Verify opResults types
      Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
      Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
      Assert.assertTrue(opResults.get(2) instanceof OpResult.DeleteResult);
      Assert.assertTrue(opResults.get(4) instanceof OpResult.SetDataResult);

      //Verify paths have been created
      MetaClientInterface.Stat entryStat = zkMetaClient.exists(TRANSACTION_TEST_PARENT_PATH + test_name);
      Assert.assertNotNull(entryStat, "Path should have been created.");

      //Cleanup
      zkMetaClient.recursiveDelete(TRANSACTION_TEST_PARENT_PATH);
      if (zkMetaClient.exists(TRANSACTION_TEST_PARENT_PATH) != null) {
        Assert.fail("Parent Path should have been removed.");
      }
    }
  }

  /**
   * This test calls transactionOp on an invalid path.
   * It checks that the invalid path has not been created to verify the
   * "all or nothing" behavior of transactionOp.
   * @throws KeeperException
   */
  @Test(dependsOnMethods = "testTransactionOps")
  public void testTransactionFail() {
    String test_name = "/test_transaction_fail";
    try(ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
      zkMetaClient.connect();
      //Create Nodes
      List<Op> ops = Arrays.asList(
          Op.create(TRANSACTION_TEST_PARENT_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
          Op.create(TRANSACTION_TEST_PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
          Op.create(TEST_INVALID_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT));

      try {
        zkMetaClient.transactionOP(ops);
        Assert.fail(
            "Should have thrown an exception. Cannot run transactional create OP on incorrect path.");
      } catch (Exception e) {
        MetaClientInterface.Stat entryStat = zkMetaClient.exists(TRANSACTION_TEST_PARENT_PATH);
        Assert.assertNull(entryStat);
      }
    }
  }

  private class MockDataChangeListener implements DataChangeListener {
    private final AtomicInteger _triggeredCount = new AtomicInteger(0);
    private volatile ChangeType _lastEventType;

    @Override
    public void handleDataChange(String key, Object data, ChangeType changeType) {
      _triggeredCount.getAndIncrement();
      _lastEventType = changeType;
      synchronized (_syncObject) {
        _syncObject.notifyAll();
      }
    }

    int getTriggeredCount() {
      return _triggeredCount.get();
    }

    ChangeType getLastEventType() {
      return _lastEventType;
    }
  }
}
