/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
/*
 * ClearRvvLockingDUnitTest.java
 *
 * Created on September 6, 2005, 2:57 PM
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.test.dunit.Assert.fail;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.logging.log4j.Logger;
import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;

/**
 * Test class to verify proper RVV locking interaction between entry operations such as PUT/REMOVE
 * and the CLEAR region operation
 *
 * GEODE-599: After an operation completed, it would unlock the RVV. This was occurring before the
 * operation was distributed to other members which created a window in which another operation
 * could be performed prior to that operation being distributed.
 *
 * The fix for GEODE-599 was to not release the lock until after distributing the operation to the
 * other members.
 *
 */

@SuppressWarnings("serial")

public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {

  @Rule
  public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
  /*
   * The tests perform a single operation and a concurrent clear.
   *
   * opsVM determines where the single operation will be performed, null will perform the op on the
   * test VM (vm0) clearVM determines where the clear operation will be performed, null will perform
   * the clear on the test VM (vm0)
   *
   * Specifying NULL/NULL for opsVM and clearVM has the effect of performing both in the same thread
   * whereas specifying vm0/vm0 for example will run them both on the same vm, but different
   * threads. NULL/NULL is not tested here since the same thread performing a clear prior to
   * returning from a put is not possible using the public API.
   *
   * Each test is performed twice once with operation and clear on the same vm, once on different
   * vms.
   *
   */
  VM vm0, vm1, opsVM, clearVM;

  static Cache cache;
  static LocalRegion region;
  DistributedMember vm0ID, vm1ID;

  static AbstractRegionMap.ARMLockTestHook theHook;

  static final String THE_KEY = "theKey";
  static final String THE_VALUE = "theValue";

  private static final Logger logger = LogService.getLogger();

  // test methods

  @Test
  public void testPutOperationSameVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm0); // first arg is where to perform operation, second arg where to
                                 // perform clear
      opsVM.invoke(() -> setBasicHook(opsVM));
      runConsistencyTest(vm0, performPutOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testPutOperationDifferentVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm1); // first arg is where to perform operation, second arg where to
                                 // perform clear
      opsVM.invoke(() -> setBasicHook(clearVM));
      runConsistencyTest(vm0, performPutOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testPutOperationNoAck() {
    try {
      setupNoAckMembers();
      setOpAndClearVM(vm0, vm0);
      vm0.invoke(() -> setLocalNoAckHook(vm1));
      vm1.invoke(() -> setRemoteNoAckHook(vm0));
      vm0.invoke(() -> primeStep1(1));
      vm1.invoke(() -> primeStep2(1));
      runConsistencyTest(vm0, performNoAckPutOperation);
      checkForConsistencyErrors();
    } finally {
      vm0.invoke(() -> resetHook());
      vm1.invoke(() -> resetHook());
    }
  }

  @Test
  public void testRemoveOperationSameVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm0);
      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
      runConsistencyTest(vm0, performRemoveOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testRemoveOperationDifferentVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm1);
      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
      runConsistencyTest(vm0, performRemoveOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testInvalidateOperationSameVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm0);
      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
      runConsistencyTest(vm0, performInvalidateOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testInvalidateOperationDifferentVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm1);
      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
      runConsistencyTest(vm0, performInvalidateOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testPutAllOperationSameVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm0);
      opsVM.invoke(() -> setBulkHook(vm0));
      runConsistencyTest(vm0, performPutAllOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testPutAllOperationDifferentVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm1);
      opsVM.invoke(() -> setBulkHook(vm0));
      runConsistencyTest(vm0, performPutAllOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testRemoveAllOperationSameVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm0);
      opsVM.invoke(() -> setBulkHook(vm0));
      runConsistencyTest(vm0, performRemoveAllOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }

  @Test
  public void testRemoveAllOperationDifferentVM() {
    try {
      setupMembers();
      setOpAndClearVM(vm0, vm1);
      opsVM.invoke(() -> setBulkHook(vm0));
      runConsistencyTest(vm0, performRemoveAllOperation);
      checkForConsistencyErrors();
    } finally {
      opsVM.invoke(() -> resetHook());
    }
  }


  private void invokePut(VM whichVM) {
    if (whichVM == null) {
      doPut();
    } else {
      whichVM.invoke(() -> doPut());
    }
  }

  private void invokeRemove(VM whichVM) {
    if (whichVM == null) {
      doRemove();
    } else {
      whichVM.invoke(() -> doRemove());
    }
  }

  private void invokeInvalidate(VM whichVM) {
    if (whichVM == null) {
      doInvalidate();
    } else {
      whichVM.invoke(() -> doInvalidate());
    }
  }

  private void invokePutAll(VM whichVM) {
    if (whichVM == null) {
      doPutAll();
    } else {
      whichVM.invoke(() -> doPutAll());
    }
  }

  private void invokeRemoveAll(VM whichVM) {
    if (whichVM == null) {
      doRemoveAll();
    } else {
      whichVM.invoke(() -> doRemoveAll());
    }
  }

  private static void invokeClear(VM whichVM) {
    if (whichVM == null) {
      doClear();
    } else {
      whichVM.invoke(() -> doClear());
    }
  }

  // remote test methods

  private static boolean doesRegionEntryExist(String key) {
    return region.getRegionEntry(key) != null;
  }

  private static void doPut() {
    region.put(THE_KEY, THE_VALUE);
  }

  private static void doRemove() {
    region.remove(THE_KEY);
  }

  private static void doInvalidate() {
    region.invalidate(THE_KEY);
  }

  private static void doPutAll() {
    Map<Object, Object> map = generateKeyValues();
    region.putAll(map, "putAllCallback");
  }

  private static void doRemoveAll() {
    Map<Object, Object> map = generateKeyValues();
    region.removeAll(map.keySet(), "removeAllCallback");
  }

  private static void doClear() {
    region.clear();
  }

  private static void primeStep1(int cnt) {
    primeStep1Latch(cnt);
  }

  private static void primeStep2(int cnt) {
    primeStep2Latch(cnt);
  }

  private static void releaseStep1() {
    decrementStep1Latch();
  }

  SerializableRunnable performPutOperation = new SerializableRunnable("perform PUT") {
    @Override
    public void run() {
      try {
        invokePut(opsVM);
      } catch (Exception e) {
        fail("while performing PUT", e);
      }
    }
  };

  SerializableRunnable performNoAckPutOperation = new SerializableRunnable("perform NoAckPUT") {
    @Override
    public void run() throws InterruptedException {
      Runnable putThread1 = new Runnable() {
        @Override
        public void run() {
          DistributedSystem.setThreadsSocketPolicy(false);
          doPut();
          DistributedSystem.releaseThreadsSockets();
        }
      };

      Runnable putThread2 = new Runnable() {
        @Override
        public void run() {
          DistributedSystem.setThreadsSocketPolicy(false);
          awaitStep1Latch();
          doClear();
          DistributedSystem.releaseThreadsSockets();
        }
      };

      Thread t1 = new Thread(putThread1);
      Thread t2 = new Thread(putThread2);
      t2.start();
      t1.start();
      t1.join();
      t2.join();
    }
  };

  SerializableRunnable performRemoveOperation = new SerializableRunnable("perform REMOVE") {
    @Override
    public void run() {
      try {
        invokePut(opsVM);
        invokeRemove(opsVM);
      } catch (Exception e) {
        fail("while performing REMOVE", e);
      }
    }
  };

  SerializableRunnable performInvalidateOperation = new SerializableRunnable("perform INVALIDATE") {
    @Override
    public void run() {
      try {
        invokePut(opsVM);
        invokeInvalidate(opsVM);
      } catch (Exception e) {
        fail("while performing INVALIDATE", e);
      }
    }
  };

  SerializableRunnable performPutAllOperation = new SerializableRunnable("perform PUTALL") {
    @Override
    public void run() {
      try {
        invokePutAll(opsVM);
      } catch (Exception e) {
        fail("while performing PUTALL", e);
      }
    }
  };

  SerializableRunnable performRemoveAllOperation = new SerializableRunnable("perform REMOVEALL") {
    @Override
    public void run() {
      try {
        invokePutAll(opsVM);
        invokeRemoveAll(opsVM);
      } catch (Exception e) {
        fail("while performing REMOVEALL", e);
      }
    }
  };

  // helper methods

  private void setOpAndClearVM(VM opsTarget, VM clearTarget) {
    opsVM = opsTarget;
    clearVM = clearTarget;
  }

  private void setupMembers() {
    Host host = Host.getHost(0);
    vm0 = host.getVM(0);
    vm1 = host.getVM(1);
    vm0ID = createCache(vm0);
    vm1ID = createCache(vm1);
    String testName = getName();
    vm0.invoke(() -> createRegion(testName));
    vm1.invoke(() -> createRegion(testName));
  }

  private void setupNoAckMembers() {
    Host host = Host.getHost(0);
    vm0 = host.getVM(0);
    vm1 = host.getVM(1);
    vm0ID = createNoConserveSocketsCache(vm0);
    vm1ID = createNoConserveSocketsCache(vm1);
    String testName = getName();
    vm0.invoke(() -> createNOACKRegion(testName));
    vm1.invoke(() -> createNOACKRegion(testName));
  }

  private void runConsistencyTest(VM vm, SerializableRunnableIF theTest) {
    vm.invoke(theTest);
  }

  private void checkForConsistencyErrors() {
    Map<Object, Object> r0Contents = (Map<Object, Object>) vm0.invoke(() -> getRegionContents());
    Map<Object, Object> r1Contents = (Map<Object, Object>) vm1.invoke(() -> getRegionContents());

    String key = THE_KEY;
    softly.assertThat(r1Contents.get(key)).as("region contents are not consistent for key %s", key)
        .isEqualTo(r0Contents.get(key));
    softly.assertThat(checkRegionEntry(vm1, key))
        .as("region entries are not consistent for key %s", key)
        .isEqualTo(checkRegionEntry(vm0, key));

    for (int subi = 1; subi < 3; subi++) {
      String subkey = key + "-" + subi;
      if (r0Contents.containsKey(subkey)) {
        softly.assertThat(r1Contents.get(subkey))
            .as("region contents are not consistent for key %s", subkey)
            .isEqualTo(r0Contents.get(subkey));
      } else {
        softly.assertThat(r1Contents).as("expected containsKey for %s to return false", subkey)
            .doesNotContainKey(subkey);
      }
    }
  }

  public void resetHook() {
    ((AbstractRegionMap) region.entries).setARMLockTestHook(null);
  }

  public void setBasicHook(VM whichVM) {
    theOtherVM = whichVM;
    theHook = new ArmBasicClearHook();
    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
  }

  public void setRemoveAndInvalidateHook(VM whichVM) {
    theOtherVM = whichVM;
    theHook = new ArmRemoveAndInvalidateClearHook();
    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
  }

  public void setRemoteNoAckHook(VM whichVM) {
    theOtherVM = whichVM;
    theHook = new ArmNoAckRemoteHook();
    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
  }

  public void setLocalNoAckHook(VM whichVM) {
    theOtherVM = whichVM;
    theHook = new ArmNoAckLocalHook();
    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
  }

  public void setBulkHook(VM whichVM) {
    theOtherVM = whichVM;
    theHook = new ArmBulkClearHook();
    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
  }

  private InternalDistributedMember createCache(VM vm) {
    return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() {
      @Override
      public Object call() {
        cache = getCache(new CacheFactory().set("conserve-sockets", "true"));
        return getSystem().getDistributedMember();
      }
    });
  }

  private InternalDistributedMember createNoConserveSocketsCache(VM vm) {
    return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() {
      @Override
      public Object call() {
        cache = getCache(new CacheFactory().set("conserve-sockets", "false"));
        return getSystem().getDistributedMember();
      }
    });
  }

  private static void createRegion(String rgnName) {
    RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
    rf.setConcurrencyChecksEnabled(true);
    rf.setScope(Scope.DISTRIBUTED_ACK);
    region = (LocalRegion) rf.create(rgnName);
  }

  private static void createNOACKRegion(String rgnName) {
    RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
    rf.setConcurrencyChecksEnabled(true);
    rf.setScope(Scope.DISTRIBUTED_NO_ACK);
    region = (LocalRegion) rf.create(rgnName);
  }

  private static Map<Object, Object> generateKeyValues() {
    String key = THE_KEY;
    String value = THE_VALUE;
    Map<Object, Object> map = new HashMap<>();
    map.put(key, value);
    map.put(key + "-1", value + "-1");
    map.put(key + "-2", value + "-2");
    return map;
  }

  @SuppressWarnings("rawtypes")
  private static Map<Object, Object> getRegionContents() {
    Map<Object, Object> result = new HashMap<>();
    for (Iterator i = region.entrySet().iterator(); i.hasNext();) {
      Region.Entry e = (Region.Entry) i.next();
      result.put(e.getKey(), e.getValue());
    }
    return result;
  }

  private boolean checkRegionEntry(VM vm, String key) {
    boolean target = vm.invoke(() -> doesRegionEntryExist(key));
    return target;
  }

  static VM theOtherVM;
  static transient CountDownLatch step1Latch, step2Latch;

  public static void primeStep1Latch(int waitCount) {
    step1Latch = new CountDownLatch(waitCount);
  }

  public static void awaitStep1Latch() {
    try {
      step1Latch.await();
    } catch (InterruptedException e) {
    }
  }

  public static void decrementStep1Latch() {
    step1Latch.countDown();
  }

  public static void decrementRemoteStep1Latch() {
    theOtherVM.invoke(() -> decrementStep1Latch());
  }

  public static void primeStep2Latch(int waitCount) {
    step2Latch = new CountDownLatch(waitCount);
  }

  public static void awaitStep2Latch() {
    try {
      step2Latch.await();
    } catch (InterruptedException e) {
    }
  }

  public static void decrementStep2Latch() {
    step2Latch.countDown();
  }

  public static void decrementRemoteStep2Latch() {
    theOtherVM.invoke(() -> decrementStep2Latch());
  }

  /*
   * Test callback class used to hook the rvv locking mechanism with basic operations.
   */
  public static class ArmBasicClearHook extends ARMLockTestHookAdapter {
    @Override
    public void afterRelease(InternalRegion owner, CacheEvent event) {
      if ((event.getOperation().isCreate()) && owner.getName().startsWith("test")) {
        invokeClear(theOtherVM);
      }
    }
  }

  /*
   * Test callback class used to hook the rvv locking mechanism with basic operations.
   */
  public static class ArmRemoveAndInvalidateClearHook extends ARMLockTestHookAdapter {

    @Override
    public void afterRelease(InternalRegion owner, CacheEvent event) {
      if ((event.getOperation().isDestroy() || event.getOperation().isInvalidate())
          && owner.getName().startsWith("test")) {
        invokeClear(theOtherVM);
      }
    }
  }

  /*
   * Test callback class used to hook the rvv locking mechanism for NOACK testing.
   */
  public static class ArmNoAckRemoteHook extends ARMLockTestHookAdapter {
    @Override
    public void beforeLock(InternalRegion owner, CacheEvent event) {
      if (event.isOriginRemote() && event.getOperation().isCreate()
          && owner.getName().startsWith("test")) {
        theOtherVM.invoke(() -> releaseStep1()); // start clear
        awaitStep2Latch(); // wait for clear to complete
      }
    }
  }

  public static class ArmNoAckLocalHook extends ARMLockTestHookAdapter {
    @Override
    public void beforeStateFlushWait() {
      decrementRemoteStep2Latch();
    }
  }

  /*
   * Test callback class used to hook the rvv locking mechanism with bulk operations.
   */
  public static class ArmBulkClearHook extends ARMLockTestHookAdapter {
    @Override
    public void afterBulkRelease(InternalRegion region) {
      invokeClear(theOtherVM);
    }
  }
}
