/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.phoenix;

import static org.junit.Assert.assertFalse;

import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.exec.store.StoragePluginRegistry;
import com.google.common.collect.Maps;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.LoggerFactory;

import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;

public class PhoenixBaseTest extends ClusterTest {

  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBaseTest.class);

  public final static String U_U_I_D = UUID.randomUUID().toString();
  private final static AtomicInteger initCount = new AtomicInteger(0);

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    PhoenixTestSuite.initPhoenix();
    if (PhoenixTestSuite.isRunningSuite()) {
      PhoenixBasicsIT.testCatalogs();
    }
    startDrillCluster();
    if (initCount.incrementAndGet() == 1) {
      createSchema(PhoenixBasicsIT.CONN_STRING);
      createTables(PhoenixBasicsIT.CONN_STRING);
      createSampleData(PhoenixBasicsIT.CONN_STRING);
    }
  }

  @AfterClass
  public static void tearDownCluster() throws Exception {
    if (!PhoenixTestSuite.isRunningSuite()) {
      PhoenixTestSuite.tearDownCluster();
    }
  }

  public static void startDrillCluster() throws Exception {
    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
    startCluster(builder);
    Map<String, Object> props = Maps.newHashMap();
    props.put("phoenix.query.timeoutMs", 90000);
    props.put("phoenix.query.keepAliveMs", "30000");
    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, 0, null, null, null,
      PhoenixBasicsIT.CONN_STRING, null, props);
    config.setEnabled(true);
    registry.put(PhoenixStoragePluginConfig.NAME + "123", config);
    dirTestWatcher.copyResourceToRoot(Paths.get(""));
  }

  public static void createSchema(String connString) throws Exception {
    try (final Connection connection = DriverManager.getConnection(connString)) {
      logger.debug("Phoenix connection established with the specified url : {}", connString);
      assertFalse(connection.isClosed());
      connection.setAutoCommit(true);
      try (final Statement stmt = connection.createStatement()) {
        assertFalse(stmt.execute("CREATE SCHEMA IF NOT EXISTS V1"));
      }
    }
  }

  public static void createTables(String connString) throws Exception {
    try (final Connection connection = DriverManager.getConnection(connString)) {
      assertFalse(connection.isClosed());
      connection.setAutoCommit(true);
      try (final Statement stmt = connection.createStatement()) {
        String region_sql = " CREATE TABLE V1.REGION "
            + "("
            + "    R_REGIONKEY BIGINT not null,"
            + "    R_NAME      VARCHAR,"
            + "    R_COMMENT   VARCHAR"
            + "    CONSTRAINT  REGION_PK PRIMARY KEY (R_REGIONKEY)"
            + ")";
        String nation_sql = " CREATE TABLE V1.NATION "
            + "("
            + "    N_NATIONKEY BIGINT not null primary key,"
            + "    N_NAME      VARCHAR(100),"
            + "    N_REGIONKEY BIGINT,"
            + "    N_COMMENT   VARCHAR(255)"
            + ")";
        String datatype_sql = " CREATE TABLE V1.DATATYPE "
            + "("
            + "    T_UUID      VARCHAR not null primary key,"
            + "    T_VARCHAR   VARCHAR,"
            + "    T_CHAR      CHAR(5),"
            + "    T_BIGINT    BIGINT,"
            + "    T_INTEGER   INTEGER,"
            + "    T_SMALLINT  SMALLINT,"
            + "    T_TINYINT   TINYINT,"
            + "    T_DOUBLE    DOUBLE,"
            + "    T_FLOAT     FLOAT,"
            + "    T_DECIMAL   DECIMAL(4,2),"
            + "    T_DATE      DATE,"
            + "    T_TIME      TIME,"
            + "    T_TIMESTAMP TIMESTAMP,"
            + "    T_BINARY    BINARY(10),"
            + "    T_VARBINARY VARBINARY,"
            + "    T_BOOLEAN   BOOLEAN"
            + ")";
        String arrytype_sql = " CREATE TABLE V1.ARRAYTYPE "
            + "("
            + "    T_UUID      VARCHAR not null primary key,"
            + "    T_VARCHAR   VARCHAR ARRAY,"
            + "    T_CHAR      CHAR(5) ARRAY,"
            + "    T_BIGINT    BIGINT  ARRAY,"
            + "    T_INTEGER   INTEGER ARRAY,"
            + "    T_DOUBLE    DOUBLE  ARRAY,"
            + "    T_SMALLINT  SMALLINT ARRAY,"
            + "    T_TINYINT   TINYINT  ARRAY,"
            + "    T_BOOLEAN   BOOLEAN  ARRAY"
            + ")";
        assertFalse(stmt.execute(region_sql));
        assertFalse(stmt.execute(nation_sql));
        assertFalse(stmt.execute(datatype_sql));
        assertFalse(stmt.execute(arrytype_sql));
      }
    }
  }

  public static void createSampleData(String connString) throws Exception {
    final String[] paths = new String[] { "data/region.tbl", "data/nation.tbl" };
    final String[] sqls = new String[] {
        "UPSERT INTO V1.REGION VALUES(?,?,?)",
        "UPSERT INTO V1.NATION VALUES(?,?,?,?)",
        "UPSERT INTO V1.DATATYPE VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
        "UPSERT INTO V1.ARRAYTYPE VALUES(?,?,ARRAY['a','b','c'],?,?,?,?,?,?)",
    };

    Path region_path = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), paths[0]);
    Path nation_path = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), paths[1]);
    logger.info("Loading the .tbl file : " + Arrays.toString(paths));

    List<String[]> allRows = parseTblFile(String.valueOf(region_path));
    try (final Connection connection = DriverManager.getConnection(connString)) {
      assertFalse(connection.isClosed());
      connection.setAutoCommit(false);
      // region table
      try (final PreparedStatement pstmt = connection.prepareStatement(sqls[0])) {
        for (String[] row : allRows) {
          pstmt.setLong(1, Long.valueOf(row[0]));
          pstmt.setString(2, row[1]);
          pstmt.setString(3, row[2]);
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
      connection.commit();
      // nation table
      allRows = parseTblFile(String.valueOf(nation_path));
      try (final PreparedStatement pstmt = connection.prepareStatement(sqls[1])) {
        for (String[] row : allRows) {
          pstmt.setLong(1, Long.valueOf(row[0]));
          pstmt.setString(2, row[1]);
          pstmt.setLong(3, Long.valueOf(row[2]));
          pstmt.setString(4, row[3]);
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
      connection.commit();
      // datatype table
      try (final PreparedStatement pstmt = connection.prepareStatement(sqls[2])) {
        pstmt.setString(1, U_U_I_D);
        pstmt.setString(2, "apache");
        pstmt.setString(3, "drill");
        pstmt.setLong(4, Long.MAX_VALUE);
        pstmt.setInt(5, Integer.MAX_VALUE);
        pstmt.setShort(6, Short.MAX_VALUE);
        pstmt.setByte(7, Byte.MAX_VALUE);
        pstmt.setDouble(8, Double.MAX_VALUE);
        pstmt.setFloat(9, Float.MAX_VALUE);
        pstmt.setBigDecimal(10, BigDecimal.valueOf(10.11));
        pstmt.setDate(11, java.sql.Date.valueOf("2021-12-12"));
        pstmt.setTime(12, java.sql.Time.valueOf("12:12:12"));
        pstmt.setTimestamp(13, java.sql.Timestamp.valueOf("2021-12-12 12:12:12"));
        pstmt.setBytes(14, "a_b_c_d_e_".getBytes(StandardCharsets.UTF_8));
        pstmt.setBytes(15, "12345".getBytes(StandardCharsets.UTF_8));
        pstmt.setBoolean(16, Boolean.TRUE);
        pstmt.addBatch();
        pstmt.executeBatch();
      }
      connection.commit();
      // arraytype table
      try (final PreparedStatement pstmt = connection.prepareStatement(sqls[3])) {
        Array t_varchar = connection.createArrayOf("VARCHAR", new String[] { "apache", "drill", "1.20" });
        @SuppressWarnings("unused")
        Array t_char = connection.createArrayOf("CHAR", new String[] { "a", "b", "c" }); // PHOENIX-6607
        Array t_bigint = connection.createArrayOf("BIGINT", new Long[] { Long.MIN_VALUE, Long.MAX_VALUE });
        Array t_integer = connection.createArrayOf("INTEGER", new Integer[] { Integer.MIN_VALUE, Integer.MAX_VALUE });
        Array t_double = connection.createArrayOf("DOUBLE", new Double[] { Double.MIN_VALUE, Double.MAX_VALUE });
        @SuppressWarnings("unused")
        Array t_float = connection.createArrayOf("FLOAT", new Float[] { Float.MIN_VALUE, Float.MAX_VALUE }); // PHOENIX-6606
        Array t_smallint = connection.createArrayOf("SMALLINT", new Short[] { Short.MIN_VALUE, Short.MAX_VALUE });
        Array t_tinyint = connection.createArrayOf("TINYINT", new Byte[] { Byte.MIN_VALUE, Byte.MAX_VALUE });
        Array t_boolean = connection.createArrayOf("BOOLEAN", new Boolean[] { Boolean.TRUE, Boolean.FALSE });
        pstmt.setString(1, U_U_I_D);
        pstmt.setArray(2, t_varchar);
        pstmt.setArray(3, t_bigint);
        pstmt.setArray(4, t_integer);
        pstmt.setArray(5, t_double);
        pstmt.setArray(6, t_smallint);
        pstmt.setArray(7, t_tinyint);
        pstmt.setArray(8, t_boolean);
        pstmt.addBatch();
        pstmt.executeBatch();
      }
      connection.commit();
      logger.info("Loaded {} rows.", allRows.size());
    }
  }

  private static List<String[]> parseTblFile(String path) throws Exception {
    CsvParserSettings settings = new CsvParserSettings();
    settings.getFormat().setDelimiter("|");
    settings.getFormat().setLineSeparator("\n");
    CsvParser parser = new CsvParser(settings);

    return parser.parseAll(getReader(path));
  }

  private static Reader getReader(String path) throws Exception {
    return new InputStreamReader(new FileInputStream(path), "UTF-8");
  }
}
