| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import net.minidev.json.JSONObject; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.*; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.tajo.*; |
| import org.apache.tajo.catalog.*; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.exception.TajoInternalError; |
| import org.apache.tajo.exception.UnsupportedException; |
| import org.apache.tajo.plan.LogicalPlan; |
| import org.apache.tajo.plan.expr.EvalNode; |
| import org.apache.tajo.plan.logical.LogicalNode; |
| import org.apache.tajo.plan.logical.NodeType; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.storage.fragment.Fragment; |
| import org.apache.tajo.util.Bytes; |
| import org.apache.tajo.util.TUtil; |
| |
| import javax.annotation.Nullable; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.text.NumberFormat; |
| import java.util.*; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; |
| |
| public class FileTablespace extends Tablespace { |
| |
| public static final PathFilter hiddenFileFilter = new PathFilter() { |
| public boolean accept(Path p) { |
| String name = p.getName(); |
| return !name.startsWith("_") && !name.startsWith("."); |
| } |
| }; |
| private final Log LOG = LogFactory.getLog(FileTablespace.class); |
| |
| static final String OUTPUT_FILE_PREFIX="part-"; |
| static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE = |
| new ThreadLocal<NumberFormat>() { |
| @Override |
| public NumberFormat initialValue() { |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(2); |
| return fmt; |
| } |
| }; |
| static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = |
| new ThreadLocal<NumberFormat>() { |
| @Override |
| public NumberFormat initialValue() { |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(6); |
| return fmt; |
| } |
| }; |
| |
| static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = |
| new ThreadLocal<NumberFormat>() { |
| @Override |
| public NumberFormat initialValue() { |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(3); |
| return fmt; |
| } |
| }; |
| |
| private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true, false); |
| private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true); |
| |
| protected FileSystem fs; |
| protected Path spacePath; |
| protected Path stagingRootPath; |
| protected boolean blocksMetadataEnabled; |
| private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); |
| |
| public FileTablespace(String spaceName, URI uri, JSONObject config) { |
| super(spaceName, uri, config); |
| } |
| |
| @Override |
| protected void storageInit() throws IOException { |
| this.spacePath = new Path(uri); |
| this.fs = spacePath.getFileSystem(conf); |
| this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR))); |
| this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString()); |
| |
| this.blocksMetadataEnabled = |
| conf.getBoolean(DFS_HDFS_BLOCKS_METADATA_ENABLED, DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); |
| |
| if (!this.blocksMetadataEnabled) { |
| LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); |
| } |
| } |
| |
| @Override |
| public long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException { |
| Path path = new Path(table.getUri()); |
| ContentSummary summary; |
| try { |
| summary = fs.getContentSummary(path); |
| } catch (IOException e) { |
| throw new TajoInternalError(e); |
| } |
| return summary.getLength(); |
| } |
| |
| @Override |
| public URI getRootUri() { |
| return fs.getUri(); |
| } |
| |
| public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) |
| throws IOException { |
| FileStatus status = fs.getFileStatus(path); |
| return getFileScanner(meta, schema, path, status); |
| } |
| |
| public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) |
| throws IOException { |
| Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); |
| return getScanner(meta, schema, fragment, null); |
| } |
| |
| public FileSystem getFileSystem() { |
| return this.fs; |
| } |
| |
| public void delete(Path tablePath) throws IOException { |
| FileSystem fs = tablePath.getFileSystem(conf); |
| fs.delete(tablePath, true); |
| } |
| |
| public boolean exists(Path path) throws IOException { |
| FileSystem fileSystem = path.getFileSystem(conf); |
| return fileSystem.exists(path); |
| } |
| |
| @Override |
| public URI getTableUri(String databaseName, String tableName) { |
| return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); |
| } |
| |
| @VisibleForTesting |
| public Appender getAppender(TableMeta meta, Schema schema, Path filePath) |
| throws IOException { |
| return getAppender(null, null, meta, schema, filePath); |
| } |
| |
| public FileFragment[] split(String tableName) throws IOException { |
| Path tablePath = new Path(spacePath, tableName); |
| return split(tableName, tablePath, fs.getDefaultBlockSize()); |
| } |
| |
| public FileFragment[] split(String tableName, long fragmentSize) throws IOException { |
| Path tablePath = new Path(spacePath, tableName); |
| return split(tableName, tablePath, fragmentSize); |
| } |
| |
| public FileFragment[] split(Path tablePath) throws IOException { |
| FileSystem fs = tablePath.getFileSystem(conf); |
| return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); |
| } |
| |
| public FileFragment[] split(String tableName, Path tablePath) throws IOException { |
| return split(tableName, tablePath, fs.getDefaultBlockSize()); |
| } |
| |
| private FileFragment[] split(String tableName, Path tablePath, long size) |
| throws IOException { |
| FileSystem fs = tablePath.getFileSystem(conf); |
| |
| long defaultBlockSize = size; |
| List<FileFragment> listTablets = new ArrayList<FileFragment>(); |
| FileFragment tablet; |
| |
| FileStatus[] fileLists = fs.listStatus(tablePath); |
| for (FileStatus file : fileLists) { |
| long remainFileSize = file.getLen(); |
| long start = 0; |
| if (remainFileSize > defaultBlockSize) { |
| while (remainFileSize > defaultBlockSize) { |
| tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); |
| listTablets.add(tablet); |
| start += defaultBlockSize; |
| remainFileSize -= defaultBlockSize; |
| } |
| listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); |
| } else { |
| listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); |
| } |
| } |
| |
| FileFragment[] tablets = new FileFragment[listTablets.size()]; |
| listTablets.toArray(tablets); |
| |
| return tablets; |
| } |
| |
| public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, |
| Path tablePath, long size) |
| throws IOException { |
| FileSystem fs = tablePath.getFileSystem(conf); |
| |
| long defaultBlockSize = size; |
| List<FileFragment> listTablets = new ArrayList<FileFragment>(); |
| FileFragment tablet; |
| |
| FileStatus[] fileLists = fs.listStatus(tablePath); |
| for (FileStatus file : fileLists) { |
| long remainFileSize = file.getLen(); |
| long start = 0; |
| if (remainFileSize > defaultBlockSize) { |
| while (remainFileSize > defaultBlockSize) { |
| tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); |
| listTablets.add(tablet); |
| start += defaultBlockSize; |
| remainFileSize -= defaultBlockSize; |
| } |
| listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); |
| } else { |
| listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); |
| } |
| } |
| |
| FileFragment[] tablets = new FileFragment[listTablets.size()]; |
| listTablets.toArray(tablets); |
| |
| return tablets; |
| } |
| |
| public long calculateSize(Path tablePath) throws IOException { |
| FileSystem fs = tablePath.getFileSystem(conf); |
| long totalSize = 0; |
| |
| if (fs.exists(tablePath)) { |
| totalSize = fs.getContentSummary(tablePath).getLength(); |
| } |
| |
| return totalSize; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| // FileInputFormat Area |
| ///////////////////////////////////////////////////////////////////////////// |
| public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) { |
| if (taskAttemptId == null) { |
| // For testcase |
| return workDir; |
| } |
| // The final result of a task will be written in a file named part-ss-nnnnnnn, |
| // where ss is the stage id associated with this task, and nnnnnn is the task id. |
| Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, |
| OUTPUT_FILE_PREFIX + |
| OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + |
| OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + |
| OUTPUT_FILE_FORMAT_SEQ.get().format(0)); |
| LOG.info("Output File Path: " + outFilePath); |
| |
| return outFilePath; |
| } |
| |
| /** |
| * Proxy PathFilter that accepts a path only if all filters given in the |
| * constructor do. Used by the listPaths() to apply the built-in |
| * hiddenFileFilter together with a user provided one (if any). |
| */ |
| private static class MultiPathFilter implements PathFilter { |
| private List<PathFilter> filters; |
| |
| public MultiPathFilter(List<PathFilter> filters) { |
| this.filters = filters; |
| } |
| |
| public boolean accept(Path path) { |
| for (PathFilter filter : filters) { |
| if (!filter.accept(path)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| } |
| |
| /** |
| * List input directories. |
| * Subclasses may override to, e.g., select only files matching a regular |
| * expression. |
| * |
| * @return array of FileStatus objects |
| * @throws IOException if zero items. |
| */ |
| protected List<FileStatus> listStatus(Path... dirs) throws IOException { |
| List<FileStatus> result = new ArrayList<FileStatus>(); |
| if (dirs.length == 0) { |
| throw new IOException("No input paths specified in job"); |
| } |
| |
| List<IOException> errors = new ArrayList<IOException>(); |
| |
| // creates a MultiPathFilter with the hiddenFileFilter and the |
| // user provided one (if any). |
| List<PathFilter> filters = new ArrayList<PathFilter>(); |
| filters.add(hiddenFileFilter); |
| |
| PathFilter inputFilter = new MultiPathFilter(filters); |
| |
| for (int i = 0; i < dirs.length; ++i) { |
| Path p = dirs[i]; |
| |
| FileStatus[] matches = fs.globStatus(p, inputFilter); |
| if (matches == null) { |
| LOG.warn("Input path does not exist: " + p); |
| } else if (matches.length == 0) { |
| LOG.warn("Input Pattern " + p + " matches 0 files"); |
| } else { |
| for (FileStatus globStat : matches) { |
| if (globStat.isDirectory()) { |
| for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { |
| result.add(stat); |
| } |
| } else { |
| result.add(globStat); |
| } |
| } |
| } |
| } |
| |
| if (!errors.isEmpty()) { |
| throw new InvalidInputException(errors); |
| } |
| LOG.info("Total input paths to process : " + result.size()); |
| return result; |
| } |
| |
| /** |
| * Is the given filename splitable? Usually, true, but if the file is |
| * stream compressed, it will not be. |
| * <p/> |
| * <code>FileInputFormat</code> implementations can override this and return |
| * <code>false</code> to ensure that individual input files are never split-up |
| * so that Mappers process entire files. |
| * |
| * |
| * @param path the file name to check |
| * @param status get the file length |
| * @return is this file isSplittable? |
| */ |
| protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { |
| Scanner scanner = getFileScanner(meta, schema, path, status); |
| boolean split = scanner.isSplittable(); |
| scanner.close(); |
| return split; |
| } |
| |
| private static final double SPLIT_SLOP = 1.1; // 10% slop |
| |
| protected int getBlockIndex(BlockLocation[] blkLocations, |
| long offset) { |
| for (int i = 0; i < blkLocations.length; i++) { |
| // is the offset inside this block? |
| if ((blkLocations[i].getOffset() <= offset) && |
| (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { |
| return i; |
| } |
| } |
| BlockLocation last = blkLocations[blkLocations.length - 1]; |
| long fileLength = last.getOffset() + last.getLength() - 1; |
| throw new IllegalArgumentException("Offset " + offset + |
| " is outside of file (0.." + |
| fileLength + ")"); |
| } |
| |
| /** |
| * A factory that makes the split for this class. It can be overridden |
| * by sub-classes to make sub-types |
| */ |
| protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { |
| return new FileFragment(fragmentId, file, start, length); |
| } |
| |
| protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, |
| String[] hosts) { |
| return new FileFragment(fragmentId, file, start, length, hosts); |
| } |
| |
| protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) |
| throws IOException { |
| return new FileFragment(fragmentId, file, blockLocation); |
| } |
| |
| // for Non Splittable. eg, compressed gzip TextFile |
| protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, |
| BlockLocation[] blkLocations) throws IOException { |
| |
| Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); |
| for (BlockLocation blockLocation : blkLocations) { |
| for (String host : blockLocation.getHosts()) { |
| if (hostsBlockMap.containsKey(host)) { |
| hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); |
| } else { |
| hostsBlockMap.put(host, 1); |
| } |
| } |
| } |
| |
| List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); |
| Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { |
| |
| @Override |
| public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { |
| return v1.getValue().compareTo(v2.getValue()); |
| } |
| }); |
| |
| String[] hosts = new String[blkLocations[0].getHosts().length]; |
| |
| for (int i = 0; i < hosts.length; i++) { |
| Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); |
| hosts[i] = entry.getKey(); |
| } |
| return new FileFragment(fragmentId, file, start, length, hosts); |
| } |
| |
| /** |
| * Get the minimum split size |
| * |
| * @return the minimum number of bytes that can be in a split |
| */ |
| public long getMinSplitSize() { |
| return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); |
| } |
| |
| /** |
| * Get Disk Ids by Volume Bytes |
| */ |
| private int[] getDiskIds(VolumeId[] volumeIds) { |
| int[] diskIds = new int[volumeIds.length]; |
| for (int i = 0; i < volumeIds.length; i++) { |
| int diskId = -1; |
| if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { |
| diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); |
| } |
| diskIds[i] = diskId; |
| } |
| return diskIds; |
| } |
| |
| /** |
| * Generate the list of files and make them into FileSplits. |
| * |
| * @throws IOException |
| */ |
| public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) |
| throws IOException { |
| // generate splits' |
| |
| List<Fragment> splits = Lists.newArrayList(); |
| List<Fragment> volumeSplits = Lists.newArrayList(); |
| List<BlockLocation> blockLocations = Lists.newArrayList(); |
| |
| for (Path p : inputs) { |
| ArrayList<FileStatus> files = Lists.newArrayList(); |
| if (fs.isFile(p)) { |
| files.addAll(Lists.newArrayList(fs.getFileStatus(p))); |
| } else { |
| files.addAll(listStatus(p)); |
| } |
| |
| int previousSplitSize = splits.size(); |
| for (FileStatus file : files) { |
| Path path = file.getPath(); |
| long length = file.getLen(); |
| if (length > 0) { |
| // Get locations of blocks of file |
| BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); |
| boolean splittable = isSplittable(meta, schema, path, file); |
| if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { |
| |
| if (splittable) { |
| for (BlockLocation blockLocation : blkLocations) { |
| volumeSplits.add(makeSplit(tableName, path, blockLocation)); |
| } |
| blockLocations.addAll(Arrays.asList(blkLocations)); |
| |
| } else { // Non splittable |
| long blockSize = blkLocations[0].getLength(); |
| if (blockSize >= length) { |
| blockLocations.addAll(Arrays.asList(blkLocations)); |
| for (BlockLocation blockLocation : blkLocations) { |
| volumeSplits.add(makeSplit(tableName, path, blockLocation)); |
| } |
| } else { |
| splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); |
| } |
| } |
| |
| } else { |
| if (splittable) { |
| |
| long minSize = Math.max(getMinSplitSize(), 1); |
| |
| long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one |
| long splitSize = Math.max(minSize, blockSize); |
| long bytesRemaining = length; |
| |
| // for s3 |
| while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { |
| int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); |
| splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, |
| blkLocations[blkIndex].getHosts())); |
| bytesRemaining -= splitSize; |
| } |
| if (bytesRemaining > 0) { |
| int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); |
| splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, |
| blkLocations[blkIndex].getHosts())); |
| } |
| } else { // Non splittable |
| splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); |
| } |
| } |
| } |
| } |
| if(LOG.isDebugEnabled()){ |
| LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); |
| } |
| } |
| |
| // Combine original fileFragments with new VolumeId information |
| setVolumeMeta(volumeSplits, blockLocations); |
| splits.addAll(volumeSplits); |
| LOG.info("Total # of splits: " + splits.size()); |
| return splits; |
| } |
| |
| private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) |
| throws IOException { |
| |
| int locationSize = blockLocations.size(); |
| int splitSize = splits.size(); |
| if (locationSize == 0 || splitSize == 0) return; |
| |
| if (locationSize != splitSize) { |
| // splits and locations don't match up |
| LOG.warn("Number of block locations not equal to number of splits: " |
| + "#locations=" + locationSize |
| + " #splits=" + splitSize); |
| return; |
| } |
| |
| DistributedFileSystem fs = (DistributedFileSystem) this.fs; |
| int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); |
| int blockLocationIdx = 0; |
| |
| Iterator<Fragment> iter = splits.iterator(); |
| while (locationSize > blockLocationIdx) { |
| |
| int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); |
| List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); |
| //BlockStorageLocation containing additional volume location information for each replica of each block. |
| BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); |
| |
| for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { |
| ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); |
| blockLocationIdx++; |
| } |
| } |
| LOG.info("# of splits with volumeId " + splitSize); |
| } |
| |
| private static class InvalidInputException extends IOException { |
| List<IOException> errors; |
| public InvalidInputException(List<IOException> errors) { |
| this.errors = errors; |
| } |
| |
| @Override |
| public String getMessage(){ |
| StringBuffer sb = new StringBuffer(); |
| int messageLimit = Math.min(errors.size(), 10); |
| for (int i = 0; i < messageLimit ; i ++) { |
| sb.append(errors.get(i).getMessage()).append("\n"); |
| } |
| |
| if(messageLimit < errors.size()) |
| sb.append("skipped .....").append("\n"); |
| |
| return sb.toString(); |
| } |
| } |
| |
| @Override |
| public List<Fragment> getSplits(String inputSourceId, |
| TableDesc table, |
| @Nullable EvalNode filterCondition) throws IOException { |
| return getSplits(inputSourceId, table.getMeta(), table.getSchema(), new Path(table.getUri())); |
| } |
| |
| @Override |
| public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { |
| if (!tableDesc.isExternal()) { |
| String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); |
| String databaseName = splitted[0]; |
| String simpleTableName = splitted[1]; |
| |
| // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) |
| Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName); |
| tableDesc.setUri(tablePath.toUri()); |
| } else { |
| Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given."); |
| } |
| |
| Path path = new Path(tableDesc.getUri()); |
| |
| FileSystem fs = path.getFileSystem(conf); |
| TableStats stats = new TableStats(); |
| if (tableDesc.isExternal()) { |
| if (!fs.exists(path)) { |
| LOG.error(path.toUri() + " does not exist"); |
| throw new IOException("ERROR: " + path.toUri() + " does not exist"); |
| } |
| } else { |
| fs.mkdirs(path); |
| } |
| |
| long totalSize = 0; |
| |
| try { |
| totalSize = calculateSize(path); |
| } catch (IOException e) { |
| LOG.warn("Cannot calculate the size of the relation", e); |
| } |
| |
| stats.setNumBytes(totalSize); |
| |
| if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. |
| stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); |
| } |
| |
| tableDesc.setStats(stats); |
| } |
| |
| @Override |
| public void purgeTable(TableDesc tableDesc) throws IOException { |
| try { |
| Path path = new Path(tableDesc.getUri()); |
| FileSystem fs = path.getFileSystem(conf); |
| LOG.info("Delete table data dir: " + path); |
| fs.delete(path, true); |
| } catch (IOException e) { |
| throw new InternalError(e.getMessage()); |
| } |
| } |
| |
| @Override |
| public StorageProperty getProperty() { |
| return FileStorageProperties; |
| } |
| |
| @Override |
| public FormatProperty getFormatProperty(TableMeta meta) { |
| return GeneralFileProperties; |
| } |
| |
| @Override |
| public void close() { |
| } |
| |
| @Override |
| public void prepareTable(LogicalNode node) throws IOException { |
| } |
| |
| @Override |
| public void rollbackTable(LogicalNode node) throws IOException { |
| } |
| |
| @Override |
| public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { |
| String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); |
| |
| Path stagingDir; |
| // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO |
| // So, this query results won't be materialized as a part of a table. |
| // The result will be temporarily written in the staging directory. |
| if (outputPath.isEmpty()) { |
| // for temporarily written in the storage directory |
| stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); |
| } else { |
| Tablespace space = TablespaceManager.get(outputPath); |
| if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation |
| // If this space allows move operation, the staging directory will be underneath the final output table uri. |
| stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId)); |
| } else { |
| stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); |
| } |
| } |
| |
| return stagingDir.toUri(); |
| } |
| |
| // query submission directory is private! |
| final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx-------- |
| public static final String TMP_STAGING_DIR_PREFIX = ".staging"; |
| |
| public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta) |
| throws IOException { |
| |
| String realUser; |
| String currentUser; |
| UserGroupInformation ugi; |
| ugi = UserGroupInformation.getLoginUser(); |
| realUser = ugi.getShortUserName(); |
| currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); |
| |
| |
| Path stagingDir = new Path(getStagingUri(context, queryId, meta)); |
| |
| //////////////////////////////////////////// |
| // Create Output Directory |
| //////////////////////////////////////////// |
| |
| if (fs.exists(stagingDir)) { |
| throw new IOException("The staging directory '" + stagingDir + "' already exists"); |
| } |
| fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); |
| FileStatus fsStatus = fs.getFileStatus(stagingDir); |
| String owner = fsStatus.getOwner(); |
| |
| if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { |
| throw new IOException("The ownership on the user's query " + |
| "directory " + stagingDir + " is not as expected. " + |
| "It is owned by " + owner + ". The directory must " + |
| "be owned by the submitter " + currentUser + " or " + |
| "by " + realUser); |
| } |
| |
| if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { |
| LOG.info("Permissions on staging directory " + stagingDir + " are " + |
| "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + |
| "to correct value " + STAGING_DIR_PERMISSION); |
| fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); |
| } |
| |
| Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); |
| fs.mkdirs(stagingResultDir); |
| |
| return stagingDir.toUri(); |
| } |
| |
| @Override |
| public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) { |
| } |
| |
| @Override |
| public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, |
| Schema schema, TableDesc tableDesc) throws IOException { |
| return commitOutputData(queryContext, true); |
| } |
| |
| @Override |
| public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, |
| Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) |
| throws IOException { |
| return null; |
| } |
| |
| /** |
| * Finalizes result data. Tajo stores result data in the staging directory. |
| * If the query fails, clean up the staging directory. |
| * Otherwise the query is successful, move to the final directory from the staging directory. |
| * |
| * @param queryContext The query property |
| * @param changeFileSeq If true change result file name with max sequence. |
| * @return Saved path |
| * @throws java.io.IOException |
| */ |
| protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { |
| Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); |
| Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); |
| Path finalOutputDir; |
| if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { |
| finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); |
| try { |
| FileSystem fs = stagingResultDir.getFileSystem(conf); |
| |
| if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO |
| |
| // It moves the original table into the temporary location. |
| // Then it moves the new result table into the original table location. |
| // Upon failed, it recovers the original table if possible. |
| boolean movedToOldTable = false; |
| boolean committed = false; |
| Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); |
| ContentSummary summary = fs.getContentSummary(stagingResultDir); |
| |
| // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. |
| boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); |
| |
| // If existing data doesn't need to keep, check if there are some files. |
| if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) |
| && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { |
| // This is a map for existing non-leaf directory to rename. A key is current directory and a value is |
| // renaming directory. |
| Map<Path, Path> renameDirs = TUtil.newHashMap(); |
| // This is a map for recovering existing partition directory. A key is current directory and a value is |
| // temporary directory to back up. |
| Map<Path, Path> recoveryDirs = TUtil.newHashMap(); |
| |
| try { |
| if (!fs.exists(finalOutputDir)) { |
| fs.mkdirs(finalOutputDir); |
| } |
| |
| visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), |
| renameDirs, oldTableDir); |
| |
| // Rename target partition directories |
| for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { |
| // Backup existing data files for recovering |
| if (fs.exists(entry.getValue())) { |
| String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), |
| oldTableDir.toString()); |
| Path recoveryPath = new Path(recoveryPathString); |
| fs.rename(entry.getValue(), recoveryPath); |
| fs.exists(recoveryPath); |
| recoveryDirs.put(entry.getValue(), recoveryPath); |
| } |
| // Delete existing directory |
| fs.delete(entry.getValue(), true); |
| // Rename staging directory to final output directory |
| fs.rename(entry.getKey(), entry.getValue()); |
| } |
| |
| } catch (IOException ioe) { |
| // Remove created dirs |
| for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { |
| fs.delete(entry.getValue(), true); |
| } |
| |
| // Recovery renamed dirs |
| for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { |
| fs.delete(entry.getValue(), true); |
| fs.rename(entry.getValue(), entry.getKey()); |
| } |
| |
| throw new IOException(ioe.getMessage()); |
| } |
| } else { // no partition |
| try { |
| |
| // if the final output dir exists, move all contents to the temporary table dir. |
| // Otherwise, just make the final output dir. As a result, the final output dir will be empty. |
| if (fs.exists(finalOutputDir)) { |
| fs.mkdirs(oldTableDir); |
| |
| for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { |
| fs.rename(status.getPath(), oldTableDir); |
| } |
| |
| movedToOldTable = fs.exists(oldTableDir); |
| } else { // if the parent does not exist, make its parent directory. |
| fs.mkdirs(finalOutputDir); |
| } |
| |
| // Move the results to the final output dir. |
| for (FileStatus status : fs.listStatus(stagingResultDir)) { |
| fs.rename(status.getPath(), finalOutputDir); |
| } |
| |
| // Check the final output dir |
| committed = fs.exists(finalOutputDir); |
| |
| } catch (IOException ioe) { |
| // recover the old table |
| if (movedToOldTable && !committed) { |
| |
| // if commit is failed, recover the old data |
| for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { |
| fs.delete(status.getPath(), true); |
| } |
| |
| for (FileStatus status : fs.listStatus(oldTableDir)) { |
| fs.rename(status.getPath(), finalOutputDir); |
| } |
| } |
| |
| throw new IOException(ioe.getMessage()); |
| } |
| } |
| } else { |
| String queryType = queryContext.get(QueryVars.COMMAND_TYPE); |
| |
| if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table |
| |
| NumberFormat fmt = NumberFormat.getInstance(); |
| fmt.setGroupingUsed(false); |
| fmt.setMinimumIntegerDigits(3); |
| |
| if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { |
| for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { |
| if (eachFile.isFile()) { |
| LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); |
| continue; |
| } |
| moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); |
| } |
| } else { |
| int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; |
| for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { |
| if (eachFile.getPath().getName().startsWith("_")) { |
| continue; |
| } |
| moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); |
| } |
| } |
| // checking all file moved and remove empty dir |
| verifyAllFileMoved(fs, stagingResultDir); |
| FileStatus[] files = fs.listStatus(stagingResultDir); |
| if (files != null && files.length != 0) { |
| for (FileStatus eachFile: files) { |
| LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); |
| } |
| } |
| } else { // CREATE TABLE AS SELECT (CTAS) |
| if (fs.exists(finalOutputDir)) { |
| for (FileStatus status : fs.listStatus(stagingResultDir)) { |
| fs.rename(status.getPath(), finalOutputDir); |
| } |
| } else { |
| fs.rename(stagingResultDir, finalOutputDir); |
| } |
| LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); |
| } |
| } |
| |
| // remove the staging directory if the final output dir is given. |
| Path stagingDirRoot = stagingDir.getParent(); |
| fs.delete(stagingDirRoot, true); |
| } catch (Throwable t) { |
| LOG.error(t); |
| throw new IOException(t); |
| } |
| } else { |
| finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); |
| } |
| |
| return finalOutputDir; |
| } |
| |
| /** |
| * Attach the sequence number to the output file name and than move the file into the final result path. |
| * |
| * @param fs FileSystem |
| * @param stagingResultDir The staging result dir |
| * @param fileStatus The file status |
| * @param finalOutputPath Final output path |
| * @param nf Number format |
| * @param fileSeq The sequence number |
| * @throws java.io.IOException |
| */ |
| private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, |
| FileStatus fileStatus, Path finalOutputPath, |
| NumberFormat nf, |
| int fileSeq, boolean changeFileSeq) throws IOException { |
| if (fileStatus.isDirectory()) { |
| String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); |
| if (subPath != null) { |
| Path finalSubPath = new Path(finalOutputPath, subPath); |
| if (!fs.exists(finalSubPath)) { |
| fs.mkdirs(finalSubPath); |
| } |
| int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); |
| for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { |
| if (eachFile.getPath().getName().startsWith("_")) { |
| continue; |
| } |
| moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); |
| } |
| } else { |
| throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); |
| } |
| } else { |
| String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); |
| if (subPath != null) { |
| Path finalSubPath = new Path(finalOutputPath, subPath); |
| if (changeFileSeq) { |
| finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); |
| } |
| if (!fs.exists(finalSubPath.getParent())) { |
| fs.mkdirs(finalSubPath.getParent()); |
| } |
| if (fs.exists(finalSubPath)) { |
| throw new IOException("Already exists data file:" + finalSubPath); |
| } |
| boolean success = fs.rename(fileStatus.getPath(), finalSubPath); |
| if (success) { |
| LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + |
| "to final output[" + finalSubPath + "]"); |
| } else { |
| LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + |
| "to final output[" + finalSubPath + "]"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Removes the path of the parent. |
| * @param parentPath |
| * @param childPath |
| * @return |
| */ |
| private String extractSubPath(Path parentPath, Path childPath) { |
| String parentPathStr = parentPath.toUri().getPath(); |
| String childPathStr = childPath.toUri().getPath(); |
| |
| if (parentPathStr.length() > childPathStr.length()) { |
| return null; |
| } |
| |
| int index = childPathStr.indexOf(parentPathStr); |
| if (index != 0) { |
| return null; |
| } |
| |
| return childPathStr.substring(parentPathStr.length() + 1); |
| } |
| |
| /** |
| * Attach the sequence number to a path. |
| * |
| * @param path Path |
| * @param seq sequence number |
| * @param nf Number format |
| * @return New path attached with sequence number |
| * @throws java.io.IOException |
| */ |
| private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { |
| String[] tokens = path.getName().split("-"); |
| if (tokens.length != 4) { |
| throw new IOException("Wrong result file name:" + path); |
| } |
| return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); |
| } |
| |
| /** |
| * Make sure all files are moved. |
| * @param fs FileSystem |
| * @param stagingPath The stagind directory |
| * @return |
| * @throws java.io.IOException |
| */ |
| private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { |
| FileStatus[] files = fs.listStatus(stagingPath); |
| if (files != null && files.length != 0) { |
| for (FileStatus eachFile: files) { |
| if (eachFile.isFile()) { |
| LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); |
| return false; |
| } else { |
| if (verifyAllFileMoved(fs, eachFile.getPath())) { |
| fs.delete(eachFile.getPath(), false); |
| } else { |
| return false; |
| } |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * This method sets a rename map which includes renamed staging directory to final output directory recursively. |
| * If there exists some data files, this delete it for duplicate data. |
| * |
| * |
| * @param fs |
| * @param stagingPath |
| * @param outputPath |
| * @param stagingParentPathString |
| * @throws java.io.IOException |
| */ |
| private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, |
| String stagingParentPathString, |
| Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { |
| FileStatus[] files = fs.listStatus(stagingPath); |
| |
| for(FileStatus eachFile : files) { |
| if (eachFile.isDirectory()) { |
| Path oldPath = eachFile.getPath(); |
| |
| // Make recover directory. |
| String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, |
| oldTableDir.toString()); |
| Path recoveryPath = new Path(recoverPathString); |
| if (!fs.exists(recoveryPath)) { |
| fs.mkdirs(recoveryPath); |
| } |
| |
| visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, |
| renameDirs, oldTableDir); |
| // Find last order partition for renaming |
| String newPathString = oldPath.toString().replaceAll(stagingParentPathString, |
| outputPath.toString()); |
| Path newPath = new Path(newPathString); |
| if (!isLeafDirectory(fs, eachFile.getPath())) { |
| renameDirs.put(eachFile.getPath(), newPath); |
| } else { |
| if (!fs.exists(newPath)) { |
| fs.mkdirs(newPath); |
| } |
| } |
| } |
| } |
| } |
| |
| private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { |
| boolean retValue = false; |
| |
| FileStatus[] files = fs.listStatus(path); |
| for (FileStatus file : files) { |
| if (fs.isDirectory(file.getPath())) { |
| retValue = true; |
| break; |
| } |
| } |
| |
| return retValue; |
| } |
| |
| |
| } |