/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.view;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import com.google.gson.Gson;
import org.apache.log4j.Logger;

/**
 * It maintains all the mv schemas in it.
 */
@InterfaceAudience.Internal
public abstract class MVManager {

  private static final Logger LOGGER =
      LogServiceFactory.getLogService(MVManager.class.getName());

  private final MVProvider schemaProvider = new MVProvider();

  private volatile MVCatalog<?> catalog;

  private final Object lock = new Object();

  public MVManager() {

  }

  public abstract List<String> getDatabases();

  public abstract String getDatabaseLocation(String databaseName);

  public boolean hasSchemaOnTable(CarbonTable table) throws IOException {
    List<MVSchema> schemas = getSchemas();
    for (MVSchema schema : schemas) {
      for (RelationIdentifier relatedTable : schema.getRelatedTables()) {
        if (relatedTable.getDatabaseName().equalsIgnoreCase(table.getDatabaseName()) &&
            relatedTable.getTableName().equalsIgnoreCase(table.getTableName())) {
          return true;
        }
      }
    }
    return false;
  }

  /**
   * It gives all mv schemas of a given table.
   * For show mv command.
   */
  public List<MVSchema> getSchemasOnTable(CarbonTable table)
      throws IOException {
    List<MVSchema> schemasOnTable = new ArrayList<>();
    List<MVSchema> schemas = getSchemas();
    for (MVSchema schema : schemas) {
      boolean isSchemaOnTable = false;
      for (RelationIdentifier relatedTable : schema.getRelatedTables()) {
        if (relatedTable.getDatabaseName().equalsIgnoreCase(table.getDatabaseName()) &&
            relatedTable.getTableName().equalsIgnoreCase(table.getTableName())) {
          isSchemaOnTable = true;
          break;
        }
      }
      if (isSchemaOnTable) {
        schemasOnTable.add(schema);
      }
    }
    return schemasOnTable;
  }

  /**
   * It gives all mv schemas of a given table.
   * For show mv command.
   */
  public List<MVSchema> getSchemasOnTable(String databaseName,
                                          CarbonTable carbonTable) throws IOException {
    return schemaProvider.getSchemas(this, databaseName, carbonTable);
  }

  /**
   * It gives all mv schemas from store.
   */
  public List<MVSchema> getSchemas() throws IOException {
    List<MVSchema> schemas = new ArrayList<>();
    for (String database : this.getDatabases()) {
      schemas.addAll(this.getSchemas(database));
    }
    return schemas;
  }

  /**
   * It gives all mv schemas from store.
   */
  public List<MVSchema> getSchemas(String databaseName) throws IOException {
    return schemaProvider.getSchemas(this, databaseName);
  }

  public MVSchema getSchema(String databaseName, String viewName) throws IOException {
    return schemaProvider.getSchema(this, databaseName, viewName);
  }

  /**
   * Saves the mv schema to storage
   *
   * @param viewSchema mv schema
   */
  public void createSchema(String databaseName, MVSchema viewSchema)
      throws IOException {
    schemaProvider.saveSchema(this, databaseName, viewSchema);
  }

  /**
   * Drops the mv schema from storage
   *
   * @param viewName index name
   */
  public void deleteSchema(String databaseName, String viewName) throws IOException {
    schemaProvider.dropSchema(this, databaseName, viewName);
  }

  /**
   * Get the mv catalog.
   */
  public MVCatalog<?> getCatalog() {
    return catalog;
  }

  /**
   * Get the mv catalog.
   */
  public MVCatalog<?> getCatalog(
      MVCatalogFactory<?> catalogFactory,
      boolean reload) throws IOException {
    MVCatalog<?> catalog = this.catalog;
    if (reload || catalog == null) {
      synchronized (lock) {
        catalog = this.catalog;
        if (reload || catalog == null) {
          catalog = catalogFactory.newCatalog();
          List<MVSchema> schemas = getSchemas();
          if (null == catalog) {
            throw new RuntimeException("Internal Error.");
          }
          for (MVSchema schema : schemas) {
            try {
              catalog.registerSchema(schema);
            } catch (Exception e) {
              // Ignore the schema
              LOGGER.error("Error while registering schema", e);
            }
          }
          this.catalog = catalog;
        }
      }
    }
    return catalog;
  }

