/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.execute;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import javax.naming.Context;
import javax.transaction.UserTransaction;

import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;


@RunWith(JUnitParamsRunner.class)
@SuppressWarnings("serial")
public class PRSetOperationJTADUnitTest extends JUnit4CacheTestCase {

  private static final Logger logger = LogService.getLogger();
  private static final String REGION_NAME = "region1";

  private Map<Long, String> testData;
  private Map<Long, String> modifiedData;

  private VM accessor = null;
  private VM dataStore1 = null;
  private VM dataStore2 = null;
  private VM dataStore3 = null;

  @Rule
  public DistributedRestoreSystemProperties restoreSystemProperties =
      new DistributedRestoreSystemProperties();

  public PRSetOperationJTADUnitTest() {
    super();
  }

  @Before
  public void setup() {
    testData = new HashMap<>();
    testData.put(1L, "value1");
    testData.put(2L, "value2");
    testData.put(3L, "duplicateValue");
    testData.put(4L, "duplicateValue");
    modifiedData = new HashMap<>();
    modifiedData.putAll(testData);
    modifiedData.put(5L, "newValue");
  }

  @Override
  public final void postSetUp() throws Exception {
    disconnectAllFromDS(); // isolate this test from others to avoid periodic CacheExistsExceptions
    Host host = Host.getHost(0);
    dataStore1 = host.getVM(0);
    dataStore2 = host.getVM(1);
    dataStore3 = host.getVM(2);
    accessor = host.getVM(3);
  }

  @Override
  public final void preTearDownCacheTestCase() throws Exception {
    Invoke.invokeInEveryVM(() -> verifyNoTxState());
  }

  @Test
  @Parameters({"true", "false"})
  public void testRegionKeysetWithJTA(boolean disableSetOpToStartJTA) throws Exception {
    setupAndLoadRegion(disableSetOpToStartJTA);
    verifyRegionKeysetWithJTA(disableSetOpToStartJTA);
  }

