| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.analysis; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.adl.AdlFileSystem; |
| import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; |
| import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.s3a.S3AFileSystem; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.impala.authorization.Privilege; |
| import org.apache.impala.catalog.FeFsTable; |
| import org.apache.impala.catalog.FeTable; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.FileSystemUtil; |
| import org.apache.impala.thrift.TLoadDataReq; |
| import org.apache.impala.thrift.TTableName; |
| import org.apache.impala.util.FsPermissionChecker; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Represents a LOAD DATA statement for moving data into an existing table: |
| * LOAD DATA INPATH 'filepath' [OVERWRITE] INTO TABLE <table name> |
| * [PARTITION (partcol1=val1, partcol2=val2 ...)] |
| * |
| * The LOAD DATA operation supports loading (moving) a single file or all files in a |
| * given source directory to a table or partition location. If OVERWRITE is true, all |
| * exiting files in the destination will be removed before moving the new data in. |
| * If OVERWRITE is false, existing files will be preserved. If there are any file name |
| * conflicts, the new files will be uniquified by inserting a UUID into the file name |
| * (preserving the extension). |
| * Loading hidden files is not supported and any hidden files in the source or |
| * destination are preserved, even if OVERWRITE is true. |
| */ |
| public class LoadDataStmt extends StatementBase { |
| private final TableName tableName_; |
| private final HdfsUri sourceDataPath_; |
| private final PartitionSpec partitionSpec_; |
| private final boolean overwrite_; |
| |
| // Set during analysis |
| private String dbName_; |
| |
| public LoadDataStmt(TableName tableName, HdfsUri sourceDataPath, boolean overwrite, |
| PartitionSpec partitionSpec) { |
| Preconditions.checkNotNull(tableName); |
| Preconditions.checkNotNull(sourceDataPath); |
| this.tableName_ = tableName; |
| this.sourceDataPath_ = sourceDataPath; |
| this.overwrite_ = overwrite; |
| this.partitionSpec_ = partitionSpec; |
| } |
| |
| public String getTbl() { |
| return tableName_.getTbl(); |
| } |
| |
| public String getDb() { |
| Preconditions.checkNotNull(dbName_); |
| return dbName_; |
| } |
| |
| /* |
| * Print SQL syntax corresponding to this node. |
| * @see org.apache.impala.parser.ParseNode#toSql() |
| */ |
| @Override |
| public String toSql(ToSqlOptions options) { |
| StringBuilder sb = new StringBuilder("LOAD DATA INPATH '"); |
| sb.append(sourceDataPath_ + "' "); |
| if (overwrite_) sb.append("OVERWRITE "); |
| sb.append("INTO TABLE " + tableName_.toString()); |
| if (partitionSpec_ != null) sb.append(" " + partitionSpec_.toSql(options)); |
| return sb.toString(); |
| } |
| |
| @Override |
| public void collectTableRefs(List<TableRef> tblRefs) { |
| tblRefs.add(new TableRef(tableName_.toPath(), null)); |
| } |
| |
| @Override |
| public void analyze(Analyzer analyzer) throws AnalysisException { |
| dbName_ = analyzer.getTargetDbName(tableName_); |
| FeTable table = analyzer.getTable(tableName_, Privilege.INSERT); |
| if (!(table instanceof FeFsTable)) { |
| throw new AnalysisException("LOAD DATA only supported for HDFS tables: " + |
| dbName_ + "." + getTbl()); |
| } |
| analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE); |
| analyzer.ensureTableNotTransactional(table, "LOAD DATA"); |
| |
| // Analyze the partition spec, if one was specified. |
| if (partitionSpec_ != null) { |
| partitionSpec_.setTableName(tableName_); |
| partitionSpec_.setPartitionShouldExist(); |
| partitionSpec_.setPrivilegeRequirement(Privilege.INSERT); |
| partitionSpec_.analyze(analyzer); |
| } else { |
| if (table.getMetaStoreTable().getPartitionKeysSize() > 0) { |
| throw new AnalysisException("Table is partitioned but no partition spec was " + |
| "specified: " + dbName_ + "." + getTbl()); |
| } |
| } |
| analyzePaths(analyzer, (FeFsTable) table); |
| } |
| |
| /** |
| * Check to see if Impala has the necessary permissions to access the source and dest |
| * paths for this LOAD statement (which maps onto a sequence of file move operations, |
| * with the requisite permission requirements), and check to see if all files to be |
| * moved are in format that Impala understands. Errors are raised as AnalysisExceptions. |
| * |
| * We don't check permissions for the S3AFileSystem and the AdlFileSystem due to |
| * limitations with thier getAclStatus() API. (see HADOOP-13892 and HADOOP-14437) |
| */ |
| private void analyzePaths(Analyzer analyzer, FeFsTable table) |
| throws AnalysisException { |
| // The user must have permission to access the source location. Since the files will |
| // be moved from this location, the user needs to have all permission. |
| sourceDataPath_.analyze(analyzer, Privilege.ALL); |
| |
| // Catch all exceptions thrown by accessing files, and rethrow as AnalysisExceptions. |
| try { |
| Path source = sourceDataPath_.getPath(); |
| FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration()); |
| if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem) && |
| !(fs instanceof AzureBlobFileSystem) && |
| !(fs instanceof SecureAzureBlobFileSystem) && |
| !(fs instanceof AdlFileSystem)) { |
| throw new AnalysisException(String.format("INPATH location '%s' " + |
| "must point to an HDFS, S3A, ADL or ABFS filesystem.", sourceDataPath_)); |
| } |
| if (!fs.exists(source)) { |
| throw new AnalysisException(String.format( |
| "INPATH location '%s' does not exist.", sourceDataPath_)); |
| } |
| |
| // If the source file is a directory, we must be able to read from and write to |
| // it. If the source file is a file, we must be able to read from it, and write to |
| // its parent directory (in order to delete the file as part of the move operation). |
| FsPermissionChecker checker = FsPermissionChecker.getInstance(); |
| // TODO: Disable permission checking for S3A as well (HADOOP-13892) |
| boolean shouldCheckPerms = !(fs instanceof AdlFileSystem || |
| fs instanceof AzureBlobFileSystem || fs instanceof SecureAzureBlobFileSystem); |
| |
| if (fs.isDirectory(source)) { |
| if (FileSystemUtil.getTotalNumVisibleFiles(source) == 0) { |
| throw new AnalysisException(String.format( |
| "INPATH location '%s' contains no visible files.", sourceDataPath_)); |
| } |
| if (FileSystemUtil.containsVisibleSubdirectory(source)) { |
| throw new AnalysisException(String.format( |
| "INPATH location '%s' cannot contain non-hidden subdirectories.", |
| sourceDataPath_)); |
| } |
| if (!checker.getPermissions(fs, source).checkPermissions( |
| FsAction.READ_WRITE) && shouldCheckPerms) { |
| throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + |
| "because Impala does not have READ and WRITE permissions on this directory", |
| source)); |
| } |
| } else { |
| // INPATH names a file. |
| if (FileSystemUtil.isHiddenFile(source.getName())) { |
| throw new AnalysisException(String.format( |
| "INPATH location '%s' points to a hidden file.", source)); |
| } |
| |
| if (!checker.getPermissions(fs, source.getParent()).checkPermissions( |
| FsAction.WRITE) && shouldCheckPerms) { |
| throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + |
| "because Impala does not have WRITE permissions on its parent " + |
| "directory %s", source, source.getParent())); |
| } |
| |
| if (!checker.getPermissions(fs, source).checkPermissions( |
| FsAction.READ) && shouldCheckPerms) { |
| throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + |
| "because Impala does not have READ permissions on this file", source)); |
| } |
| } |
| |
| FeFsTable.Utils.checkWriteAccess(table, |
| partitionSpec_ != null ? partitionSpec_.getPartitionSpecKeyValues() : null, |
| "LOAD DATA"); |
| } catch (FileNotFoundException e) { |
| throw new AnalysisException("File not found: " + e.getMessage(), e); |
| } catch (IOException e) { |
| throw new AnalysisException("Error accessing filesystem: " + e.getMessage(), e); |
| } |
| } |
| |
| public TLoadDataReq toThrift() { |
| TLoadDataReq loadDataReq = new TLoadDataReq(); |
| loadDataReq.setTable_name(new TTableName(getDb(), getTbl())); |
| loadDataReq.setSource_path(sourceDataPath_.toString()); |
| loadDataReq.setOverwrite(overwrite_); |
| if (partitionSpec_ != null) { |
| loadDataReq.setPartition_spec(partitionSpec_.toThrift()); |
| } |
| return loadDataReq; |
| } |
| } |