blob: 074d0abed350d0e1b7c0b3696955dfe182240c2b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PhysicalPlanningException;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.PersistentStoreNode;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
public class PhysicalPlanUtil {
public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
throws PhysicalPlanningException {
return (T) new FindVisitor().visit(plan, new Stack<>(), clazz);
}
public static TupleComparator [] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema);
BaseTupleComparator[] comparators = new BaseTupleComparator[2];
comparators[0] = new BaseTupleComparator(leftSchema, sortSpecs[0]);
comparators[1] = new BaseTupleComparator(rightSchema, sortSpecs[1]);
return comparators;
}
/**
* Listing table data file which is not empty.
* If the table is a partitioned table, return file list which has same partition key.
* @param tajoConf
* @param tableDesc
* @param fileIndex
* @param numResultFiles
* @return
* @throws java.io.IOException
*/
public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
int fileIndex, int numResultFiles) throws IOException {
Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(tajoConf);
//In the case of partitioned table, we should return same partition key data files.
int partitionDepth = 0;
if (tableDesc.hasPartition()) {
partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size();
}
List<FileStatus> nonZeroLengthFiles = new ArrayList<>();
if (fs.exists(path)) {
getNonZeroLengthDataFiles(fs, path, nonZeroLengthFiles, fileIndex, numResultFiles,
new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
}
List<FileFragment> fragments = new ArrayList<>();
String[] previousPartitionPathNames = null;
for (FileStatus eachFile: nonZeroLengthFiles) {
FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
if (partitionDepth > 0) {
// finding partition key;
Path filePath = fileFragment.getPath();
Path parentPath = filePath;
String[] parentPathNames = new String[partitionDepth];
for (int i = 0; i < partitionDepth; i++) {
parentPath = parentPath.getParent();
parentPathNames[partitionDepth - i - 1] = parentPath.getName();
}
// If current partitionKey == previousPartitionKey, add to result.
if (previousPartitionPathNames == null) {
fragments.add(fileFragment);
} else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
fragments.add(fileFragment);
} else {
break;
}
previousPartitionPathNames = parentPathNames;
} else {
fragments.add(fileFragment);
}
}
return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
}
/**
*
* @param fs
* @param path The table path
* @param result The final result files to be used
* @param startFileIndex
* @param numResultFiles
* @param currentFileIndex
* @param partitioned A flag to indicate if this table is partitioned
* @param currentDepth Current visiting depth of partition directories
* @param maxDepth The partition depth of this table
* @throws IOException
*/
private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
int startFileIndex, int numResultFiles,
AtomicInteger currentFileIndex, boolean partitioned,
int currentDepth, int maxDepth) throws IOException {
// Intermediate directory
if (fs.isDirectory(path)) {
FileStatus[] files = fs.listStatus(path, FileTablespace.hiddenFileFilter);
if (files != null && files.length > 0) {
for (FileStatus eachFile : files) {
// checking if the enough number of files are found
if (result.size() >= numResultFiles) {
return;
}
if (eachFile.isDirectory()) {
getNonZeroLengthDataFiles(
fs,
eachFile.getPath(),
result,
startFileIndex,
numResultFiles,
currentFileIndex,
partitioned,
currentDepth + 1, // increment a visiting depth
maxDepth);
// if partitioned table, we should ignore files located in the intermediate directory.
// we can ensure that this file is in leaf directory if currentDepth == maxDepth.
} else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) {
if (currentFileIndex.get() >= startFileIndex) {
result.add(eachFile);
}
currentFileIndex.incrementAndGet();
}
}
}
// Files located in leaf directory
} else {
FileStatus fileStatus = fs.getFileStatus(path);
if (fileStatus != null && fileStatus.getLen() > 0) {
if (currentFileIndex.get() >= startFileIndex) {
result.add(fileStatus);
}
currentFileIndex.incrementAndGet();
if (result.size() >= numResultFiles) {
return;
}
}
}
}
private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
throws PhysicalPlanningException {
if (target.isAssignableFrom(exec.getClass())) {
return exec;
} else {
return super.visit(exec, stack, target);
}
}
}
/**
* Set nullChar to TableMeta according to data format
*
* @param meta TableMeta
* @param nullChar A character for NULL representation
*/
private static void setNullCharForTextSerializer(TableMeta meta, String nullChar) {
String dataFormat = meta.getDataFormat();
if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT)) {
meta.putProperty(StorageConstants.TEXT_NULL, nullChar);
} else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
meta.putProperty(StorageConstants.RCFILE_NULL, nullChar);
} else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, nullChar);
}
}
/**
* Check if TableMeta contains NULL char property according to data format
*
* @param meta Table Meta
* @return True if TableMeta contains NULL char property according to data format
*/
public static boolean containsNullChar(TableMeta meta) {
String dataFormat = meta.getDataFormat();
if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT)) {
return meta.containsProperty(StorageConstants.TEXT_NULL);
} else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
return meta.containsProperty(StorageConstants.RCFILE_NULL);
} else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
return meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL);
} else {
return false;
}
}
/**
* Set session variable null char TableMeta if necessary
*
* @param context QueryContext
* @param plan StoreTableNode
* @param meta TableMeta
*/
public static void setNullCharIfNecessary(QueryContext context, PersistentStoreNode plan, TableMeta meta) {
if (plan.getType() != NodeType.INSERT) {
// table property in TableMeta is the first priority, and session is the second priority
if (!containsNullChar(meta) && context.containsKey(SessionVars.NULL_CHAR)) {
setNullCharForTextSerializer(meta, context.get(SessionVars.NULL_CHAR));
}
}
}
}