/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.sqoop.manager.oracle.util;

import java.net.URL;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.sqoop.manager.oracle.OraOopTestCase;
import org.apache.sqoop.manager.oracle.OracleUtils;

/**
 * Class to load an Oracle table with data based on configuration file.
 */
public final class OracleData {
  private OracleData() {
  }

  enum KeyType {
    PRIMARY, UNIQUE
  }

  private static ClassLoader classLoader;
  static {
    classLoader = Thread.currentThread().getContextClassLoader();
    if (classLoader == null) {
      classLoader = OraOopTestCase.class.getClassLoader();
    }
  }

  private static String getColumnList(OracleTableDefinition tableDefinition) {
    StringBuilder result = new StringBuilder();
    String delim = "";
    for (OracleDataDefinition column : tableDefinition.getColumnList()) {
      result.append(delim).append(OracleUtils.escapeIdentifier(column.getColumnName())).append(" ")
          .append(
          column.getDataType());
      delim = ",\n";
    }
    return result.toString();
  }

  private static String
      getDataExpression(List<OracleDataDefinition> columnList) {
    StringBuilder result = new StringBuilder();
    for (OracleDataDefinition column : columnList) {
      result.append("l_ret_rec.").append(OracleUtils.escapeIdentifier(column.getColumnName())).append(" := ")
          .append(column.getDataExpression()).append(";\n");
    }
    return result.toString();
  }

  private static void createPackageSpec(Connection conn,
      OracleTableDefinition tableDefinition) throws Exception {
    String pkgSql =
        IOUtils.toString(classLoader.getResource(
            "oraoop/pkg_tst_product_gen.psk").openStream());
    pkgSql =
        pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition));
    pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
    PreparedStatement stmt = conn.prepareStatement(pkgSql);
    stmt.execute();
  }

  private static void createPackageBody(Connection conn,
      OracleTableDefinition tableDefinition) throws Exception {
    String pkgSql =
        IOUtils.toString(classLoader.getResource(
            "oraoop/pkg_tst_product_gen.pbk").openStream());
    String columnList = getColumnList(tableDefinition);
    if (tableDefinition.isIndexOrganizedTable()) {
      columnList += "\n," + getKeyString(KeyType.PRIMARY, tableDefinition);
    }
    pkgSql =
        pkgSql.replaceAll("\\$COLUMN_LIST", columnList);
    pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
    pkgSql =
        pkgSql.replaceAll("\\$DATA_EXPRESSION_LIST",
            getDataExpression(tableDefinition.getColumnList()));

    pkgSql =
        pkgSql.replaceAll("\\$TABLE_ORGANIZATION_CLAUSE",
            tableDefinition.isIndexOrganizedTable()
                ? "ORGANIZATION INDEX OVERFLOW NOLOGGING" : "");

    pkgSql =
        pkgSql.replaceAll("\\$PARTITION_CLAUSE", tableDefinition
            .getPartitionClause());
    PreparedStatement stmt = conn.prepareStatement(pkgSql);
    stmt.execute();
  }

  private static String getKeyColumns(KeyType keyType,
      OracleTableDefinition tableDefinition) {
    String result = null;
    List<String> columns = null;
    switch (keyType) {
      case PRIMARY:
        columns = tableDefinition.getPrimaryKeyColumns();
        break;
      case UNIQUE:
        columns = tableDefinition.getUniqueKeyColumns();
        break;
      default:
        throw new RuntimeException("Missing key type.");
    }
    if (columns != null && columns.size() > 0) {
      StringBuilder keyColumnList = new StringBuilder();
      String delim = "";
      for (String column : columns) {
        keyColumnList.append(delim).append(OracleUtils.escapeIdentifier(column));
        delim = ",";
      }
      result = keyColumnList.toString();
    }
    return result;
  }

  private static String getKeyString(KeyType keyType,
      OracleTableDefinition tableDefinition) {
    String keySql = null;
    String keyColumnList = getKeyColumns(keyType, tableDefinition);
    if (keyColumnList!=null) {
      keySql = "constraint \"$TABLE_NAME_"
              + ((keyType == KeyType.PRIMARY) ? "PK\" primary key"
                  : "UK\" unique") + "($PK_COLUMN_LIST) ";

      keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
      keySql =
          keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
    }
    return keySql;
  }

  private static void createKey(Connection conn, KeyType keyType,
      OracleTableDefinition tableDefinition) throws Exception {
    String keySql = getKeyString(keyType, tableDefinition);
    String keyColumnList = getKeyColumns(keyType, tableDefinition);
    if (keySql!=null) {
      keySql = "alter table \"$TABLE_NAME\" add " + keySql
             + " using index (create unique index \"$TABLE_NAME_"
             + ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"")
             + " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)";
      keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
      keySql =
          keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());

      PreparedStatement stmt = conn.prepareStatement(keySql);
      stmt.execute();
    }
  }

  public static int getParallelProcesses(Connection conn) throws Exception {
    PreparedStatement stmt =
        conn.prepareStatement("SELECT cc.value value"
            + "\n"
            + "FROM"
            + "\n"
            + "  (SELECT to_number(value) value"
            + "\n"
            + "  FROM v$parameter"
            + "\n"
            + "  WHERE name='parallel_max_servers'"
            + "\n"
            + "  ) pms,"
            + "\n"
            + "  (SELECT to_number(value) value"
            + "\n"
            + "  FROM v$parameter"
            + "\n"
            + "  WHERE name='parallel_threads_per_cpu'"
            + "\n"
            + "  ) ptpc,"
            + "\n"
            + "  (SELECT to_number(value) value FROM v$parameter "
            + "   WHERE name='cpu_count'"
            + "\n" + "  ) cc");
    ResultSet res = stmt.executeQuery();
    res.next();
    return res.getInt(1);
  }

  public static void createTable(Connection conn,
      OracleTableDefinition tableDefinition, int parallelDegree,
      int rowsPerSlave) throws Exception {
    createPackageSpec(conn, tableDefinition);
    createPackageBody(conn, tableDefinition);

    CallableStatement procStmt =
        conn.prepareCall("begin \"PKG_ODG_" + tableDefinition.getTableName()
            + "\".prc_load_table(?,?); end;");
    procStmt.setInt(1, parallelDegree);
    procStmt.setInt(2, rowsPerSlave);
    procStmt.execute();

    if (!tableDefinition.isIndexOrganizedTable()) {
      createKey(conn, KeyType.PRIMARY, tableDefinition);
    }
    createKey(conn, KeyType.UNIQUE, tableDefinition);
  }

  public static void createTable(Connection conn, String fileName,
      int parallelDegree, int rowsPerSlave) throws Exception {
    createTable(conn, fileName, parallelDegree, rowsPerSlave, false);
  }

  public static void createTable(Connection conn, String fileName,
      int parallelDegree, int rowsPerSlave,
      boolean dropTableIfExists) throws Exception {
    URL file = classLoader.getResource("oraoop/" + fileName);
    OracleTableDefinition tableDefinition = new OracleTableDefinition(file);
    if (dropTableIfExists) {
      dropTableIfExists(conn, tableDefinition.getTableName());
    }
    createTable(conn, tableDefinition, parallelDegree, rowsPerSlave);
  }

  private static void dropTableIfExists(Connection conn, String tableName) throws Exception {
    try (Statement stmt = conn.createStatement()) {
      stmt.execute("BEGIN EXECUTE IMMEDIATE 'DROP TABLE " + tableName
          + "'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END;");
    }
  }

}
