/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
/*
 * Created on Oct 13, 2005
 *
 *
 */
package org.apache.geode.cache.query.internal;

import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.SystemFailure;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.query.NameResolutionException;
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.security.MethodInvocationAuthorizer;
import org.apache.geode.cache.query.security.RestrictedMethodAuthorizer;
import org.apache.geode.examples.SimpleSecurityManager;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.OQLQueryTest;
import org.apache.geode.test.junit.rules.ServerStarterRule;

@Category(OQLQueryTest.class)
public class ExecutionContextIntegrationTest {

  @Rule
  public ServerStarterRule server = new ServerStarterRule()
      .withRegion(RegionShortcut.REPLICATE, "portfolio")
      .withAutoStart();

  private void assertIteratorScope(Iterator itr) {
    while (itr.hasNext()) {
      RuntimeIterator rItr = (RuntimeIterator) itr.next();
      switch (rItr.getName()) {
        case "p":
          assertThat(rItr.getScopeID())
              .as("The scopeID of outer iterator is not 1").isEqualTo(1);
          break;
        case "pf":
        case "pos":
          assertThat(rItr.getScopeID())
              .as("The scopeID of first inner level iterator is not 2").isEqualTo(2);
          break;
        case "rtPos":
          assertThat(rItr.getScopeID())
              .as("The scopeID of second inner level iterator is not 3").isEqualTo(3);
          break;
        case "y":
          assertThat(rItr.getScopeID())
              .as("The scopeID of outer level iterator is not 1").isEqualTo(1);
          break;
        case "pf1":
          assertThat(rItr.getScopeID())
              .as("The scopeID of inner level iterator is not 5").isEqualTo(5);
          break;
        case "pf2":
          assertThat(rItr.getScopeID())
              .as("The scopeID of inner level iterator is not 4").isEqualTo(4);
          break;
        default:
          throw new RuntimeException(
              "No such iterator with name = " + rItr.getName() + " should be available");
      }
    }
  }

  private void assertIteratorScopeMultiThreaded(Iterator itr) {
    RuntimeIterator rItr = (RuntimeIterator) itr.next();
    switch (rItr.getName()) {
      case "p":
        assertThat(rItr.getScopeID())
            .as("The scopeID of outer iterator is not 1").isEqualTo(1);
        break;
      case "pf":
      case "pos":
        assertThat(rItr.getScopeID())
            .as("The scopeID of first inner level iterator is not 2").isEqualTo(2);
        break;
      case "rtPos":
        assertThat(rItr.getScopeID())
            .as("The scopeID of second inner level iterator is not 3").isEqualTo(3);
        break;
      case "y":
        assertThat(rItr.getScopeID())
            .as("The scopeID of outer level iterator is not 1").isEqualTo(1);
        break;
      default:
        throw new RuntimeException(
            "No such iterator with name = " + rItr.getName() + " should be available");
    }
  }

  private int computeEvaluateAndAssertIterator(ExecutionContext context, int i,
      CompiledIteratorDef iterDef) throws TypeMismatchException, NameResolutionException {
    ++i;
    @SuppressWarnings("unchecked")
    Set<RuntimeIterator> dependencies = iterDef.computeDependencies(context);
    context.addDependencies(new CompiledID("dummy"), dependencies);
    RuntimeIterator rIter = iterDef.getRuntimeIterator(context);
    context.addToIndependentRuntimeItrMap(iterDef);
    context.bindIterator(rIter);

    assertThat(rIter.getIndexInternalID())
        .as("The index_internal_id is not set as per expectation of index_iter'n'")
        .isEqualTo("index_iter" + i);

    return i;
  }

  @Test
  public void constructorShouldUseConfiguredMethodAuthorizer() {
    ExecutionContext unsecuredContext = new QueryExecutionContext(null, server.getCache());
    MethodInvocationAuthorizer noOpAuthorizer = unsecuredContext.getMethodInvocationAuthorizer();

    // No security, no-op authorizer.
    assertThat(noOpAuthorizer).isNotNull();
    assertThat(noOpAuthorizer.getClass().getCanonicalName())
        .startsWith("org.apache.geode.cache.query.internal.DefaultQueryService$$Lambda$");

    server.stopMember();

    // Security Enabled -> RestrictedMethodAuthorizer
    server.withProperty(SECURITY_MANAGER, SimpleSecurityManager.class.getName())
        .withProperty("security-username", "cluster")
        .withProperty("security-password", "cluster")
        .startServer();
    ExecutionContext securedContext = new QueryExecutionContext(null, server.getCache());
    MethodInvocationAuthorizer authorizer = securedContext.getMethodInvocationAuthorizer();
    assertThat(authorizer).isNotNull();
    assertThat(authorizer).isInstanceOf(RestrictedMethodAuthorizer.class);
  }

