/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.test;

import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.util.StoragePluginTestUtils;
import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.apache.drill.test.DrillTestWrapper.TestServices;
import org.apache.hadoop.fs.FileSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * deprecated Use {@link ClusterTest} instead.
 *
 * But, not marked as deprecated because it is still widely used.
 */
//@Deprecated
public class BaseTestQuery extends ExecTest {
  private static final Logger logger = LoggerFactory.getLogger(BaseTestQuery.class);

  private static final int MAX_WIDTH_PER_NODE = 2;

  @SuppressWarnings("serial")
  private static final Properties TEST_CONFIGURATIONS = new Properties() {
    {
      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
      put(ExecConstants.HTTP_ENABLE, "false");
      // Increasing retry attempts for testing
      put(ExecConstants.UDF_RETRY_ATTEMPTS, "10");
      put(ExecConstants.SSL_USE_HADOOP_CONF, "false");
    }
  };

  protected static DrillClient client;
  protected static Drillbit[] bits;
  protected static RemoteServiceSet serviceSet;
  protected static DrillConfig config;
  protected static BufferAllocator allocator;

  /**
   * Number of Drillbits in test cluster. Default is 1.
   *
   * Tests can update the cluster size through {@link #updateTestCluster(int, DrillConfig)}
   */
  private static int drillbitCount = 1;

  private int[] columnWidths = new int[] { 8 };

  private static ScanResult classpathScan;

  @BeforeClass
  public static void setupDefaultTestCluster() throws Exception {
    config = DrillConfig.create(cloneDefaultTestConfigProperties());
    classpathScan = ClassPathScanner.fromPrescan(config);
    openClient();
    // turns on the verbose errors in tests
    // sever side stacktraces are added to the message before sending back to the client
    test("ALTER SESSION SET `exec.errors.verbose` = true");
  }

  protected static void updateTestCluster(int newDrillbitCount, DrillConfig newConfig) {
    updateTestCluster(newDrillbitCount, newConfig, cloneDefaultTestConfigProperties());
  }

  protected static void updateTestCluster(int newDrillbitCount, DrillConfig newConfig, Properties properties) {
    Preconditions.checkArgument(newDrillbitCount > 0, "Number of Drillbits must be at least one");
    if (drillbitCount != newDrillbitCount || config != null) {
      // TODO: Currently we have to shutdown the existing Drillbit cluster before starting a new one with the given
      // Drillbit count. Revisit later to avoid stopping the cluster.
      try {
        closeClient();
        drillbitCount = newDrillbitCount;
        if (newConfig != null) {
          // For next test class, updated DrillConfig will be replaced by default DrillConfig in BaseTestQuery as part
          // of the @BeforeClass method of test class.
          config = newConfig;
        }
        openClient(properties);
      } catch (Exception e) {
        throw new RuntimeException("Failure while updating the test Drillbit cluster.", e);
      }
    }
  }

  /**
   * Useful for tests that require a DrillbitContext to get/add storage plugins, options etc.
   *
   * @return DrillbitContext of first Drillbit in the cluster.
   */
  protected static DrillbitContext getDrillbitContext() {
    Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
    return bits[0].getContext();
  }

