/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.core.view;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import com.google.gson.Gson;
import org.apache.log4j.Logger;

/**
 * It maintains all the mv schemas in it.
 */
@InterfaceAudience.Internal
public abstract class MVManager {

  private static final Logger LOGGER =
      LogServiceFactory.getLogService(MVManager.class.getName());

  private final MVProvider schemaProvider = new MVProvider();

  private volatile MVCatalog<?> catalog;

  private final Object lock = new Object();

  public MVManager() {

  }

  public abstract List<String> getDatabases();

  public abstract String getDatabaseLocation(String databaseName);

  public boolean hasSchemaOnTable(CarbonTable table) {
    return !table.getMVTablesMap().isEmpty();
  }

  public boolean isMVInSyncWithParentTables(MVSchema mvSchema) throws IOException {
    return schemaProvider.isViewCanBeEnabled(mvSchema, true);
  }

  /**
   * It gives all mv schemas of a given table.
   * For show mv command.
   */
  public List<MVSchema> getSchemasOnTable(CarbonTable table)
      throws IOException {
    return getSchemas(table.getMVTablesMap());
  }

  /**
   * It gives all mv schemas of a given table.
   * For show mv command.
   */
  public List<MVSchema> getSchemasOnTable(String databaseName,
                                          CarbonTable carbonTable) throws IOException {
    return schemaProvider.getSchemas(this, databaseName, carbonTable);
  }

  /**
   * It gives all mv schemas from store.
   */
  public List<MVSchema> getSchemas() throws IOException {
    List<MVSchema> schemas = new ArrayList<>();
    for (String database : this.getDatabases()) {
      try {
        schemas.addAll(this.getSchemas(database));
      } catch (IOException ex) {
        throw ex;
      } catch (Exception ex) {
        LOGGER.error("Exception Occurred: Skipping MV schemas from database: " + database);
        if (LOGGER.isDebugEnabled()) {
          LOGGER.debug(ex.getMessage());
        }
      }
    }
    return schemas;
  }

