/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import net.minidev.json.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.TUtil;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.*;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;

public class FileTablespace extends Tablespace {

  public static final PathFilter hiddenFileFilter = new PathFilter() {
    public boolean accept(Path p) {
      String name = p.getName();
      return !name.startsWith("_") && !name.startsWith(".");
    }
  };
  private final Log LOG = LogFactory.getLog(FileTablespace.class);

  static final String OUTPUT_FILE_PREFIX="part-";
  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE =
      new ThreadLocal<NumberFormat>() {
        @Override
        public NumberFormat initialValue() {
          NumberFormat fmt = NumberFormat.getInstance();
          fmt.setGroupingUsed(false);
          fmt.setMinimumIntegerDigits(2);
          return fmt;
        }
      };
  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
      new ThreadLocal<NumberFormat>() {
        @Override
        public NumberFormat initialValue() {
          NumberFormat fmt = NumberFormat.getInstance();
          fmt.setGroupingUsed(false);
          fmt.setMinimumIntegerDigits(6);
          return fmt;
        }
      };

  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
      new ThreadLocal<NumberFormat>() {
        @Override
        public NumberFormat initialValue() {
          NumberFormat fmt = NumberFormat.getInstance();
          fmt.setGroupingUsed(false);
          fmt.setMinimumIntegerDigits(3);
          return fmt;
        }
      };