  @Test
  @Parameters({"true", "false"})
  public void testRegionValuesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
    setupAndLoadRegion(disableSetOpToStartJTA);
    verifyRegionValuesWithJTA(disableSetOpToStartJTA);
  }

  @Test
  @Parameters({"true", "false"})
  public void testRegionEntriesWithJTA(boolean disableSetOpToStartJTA) throws Exception {
    setupAndLoadRegion(disableSetOpToStartJTA);
    verifyRegionEntriesWithJTA(disableSetOpToStartJTA);
  }

  private void setupAndLoadRegion(boolean disableSetOpToStartJTA) {
    createRegion(disableSetOpToStartJTA);
    dataStore1.invoke(() -> loadRegion());
  }

  private void createRegion(boolean disableSetOpToStartJTA) {
    accessor.invoke(() -> createCache(disableSetOpToStartJTA));
    dataStore1.invoke(() -> createCache(disableSetOpToStartJTA));
    dataStore2.invoke(() -> createCache(disableSetOpToStartJTA));
    dataStore3.invoke(() -> createCache(disableSetOpToStartJTA));

    accessor.invoke(() -> createPR(true));
    dataStore1.invoke(() -> createPR(false));
    dataStore2.invoke(() -> createPR(false));
    dataStore3.invoke(() -> createPR(false));
  }

  private void loadRegion() {
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    testData.forEach((k, v) -> region.put(k, v));
  }

  private void verifyRegionKeysetWithJTA(boolean disableSetOpToStartJTA) {
    accessor.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, true));
    dataStore1.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
    dataStore2.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
    dataStore3.invoke(() -> verifyRegionKeysetWithJTA(disableSetOpToStartJTA, false));
  }

  private void verifyRegionValuesWithJTA(boolean disableSetOpToStartJTA) {
    accessor.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, true));
    dataStore1.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
    dataStore2.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
    dataStore3.invoke(() -> verifyRegionValuesWithJTA(disableSetOpToStartJTA, false));
  }

  private void verifyRegionEntriesWithJTA(boolean disableSetOpToStartJTA) {
    accessor.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, true));
    dataStore1.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
    dataStore2.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
    dataStore3.invoke(() -> verifyRegionEntriesWithJTA(disableSetOpToStartJTA, false));
  }

  private void verifyRegionKeysetWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
      throws Exception {
    Context ctx = basicGetCache().getJNDIContext();
    UserTransaction userTX = startUserTransaction(ctx);
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    try {
      userTX.begin();
      Collection<Long> set = region.keySet();
      set.forEach((key) -> assertTrue(testData.keySet().contains(key)));
      testData.keySet().forEach((key) -> assertTrue(set.contains(key)));
    } finally {
      validateTXManager(disableSetOpToStartJTA, isAccessor);
      if (!disableSetOpToStartJTA && !isAccessor) {
        userTX.rollback();
      }
    }
  }

  private void verifyRegionValuesWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
      throws Exception {
    Context ctx = basicGetCache().getJNDIContext();
    UserTransaction userTX = startUserTransaction(ctx);
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    try {
      userTX.begin();
      Collection<String> set = region.values();
      set.forEach((value) -> assertTrue(testData.values().contains(value)));
      testData.values().forEach((value) -> assertTrue(set.contains(value)));
    } finally {
      validateTXManager(disableSetOpToStartJTA, isAccessor);
      if (!disableSetOpToStartJTA && !isAccessor) {
        userTX.rollback();
      }
    }
  }

  private void verifyRegionEntriesWithJTA(boolean disableSetOpToStartJTA, boolean isAccessor)
      throws Exception {
    Context ctx = basicGetCache().getJNDIContext();
    UserTransaction userTX = startUserTransaction(ctx);
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    try {
      userTX.begin();
      Collection<Map.Entry<Long, String>> set = region.entrySet();
      set.forEach((entry) -> {
        assertTrue(testData.values().contains(entry.getValue()));
        assertTrue(testData.keySet().contains(entry.getKey()));
      });
      testData.entrySet().forEach((entry) -> assertTrue(set.contains(entry)));
    } finally {
      validateTXManager(disableSetOpToStartJTA, isAccessor);
      if (!disableSetOpToStartJTA && !isAccessor) {
        userTX.rollback();
      }
    }
  }

  private void validateTXManager(boolean disableSetOpToStartTx, boolean isAccessor) {
    if (disableSetOpToStartTx) {
      assertNull(TXManagerImpl.getCurrentTXState());
    } else {
      assertNotNull(TXManagerImpl.getCurrentTXState());
      if (!isAccessor) {
        assertTrue(((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).hasRealDeal());
      }
    }
  }

  private UserTransaction startUserTransaction(Context ctx) throws Exception {
    return (UserTransaction) ctx.lookup("java:/UserTransaction");
  }

  private void verifyNoTxState() {
    TXManagerImpl mgr = getCache().getTxManager();
    assertEquals(0, mgr.hostedTransactionsInProgressForTest());
  }

  final String restoreSetOperationTransactionBehavior = "restoreSetOperationTransactionBehavior";
  final String RESTORE_SET_OPERATION_PROPERTY =
      (System.currentTimeMillis() % 2 == 0 ? DistributionConfig.GEMFIRE_PREFIX : "geode.")
          + restoreSetOperationTransactionBehavior;

  private void createCache(boolean disableSetOpToStartJTA) {
    if (disableSetOpToStartJTA) {
      logger.info("setting system property {} to true ", RESTORE_SET_OPERATION_PROPERTY);
      System.setProperty(RESTORE_SET_OPERATION_PROPERTY, "true");
    }
    getCache();
  }

  private void createPR(boolean isAccessor) {
    basicGetCache().createRegionFactory(RegionShortcut.PARTITION)
        .setPartitionAttributes(new PartitionAttributesFactory<Long, String>().setTotalNumBuckets(3)
            .setLocalMaxMemory(isAccessor ? 0 : 1).create())
        .create(REGION_NAME);
  }

  @Test
  public void testRegionValuesWithPutWhenSetOperationStartsJTA() throws Exception {
    boolean disableSetOpToStartJTA = false;
    setupRegion(disableSetOpToStartJTA);

    accessor.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA());
    dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationStartsJTA());
  }

  private void setupRegion(boolean disableSetOpToStartJTA) {
    accessor.invoke(() -> createCache(disableSetOpToStartJTA));
    dataStore1.invoke(() -> createCache(disableSetOpToStartJTA));
    accessor.invoke(() -> createPR(true));
    dataStore1.invoke(() -> createPR(false));
    dataStore1.invoke(() -> loadRegion());
  }


  private void verifyRegionValuesWhenSetOperationStartsJTA() throws Exception {
    Context ctx = cache.getJNDIContext();
    UserTransaction userTX = startUserTransaction(ctx);
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    try {
      userTX.begin();
      Collection<String> set = region.values();
      set.forEach((value) -> assertTrue(testData.values().contains(value)));
      testData.values().forEach((value) -> assertTrue(set.contains(value)));
      assertEquals(testData.size(), set.size());
      region.put(5L, "newValue");
      set.forEach((value) -> assertTrue(modifiedData.values().contains(value)));
      modifiedData.values().forEach((value) -> assertTrue(set.contains(value)));
      assertEquals(modifiedData.size(), set.size());
    } finally {
      userTX.rollback();
    }
  }

  @Test
  public void testRegionValuesWithPutWhenSetOperationDoesNotStartJTA() throws Exception {
    boolean disableSetOpToStartJTA = true;
    setupRegion(disableSetOpToStartJTA);

    accessor.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA());
    dataStore1.invoke(() -> verifyRegionValuesWhenSetOperationDoesNotStartJTA());
  }


  private void verifyRegionValuesWhenSetOperationDoesNotStartJTA() throws Exception {
    Context ctx = cache.getJNDIContext();
    UserTransaction userTX = startUserTransaction(ctx);
    Region<Long, String> region = basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    try {
      userTX.begin();
      Collection<String> set = region.values();
      set.forEach((value) -> assertTrue(testData.values().contains(value)));
      testData.values().forEach((value) -> assertTrue(set.contains(value)));
      assertEquals(testData.size(), set.size());
      region.put(5L, "newValue");
      assertThatThrownBy(() -> set.contains("newValue")).isInstanceOf(IllegalStateException.class)
          .hasMessageContaining(
              "The Region collection is not transactional but is being used in a transaction");
    } finally {
      userTX.rollback();
    }
  }


  @Test
  public void testTxFunctionOnMemberWhenSetOperationDoesNotStartJTA() {
    doTestTxFunction(true);
  }

  @Test
  public void testTxFunctionOnMemberWhenSetOperationStartsJTA() {
    doTestTxFunction(false);
  }

  private void doTestTxFunction(boolean disableSetOpToStartJTA) {
    setupAndLoadRegion(disableSetOpToStartJTA);

    accessor.invoke(() -> registerFunction());
    dataStore1.invoke(() -> registerFunction());
    dataStore2.invoke(() -> registerFunction());
    dataStore3.invoke(() -> registerFunction());

    accessor.invoke(() -> doTxFunction(disableSetOpToStartJTA));
    dataStore1.invoke(() -> doTxFunction(disableSetOpToStartJTA));
    dataStore2.invoke(() -> doTxFunction(disableSetOpToStartJTA));
    dataStore3.invoke(() -> doTxFunction(disableSetOpToStartJTA));
  }

  private void registerFunction() {
    FunctionService.registerFunction(new TXFunctionSetOpDoesNoStartJTA());
    FunctionService.registerFunction(new TXFunctionSetOpStartsJTA());
  }

  class TXFunctionSetOpStartsJTA implements Function {
    static final String id = "TXFunctionSetOpStartsJTA";

    @Override
    public void execute(FunctionContext context) {
      Region r = null;
      try {
        verifyRegionValuesWhenSetOperationStartsJTA();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
      context.getResultSender().lastResult(Boolean.TRUE);
    }

    @Override
    public String getId() {
      return id;
    }

    @Override
    public boolean hasResult() {
      return true;
    }

    @Override
    public boolean optimizeForWrite() {
      return true;
    }

    @Override
    public boolean isHA() {
      return false;
    }
  }

  class TXFunctionSetOpDoesNoStartJTA implements Function {
    static final String id = "TXFunctionSetOpDoesNotStartJTA";

    @Override
    public void execute(FunctionContext context) {
      Region r = null;
      try {
        verifyRegionValuesWhenSetOperationDoesNotStartJTA();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
      context.getResultSender().lastResult(Boolean.TRUE);
    }

    @Override
    public String getId() {
      return id;
    }

    @Override
    public boolean hasResult() {
      return true;
    }

    @Override
    public boolean optimizeForWrite() {
      return true;
    }

    @Override
    public boolean isHA() {
      return false;
    }
  }

  private void doTxFunction(boolean disableSetOpToStartJTA) {
    PartitionedRegion region =
        (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + REGION_NAME);
    DistributedMember owner = region.getOwnerForKey(region.getKeyInfo(5L));
    if (disableSetOpToStartJTA) {
      FunctionService.onMember(owner).execute(TXFunctionSetOpDoesNoStartJTA.id).getResult();
    } else {
      FunctionService.onMember(owner).execute(TXFunctionSetOpStartsJTA.id).getResult();
    }
  }
}
