/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.nio.file.Paths;
import java.util.List;

import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryBuilder.QuerySummary;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.categories.PlannerTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

/**
 * Class to test different planning use cases (separate from query execution)
 *
 */
@Category({SlowTest.class, PlannerTest.class})
public class DrillSeparatePlanningTest extends ClusterTest {

  @BeforeClass
  public static void setupFiles() {
    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "json"));
    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "csv"));
  }

  @Before
  public void testSetup() throws Exception {
    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
        .clusterSize(2);
    startCluster(builder);
  }

  @Test(timeout=60_000)
  public void testSingleFragmentQuery() throws Exception {
    final String query = "SELECT * FROM cp.`employee.json` where employee_id > 1 and employee_id < 1000";

    QueryPlanFragments planFragments = getFragmentsHelper(query);

    assertNotNull(planFragments);

    assertEquals(1, planFragments.getFragmentsCount());
    assertTrue(planFragments.getFragments(0).getLeafFragment());

    QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run();
    assertEquals(997, summary.recordCount());
  }

  @Test(timeout=60_000)
  public void testMultiMinorFragmentSimpleQuery() throws Exception {
    final String query = "SELECT o_orderkey FROM dfs.`multilevel/json`";

    QueryPlanFragments planFragments = getFragmentsHelper(query);

    assertNotNull(planFragments);
    assertTrue((planFragments.getFragmentsCount() > 1));

    for (PlanFragment planFragment : planFragments.getFragmentsList()) {
      assertTrue(planFragment.getLeafFragment());
    }

    int rowCount = getResultsHelper(planFragments);
    assertEquals(120, rowCount);
  }

  @Test(timeout=60_000)
  public void testMultiMinorFragmentComplexQuery() throws Exception {
    final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";

    QueryPlanFragments planFragments = getFragmentsHelper(query);

    assertNotNull(planFragments);
    assertTrue((planFragments.getFragmentsCount() > 1));

    for ( PlanFragment planFragment : planFragments.getFragmentsList()) {
      assertTrue(planFragment.getLeafFragment());
    }

    int rowCount = getResultsHelper(planFragments);
    assertEquals(8, rowCount);
  }

  @Test(timeout=60_000)
  public void testPlanningNoSplit() throws Exception {
    final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";

    client.alterSession("planner.slice_target", 1);
    try {
      final QueryPlanFragments planFragments = client.planQuery(query);

      assertNotNull(planFragments);
      assertTrue((planFragments.getFragmentsCount() > 1));

      PlanFragment rootFragment = planFragments.getFragments(0);
      assertFalse(rootFragment.getLeafFragment());

      QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run();
      assertEquals(3, summary.recordCount());
    }
    finally {
      client.resetSession("planner.slice_target");
    }
  }

  @Test(timeout=60_000)
  public void testPlanningNegative() throws Exception {
    final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";

    // LOGICAL is not supported
    final QueryPlanFragments planFragments = client.planQuery(QueryType.LOGICAL, query, false);

    assertNotNull(planFragments);
    assertNotNull(planFragments.getError());
    assertTrue(planFragments.getFragmentsCount()==0);
  }

  @Test(timeout=60_000)
  public void testPlanning() throws Exception {
    final String query = "SELECT dir0, columns[3] FROM dfs.`multilevel/csv` order by dir0";

    client.alterSession("planner.slice_target", 1);
    try {
      // Original version, but no reason to dump output to test results.
//      long rows = client.queryBuilder().sql(query).print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
      QuerySummary summary = client.queryBuilder().sql(query).run();
      assertEquals(120, summary.recordCount());
    }
    finally {
      client.resetSession("planner.slice_target");
    }
  }

  private QueryPlanFragments getFragmentsHelper(final String query) {
    client.alterSession("planner.slice_target", 1);
    try {
      QueryPlanFragments planFragments = client.planQuery(QueryType.SQL, query, true);
      return planFragments;
    }
    finally {
      client.resetSession("planner.slice_target");
    }
  }

  private int getResultsHelper(final QueryPlanFragments planFragments) throws Exception {
    int totalRows = 0;
    for (PlanFragment fragment : planFragments.getFragmentsList()) {
      DrillbitEndpoint assignedNode = fragment.getAssignment();
      ClientFixture fragmentClient = cluster.client(assignedNode.getAddress(), assignedNode.getUserPort());

      RowSet rowSet = fragmentClient.queryBuilder().sql("select hostname, user_port from sys.drillbits where `current`=true").rowSet();
      assertEquals(1, rowSet.rowCount());
      RowSetReader reader = rowSet.reader();
      assertTrue(reader.next());
      String host = reader.scalar("hostname").getString();
      int port = reader.scalar("user_port").getInt();
      rowSet.clear();

      assertEquals(assignedNode.getAddress(), host);
      assertEquals(assignedNode.getUserPort(), port);

      List<PlanFragment> fragmentList = Lists.newArrayList();
      fragmentList.add(fragment);
      QuerySummary summary = fragmentClient.queryBuilder().plan(fragmentList).run();
      totalRows += summary.recordCount();
      fragmentClient.close();
    }
    return totalRows;
  }
}