  /**
   * In case of compaction on mv table,this method will merge the segment list of main table
   * and return updated segment mapping
   *
   * @param mergedLoadName      to find which all segments are merged to new compacted segment
   * @param viewSchema       of mv table
   * @param viewLoadMetadataDetails of mv table
   * @return updated segment map after merging segment list
   */
  @SuppressWarnings("unchecked")
  public static String getUpdatedSegmentMap(String mergedLoadName,
      MVSchema viewSchema,
      LoadMetadataDetails[] viewLoadMetadataDetails) {
    Map<String, List<String>> segmentMapping = new HashMap<>();
    List<RelationIdentifier> relationIdentifiers = viewSchema.getRelatedTables();
    for (RelationIdentifier relationIdentifier : relationIdentifiers) {
      for (LoadMetadataDetails loadMetadataDetail : viewLoadMetadataDetails) {
        if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
          if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) {
            Map segmentMap = new Gson().fromJson(loadMetadataDetail.getExtraInfo(), Map.class);
            if (segmentMapping.isEmpty()) {
              segmentMapping.putAll(segmentMap);
            } else {
              segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
                  + relationIdentifier.getTableName()).addAll(
                  (List<String>) segmentMap.get(
                      relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
                          + relationIdentifier.getTableName()));
            }
          }
        }
      }
    }
    Gson gson = new Gson();
    return gson.toJson(segmentMapping);
  }

  /**
   * Get enabled mv status details
   */
  public List<MVStatusDetail> getEnabledStatusDetails() throws IOException {
    List<MVStatusDetail> statusDetails = new ArrayList<>();
    for (String database : this.getDatabases()) {
      statusDetails.addAll(this.getEnabledStatusDetails(database));
    }
    return statusDetails;
  }

  /**
   * Get enabled mv status details
   */
  List<MVStatusDetail> getEnabledStatusDetails(String databaseName)
      throws IOException {
    List<MVStatusDetail> statusDetails = schemaProvider.getStatusDetails(this, databaseName);
    List<MVStatusDetail> enabledStatusDetails = new ArrayList<>(statusDetails.size());
    for (MVStatusDetail statusDetail : statusDetails) {
      if (statusDetail.getStatus() == MVStatus.ENABLED) {
        enabledStatusDetails.add(statusDetail);
      }
    }
    return enabledStatusDetails;
  }

  public void setStatus(RelationIdentifier viewIdentifier, MVStatus viewStatus)
      throws IOException {
    MVSchema schema = getSchema(
        viewIdentifier.getDatabaseName(), viewIdentifier.getTableName());
    if (schema != null) {
      schemaProvider.updateStatus(this, Collections.singletonList(schema), viewStatus);
    }
  }

  public void setStatus(List<MVSchema> viewSchemas, MVStatus viewStatus)
      throws IOException {
    if (viewSchemas != null && !viewSchemas.isEmpty()) {
      schemaProvider.updateStatus(this, viewSchemas, viewStatus);
    }
  }

  public void onDrop(String databaseName, String viewName)
      throws IOException {
    MVSchema viewSchema = getSchema(databaseName, viewName);
    if (viewSchema != null) {
      schemaProvider.updateStatus(
          this, Collections.singletonList(viewSchema), MVStatus.DROPPED);
    }
  }

  /**
   * This method will remove all segments of MV table in case of Insert-Overwrite/Update/Delete
   * operations on main table
   *
   * @param schemas mv schemas
   */
  public void onTruncate(List<MVSchema> schemas)
      throws IOException {
    for (MVSchema schema : schemas) {
      if (!schema.isRefreshOnManual()) {
        setStatus(schema.identifier, MVStatus.DISABLED);
      }
      RelationIdentifier relationIdentifier = schema.getIdentifier();
      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier
          .from(relationIdentifier.getTablePath(),
              relationIdentifier.getDatabaseName(),
              relationIdentifier.getTableName()));
      ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
      try {
        if (carbonLock.lockWithRetries()) {
          LOGGER.info("Acquired lock for table" + relationIdentifier.getDatabaseName() + "."
              + relationIdentifier.getTableName() + " for table status update");
          String metaDataPath =
              CarbonTablePath.getMetadataPath(relationIdentifier.getTablePath());
          LoadMetadataDetails[] loadMetadataDetails =
              SegmentStatusManager.readLoadMetadata(metaDataPath);
          for (LoadMetadataDetails entry : loadMetadataDetails) {
            entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
          }
          SegmentStatusManager.writeLoadDetailsIntoFile(
              CarbonTablePath.getTableStatusFilePath(relationIdentifier.getTablePath()),
              loadMetadataDetails);
        } else {
          LOGGER.error("Not able to acquire the lock for Table status update for table "
              + relationIdentifier.getDatabaseName() + "." + relationIdentifier
              .getTableName());
        }
      } finally {
        if (carbonLock.unlock()) {
          LOGGER.info(
              "Table unlocked successfully after table status update" + relationIdentifier
                  .getDatabaseName() + "." + relationIdentifier.getTableName());
        } else {
          LOGGER.error(
              "Unable to unlock Table lock for table" + relationIdentifier.getDatabaseName()
                  + "." + relationIdentifier.getTableName()
                  + " during table status update");
        }
      }
    }
  }

}