  @Test
  public void addToIndependentRuntimeItrMapShouldCorrectlySetTheIndexInternalIdUsedToIdentifyAvailableIndexes()
      throws Exception {
    QCompiler compiler = new QCompiler();
    List list = compiler.compileFromClause("/portfolio p, p.positions");
    ExecutionContext context = new QueryExecutionContext(null, server.getCache());
    context.newScope(context.associateScopeID());

    for (Object o : list) {
      CompiledIteratorDef iterDef = (CompiledIteratorDef) o;
      @SuppressWarnings("unchecked")
      Set<RuntimeIterator> dependencies = iterDef.computeDependencies(context);
      context.addDependencies(new CompiledID("dummy"), dependencies);
      RuntimeIterator runtimeIterator = iterDef.getRuntimeIterator(context);
      context.bindIterator(runtimeIterator);
      context.addToIndependentRuntimeItrMap(iterDef);

      assertThat(runtimeIterator.getInternalId())
          .as(" The index_internal_id is not set as per expectation of iter'n'")
          .isEqualTo(runtimeIterator.getIndexInternalID());
    }
  }

  @Test
  public void testFunctionalAddToIndependentRuntimeItrMapWithIndex() throws Exception {
    QCompiler compiler = new QCompiler();
    DefaultQueryService qs = new DefaultQueryService(server.getCache());
    qs.createIndex("myindex", "pf.id", "/portfolio pf, pf.positions pos");
    List list = compiler.compileFromClause("/portfolio p, p.positions");
    ExecutionContext context = new QueryExecutionContext(null, server.getCache());
    context.newScope(context.associateScopeID());
    Iterator iter = list.iterator();

    int i = 0;
    while (iter.hasNext()) {
      CompiledIteratorDef iterDef = (CompiledIteratorDef) iter.next();
      i = computeEvaluateAndAssertIterator(context, i, iterDef);
    }
  }

  @Test
  public void addToIndependentRuntimeItrMapShouldCorrectlySetTheRuntimeIteratorRegionPath()
      throws Exception {
    QCompiler compiler = new QCompiler();
    DefaultQueryService qs = new DefaultQueryService(server.getCache());
    qs.createIndex("myindex", "pf.id", "/portfolio pf, pf.positions pos");

    @SuppressWarnings("unchecked")
    List<CompiledIteratorDef> list =
        (List<CompiledIteratorDef>) compiler.compileFromClause("/portfolio p, p.positions");
    ExecutionContext context = new QueryExecutionContext(null, server.getCache());
    context.newScope(context.associateScopeID());
    Iterator<CompiledIteratorDef> iter = list.iterator();

    int i = 0;
    CompiledIteratorDef iterDef = null;

    while (iter.hasNext()) {
      iterDef = iter.next();
      i = computeEvaluateAndAssertIterator(context, i, iterDef);
    }

    Set<RuntimeIterator> temp = new HashSet<>();
    context.computeUltimateDependencies(iterDef, temp);
    String regionPath = context.getRegionPathForIndependentRuntimeIterator(temp.iterator().next());

    assertThat(regionPath.equals("/portfolio"))
        .as("Region path " + regionPath + " should be equal to /portfolio.")
        .isTrue();
  }

