/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.command.management

import java.io.{File, IOException}
import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CompactionModel}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.AlterTableUtil

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, StreamHandoffRDD}
import org.apache.carbondata.streaming.segment.StreamSegment

/**
 * Command for the compaction in alter table command
 */
case class CarbonAlterTableCompactionCommand(
    alterTableModel: AlterTableModel,
    tableInfoOp: Option[TableInfo] = None,
    val operationContext: OperationContext = new OperationContext ) extends AtomicRunnableCommand {

  var table: CarbonTable = _

  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    val tableName = alterTableModel.tableName.toLowerCase
    val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
    setAuditTable(dbName, tableName)
    if (alterTableModel.customSegmentIds.nonEmpty) {
      setAuditInfo(Map("segmentIds" -> alterTableModel.customSegmentIds.get.mkString(", ")))
    }
    table = if (tableInfoOp.isDefined) {
      CarbonTable.buildFromTableInfo(tableInfoOp.get)
    } else {
      val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore
        .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
      if (relation == null) {
        throw new NoSuchTableException(dbName, tableName)
      }
      if (null == relation.carbonTable) {
        LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
        throw new NoSuchTableException(dbName, tableName)
      }
      relation.carbonTable
    }
    if (!table.getTableInfo.isTransactionalTable) {
      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
    }
    if (CarbonUtil.hasAggregationDataMap(table) ||
        (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
      // If the compaction request is of 'streaming' type then we need to generate loadCommands
      // for all the child datamaps in the LoadMetadataEvent. Therefore setting isCompaction=false.
      // If set to true then only loadCommands for compaction will be created.
      val loadMetadataEvent =
        if (alterTableModel.compactionType.equalsIgnoreCase(CompactionType.STREAMING.name())) {
          new LoadMetadataEvent(table, false, Map.empty[String, String].asJava)
        } else {
          new LoadMetadataEvent(table, true, Map.empty[String, String].asJava)
        }
      OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
    }
    Seq.empty
  }

  override def processData(sparkSession: SparkSession): Seq[Row] = {
    if (SegmentStatusManager.isOverwriteInProgressInTable(table)) {
      throw new ConcurrentOperationException(table, "insert overwrite", "compaction")
    }
    var compactionType: CompactionType = null
    try {
      compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
    } catch {
      case _: Exception =>
        throw new MalformedCarbonCommandException(
          "Unsupported alter operation on carbon table")
    }
    if (compactionType == CompactionType.UPGRADE_SEGMENT) {
      val tableStatusLock = CarbonLockFactory
        .getCarbonLockObj(table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
      try {
        if (tableStatusLock.lockWithRetries()) {
          val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
            .getTableStatusFilePath(table.getTablePath))
          loadMetaDataDetails.foreach { loadMetaDataDetail =>
            // "0" check is added to reproduce a scenario similar to 1.1 store where the size
            // would be null. For test case in the new version it would be set to 0.
            if (loadMetaDataDetail.getIndexSize == null || loadMetaDataDetail.getDataSize == null
            || loadMetaDataDetail.getIndexSize == "0" || loadMetaDataDetail.getDataSize == "0") {
              CarbonLoaderUtil
                .addDataIndexSizeIntoMetaEntry(loadMetaDataDetail, loadMetaDataDetail.getLoadName,
                  table)
            }
          }
          SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath
            .getTableStatusFilePath(table.getTablePath), loadMetaDataDetails)
        } else {
          throw new ConcurrentOperationException(table.getDatabaseName,
            table.getTableName, "table status updation", "upgrade segments")
        }
      } finally {
        tableStatusLock.unlock()
      }
      Seq.empty
    } else if (compactionType == CompactionType.SEGMENT_INDEX) {
      if (table.isStreamingSink) {
        throw new MalformedCarbonCommandException(
          "Unsupported alter operation on carbon table: Merge index is not supported on streaming" +
          " table")
      }
      val version = CarbonUtil.getFormatVersion(table)
      val isOlderVersion = version == ColumnarFormatVersion.V1 ||
                           version == ColumnarFormatVersion.V2
      if (isOlderVersion) {
        throw new MalformedCarbonCommandException(
          "Unsupported alter operation on carbon table: Merge index is not supported on V1 V2 " +
          "store segments")
      }

      val alterTableMergeIndexEvent: AlterTableMergeIndexEvent =
        AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
      OperationListenerBus.getInstance
        .fireEvent(alterTableMergeIndexEvent, operationContext)
      Seq.empty
    } else {

      if (compactionType != CompactionType.CUSTOM &&
        alterTableModel.customSegmentIds.isDefined) {
        throw new MalformedCarbonCommandException(
          s"Custom segments not supported when doing ${compactionType.toString} compaction")
      }
      if (compactionType == CompactionType.CUSTOM &&
        alterTableModel.customSegmentIds.isEmpty) {
        throw new MalformedCarbonCommandException(
          s"Segment ids should not be empty when doing ${compactionType.toString} compaction")
      }

      val carbonLoadModel = new CarbonLoadModel()
      carbonLoadModel.setTableName(table.getTableName)
      val dataLoadSchema = new CarbonDataLoadSchema(table)
      // Need to fill dimension relation
      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
      carbonLoadModel.setTableName(table.getTableName)
      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
      carbonLoadModel.setDatabaseName(table.getDatabaseName)
      carbonLoadModel.setTablePath(table.getTablePath)
      val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
        .getOrElse(CarbonCommonConstants.COMPRESSOR,
          CompressorFactory.getInstance().getCompressor.getName)
      carbonLoadModel.setColumnCompressor(columnCompressor)

      var storeLocation = System.getProperty("java.io.tmpdir")
      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
      // trigger event for compaction
      val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
        AlterTableCompactionPreEvent(sparkSession, table, null, null)
      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
      val compactedSegments: java.util.List[String] = new util.ArrayList[String]()
      try {
        alterTableForCompaction(
          sparkSession.sqlContext,
          alterTableModel,
          carbonLoadModel,
          storeLocation,
          compactedSegments,
          operationContext)
      } catch {
        case e: Exception =>
          if (null != e.getMessage) {
            CarbonException.analysisException(
              s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
          } else {
            CarbonException.analysisException(
              "Exception in compaction. Please check logs for more info.")
          }
      }
      // trigger event for compaction
      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
        AlterTableCompactionPostEvent(sparkSession, table, null, compactedSegments)
      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
      Seq.empty
    }
  }

  private def alterTableForCompaction(sqlContext: SQLContext,
      alterTableModel: AlterTableModel,
      carbonLoadModel: CarbonLoadModel,
      storeLocation: String,
      compactedSegments: java.util.List[String],
      operationContext: OperationContext): Unit = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
    val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
      if (alterTableModel.segmentUpdateStatusManager.isDefined) {
        carbonLoadModel.setSegmentUpdateStatusManager(
          alterTableModel.segmentUpdateStatusManager.get)
        carbonLoadModel.setLoadMetadataDetails(
          alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
      }
    }

    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable

    if (null == carbonLoadModel.getLoadMetadataDetails) {
      carbonLoadModel.readAndSetLoadMetadataDetails()
    }

    if (compactionType == CompactionType.STREAMING) {
      StreamHandoffRDD.startStreamingHandoffThread(
        carbonLoadModel,
        operationContext,
        sqlContext.sparkSession, true)
      return
    }

    if (compactionType == CompactionType.CLOSE_STREAMING) {
      closeStreamingTable(
        carbonLoadModel,
        operationContext,
        sqlContext.sparkSession)
      return
    }

    // reading the start time of data load.
    val loadStartTime: Long =
      if (alterTableModel.factTimeStamp.isEmpty) {
        CarbonUpdateUtil.readCurrentTime
      } else {
        alterTableModel.factTimeStamp.get
      }
    carbonLoadModel.setFactTimeStamp(loadStartTime)

    val isCompactionTriggerByDDl = true
    val segmentIds: Option[List[String]] = if (compactionType == CompactionType.CUSTOM &&
      alterTableModel.customSegmentIds.isDefined) {
      val ids = alterTableModel.customSegmentIds
      ids match {
        case Some(x) =>
          val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
          val otherLoadDetails = loadMetadataDetails
            .exists(p => x.exists(a => a.equalsIgnoreCase(p.getLoadName)) && !p.isCarbonFormat)
          if (otherLoadDetails) {
            throw new AnalysisException(s"Custom compaction does not support other format segment")
          }
        case None =>
      }
      ids
    } else {
      None
    }
    val compactionModel = CompactionModel(compactionSize,
      compactionType,
      carbonTable,
      isCompactionTriggerByDDl,
      CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
      TableIdentifier(carbonTable.getTableName,
      Some(carbonTable.getDatabaseName))),
      segmentIds
    )

    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
      )
      .equalsIgnoreCase("true")

    // if system level compaction is enabled then only one compaction can run in the system
    // if any other request comes at this time then it will create a compaction request file.
    // so that this will be taken up by the compaction process which is executing.
    if (!isConcurrentCompactionAllowed) {
      LOGGER.info("System level compaction lock is enabled.")
      CarbonDataRDDFactory.handleCompactionForSystemLocking(
        sqlContext,
        carbonLoadModel,
        storeLocation,
        compactionType,
        carbonTable,
        compactedSegments,
        compactionModel,
        operationContext
      )
    } else {
      // normal flow of compaction
      val lock = CarbonLockFactory.getCarbonLockObj(
        carbonTable.getAbsoluteTableIdentifier,
        LockUsage.COMPACTION_LOCK)
      val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
        .getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
      try {
        // COMPACTION_LOCK and UPDATE_LOCK are already locked when start to execute update sql,
        // so it don't need to require locks again when compactionType is IUD_UPDDEL_DELTA.
        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
          if (!updateLock.lockWithRetries(3, 3)) {
            throw new ConcurrentOperationException(carbonTable, "update", "compaction")
          }
          if (!lock.lockWithRetries()) {
            LOGGER.error(s"Not able to acquire the compaction lock for table " +
                         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
            CarbonException.analysisException(
              "Table is already locked for compaction. Please try after some time.")
          } else {
            LOGGER.info("Acquired the compaction lock for table " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
          }
        }
        CarbonDataRDDFactory.startCompactionThreads(
          sqlContext,
          carbonLoadModel,
          storeLocation,
          compactionModel,
          lock,
          compactedSegments,
          operationContext
        )
      } catch {
        case e: Exception =>
          LOGGER.error(s"Exception in start compaction thread.", e)
          if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
            lock.unlock()
          }
          throw e
      } finally {
        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
          updateLock.unlock()
        }
        DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
      }
    }
  }

  def closeStreamingTable(
      carbonLoadModel: CarbonLoadModel,
      operationContext: OperationContext,
      sparkSession: SparkSession
  ): Unit = {
    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    // 1. delete the lock of streaming.lock, forcing the stream to be closed
    val streamingLock = CarbonLockFactory.getCarbonLockObj(
      carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier,
      LockUsage.STREAMING_LOCK)
    val lockFile =
      FileFactory.getCarbonFile(streamingLock.getLockFilePath, FileFactory.getConfiguration)
    if (lockFile.exists()) {
      if (!lockFile.delete()) {
        LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath)
      }
    }
    try {
      if (streamingLock.lockWithRetries()) {
        // 2. convert segment status from "streaming" to "streaming finish"
        StreamSegment.finishStreaming(carbonTable)
        // 3. iterate to handoff all streaming segment to batch segment
        StreamHandoffRDD.iterateStreamingHandoff(carbonLoadModel, operationContext, sparkSession)
        val tableIdentifier =
          new TableIdentifier(carbonTable.getTableName, Option(carbonTable.getDatabaseName))
        // 4. modify table to normal table
        AlterTableUtil.modifyTableProperties(
          tableIdentifier,
          Map("streaming" -> "false"),
          Seq.empty,
          true)(sparkSession,
          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
        // 5. remove checkpoint
        FileFactory.deleteAllFilesOfDir(
          new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))
        FileFactory.deleteAllFilesOfDir(
          new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
      } else {
        val msg = "Failed to close streaming table, because streaming is locked for table " +
                  carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
        LOGGER.error(msg)
        throw new IOException(msg)
      }
    } finally {
      if (streamingLock.unlock()) {
        LOGGER.info("Table unlocked successfully after streaming finished" +
                    carbonTable.getDatabaseName() + "." + carbonTable.getTableName())
      } else {
        LOGGER.error("Unable to unlock Table lock for table " +
                     carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
                     " during streaming finished")
      }
    }
  }

  override protected def opName: String = {
    s"ALTER TABLE COMPACTION ${alterTableModel.compactionType.toUpperCase}"
  }
}