  /**
   * It gives all mv schemas from given databases in the store
   */
  public List<MVSchema> getSchemas(Map<String, List<String>> mvTablesMap) throws IOException {
    List<MVSchema> schemas = new ArrayList<>();
    for (Map.Entry<String, List<String>> databaseEntry : mvTablesMap.entrySet()) {
      String database = databaseEntry.getKey();
      List<String> mvTables = databaseEntry.getValue();
      for (String mvTable : mvTables) {
        try {
          schemas.add(this.getSchema(database, mvTable));
        } catch (IOException ex) {
          LOGGER.error("Error while fetching MV schema " + mvTable + " from database: " + database);
          throw ex;
        } catch (Exception ex) {
          LOGGER.error(
              "Exception Occurred: Skipping MV schema " + mvTable + " from database: " + database);
          if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(ex.getMessage());
          }
        }
      }
    }
    return schemas;
  }

  /**
   * It gives all mv schemas from store.
   */
  public List<MVSchema> getSchemas(String databaseName) throws IOException {
    return schemaProvider.getSchemas(this, databaseName);
  }

  public MVSchema getSchema(String databaseName, String viewName) throws IOException {
    return schemaProvider.getSchema(this, databaseName, viewName, false);
  }

  public MVSchema getSchema(String databaseName, String viewName, boolean isRegisterMV)
      throws IOException {
    return schemaProvider.getSchema(this, databaseName, viewName, isRegisterMV);
  }

  /**
   * Saves the mv schema to storage
   *
   * @param viewSchema mv schema
   */
  public void createSchema(String databaseName, MVSchema viewSchema)
      throws IOException {
    schemaProvider.saveSchema(this, databaseName, viewSchema);
  }

  /**
   * Drops the mv schema from storage
   *
   * @param viewName mv name
   */
  public void deleteSchema(String databaseName, String viewName) throws IOException {
    schemaProvider.dropSchema(this, databaseName, viewName);
  }

  /**
   * Get the mv catalog.
   */
  public MVCatalog<?> getCatalog() {
    return catalog;
  }

  /**
   * Get the mv catalog.
   */
  public MVCatalog<?> getCatalog(
      MVCatalogFactory<?> catalogFactory,
      List<MVSchema> currSchemas) throws IOException {
    MVCatalog<?> catalog = this.catalog;
    synchronized (lock) {
      catalog = this.catalog;
      if (catalog == null) {
        catalog = catalogFactory.newCatalog();
      }
      List<MVSchema> schemas = getSchemas();
      if (schemas.size() == currSchemas.size() && currSchemas.containsAll(schemas)) {
        return catalog;
      }
      for (MVSchema schema : schemas) {
        try {
          // register the schemas that are not already present in catalog.
          if (!currSchemas.contains(schema)) {
            catalog.registerSchema(schema);
          }
        } catch (Exception e) {
          // Ignore the schema
          LOGGER.error(
              "Error while registering schema for mv: " + schema.getIdentifier().getTableName());
          if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(e.getMessage());
          }
        }
      }
      for (MVSchema currSchema : currSchemas) {
        try {
          // deregister the schemas from catalog if not present in the path.
          if (!schemas.contains(currSchema)) {
            catalog.deregisterSchema(currSchema.getIdentifier());
          }
        } catch (Exception e) {
          // Ignore the schema
          LOGGER.error("Error while deregistering schema for mv: " + currSchema.getIdentifier()
              .getTableName());
          if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(e.getMessage());
          }
        }
      }
      this.catalog = catalog;
    }
    return catalog;
  }

  /**
   * In case of compaction on mv table,this method will merge the segment list of main table
   * and return updated segment mapping
   *
   * @param mergedLoadName      to find which all segments are merged to new compacted segment
   * @param viewSchema       of mv table
   * @param viewLoadMetadataDetails of mv table
   * @return updated segment map after merging segment list
   */
  @SuppressWarnings("unchecked")
  public static String getUpdatedSegmentMap(String mergedLoadName,
      MVSchema viewSchema,
      LoadMetadataDetails[] viewLoadMetadataDetails) {
    Map<String, List<String>> segmentMapping = new HashMap<>();
    List<RelationIdentifier> relationIdentifiers = viewSchema.getRelatedTables();
    for (RelationIdentifier relationIdentifier : relationIdentifiers) {
      for (LoadMetadataDetails loadMetadataDetail : viewLoadMetadataDetails) {
        if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
          if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) {
            Map segmentMap = new Gson().fromJson(loadMetadataDetail.getExtraInfo(), Map.class);
            if (segmentMapping.isEmpty()) {
              segmentMapping.putAll(segmentMap);
            } else {
              segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
                  + relationIdentifier.getTableName()).addAll(
                  (List<String>) segmentMap.get(
                      relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
                          + relationIdentifier.getTableName()));
            }
          }
        }
      }
    }
    Gson gson = new Gson();
    return gson.toJson(segmentMapping);
  }

  /**
   * Get enabled mv status details
   */
  public List<MVStatusDetail> getEnabledStatusDetails() throws IOException {
    List<MVStatusDetail> statusDetails = new ArrayList<>();
    for (String database : this.getDatabases()) {
      try {
        statusDetails.addAll(this.getEnabledStatusDetails(database));
      } catch (IOException ex) {
        throw ex;
      } catch (Exception ex) {
        if (LOGGER.isDebugEnabled()) {
          LOGGER.debug(ex.getMessage());
        }
      }
    }
    return statusDetails;
  }

  /**
   * Get enabled mv status details
   */
  List<MVStatusDetail> getEnabledStatusDetails(String databaseName)
      throws IOException {
    List<MVStatusDetail> statusDetails = schemaProvider.getStatusDetails(this, databaseName);
    List<MVStatusDetail> enabledStatusDetails = new ArrayList<>(statusDetails.size());
    for (MVStatusDetail statusDetail : statusDetails) {
      if (statusDetail.getStatus() == MVStatus.ENABLED) {
        enabledStatusDetails.add(statusDetail);
      }
    }
    return enabledStatusDetails;
  }

  public void setStatus(RelationIdentifier viewIdentifier, MVStatus viewStatus)
      throws IOException {
    MVSchema schema = getSchema(
        viewIdentifier.getDatabaseName(), viewIdentifier.getTableName());
    if (schema != null) {
      schemaProvider.updateStatus(this, Collections.singletonList(schema), viewStatus);
    }
  }

  public void setStatus(List<MVSchema> viewSchemas, MVStatus viewStatus)
      throws IOException {
    if (viewSchemas != null && !viewSchemas.isEmpty()) {
      schemaProvider.updateStatus(this, viewSchemas, viewStatus);
    }
  }

  public void onDrop(String databaseName, String viewName)
      throws IOException {
    MVSchema viewSchema = getSchema(databaseName, viewName);
    if (viewSchema != null) {
      schemaProvider.updateStatus(
          this, Collections.singletonList(viewSchema), MVStatus.DROPPED);
    }
  }

  /**
   * This method will remove all segments of MV table in case of Insert-Overwrite/Update/Delete
   * operations on main table
   *
   * @param schemas mv schemas
   */
  public void onTruncate(List<MVSchema> schemas)
      throws IOException {
    for (MVSchema schema : schemas) {
      if (!schema.isRefreshOnManual()) {
        setStatus(schema.identifier, MVStatus.DISABLED);
      }
      RelationIdentifier relationIdentifier = schema.getIdentifier();
      CarbonTable carbonTable = CarbonMetadata.getInstance()
          .getCarbonTable(relationIdentifier.getDatabaseName(), relationIdentifier.getTableName());
      String tblStatusVersion = "";
      if (null != carbonTable) {
        tblStatusVersion = carbonTable.getTableStatusVersion();
      }
      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier
          .from(relationIdentifier.getTablePath(),
              relationIdentifier.getDatabaseName(),
              relationIdentifier.getTableName()), tblStatusVersion);
      ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
      try {
        if (carbonLock.lockWithRetries()) {
          LOGGER.info("Acquired lock for table" + relationIdentifier.getDatabaseName() + "."
              + relationIdentifier.getTableName() + " for table status update");
          String metaDataPath =
              CarbonTablePath.getMetadataPath(relationIdentifier.getTablePath());
          LoadMetadataDetails[] loadMetadataDetails =
              SegmentStatusManager.readLoadMetadata(metaDataPath, tblStatusVersion);
          for (LoadMetadataDetails entry : loadMetadataDetails) {
            entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
          }
          SegmentStatusManager.writeLoadDetailsIntoFile(
              CarbonTablePath.getTableStatusFilePath(relationIdentifier.getTablePath(),
                  tblStatusVersion), loadMetadataDetails);
        } else {
          LOGGER.error("Not able to acquire the lock for Table status update for table "
              + relationIdentifier.getDatabaseName() + "." + relationIdentifier
              .getTableName());
        }
      } finally {
        if (carbonLock.unlock()) {
          LOGGER.info(
              "Table unlocked successfully after table status update" + relationIdentifier
                  .getDatabaseName() + "." + relationIdentifier.getTableName());
        } else {
          LOGGER.error(
              "Unable to unlock Table lock for table" + relationIdentifier.getDatabaseName()
                  + "." + relationIdentifier.getTableName()
                  + " during table status update");
        }
      }
    }
  }

}
