package org.apache.carbondata.spark.testsuite.compaction

import java.io.{BufferedWriter, File, FileWriter}

import scala.collection.mutable.ListBuffer

import au.com.bytecode.opencsv.CSVWriter
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties


class TestHybridCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {

  val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath

  val csvPath1 =
    s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction1.csv"

  val csvPath2 =
    s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction2.csv"

  val tableName = "t1"


  override def beforeAll: Unit = {
    generateCSVFiles()
    CarbonProperties.getInstance()
      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "MM/dd/yyyy")
  }


  override def afterAll: Unit = {
    deleteCSVFiles()
  }


  override def beforeEach(): Unit = {
    dropTable()
    createTable()
  }


  override def afterEach(): Unit = {
        dropTable()
  }


  def generateCSVFiles(): Unit = {
    val rows1 = new ListBuffer[Array[String]]
    rows1 += Array("seq", "first", "last", "age", "city", "state", "date")
    rows1 += Array("1", "Augusta", "Nichols", "20", "Varasdo", "WA", "07/05/2003")
    rows1 += Array("2", "Luis", "Barnes", "39", "Oroaklim", "MT", "04/05/2048")
    rows1 += Array("3", "Leah", "Guzman", "54", "Culeosa", "KS", "02/23/1983")
    rows1 += Array("4", "Ian", "Ford", "61", "Rufado", "AL", "03/02/1995")
    rows1 += Array("5", "Fanny", "Horton", "37", "Rorlihbem", "CT", "05/12/1987")
    createCSV(rows1, csvPath1)

    val rows2 = new ListBuffer[Array[String]]
    rows2 += Array("seq", "first", "last", "age", "city", "state", "date")
    rows2 += Array("11", "Claudia", "Sullivan", "42", "Dilwuani", "ND", "09/01/2003")
    rows2 += Array("12", "Kate", "Adkins", "54", "Fokafrid", "WA", "10/13/2013")
    rows2 += Array("13", "Eliza", "Lynch", "23", "Bonpige", "ME", "05/02/2015")
    rows2 += Array("14", "Sarah", "Fleming", "60", "Duvugove", "IA", "04/15/2036")
    rows2 += Array("15", "Maude", "Bass", "44", "Ukozedka", "CT", "11/08/1988")
    createCSV(rows2, csvPath2)
  }


  def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
    val out = new BufferedWriter(new FileWriter(csvPath))
    val writer: CSVWriter = new CSVWriter(out)

    for (row <- rows) {
      writer.writeNext(row)
    }

    out.close()
    writer.close()
  }


  def deleteCSVFiles(): Unit = {
    try {
      FileUtils.forceDelete(new File(csvPath1))
      FileUtils.forceDelete(new File(csvPath2))
    }
    catch {
      case e: Exception =>
        e.printStackTrace()
        Assert.fail(e.getMessage)
    }
  }


  def createTable(): Unit = {
    sql(
      s"""
         | CREATE TABLE $tableName(seq int, first string, last string,
         |   age int, city string, state string, date date)
         | STORED BY 'carbondata'
         | TBLPROPERTIES(
         |   'sort_scope'='local_sort',
         |   'sort_columns'='state, age',
         |   'dateformat'='MM/dd/yyyy')
      """.stripMargin)
  }


  def loadUnsortedData(n : Int = 1): Unit = {
    for(_ <- 1 to n) {
      sql(
        s"""
           | LOAD DATA INPATH '$csvPath1' INTO TABLE $tableName
           | OPTIONS (
           |   'sort_scope'='no_sort')""".stripMargin)
    }
  }


  def loadSortedData(n : Int = 1): Unit = {
    for(_ <- 1 to n) {
      sql(
        s"""
           | LOAD DATA INPATH '$csvPath2' INTO TABLE $tableName
           | OPTIONS (
           |   'sort_scope'='local_sort')""".stripMargin)
    }
  }


  def dropTable(): Unit = {
    sql(s"DROP TABLE IF EXISTS $tableName")
  }


  test("SORTED LOADS") {
    loadSortedData(2)
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT state, age FROM $tableName").collect()
    out.map(_.get(0).toString) should
    equal(Array("CT", "CT", "IA", "IA", "ME", "ME", "ND", "ND", "WA", "WA"))
  }


  test("UNSORTED LOADS") {
    loadUnsortedData(2)
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT state, age FROM $tableName").collect()
    out.map(_.get(0).toString) should
    equal(Array("AL", "AL", "CT", "CT", "KS", "KS", "MT", "MT", "WA", "WA"))
  }


  test("MIXED LOADS") {
    loadSortedData()
    loadUnsortedData()
    sql(s"ALTER TABLE $tableName COMPACT 'major'")
    val out = sql(s"SELECT state, age FROM $tableName").collect()
    out.map(_.get(0).toString) should
    equal(Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA"))
    out.map(_.get(1).toString) should
    equal(Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
  }


  test("INSERT") {
    loadSortedData()
    loadUnsortedData()
    sql(
      s"""
         | INSERT INTO $tableName
         | VALUES('20', 'Naman', 'Rastogi', '23', 'Bengaluru', 'ZZ', '12/28/2018')
      """.stripMargin)
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT state FROM $tableName").collect()
    out.map(_.get(0).toString) should equal(
      Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA", "ZZ"))
  }


  test("UPDATE") {
    loadSortedData()
    loadUnsortedData()
    sql(s"UPDATE  $tableName SET (state)=('CT') WHERE seq='13'").collect()
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT state FROM $tableName WHERE seq='13'").collect()
    out.map(_.get(0).toString) should equal(Array("CT"))
  }

  test("DELETE") {
    loadSortedData()
    loadUnsortedData()
    sql(s"DELETE FROM $tableName WHERE seq='13'").collect()
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT state FROM $tableName").collect()
    out.map(_.get(0).toString) should equal(
      Array("AL", "CT", "CT", "IA", "KS", "MT", "ND", "WA", "WA"))
  }


  test("RESTRUCTURE TABLE REMOVE COLUMN NOT IN SORT_COLUMNS") {
    loadSortedData()
    loadUnsortedData()
    sql(s"ALTER TABLE $tableName DROP COLUMNS(city)")
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT age FROM $tableName").collect()
    out.map(_.get(0).toString) should equal(
      Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
  }


  test("RESTRUCTURE TABLE REMOVE COLUMN IN SORT_COLUMNS") {
    loadSortedData()
    loadUnsortedData()
    sql(s"ALTER TABLE $tableName DROP COLUMNS(state)")
    sql(s"ALTER TABLE $tableName COMPACT 'major'")

    val out = sql(s"SELECT age FROM $tableName").collect()
    out.map(_.get(0).toString) should equal(
      Array("20", "23", "37", "39", "42", "44", "54", "54", "60", "61"))
  }

}
