/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.storage;

import com.google.common.base.Optional;
import net.minidev.json.JSONObject;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Set;

/**
 * Tablespace manages the functions of storing and reading data.
 * Tablespace is a abstract class.
 * For supporting such as HDFS, HBASE, a specific Tablespace should be implemented by inheriting this class.
 *
 */
public abstract class Tablespace {

  protected final String name;
  protected final URI uri;
  protected final JSONObject config;
  /** this space is visible or not. */
  protected boolean visible = true;

  protected TajoConf conf;

  public Tablespace(String name, URI uri, JSONObject config) {
    this.name = name;
    this.uri = uri;
    this.config = config;
  }

  public JSONObject getConfig() {
    return config;
  }

  public void setVisible(boolean visible) {
    this.visible = visible;
  }

  public Set<String> getDependencies() {
    return Collections.emptySet();
  }

  /**
   * Initialize storage manager.
   * @throws java.io.IOException
   */
  protected abstract void storageInit() throws IOException;

  public String getName() {
    return name;
  }

  public URI getUri() {
    return uri;
  }

  public boolean isVisible() {
    return visible;
  }

  public String toString() {
    return name + "=" + uri.toString();
  }

  public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;

  /**
   * if {@link StorageProperty#isArbitraryPathAllowed} is true,
   * the storage allows arbitrary path accesses. In this case, the storage must provide the root URI.
   *
   * @see {@link StorageProperty#isArbitraryPathAllowed}
   * @return Root URI
   */
  public URI getRootUri() {
    throw new TajoRuntimeException(new UnsupportedException(String.format("artibrary path '%s'", uri.toString())));
  }

  /**
   * Get Table URI
   *
   * @param databaseName Database name
   * @param tableName Table name
   * @return Table URI
   */
  public abstract URI getTableUri(String databaseName, String tableName);

  /**
   * Returns the splits that will serve as input for the scan tasks. The
   * number of splits matches the number of regions in a table.
   * @param inputSourceId Input source identifier, which can be either relation name or execution block id
   * @param tableDesc The table description for the target data.
   * @param filterCondition filter condition which can prune splits if possible
   * @return The list of input fragments.
   * @throws java.io.IOException
   */
  public abstract List<Fragment> getSplits(String inputSourceId,
                                           TableDesc tableDesc,
                                           @Nullable EvalNode filterCondition) throws IOException, TajoException;

  /**
   * It returns the storage property.
   * @return The storage property
   */
  public abstract StorageProperty getProperty();

  public abstract FormatProperty getFormatProperty(TableMeta meta);

  /**
   * Release storage manager resource
   */
  public abstract void close();