  private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false);
  private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true);

  protected FileSystem fs;
  protected Path spacePath;
  protected Path stagingRootPath;
  protected boolean blocksMetadataEnabled;
  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));

  public FileTablespace(String spaceName, URI uri, JSONObject config) {
    super(spaceName, uri, config);
  }

  @Override
  protected void storageInit() throws IOException {
    this.spacePath = new Path(uri);
    this.fs = spacePath.getFileSystem(conf);
    this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR)));
    this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());

    this.blocksMetadataEnabled =
        conf.getBoolean(DFS_HDFS_BLOCKS_METADATA_ENABLED, DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);

    if (!this.blocksMetadataEnabled) {
      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
    }
  }

  @Override
  public long getTableVolume(URI uri, Optional<EvalNode> filter) throws UnsupportedException {
    Path path = new Path(uri);
    ContentSummary summary;
    try {
      summary = fs.getContentSummary(path);
    } catch (IOException e) {
      throw new TajoInternalError(e);
    }
    return summary.getLength();
  }

  @Override
  public URI getRootUri() {
    return fs.getUri();
  }

  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
      throws IOException {
    FileStatus status = fs.getFileStatus(path);
    return getFileScanner(meta, schema, path, status);
  }

  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
      throws IOException {
    Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
    return getScanner(meta, schema, fragment, null);
  }

  public FileSystem getFileSystem() {
    return this.fs;
  }

  public void delete(Path tablePath) throws IOException {
    FileSystem fs = tablePath.getFileSystem(conf);
    fs.delete(tablePath, true);
  }

  public boolean exists(Path path) throws IOException {
    FileSystem fileSystem = path.getFileSystem(conf);
    return fileSystem.exists(path);
  }

  @Override
  public URI getTableUri(String databaseName, String tableName) {
    return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri();
  }

  @VisibleForTesting
  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
      throws IOException {
    return getAppender(null, null, meta, schema, filePath);
  }

  public FileFragment[] split(String tableName) throws IOException {
    Path tablePath = new Path(spacePath, tableName);
    return split(tableName, tablePath, fs.getDefaultBlockSize());
  }

  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
    Path tablePath = new Path(spacePath, tableName);
    return split(tableName, tablePath, fragmentSize);
  }

  public FileFragment[] split(Path tablePath) throws IOException {
    FileSystem fs = tablePath.getFileSystem(conf);
    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
  }

  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
    return split(tableName, tablePath, fs.getDefaultBlockSize());
  }

  private FileFragment[] split(String tableName, Path tablePath, long size)
      throws IOException {
    FileSystem fs = tablePath.getFileSystem(conf);

    long defaultBlockSize = size;
    List<FileFragment> listTablets = new ArrayList<FileFragment>();
    FileFragment tablet;

    FileStatus[] fileLists = fs.listStatus(tablePath);
    for (FileStatus file : fileLists) {
      long remainFileSize = file.getLen();
      long start = 0;
      if (remainFileSize > defaultBlockSize) {
        while (remainFileSize > defaultBlockSize) {
          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
          listTablets.add(tablet);
          start += defaultBlockSize;
          remainFileSize -= defaultBlockSize;
        }
        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
      } else {
        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
      }
    }

    FileFragment[] tablets = new FileFragment[listTablets.size()];
    listTablets.toArray(tablets);

    return tablets;
  }

  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
                                       Path tablePath, long size)
      throws IOException {
    FileSystem fs = tablePath.getFileSystem(conf);

    long defaultBlockSize = size;
    List<FileFragment> listTablets = new ArrayList<FileFragment>();
    FileFragment tablet;

    FileStatus[] fileLists = fs.listStatus(tablePath);
    for (FileStatus file : fileLists) {
      long remainFileSize = file.getLen();
      long start = 0;
      if (remainFileSize > defaultBlockSize) {
        while (remainFileSize > defaultBlockSize) {
          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
          listTablets.add(tablet);
          start += defaultBlockSize;
          remainFileSize -= defaultBlockSize;
        }
        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
      } else {
        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
      }
    }

    FileFragment[] tablets = new FileFragment[listTablets.size()];
    listTablets.toArray(tablets);

    return tablets;
  }

  public long calculateSize(Path tablePath) throws IOException {
    FileSystem fs = tablePath.getFileSystem(conf);
    long totalSize = 0;

    if (fs.exists(tablePath)) {
      totalSize = fs.getContentSummary(tablePath).getLength();
    }

    return totalSize;
  }

  /////////////////////////////////////////////////////////////////////////////
  // FileInputFormat Area
  /////////////////////////////////////////////////////////////////////////////
  public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
    if (taskAttemptId == null) {
      // For testcase
      return workDir;
    }
    // The final result of a task will be written in a file named part-ss-nnnnnnn,
    // where ss is the stage id associated with this task, and nnnnnn is the task id.
    Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
        OUTPUT_FILE_PREFIX +
            OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" +
            OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
    LOG.info("Output File Path: " + outFilePath);

    return outFilePath;
  }

  /**
   * Proxy PathFilter that accepts a path only if all filters given in the
   * constructor do. Used by the listPaths() to apply the built-in
   * hiddenFileFilter together with a user provided one (if any).
   */
  private static class MultiPathFilter implements PathFilter {
    private List<PathFilter> filters;

    public MultiPathFilter(List<PathFilter> filters) {
      this.filters = filters;
    }

    public boolean accept(Path path) {
      for (PathFilter filter : filters) {
        if (!filter.accept(path)) {
          return false;
        }
      }
      return true;
    }
  }

  /**
   * List input directories.
   * Subclasses may override to, e.g., select only files matching a regular
   * expression.
   *
   * @return array of FileStatus objects
   * @throws IOException if zero items.
   */
  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
    if (dirs.length == 0) {
      throw new IOException("No input paths specified in job");
    }

    List<IOException> errors = new ArrayList<IOException>();

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);

    PathFilter inputFilter = new MultiPathFilter(filters);

    for (int i = 0; i < dirs.length; ++i) {
      Path p = dirs[i];

      FileStatus[] matches = fs.globStatus(p, inputFilter);
      if (matches == null) {
        LOG.warn("Input path does not exist: " + p);
      } else if (matches.length == 0) {
        LOG.warn("Input Pattern " + p + " matches 0 files");
      } else {
        for (FileStatus globStat : matches) {
          if (globStat.isDirectory()) {
            for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {
              result.add(stat);
            }
          } else {
            result.add(globStat);
          }
        }
      }
    }

    if (!errors.isEmpty()) {
      throw new InvalidInputException(errors);
    }
    LOG.info("Total input paths to process : " + result.size());
    return result;
  }

  /**
   * Is the given filename splitable? Usually, true, but if the file is
   * stream compressed, it will not be.
   * <p/>
   * <code>FileInputFormat</code> implementations can override this and return
   * <code>false</code> to ensure that individual input files are never split-up
   * so that Mappers process entire files.
   *
   *
   * @param path the file name to check
   * @param status get the file length
   * @return is this file isSplittable?
   */
  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
    Scanner scanner = getFileScanner(meta, schema, path, status);
    boolean split = scanner.isSplittable();
    scanner.close();
    return split;
  }

  private static final double SPLIT_SLOP = 1.1;   // 10% slop

  protected int getBlockIndex(BlockLocation[] blkLocations,
                              long offset) {
    for (int i = 0; i < blkLocations.length; i++) {
      // is the offset inside this block?
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
        return i;
      }
    }
    BlockLocation last = blkLocations[blkLocations.length - 1];
    long fileLength = last.getOffset() + last.getLength() - 1;
    throw new IllegalArgumentException("Offset " + offset +
        " is outside of file (0.." +
        fileLength + ")");
  }

  /**
   * A factory that makes the split for this class. It can be overridden
   * by sub-classes to make sub-types
   */
  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
    return new FileFragment(fragmentId, file, start, length);
  }

  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
                                   String[] hosts) {
    return new FileFragment(fragmentId, file, start, length, hosts);
  }

  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
      throws IOException {
    return new FileFragment(fragmentId, file, blockLocation);
  }

  // for Non Splittable. eg, compressed gzip TextFile
  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
                                      BlockLocation[] blkLocations) throws IOException {

    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
    for (BlockLocation blockLocation : blkLocations) {
      for (String host : blockLocation.getHosts()) {
        if (hostsBlockMap.containsKey(host)) {
          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
        } else {
          hostsBlockMap.put(host, 1);
        }
      }
    }

    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {

      @Override
      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
        return v1.getValue().compareTo(v2.getValue());
      }
    });

    String[] hosts = new String[blkLocations[0].getHosts().length];

    for (int i = 0; i < hosts.length; i++) {
      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
      hosts[i] = entry.getKey();
    }
    return new FileFragment(fragmentId, file, start, length, hosts);
  }

  /**
   * Get the minimum split size
   *
   * @return the minimum number of bytes that can be in a split
   */
  public long getMinSplitSize() {
    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
  }

  /**
   * Get Disk Ids by Volume Bytes
   */
  private int[] getDiskIds(VolumeId[] volumeIds) {
    int[] diskIds = new int[volumeIds.length];
    for (int i = 0; i < volumeIds.length; i++) {
      int diskId = -1;
      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
      }
      diskIds[i] = diskId;
    }
    return diskIds;
  }

  /**
   * Generate the list of files and make them into FileSplits.
   *
   * @throws IOException
   */
  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
      throws IOException {
    // generate splits'

    List<Fragment> splits = Lists.newArrayList();
    List<Fragment> volumeSplits = Lists.newArrayList();
    List<BlockLocation> blockLocations = Lists.newArrayList();

    for (Path p : inputs) {
      ArrayList<FileStatus> files = Lists.newArrayList();
      if (fs.isFile(p)) {
        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
      } else {
        files.addAll(listStatus(p));
      }

      int previousSplitSize = splits.size();
      for (FileStatus file : files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length > 0) {
          // Get locations of blocks of file
          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
          boolean splittable = isSplittable(meta, schema, path, file);
          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {

            if (splittable) {
              for (BlockLocation blockLocation : blkLocations) {
                volumeSplits.add(makeSplit(tableName, path, blockLocation));
              }
              blockLocations.addAll(Arrays.asList(blkLocations));

            } else { // Non splittable
              long blockSize = blkLocations[0].getLength();
              if (blockSize >= length) {
                blockLocations.addAll(Arrays.asList(blkLocations));
                for (BlockLocation blockLocation : blkLocations) {
                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
                }
              } else {
                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
              }
            }

          } else {
            if (splittable) {

              long minSize = Math.max(getMinSplitSize(), 1);

              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
              long splitSize = Math.max(minSize, blockSize);
              long bytesRemaining = length;

              // for s3
              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
                    blkLocations[blkIndex].getHosts()));
                bytesRemaining -= splitSize;
              }
              if (bytesRemaining > 0) {
                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
                    blkLocations[blkIndex].getHosts()));
              }
            } else { // Non splittable
              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
            }
          }
        }
      }
      if(LOG.isDebugEnabled()){
        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
      }
    }

    // Combine original fileFragments with new VolumeId information
    setVolumeMeta(volumeSplits, blockLocations);
    splits.addAll(volumeSplits);
    LOG.info("Total # of splits: " + splits.size());
    return splits;
  }

  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
      throws IOException {

    int locationSize = blockLocations.size();
    int splitSize = splits.size();
    if (locationSize == 0 || splitSize == 0) return;

    if (locationSize != splitSize) {
      // splits and locations don't match up
      LOG.warn("Number of block locations not equal to number of splits: "
          + "#locations=" + locationSize
          + " #splits=" + splitSize);
      return;
    }

    DistributedFileSystem fs = (DistributedFileSystem) this.fs;
    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
    int blockLocationIdx = 0;

    Iterator<Fragment> iter = splits.iterator();
    while (locationSize > blockLocationIdx) {

      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
      //BlockStorageLocation containing additional volume location information for each replica of each block.
      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);

      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
        ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
        blockLocationIdx++;
      }
    }
    LOG.info("# of splits with volumeId " + splitSize);
  }

  private static class InvalidInputException extends IOException {
    List<IOException> errors;
    public InvalidInputException(List<IOException> errors) {
      this.errors = errors;
    }

    @Override
    public String getMessage(){
      StringBuffer sb = new StringBuffer();
      int messageLimit = Math.min(errors.size(), 10);
      for (int i = 0; i < messageLimit ; i ++) {
        sb.append(errors.get(i).getMessage()).append("\n");
      }

      if(messageLimit < errors.size())
        sb.append("skipped .....").append("\n");

      return sb.toString();
    }
  }

  @Override
  public List<Fragment> getSplits(String inputSourceId,
                                  TableDesc table,
                                  @Nullable EvalNode filterCondition) throws IOException {
    return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri()));
  }

  @Override
  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
    if (!tableDesc.isExternal()) {
      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
      String databaseName = splitted[0];
      String simpleTableName = splitted[1];

      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
      Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName);
      tableDesc.setUri(tablePath.toUri());
    } else {
      Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
    }

    Path path = new Path(tableDesc.getUri());

    FileSystem fs = path.getFileSystem(conf);
    TableStats stats = new TableStats();
    if (tableDesc.isExternal()) {
      if (!fs.exists(path)) {
        LOG.error(path.toUri() + " does not exist");
        throw new IOException("ERROR: " + path.toUri() + " does not exist");
      }
    } else {
      fs.mkdirs(path);
    }

    long totalSize = 0;

    try {
      totalSize = calculateSize(path);
    } catch (IOException e) {
      LOG.warn("Cannot calculate the size of the relation", e);
    }

    stats.setNumBytes(totalSize);

    if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
    }

    tableDesc.setStats(stats);
  }

  @Override
  public void purgeTable(TableDesc tableDesc) throws IOException {
    try {
      Path path = new Path(tableDesc.getUri());
      FileSystem fs = path.getFileSystem(conf);
      LOG.info("Delete table data dir: " + path);
      fs.delete(path, true);
    } catch (IOException e) {
      throw new InternalError(e.getMessage());
    }
  }

  @Override
  public StorageProperty getProperty() {
    return FileStorageProperties;
  }

  @Override
  public FormatProperty getFormatProperty(TableMeta meta) {
    return GeneralFileProperties;
  }

  @Override
  public void close() {
  }

  @Override
  public void prepareTable(LogicalNode node) throws IOException {
  }

  @Override
  public void rollbackTable(LogicalNode node) throws IOException {
  }

  @Override
  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");

    Path stagingDir;
    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
    // So, this query results won't be materialized as a part of a table.
    // The result will be temporarily written in the staging directory.
    if (outputPath.isEmpty()) {
      // for temporarily written in the storage directory
      stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
    } else {
      Tablespace space = TablespaceManager.get(outputPath);
      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
        // If this space allows move operation, the staging directory will be underneath the final output table uri.
        stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId));
      } else {
        stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
      }
    }

    return stagingDir.toUri();
  }

  // query submission directory is private!
  final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx--------
  public static final String TMP_STAGING_DIR_PREFIX = ".staging";

  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta)
      throws IOException {

    String realUser;
    String currentUser;
    UserGroupInformation ugi;
    ugi = UserGroupInformation.getLoginUser();
    realUser = ugi.getShortUserName();
    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();


    Path stagingDir = new Path(getStagingUri(context, queryId, meta));

    ////////////////////////////////////////////
    // Create Output Directory
    ////////////////////////////////////////////

    if (fs.exists(stagingDir)) {
      throw new IOException("The staging directory '" + stagingDir + "' already exists");
    }
    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
    FileStatus fsStatus = fs.getFileStatus(stagingDir);
    String owner = fsStatus.getOwner();

    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
      throw new IOException("The ownership on the user's query " +
          "directory " + stagingDir + " is not as expected. " +
          "It is owned by " + owner + ". The directory must " +
          "be owned by the submitter " + currentUser + " or " +
          "by " + realUser);
    }

    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
      LOG.info("Permissions on staging directory " + stagingDir + " are " +
          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
          "to correct value " + STAGING_DIR_PERMISSION);
      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
    }

    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
    fs.mkdirs(stagingResultDir);

    return stagingDir.toUri();
  }

  @Override
  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) {
  }

  @Override
  public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
                          Schema schema, TableDesc tableDesc) throws IOException {
    return commitOutputData(queryContext, true);
  }

  @Override
  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
                                          Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
      throws IOException {
    return null;
  }

  /**
   * Finalizes result data. Tajo stores result data in the staging directory.
   * If the query fails, clean up the staging directory.
   * Otherwise the query is successful, move to the final directory from the staging directory.
   *
   * @param queryContext The query property
   * @param changeFileSeq If true change result file name with max sequence.
   * @return Saved path
   * @throws java.io.IOException
   */
  protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException {
    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
    Path finalOutputDir;
    if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) {
      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI));
      try {
        FileSystem fs = stagingResultDir.getFileSystem(conf);

        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO

          // It moves the original table into the temporary location.
          // Then it moves the new result table into the original table location.
          // Upon failed, it recovers the original table if possible.
          boolean movedToOldTable = false;
          boolean committed = false;
          Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
          ContentSummary summary = fs.getContentSummary(stagingResultDir);

          // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not.
          boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED);

          // If existing data doesn't need to keep, check if there are some files.
          if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty())
            && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) {
            // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
            // renaming directory.
            Map<Path, Path> renameDirs = TUtil.newHashMap();
            // This is a map for recovering existing partition directory. A key is current directory and a value is
            // temporary directory to back up.
            Map<Path, Path> recoveryDirs = TUtil.newHashMap();

            try {
              if (!fs.exists(finalOutputDir)) {
                fs.mkdirs(finalOutputDir);
              }

              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
                  renameDirs, oldTableDir);

              // Rename target partition directories
              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
                // Backup existing data files for recovering
                if (fs.exists(entry.getValue())) {
                  String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
                      oldTableDir.toString());
                  Path recoveryPath = new Path(recoveryPathString);
                  fs.rename(entry.getValue(), recoveryPath);
                  fs.exists(recoveryPath);
                  recoveryDirs.put(entry.getValue(), recoveryPath);
                }
                // Delete existing directory
                fs.delete(entry.getValue(), true);
                // Rename staging directory to final output directory
                fs.rename(entry.getKey(), entry.getValue());
              }

            } catch (IOException ioe) {
              // Remove created dirs
              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
                fs.delete(entry.getValue(), true);
              }

              // Recovery renamed dirs
              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
                fs.delete(entry.getValue(), true);
                fs.rename(entry.getValue(), entry.getKey());
              }

              throw new IOException(ioe.getMessage());
            }
          } else { // no partition
            try {

              // if the final output dir exists, move all contents to the temporary table dir.
              // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
              if (fs.exists(finalOutputDir)) {
                fs.mkdirs(oldTableDir);

                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
                  fs.rename(status.getPath(), oldTableDir);
                }

                movedToOldTable = fs.exists(oldTableDir);
              } else { // if the parent does not exist, make its parent directory.
                fs.mkdirs(finalOutputDir);
              }

              // Move the results to the final output dir.
              for (FileStatus status : fs.listStatus(stagingResultDir)) {
                fs.rename(status.getPath(), finalOutputDir);
              }

              // Check the final output dir
              committed = fs.exists(finalOutputDir);

            } catch (IOException ioe) {
              // recover the old table
              if (movedToOldTable && !committed) {

                // if commit is failed, recover the old data
                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
                  fs.delete(status.getPath(), true);
                }

                for (FileStatus status : fs.listStatus(oldTableDir)) {
                  fs.rename(status.getPath(), finalOutputDir);
                }
              }

              throw new IOException(ioe.getMessage());
            }
          }
        } else {
          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);

          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table

            NumberFormat fmt = NumberFormat.getInstance();
            fmt.setGroupingUsed(false);
            fmt.setMinimumIntegerDigits(3);

            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
                if (eachFile.isFile()) {
                  LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
                  continue;
                }
                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
              }
            } else {
              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
                if (eachFile.getPath().getName().startsWith("_")) {
                  continue;
                }
                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
              }
            }
            // checking all file moved and remove empty dir
            verifyAllFileMoved(fs, stagingResultDir);
            FileStatus[] files = fs.listStatus(stagingResultDir);
            if (files != null && files.length != 0) {
              for (FileStatus eachFile: files) {
                LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
              }
            }
          } else { // CREATE TABLE AS SELECT (CTAS)
            if (fs.exists(finalOutputDir)) {
              for (FileStatus status : fs.listStatus(stagingResultDir)) {
                fs.rename(status.getPath(), finalOutputDir);
              }
            } else {
              fs.rename(stagingResultDir, finalOutputDir);
            }
            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
          }
        }

        // remove the staging directory if the final output dir is given.
        Path stagingDirRoot = stagingDir.getParent();
        fs.delete(stagingDirRoot, true);
      } catch (Throwable t) {
        LOG.error(t);
        throw new IOException(t);
      }
    } else {
      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
    }

    return finalOutputDir;
  }

  /**
   * Attach the sequence number to the output file name and than move the file into the final result path.
   *
   * @param fs FileSystem
   * @param stagingResultDir The staging result dir
   * @param fileStatus The file status
   * @param finalOutputPath Final output path
   * @param nf Number format
   * @param fileSeq The sequence number
   * @throws java.io.IOException
   */
  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
                                          FileStatus fileStatus, Path finalOutputPath,
                                          NumberFormat nf,
                                          int fileSeq, boolean changeFileSeq) throws IOException {
    if (fileStatus.isDirectory()) {
      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
      if (subPath != null) {
        Path finalSubPath = new Path(finalOutputPath, subPath);
        if (!fs.exists(finalSubPath)) {
          fs.mkdirs(finalSubPath);
        }
        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
          if (eachFile.getPath().getName().startsWith("_")) {
            continue;
          }
          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
        }
      } else {
        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
      }
    } else {
      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
      if (subPath != null) {
        Path finalSubPath = new Path(finalOutputPath, subPath);
        if (changeFileSeq) {
          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
        }
        if (!fs.exists(finalSubPath.getParent())) {
          fs.mkdirs(finalSubPath.getParent());
        }
        if (fs.exists(finalSubPath)) {
          throw new IOException("Already exists data file:" + finalSubPath);
        }
        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
        if (success) {
          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
              "to final output[" + finalSubPath + "]");
        } else {
          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
              "to final output[" + finalSubPath + "]");
        }
      }
    }
  }

  /**
   * Removes the path of the parent.
   * @param parentPath
   * @param childPath
   * @return
   */
  private String extractSubPath(Path parentPath, Path childPath) {
    String parentPathStr = parentPath.toUri().getPath();
    String childPathStr = childPath.toUri().getPath();

    if (parentPathStr.length() > childPathStr.length()) {
      return null;
    }

    int index = childPathStr.indexOf(parentPathStr);
    if (index != 0) {
      return null;
    }

    return childPathStr.substring(parentPathStr.length() + 1);
  }

  /**
   * Attach the sequence number to a path.
   *
   * @param path Path
   * @param seq sequence number
   * @param nf Number format
   * @return New path attached with sequence number
   * @throws java.io.IOException
   */
  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
    String[] tokens = path.getName().split("-");
    if (tokens.length != 4) {
      throw new IOException("Wrong result file name:" + path);
    }
    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
  }

  /**
   * Make sure all files are moved.
   * @param fs FileSystem
   * @param stagingPath The stagind directory
   * @return
   * @throws java.io.IOException
   */
  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
    FileStatus[] files = fs.listStatus(stagingPath);
    if (files != null && files.length != 0) {
      for (FileStatus eachFile: files) {
        if (eachFile.isFile()) {
          LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
          return false;
        } else {
          if (verifyAllFileMoved(fs, eachFile.getPath())) {
            fs.delete(eachFile.getPath(), false);
          } else {
            return false;
          }
        }
      }
    }

    return true;
  }

  /**
   * This method sets a rename map which includes renamed staging directory to final output directory recursively.
   * If there exists some data files, this delete it for duplicate data.
   *
   *
   * @param fs
   * @param stagingPath
   * @param outputPath
   * @param stagingParentPathString
   * @throws java.io.IOException
   */
  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
                                         String stagingParentPathString,
                                         Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
    FileStatus[] files = fs.listStatus(stagingPath);

    for(FileStatus eachFile : files) {
      if (eachFile.isDirectory()) {
        Path oldPath = eachFile.getPath();

        // Make recover directory.
        String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
            oldTableDir.toString());
        Path recoveryPath = new Path(recoverPathString);
        if (!fs.exists(recoveryPath)) {
          fs.mkdirs(recoveryPath);
        }

        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
            renameDirs, oldTableDir);
        // Find last order partition for renaming
        String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
            outputPath.toString());
        Path newPath = new Path(newPathString);
        if (!isLeafDirectory(fs, eachFile.getPath())) {
          renameDirs.put(eachFile.getPath(), newPath);
        } else {
          if (!fs.exists(newPath)) {
            fs.mkdirs(newPath);
          }
        }
      }
    }
  }

  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
    boolean retValue = false;

    FileStatus[] files = fs.listStatus(path);
    for (FileStatus file : files) {
      if (fs.isDirectory(file.getPath())) {
        retValue = true;
        break;
      }
    }

    return retValue;
  }


}