  @Test
  public void testCurrScopeDpndntItrsBasedOnSingleIndpndntItr() throws Exception {
    server.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("dummy");
    // compileFromClause returns a List<CompiledIteratorDef>
    QCompiler compiler = new QCompiler();
    List list = compiler.compileFromClause(
        "/portfolio p, p.positions, p.addreses addrs, addrs.collection1 coll1, /dummy d1, d1.collection2 d2");
    RuntimeIterator indItr = null;
    ExecutionContext context = new QueryExecutionContext(null, server.getCache());
    context.newScope(context.associateScopeID());
    int i = 0;
    List<RuntimeIterator> checkList = new ArrayList<>();

    for (Object o : list) {
      CompiledIteratorDef iterDef = (CompiledIteratorDef) o;
      @SuppressWarnings("unchecked")
      Set<RuntimeIterator> dependencies = iterDef.computeDependencies(context);
      context.addDependencies(new CompiledID("test"), dependencies);
      RuntimeIterator rIter = iterDef.getRuntimeIterator(context);
      if (i == 0) {
        indItr = rIter;
        checkList.add(rIter);
      } else {
        if (i < 4) {
          checkList.add(rIter);
        }
      }
      ++i;
      context.bindIterator(rIter);
      context.addToIndependentRuntimeItrMap(iterDef);

      assertThat(rIter.getIndexInternalID())
          .as("The index_internal_id is not set as per expectation of iter'n'")
          .isEqualTo(rIter.getInternalId());
    }

    List list1 = context.getCurrScopeDpndntItrsBasedOnSingleIndpndntItr(indItr);
    assertThat(list1.size())
        .as("The dependency set returned incorrect result with size =" + list1.size())
        .isEqualTo(4);

    assertThat(list1).isEqualTo(checkList);
  }

  @Test
  public void runtimeIteratorScopeShouldBeCorrectlySetAfterCompilingQueryAndEvaluatingDependencies()
      throws Exception {
    server.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("positions");
    // compileFromClause returns a List<CompiledIteratorDef>
    String qry =
        "select distinct p.pf, ELEMENT(select distinct pf1 from /portfolio pf1 where pf1.getID = p.pf.getID )  from (select distinct pf, pos from /portfolio pf, pf.positions.values pos) p, (select distinct * from /positions rtPos where rtPos.secId = p.pos.secId) as y "
            + "where ( select distinct pf2 from /portfolio pf2 ).size() <> 0 ";

    QCompiler compiler = new QCompiler();
    CompiledValue query = compiler.compileQuery(qry);
    ExecutionContext context = new QueryExecutionContext(null, server.getCache());

    query.computeDependencies(context);
    Set runtimeIterators = context.getDependencySet(query, true);
    Iterator runtimeIterator = runtimeIterators.iterator();
    assertIteratorScope(runtimeIterator);

    query.evaluate(context);
    runtimeIterators = context.getDependencySet(query, true);
    runtimeIterator = runtimeIterators.iterator();
    assertIteratorScope(runtimeIterator);
  }

  @Test
  public void runtimeIteratorScopeShouldBeCorrectlySetAfterCompilingQueryAndEvaluatingDependenciesConcurrently() {
    server.getCache().createRegionFactory(RegionShortcut.REPLICATE).create("positions");
    // compileFromClause returns a List<CompiledIteratorDef>
    String qry =
        "select distinct p.pf from (select distinct pf, pos from /portfolio pf, pf.positions.values pos) p, (select distinct * from /positions rtPos where rtPos.secId = p.pos.secId) as y";
    final int TOTAL_THREADS = 80;
    QCompiler compiler = new QCompiler();
    final CompiledValue query = compiler.compileQuery(qry);
    final CountDownLatch latch = new CountDownLatch(TOTAL_THREADS);

    Runnable runnable = () -> {
      try {
        latch.countDown();
        latch.await();
        ExecutionContext context = new QueryExecutionContext(null, server.getCache());
        query.computeDependencies(context);
        Set runtimeIterators = context.getDependencySet(query, true);
        Iterator runtimeIterator = runtimeIterators.iterator();

        while (runtimeIterator.hasNext()) {
          assertIteratorScopeMultiThreaded(runtimeIterator);
          Thread.yield();
        }

        query.evaluate(context);
        runtimeIterators = context.getDependencySet(query, true);
        runtimeIterator = runtimeIterators.iterator();
        while (runtimeIterator.hasNext()) {
          assertIteratorScopeMultiThreaded(runtimeIterator);
        }
      } catch (VirtualMachineError e) {
        SystemFailure.initiateFailure(e);
        throw e;
      } catch (Throwable th) {
        throw new RuntimeException(th);
      }
    };

    Thread[] th = new Thread[TOTAL_THREADS];
    for (int i = 0; i < th.length; ++i) {
      th[i] = new Thread(runnable);
    }

    for (Thread thread : th) {
      thread.start();
    }

    for (Thread thread : th) {
      ThreadUtils.join(thread, 30 * 1000);
    }
  }
}