  protected static Properties cloneDefaultTestConfigProperties() {
    Properties props = new Properties();

    for (String propName : TEST_CONFIGURATIONS.stringPropertyNames()) {
      props.put(propName, TEST_CONFIGURATIONS.getProperty(propName));
    }

    props.setProperty(ExecConstants.DRILL_TMP_DIR, dirTestWatcher.getTmpDir().getAbsolutePath());
    props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath());
    props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getHomeDir().getAbsolutePath());
    props.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS);

    return props;
  }

  private static void openClient() throws Exception {
    openClient(null);
  }

  private static void openClient(Properties properties) throws Exception {
    if (properties == null) {
      properties = new Properties();
    }

    allocator = RootAllocatorFactory.newRoot(config);
    serviceSet = RemoteServiceSet.getLocalServiceSet();

    dirTestWatcher.newDfsTestTmpDir();

    bits = new Drillbit[drillbitCount];
    for (int i = 0; i < drillbitCount; i++) {
      bits[i] = new Drillbit(config, serviceSet, classpathScan);
      bits[i].run();

      StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
      StoragePluginTestUtils.configureFormatPlugins(pluginRegistry);

      StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
        dirTestWatcher.getDfsTestTmpDir(), TMP_SCHEMA);
      StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
        dirTestWatcher.getRootDir(), ROOT_SCHEMA);
      StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
        dirTestWatcher.getRootDir(), SchemaFactory.DEFAULT_WS_NAME);
    }

    if (!properties.containsKey(DrillProperties.DRILLBIT_CONNECTION)) {
      properties.setProperty(DrillProperties.DRILLBIT_CONNECTION,
          String.format("localhost:%s", bits[0].getUserPort()));
    }

    DrillConfig clientConfig = DrillConfig.forClient();
    client = QueryTestUtil.createClient(clientConfig,  serviceSet, MAX_WIDTH_PER_NODE, properties);
  }

  /**
   * Close the current <i>client</i> and open a new client using the given <i>properties</i>. All tests executed
   * after this method call use the new <i>client</i>.
   *
   * @param properties
   */
  public static void updateClient(Properties properties) throws Exception {
    Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
    if (client != null) {
      client.close();
      client = null;
    }

    DrillConfig clientConfig = DrillConfig.forClient();
    client = QueryTestUtil.createClient(clientConfig, serviceSet, MAX_WIDTH_PER_NODE, properties);
  }

  /*
   * Close the current <i>client</i> and open a new client for the given user. All tests executed
   * after this method call use the new <i>client</i>.
   * @param user
   */
  public static void updateClient(String user) throws Exception {
    updateClient(user, null);
  }

  /*
   * Close the current <i>client</i> and open a new client for the given user and password credentials. Tests
   * executed after this method call use the new <i>client</i>.
   * @param user
   */
  public static void updateClient(String user, String password) throws Exception {
    Properties props = new Properties();
    props.setProperty(DrillProperties.USER, user);
    if (password != null) {
      props.setProperty(DrillProperties.PASSWORD, password);
    }
    updateClient(props);
  }

  protected static BufferAllocator getAllocator() {
    return allocator;
  }

  public static int getUserPort() {
    return bits[0].getUserPort();
  }

  public static TestBuilder newTest() {
    return testBuilder();
  }


  public static class ClassicTestServices implements TestServices {
    @Override
    public BufferAllocator allocator() {
      return allocator;
    }

    @Override
    public void test(String query) throws Exception {
      BaseTestQuery.test(query);
    }

    @Override
    public List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception {
      return BaseTestQuery.testRunAndReturn(type, query);
    }
  }

  public static TestBuilder testBuilder() {
    return new TestBuilder(new ClassicTestServices());
  }

  @AfterClass
  public static void closeClient() throws Exception {
    if (client != null) {
      client.close();
      client = null;
    }

    if (bits != null) {
      for (Drillbit bit : bits) {
        if (bit != null) {
          bit.close();
        }
      }
      bits = null;
    }

    if (serviceSet != null) {
      serviceSet.close();
      serviceSet = null;
    }
    if (allocator != null) {
      allocator.close();
      allocator = null;
    }
  }

  @AfterClass
  public static void resetDrillbitCount() {
    // some test classes assume this value to be 1 and will fail if run along other tests that increase it
    drillbitCount = 1;
  }

  protected static void runSQL(String sql) throws Exception {
    AwaitableUserResultsListener listener = new AwaitableUserResultsListener(new SilentListener());
    testWithListener(QueryType.SQL, sql, listener);
    listener.await();
  }

  protected static List<QueryDataBatch> testSqlWithResults(String sql) throws Exception{
    return testRunAndReturn(QueryType.SQL, sql);
  }

  protected static List<QueryDataBatch> testLogicalWithResults(String logical) throws Exception{
    return testRunAndReturn(QueryType.LOGICAL, logical);
  }

  protected static List<QueryDataBatch> testPhysicalWithResults(String physical) throws Exception{
    return testRunAndReturn(QueryType.PHYSICAL, physical);
  }

  public static List<QueryDataBatch>  testRunAndReturn(QueryType type, Object query) throws Exception{
    if (type == QueryType.PREPARED_STATEMENT) {
      Preconditions.checkArgument(query instanceof PreparedStatementHandle,
          "Expected an instance of PreparedStatement as input query");
      return testPreparedStatement((PreparedStatementHandle)query);
    } else {
      Preconditions.checkArgument(query instanceof String, "Expected a string as input query");
      query = QueryTestUtil.normalizeQuery((String)query);
      return client.runQuery(type, (String)query);
    }
  }

  public static List<QueryDataBatch> testPreparedStatement(PreparedStatementHandle handle) throws Exception {
    return client.executePreparedStatement(handle);
  }

  public static int testRunAndPrint(QueryType type, String query) throws Exception {
    return QueryTestUtil.testRunAndLog(client, type, query);
  }

  protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
    QueryTestUtil.testWithListener(client, type, query, resultListener);
  }

  public static void testNoResult(String query, Object... args) throws Exception {
    testNoResult(1, query, args);
  }

  public static void alterSession(String option, Object value) {
    String valueStr = ClusterFixture.stringify(value);
    try {
      test("ALTER SESSION SET `%s` = %s", option, valueStr);
    } catch (Exception e) {
      fail(String.format("Failed to set session option `%s` = %s, Error: %s",
          option, valueStr, e.toString()));
    }
  }

  public static void resetSessionOption(String option) {
    try {
      test("ALTER SESSION RESET `%s`", option);
    } catch (Exception e) {
      fail(String.format("Failed to reset session option `%s`, Error: %s",
          option, e.toString()));
    }
  }

  public static void resetAllSessionOptions() {
    try {
      test("ALTER SESSION RESET ALL");
    } catch (Exception e) {
      fail("Failed to reset all session options");
    }
  }

  protected static void testNoResult(int interation, String query, Object... args) throws Exception {
    query = String.format(query, args);
    logger.debug("Running query:\n--------------\n" + query);
    for (int i = 0; i < interation; i++) {
      List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
      for (QueryDataBatch queryDataBatch : results) {
        queryDataBatch.release();
      }
    }
  }

  public static void test(String query, Object... args) throws Exception {
    QueryTestUtil.testRunAndLog(client, String.format(query, args));
  }

  public static void test(String query) throws Exception {
    QueryTestUtil.testRunAndLog(client, query);
  }

  protected static int testPhysical(String query) throws Exception{
    return testRunAndPrint(QueryType.PHYSICAL, query);
  }

  protected static int testSql(String query) throws Exception{
    return testRunAndPrint(QueryType.SQL, query);
  }

  protected static void testPhysicalFromFile(String file) throws Exception{
    testPhysical(getFile(file));
  }

  /**
   * Utility method which tests given query produces a {@link UserException} and the exception message contains
   * the given message.
   * @param testSqlQuery Test query
   * @param expectedErrorMsg Expected error message.
   */
  protected static void errorMsgTestHelper(String testSqlQuery, String expectedErrorMsg) throws Exception {
    try {
      test(testSqlQuery);
      fail("Expected a UserException when running " + testSqlQuery);
    } catch (UserException actualException) {
      try {
        assertThat("message of UserException when running " + testSqlQuery, actualException.getMessage(), containsString(expectedErrorMsg));
      } catch (AssertionError e) {
        e.addSuppressed(actualException);
        throw e;
      }
    }
  }

  /**
   * Utility method which tests given query produces a {@link UserException}
   * with {@link org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType} being DrillPBError.ErrorType.PARSE
   * the given message.
   * @param testSqlQuery Test query
   */
  protected static void parseErrorHelper(String testSqlQuery) throws Exception {
    errorMsgTestHelper(testSqlQuery, UserBitShared.DrillPBError.ErrorType.PARSE.name());
  }

  public static String getFile(String resource) throws IOException{
    URL url = Resources.getResource(resource);
    if (url == null) {
      throw new IOException(String.format("Unable to find path %s.", resource));
    }
    return Resources.toString(url, Charsets.UTF_8);
  }

  /**
   * Copy the resource (ex. file on classpath) to a physical file on FileSystem.
   * @param resource
   * @return the file path
   * @throws IOException
   */
  public static String getPhysicalFileFromResource(String resource) throws IOException {
    File file = File.createTempFile("tempfile", ".txt");
    file.deleteOnExit();
    PrintWriter printWriter = new PrintWriter(file);
    printWriter.write(BaseTestQuery.getFile(resource));
    printWriter.close();

    return file.getPath();
  }

  protected static void setSessionOption(String option, boolean value) {
    alterSession(option, value);
  }

  protected static void setSessionOption(String option, long value) {
    alterSession(option, value);
  }

  protected static void setSessionOption(String option, double value) {
    alterSession(option, value);
  }

  protected static void setSessionOption(String option, String value) {
    alterSession(option, value);
  }

  public static class SilentListener implements UserResultsListener {
    private final AtomicInteger count = new AtomicInteger();
    private QueryId queryId;

    @Override
    public void submissionFailed(UserException ex) {
      logger.debug("Query failed: " + ex.getMessage());
    }

    @Override
    public void queryCompleted(QueryState state) {
      logger.debug("Query completed successfully with row count: " + count.get());
    }

    @Override
    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
      int rows = result.getHeader().getRowCount();
      if (result.getData() != null) {
        count.addAndGet(rows);
      }
      result.release();
    }

    @Override
    public void queryIdArrived(QueryId queryId) {
      this.queryId = queryId;
    }

    public QueryId getQueryId() {
      return queryId;
    }

    public int getRowCount() {
      return count.get();
    }
  }

  protected void setColumnWidth(int columnWidth) {
    this.columnWidths = new int[] { columnWidth };
  }

  protected void setColumnWidths(int[] columnWidths) {
    this.columnWidths = columnWidths;
  }

  protected int logResult(List<QueryDataBatch> results) throws SchemaChangeException {
    int rowCount = 0;
    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
    for (QueryDataBatch result : results) {
      rowCount += result.getHeader().getRowCount();
      loader.load(result.getHeader().getDef(), result.getData());
      // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
      // SchemaChangeException, so check/clean throw clause above.
      VectorUtil.logVectorAccessibleContent(loader, columnWidths);
      loader.clear();
      result.release();
    }
    return rowCount;
  }

  protected int printResult(List<QueryDataBatch> results) throws SchemaChangeException {
    int result = PrintingUtils.printAndThrow(() -> logResult(results));
    return result;
  }

  protected static String getResultString(List<QueryDataBatch> results, String delimiter)
      throws SchemaChangeException {
    StringBuilder formattedResults = new StringBuilder();
    boolean includeHeader = true;
    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
    for (QueryDataBatch result : results) {
      loader.load(result.getHeader().getDef(), result.getData());
      if (loader.getRecordCount() <= 0) {
        continue;
      }
      VectorUtil.appendVectorAccessibleContent(loader, formattedResults, delimiter, includeHeader);
      if (!includeHeader) {
        includeHeader = false;
      }
      loader.clear();
      result.release();
    }

    return formattedResults.toString();
  }

  public class TestResultSet {

    private final List<List<String>> rows;

    public TestResultSet() {
      rows = new ArrayList<>();
    }

    public TestResultSet(List<QueryDataBatch> batches) throws SchemaChangeException {
      rows = new ArrayList<>();
      convert(batches);
    }

    public void addRow(String... cells) {
      List<String> newRow = Arrays.asList(cells);
      rows.add(newRow);
    }

    public int size() {
      return rows.size();
    }

    @Override public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (! (o instanceof TestResultSet)) {
        return false;
      }
      TestResultSet that = (TestResultSet) o;
      assertEquals(this.size(), that.size());
      for (int i = 0; i < this.rows.size(); i++) {
        assertEquals(this.rows.get(i).size(), that.rows.get(i).size());
        for (int j = 0; j < this.rows.get(i).size(); ++j) {
          assertEquals(this.rows.get(i).get(j), that.rows.get(i).get(j));
        }
      }
      return true;
    }

    private void convert(List<QueryDataBatch> batches) throws SchemaChangeException {
      RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
      for (QueryDataBatch batch : batches) {
        int rc = batch.getHeader().getRowCount();
        if (batch.getData() != null) {
          loader.load(batch.getHeader().getDef(), batch.getData());
          for (int i = 0; i < rc; ++i) {
            List<String> newRow = new ArrayList<>();
            rows.add(newRow);
            for (VectorWrapper<?> vw : loader) {
              ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
              Object o = accessor.getObject(i);
              newRow.add(o == null ? null : o.toString());
            }
          }
        }
        loader.clear();
        batch.release();
      }
    }
  }
}
