/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.query.dunit;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.Serializable;
import java.util.concurrent.Callable;

import org.apache.logging.log4j.Logger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.data.Position;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.OQLIndexTest;

/**
 * This tests the data inconsistency during update on an index and querying the same UNLOCKED index.
 */
@Category({OQLIndexTest.class})
public class QueryDataInconsistencyDUnitTest implements Serializable {

  private static final Logger logger = LogService.getLogger();

  private static final int cnt = 0;
  private static final int cntDest = 10;
  private static VM server = null;
  private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name
  private static String repRegionName = "TestRepRegion"; // default name
  private static volatile boolean hooked = false;

  @Rule
  public DistributedRule distributedRule = new DistributedRule(1);

  @Rule
  public CacheRule cacheRule = new CacheRule(1);

  @Before
  public void setUp() {
    server = getVM(0);
    server.invoke(() -> cacheRule.createCache());
  }

  @After
  public void tearDown() {
    Invoke.invokeInEveryVM(QueryObserverHolder::reset);
  }

  @Test
  public void testCompactRangeIndex() {
    // Create caches
    server.invoke("create indexes", () -> {
      Cache cache = cacheRule.getCache();
      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);

      // Create common Portflios and NewPortfolios
      for (int j = cnt; j < cntDest; j++) {
        region.put(new Integer(j), new Portfolio(j));
      }

      QueryService queryService = cache.getQueryService();
      try {
        Index index = queryService.createIndex("idIndex", "ID", "/" + repRegionName);
        assertEquals(10, index.getStatistics().getNumberOfKeys());
      } catch (Exception e) {
        logger.error(e);
        fail("Index creation failed");
      }
    });
    // Invoke update from client and stop in updateIndex
    // firesultSett before updating the RegionEntry and second after updating
    // the RegionEntry.
    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
      IndexManager.testHook = new IndexManagerTestHook();
      repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
      // above call must be hooked in BEFORE_UPDATE_OP call.
    });
    server.invoke("query on server", () -> {
      QueryService queryService = cacheRule.getCache().getQueryService();
      await().until(() -> hooked);
      Object resultSet = null;
      try {
        resultSet = queryService
            .newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.");
        IndexManager.testHook = null;
      }
      assertTrue(resultSet instanceof SelectResults);
      assertEquals(1, ((SelectResults) resultSet).size());
      Portfolio p1 = (Portfolio) ((SelectResults) resultSet).asList().get(0);
      if (p1.getID() != 1) {
        fail("Query thread did not verify index results even when RE is under update");
        IndexManager.testHook = null;
      }
      hooked = false;// Let client put go further.
    });

    // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
    server.invoke("query on server", () -> {
      QueryService queryService = cacheRule.getCache().getQueryService();
      await().until(() -> hooked);
      Object resultSet = null;
      try {
        resultSet = queryService
            .newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server." + e.getMessage());
      } finally {
        IndexManager.testHook = null;
      }
      assertTrue(resultSet instanceof SelectResults);
      if (((SelectResults) resultSet).size() > 0) {
        Portfolio p1 = (Portfolio) ((SelectResults) resultSet).iterator().next();
        if (p1.getID() != 1) {
          fail("Query thread did not verify index results even when RE is under update and "
              + "RegionEntry value has been modified before releasing the lock");
          IndexManager.testHook = null;
        }
      }
      hooked = false;// Let client put go further.
    });
    await().until(joinThread(putThread));
  }

  @Test
  public void testRangeIndex() {
    // Create caches
    server.invoke("create indexes", () -> {
      Cache cache = cacheRule.getCache();
      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
      IndexManager.testHook = null;
      // Create common Portfolios and NewPortfolios
      Position.cnt = 0;
      for (int j = cnt; j < cntDest; j++) {
        Portfolio p = new Portfolio(j);
        cache.getLogger().fine("Shobhit: portfolio " + j + " : " + p);
        region.put(j, p);
      }

      QueryService queryService = cache.getQueryService();
      try {
        Index index = queryService.createIndex("posIndex", "pos.secId",
            "/" + repRegionName + " p, p.positions.values pos");
        assertEquals(12, index.getStatistics().getNumberOfKeys());
      } catch (Exception e) {
        logger.error(e);
        fail("Index creation failed");
      }
    });
    // Invoke update from client and stop in updateIndex
    // firesultSett before updating the RegionEntry and second after updating
    // the RegionEntry.
    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
      Cache cache = cacheRule.getCache();
      Region repRegion = cache.getRegion(repRegionName);
      IndexManager.testHook = new IndexManagerTestHook();
      Portfolio newPort = new Portfolio(cntDest + 1);
      cache.getLogger().fine("Shobhit: New Portfolio" + newPort);
      repRegion.put(new Integer("1"), newPort);
      // above call must be hooked in BEFORE_UPDATE_OP call.
    });

    server.invoke("query on server", () -> {
      Cache cache = cacheRule.getCache();
      QueryService queryService = cache.getQueryService();
      Position pos1 = null;
      await().until(() -> hooked);
      try {
        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
        cache.getLogger().fine("Shobhit: " + resultSet);
        assertTrue(resultSet instanceof SelectResults);
        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
        if (!pos1.secId.equals("APPL")) {
          fail("Query thread did not verify index results even when RE is under update");
          IndexManager.testHook = null;
        }
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.", e);
        IndexManager.testHook = null;
      } finally {
        hooked = false;// Let client put go further.
      }
      await().until(() -> hooked);
      try {
        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
        cache.getLogger().fine("Shobhit: " + resultSet);
        assertTrue(resultSet instanceof SelectResults);
        if (((SelectResults) resultSet).size() > 0) {
          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
          if (pos2.equals(pos1)) {
            fail("Query thread did not verify index results even when RE is under update and "
                + "RegionEntry value has been modified before releasing the lock");
          }
        }
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.");
      } finally {
        hooked = false;// Let client put go further.
        IndexManager.testHook = null;
      }
    });
    await().until(joinThread(putThread));
  }

  @Test
  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() {
    // Create caches
    server.invoke("create indexes", () -> {
      Cache cache = cacheRule.getCache();
      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
      IndexManager.testHook = null;
      // Create common Portfolios and NewPortfolios
      Position.cnt = 0;
      for (int j = cnt; j < cntDest; j++) {
        region.put(j, new Portfolio(j));
      }

      QueryService queryService = cache.getQueryService();
      try {
        Index index = queryService.createIndex("posIndex", "pos.secId",
            "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values pos");
        assertEquals(12, index.getStatistics().getNumberOfKeys());
      } catch (Exception e) {
        logger.error(e);
        fail("Index creation failed");
      }
    });
    // Invoke update from client and stop in updateIndex
    // firesultSett before updating the RegionEntry and second after updating
    // the RegionEntry.
    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
      IndexManager.testHook = new IndexManagerTestHook();
      // This portfolio with same ID must have different positions.
      repRegion.put(new Integer("1"), new Portfolio(1));
      // above call must be hooked in BEFORE_UPDATE_OP call.
    });

    server.invoke("query on server", () -> {
      Cache cache = cacheRule.getCache();
      QueryService queryService = cache.getQueryService();
      Position pos1 = null;
      await().until(() -> hooked);
      try {
        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
        cache.getLogger().fine("Shobhit: " + resultSet);
        assertTrue(resultSet instanceof SelectResults);
        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
        if (!pos1.secId.equals("APPL")) {
          fail("Query thread did not verify index results even when RE is under update");
          IndexManager.testHook = null;
        }
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.");
        IndexManager.testHook = null;
      } finally {
        hooked = false;// Let client put go further.
      }
      await().until(() -> hooked);
      try {
        Object resultSet = queryService.newQuery("select pos from /" + repRegionName
            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
        assertTrue(resultSet instanceof SelectResults);
        if (((SelectResults) resultSet).size() > 0) {
          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
          if (pos2.equals(pos1)) {
            fail("Query thread did not verify index results even when RE is under update and "
                + "RegionEntry value has been modified before releasing the lock");
          }
        }
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.");
      } finally {
        hooked = false;// Let client put go further.
        IndexManager.testHook = null;
      }
    });
    await().until(joinThread(putThread));
  }

  @Test
  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
    // Create caches
    server.invoke("create indexes", () -> {
      Cache cache = cacheRule.getCache();
      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
      IndexManager.testHook = null;
      // Create common Portfolios and NewPortfolios
      Position.cnt = 0;
      for (int j = cnt; j < cntDest; j++) {
        region.put(new Integer(j), new Portfolio(j));
      }

      QueryService queryService = cache.getQueryService();
      try {
        Index index = queryService.createIndex("posIndex", "pos.secId",
            "/" + repRegionName + " p, p.positions.values pos");
        assertEquals(12, index.getStatistics().getNumberOfKeys());
      } catch (Exception e) {
        logger.error(e);
        fail("Index creation failed");
      }
    });
    // Invoke update from client and stop in updateIndex
    // firesultSett before updating the RegionEntry and second after updating
    // the RegionEntry.
    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
      Cache cache = cacheRule.getCache();
      Region repRegion = cache.getRegion(repRegionName);
      IndexManager.testHook = new IndexManagerTestHook();
      // This portfolio with same ID must have different positions.
      repRegion.put(new Integer("1"), new Portfolio(1));
      // above call must be hooked in BEFORE_UPDATE_OP call.
    });

    server.invoke("query on server", () -> {
      Cache cache = cacheRule.getCache();
      QueryService queryService = cache.getQueryService();
      Position pos1 = null;
      await().until(() -> hooked);
      try {
        Object resultSet = queryService
            .newQuery("<trace> select pos from /" + repRegionName
                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
            .execute();
        cache.getLogger().fine("Shobhit: " + resultSet);
        assertTrue(resultSet instanceof SelectResults);
        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
        if (!pos1.secId.equals("APPL")) {
          fail("Query thread did not verify index results even when RE is under update");
          IndexManager.testHook = null;
        }
      } catch (Exception e) {
        logger.error(e);
        Assertions.fail("Query execution failed on server.", e);
        IndexManager.testHook = null;
      } finally {
        hooked = false;// Let client put go further.
      }
      await().until(() -> hooked);

      try {
        Object resultSet = queryService
            .newQuery("select pos from /" + repRegionName
                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
            .execute();
        assertTrue(resultSet instanceof SelectResults);
        if (((SelectResults) resultSet).size() > 0) {
          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
          if (pos2.equals(pos1)) {
            fail("Query thread did not verify index results even when RE is under update and "
                + "RegionEntry value has been modified before releasing the lock");
          }
        }
      } catch (Exception e) {
        logger.error(e);
        fail("Query execution failed on server.");
      } finally {
        IndexManager.testHook = null;
        hooked = false;// Let client put go further.
      }
    });
    await().until(joinThread(putThread));
  }

  private Callable<Boolean> joinThread(AsyncInvocation thread) {
    return () -> {
      try {
        thread.join(100L);
      } catch (InterruptedException e) {
        return false;
      }
      if (thread.isAlive()) {
        return false;
      }
      return true;
    };
  }

  private void createProxyRegs() {
    ClientCache cache = (ClientCache) cacheRule.getCache();
    cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
  }

  public void createPR() {
    PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
    Cache cache = cacheRule.getCache();
    int numOfBuckets = 20;
    cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets)
            .setPartitionResolver(testKeyBasedResolver).create())
        .create(PartitionedRegionName1);
  }

  public class IndexManagerTestHook
      implements org.apache.geode.cache.query.internal.index.IndexManager.TestHook {
    @Override
    public void hook(final int spot) throws RuntimeException {
      switch (spot) {
        case 9: // Before Index update and after region entry lock.
          hooked = true;
          logger
              .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry.");
          await().until(() -> !hooked);
          break;
        case 10: // Before Region update and after Index Remove call.
          hooked = true;
          logger
              .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry.");
          await().until(() -> !hooked);
          break;
        default:
          break;
      }
    }
  }
}