  /**
   * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
   * In general Repartitioner determines the partition range using previous output statistics data.
   * In the special cases, such as HBase Repartitioner uses the result of this method.
   *
   * @param queryContext The current query context which contains query properties.
   * @param tableDesc The table description for the target data.
   * @param inputSchema The input schema
   * @param sortSpecs The sort specification that contains the sort column and sort order.
   * @return The list of sort ranges.
   * @throws java.io.IOException
   */
  public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
                                                   Schema inputSchema, SortSpec [] sortSpecs,
                                                   TupleRange dataRange) throws IOException;

  /**
   * It is called when the query failed.
   * Each storage manager should implement to be processed when the query fails in this method.
   *
   * @param node The child node of the root node.
   * @throws java.io.IOException
   */

  /**
   * Initialize Tablespace instance. It should be called before using.
   *
   * @param tajoConf
   * @throws java.io.IOException
   */
  public void init(TajoConf tajoConf) throws IOException {
    this.conf = new TajoConf(tajoConf);
    storageInit();
  }

  /**
   * Returns Scanner instance.
   *
   * @param meta The table meta
   * @param schema The input schema
   * @param fragment The fragment for scanning
   * @param target The output schema
   * @return Scanner instance
   * @throws java.io.IOException
   */
  public Scanner getScanner(TableMeta meta,
                            Schema schema,
                            Fragment fragment,
                            @Nullable Schema target) throws IOException {
    if (target == null) {
      target = schema;
    }

    if (fragment.isEmpty()) {
      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
      scanner.setTarget(target.toArray());

      return scanner;
    }

    Scanner scanner;

    Class<? extends Scanner> scannerClass = getScannerClass(meta.getDataFormat());
    scanner = OldStorageManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
    scanner.setTarget(target.toArray());

    return scanner;
  }

  public Appender getAppenderForInsertRow(OverridableConf queryContext,
                                          TaskAttemptId taskAttemptId,
                                          TableMeta meta,
                                          Schema schema,
                                          Path workDir) throws IOException {
    return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
  }

  /**
   * Returns Scanner instance.
   *
   * @param meta The table meta
   * @param schema The input schema
   * @param fragment The fragment for scanning
   * @param target The output schema
   * @return Scanner instance
   * @throws IOException
   */
  public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
                                                         Schema target) throws IOException {
    return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
  }

  /**
   * Returns Appender instance.
   * @param queryContext Query property.
   * @param taskAttemptId Task id.
   * @param meta Table meta data.
   * @param schema Output schema.
   * @param workDir Working directory
   * @return Appender instance
   * @throws java.io.IOException
   */
  public Appender getAppender(OverridableConf queryContext,
                              TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
      throws IOException {
    Appender appender;

    Class<? extends Appender> appenderClass;

    String handlerName = meta.getDataFormat().toLowerCase();
    appenderClass = OldStorageManager.APPENDER_HANDLER_CACHE.get(handlerName);
    if (appenderClass == null) {
      appenderClass = conf.getClass(
          String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
      OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
    }

    if (appenderClass == null) {
      throw new IOException("Unknown Storage Type: " + meta.getDataFormat());
    }

    appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);

    return appender;
  }

  /**
   * Return the Scanner class for the DataFormat that is defined in storage-default.xml.
   *
   * @param dataFormat store type
   * @return The Scanner class
   * @throws java.io.IOException
   */
  public Class<? extends Scanner> getScannerClass(String dataFormat) throws IOException {
    String handlerName = dataFormat.toLowerCase();
    Class<? extends Scanner> scannerClass = OldStorageManager.SCANNER_HANDLER_CACHE.get(handlerName);
    if (scannerClass == null) {
      scannerClass = conf.getClass(
          String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
      OldStorageManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
    }

    if (scannerClass == null) {
      throw new IOException("Unknown Storage Type: " + dataFormat);
    }

    return scannerClass;
  }

  /**
   * It is called after making logical plan. Storage manager should verify the schema for inserting.
   *
   * @param tableDesc The table description of insert target.
   * @param outSchema  The output schema of select query for inserting.
   * @throws java.io.IOException
   */
  public abstract void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException;

  /**
   * Rewrite the logical plan. It is assumed that the final plan will be given in this method.
   */
  public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoException {
    // nothing to do by default
  }

  ////////////////////////////////////////////////////////////////////////////
  // Table Lifecycle Section
  ////////////////////////////////////////////////////////////////////////////

  /**
   * This method is called after executing "CREATE TABLE" statement.
   * If a storage is a file based storage, a storage manager may create directory.
   *
   * @param tableDesc Table description which is created.
   * @param ifNotExists Creates the table only when the table does not exist.
   * @throws java.io.IOException
   */
  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException;

  /**
   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
   * which is the option to delete all the data.
   *
   * @param tableDesc
   * @throws java.io.IOException
   */
  public abstract void purgeTable(TableDesc tableDesc) throws IOException, TajoException;

  /**
   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
   * In general Tajo creates the target table after finishing the final sub-query of CATS.
   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
   * That kind of the storage should implements the logic related to creating table in this method.
   *
   * @param node The child node of the root node.
   * @throws java.io.IOException
   */
  public abstract void prepareTable(LogicalNode node) throws IOException, TajoException;

  /**
   * Finalizes result data. Tajo stores result data in the staging directory.
   * If the query fails, clean up the staging directory.
   * Otherwise the query is successful, move to the final directory from the staging directory.
   *
   * @param queryContext The query property
   * @param finalEbId The final execution block id
   * @param plan The query plan
   * @param schema The final output schema
   * @param tableDesc The description of the target table
   * @return Saved path
   * @throws java.io.IOException
   */
  public abstract Path commitTable(OverridableConf queryContext,
                                   ExecutionBlockId finalEbId,
                                   LogicalPlan plan, Schema schema,
                                   TableDesc tableDesc) throws IOException;

  public abstract void rollbackTable(LogicalNode node) throws IOException, TajoException;

  @Override
  public boolean equals(Object obj) {
    if (obj instanceof Tablespace) {
      Tablespace other = (Tablespace) obj;
      return name.equals(other.name) && uri.equals(other.uri);
    } else {
      return false;
    }
  }

  public abstract URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException;

  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
                                 TableMeta meta) throws IOException {
    throw new IOException("Staging the output result is not supported in this storage");
  }

  public MetadataProvider getMetadataProvider() {
    throw new TajoRuntimeException(new UnsupportedException("Linked Metadata Provider for " + name));
  }

  @SuppressWarnings("unused")
  public int markAccetablePlanPart(LogicalPlan plan) {
    throw new TajoRuntimeException(new UnsupportedException());
  }
}
