| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.analysis; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.SchemaParseException; |
| import org.apache.hadoop.hive.metastore.api.SQLForeignKey; |
| import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; |
| import org.apache.iceberg.TableProperties; |
| import org.apache.iceberg.mr.Catalogs; |
| import org.apache.impala.authorization.AuthorizationConfig; |
| import org.apache.impala.catalog.DataSourceTable; |
| import org.apache.impala.catalog.KuduTable; |
| import org.apache.impala.catalog.IcebergTable; |
| import org.apache.impala.catalog.RowFormat; |
| import org.apache.impala.catalog.Table; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.service.BackendConfig; |
| import org.apache.impala.thrift.TBucketInfo; |
| import org.apache.impala.thrift.TCreateTableParams; |
| import org.apache.impala.thrift.THdfsFileFormat; |
| import org.apache.impala.thrift.TIcebergCatalog; |
| import org.apache.impala.thrift.TIcebergFileFormat; |
| import org.apache.impala.thrift.TIcebergPartitionTransformType; |
| import org.apache.impala.thrift.TSortingOrder; |
| import org.apache.impala.thrift.TTableName; |
| import org.apache.impala.util.AvroSchemaConverter; |
| import org.apache.impala.util.AvroSchemaParser; |
| import org.apache.impala.util.AvroSchemaUtils; |
| import org.apache.impala.util.IcebergUtil; |
| import org.apache.impala.util.KuduUtil; |
| import org.apache.impala.util.MetaStoreUtil; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.primitives.Ints; |
| |
| /** |
| * Represents a CREATE TABLE statement. |
| */ |
| public class CreateTableStmt extends StatementBase { |
| |
| @VisibleForTesting |
| final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be" |
| + " specified using 'STORED AS KUDU'."; |
| |
| ///////////////////////////////////////// |
| // BEGIN: Members that need to be reset() |
| |
| // Table parameters specified in a CREATE TABLE statement |
| private final TableDef tableDef_; |
| |
| // END: Members that need to be reset() |
| ///////////////////////////////////////// |
| |
| // Table owner. Set during analysis |
| private String owner_; |
| |
| // Server name needed for privileges. Set during analysis. |
| private String serverName_; |
| |
| public CreateTableStmt(TableDef tableDef) { |
| Preconditions.checkNotNull(tableDef); |
| tableDef_ = tableDef; |
| } |
| |
| /** |
| * Copy c'tor. |
| */ |
| public CreateTableStmt(CreateTableStmt other) { |
| this(other.tableDef_); |
| owner_ = other.owner_; |
| } |
| |
| @Override |
| public void reset() { |
| super.reset(); |
| tableDef_.reset(); |
| } |
| |
| @Override |
| public CreateTableStmt clone() { return new CreateTableStmt(this); } |
| |
| public String getTbl() { return getTblName().getTbl(); } |
| public TableName getTblName() { return tableDef_.getTblName(); } |
| public boolean getIfNotExists() { return tableDef_.getIfNotExists(); } |
| public List<ColumnDef> getColumnDefs() { return tableDef_.getColumnDefs(); } |
| private void setColumnDefs(List<ColumnDef> colDefs) { |
| getColumnDefs().clear(); |
| getColumnDefs().addAll(colDefs); |
| } |
| public List<ColumnDef> getPrimaryKeyColumnDefs() { |
| return tableDef_.getPrimaryKeyColumnDefs(); |
| } |
| public boolean isPrimaryKeyUnique() { return tableDef_.isPrimaryKeyUnique(); } |
| public List<SQLPrimaryKey> getPrimaryKeys() { return tableDef_.getSqlPrimaryKeys(); } |
| public List<SQLForeignKey> getForeignKeys() { return tableDef_.getSqlForeignKeys(); } |
| public boolean isExternal() { return tableDef_.isExternal(); } |
| public List<ColumnDef> getPartitionColumnDefs() { |
| return tableDef_.getPartitionColumnDefs(); |
| } |
| public List<KuduPartitionParam> getKuduPartitionParams() { |
| return tableDef_.getKuduPartitionParams(); |
| } |
| public List<String> getSortColumns() { return tableDef_.getSortColumns(); } |
| public TSortingOrder getSortingOrder() { return tableDef_.getSortingOrder(); } |
| public String getComment() { return tableDef_.getComment(); } |
| public Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); } |
| private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); } |
| public HdfsUri getLocation() { return tableDef_.getLocation(); } |
| Map<String, String> getSerdeProperties() { return tableDef_.getSerdeProperties(); } |
| public THdfsFileFormat getFileFormat() { return tableDef_.getFileFormat(); } |
| RowFormat getRowFormat() { return tableDef_.getRowFormat(); } |
| private void putGeneratedProperty(String key, String value) { |
| tableDef_.putGeneratedProperty(key, value); |
| } |
| public Map<String, String> getGeneratedKuduProperties() { |
| return tableDef_.getGeneratedProperties(); |
| } |
| public TBucketInfo geTBucketInfo() { return tableDef_.geTBucketInfo(); } |
| |
| // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user |
| // at the table level. Note that primary keys may also be declared in column |
| // definitions, those are not included here (they are stored in the ColumnDefs). |
| List<String> getTblPrimaryKeyColumnNames() { |
| return tableDef_.getPrimaryKeyColumnNames(); |
| } |
| |
| public List<IcebergPartitionSpec> getIcebergPartitionSpecs() { |
| return tableDef_.getIcebergPartitionSpecs(); |
| } |
| |
| /** |
| * Get foreign keys information as strings. Useful for toSqlUtils. |
| * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table |
| * (colA, colB,..)". |
| */ |
| List<String> getForeignKeysSql() { |
| List<TableDef.ForeignKey> fkList = tableDef_.getForeignKeysList(); |
| List<String> foreignKeysSql = new ArrayList<>(); |
| if (fkList != null && !fkList.isEmpty()) { |
| for (TableDef.ForeignKey fk : fkList) { |
| StringBuilder sb = new StringBuilder("("); |
| Joiner.on(", ").appendTo(sb, fk.getForeignKeyColNames()).append(")"); |
| sb.append(" REFERENCES "); |
| sb.append(fk.getFullyQualifiedPkTableName() + "("); |
| Joiner.on(", ").appendTo(sb, fk.getPrimaryKeyColNames()).append(")"); |
| foreignKeysSql.add(sb.toString()); |
| } |
| } |
| return foreignKeysSql; |
| } |
| |
| /** |
| * Can only be called after analysis, returns the owner of this table (the user from |
| * the current session). |
| */ |
| public String getOwner() { |
| Preconditions.checkNotNull(owner_); |
| return owner_; |
| } |
| |
| /** |
| * Can only be called after analysis, returns the name of the database the table will |
| * be created within. |
| */ |
| public String getDb() { |
| Preconditions.checkState(isAnalyzed()); |
| return getTblName().getDb(); |
| } |
| |
| @Override |
| public String toSql(ToSqlOptions options) { |
| return ToSqlUtils.getCreateTableSql(this); |
| } |
| |
| public TCreateTableParams toThrift() { |
| TCreateTableParams params = new TCreateTableParams(); |
| params.setTable_name(new TTableName(getDb(), getTbl())); |
| List<org.apache.impala.thrift.TColumn> tColumns = new ArrayList<>(); |
| for (ColumnDef col: getColumnDefs()) tColumns.add(col.toThrift()); |
| params.setColumns(tColumns); |
| for (ColumnDef col: getPartitionColumnDefs()) { |
| params.addToPartition_columns(col.toThrift()); |
| } |
| params.setOwner(getOwner()); |
| params.setIs_external(isExternal()); |
| params.setComment(getComment()); |
| params.setLocation(getLocation() == null ? null : getLocation().toString()); |
| if (getCachingOp() != null) params.setCache_op(getCachingOp().toThrift()); |
| if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift()); |
| params.setFile_format(getFileFormat()); |
| params.setIf_not_exists(getIfNotExists()); |
| if (geTBucketInfo() != null) params.setBucket_info(geTBucketInfo()); |
| params.setSort_columns(getSortColumns()); |
| params.setSorting_order(getSortingOrder()); |
| params.setTable_properties(Maps.newHashMap(getTblProperties())); |
| params.getTable_properties().putAll(Maps.newHashMap(getGeneratedKuduProperties())); |
| params.setSerde_properties(getSerdeProperties()); |
| params.setIs_primary_key_unique(isPrimaryKeyUnique()); |
| for (KuduPartitionParam d: getKuduPartitionParams()) { |
| params.addToPartition_by(d.toThrift()); |
| } |
| for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) { |
| params.addToPrimary_key_column_names(pkColDef.getColName()); |
| } |
| for(SQLPrimaryKey pk: getPrimaryKeys()){ |
| params.addToPrimary_keys(pk); |
| } |
| for(SQLForeignKey fk: getForeignKeys()){ |
| params.addToForeign_keys(fk); |
| } |
| params.setServer_name(serverName_); |
| |
| // Create table stmt only have one PartitionSpec |
| if (!getIcebergPartitionSpecs().isEmpty()) { |
| Preconditions.checkState(getIcebergPartitionSpecs().size() == 1); |
| params.setPartition_spec(getIcebergPartitionSpecs().get(0).toThrift()); |
| } |
| |
| return params; |
| } |
| |
| @Override |
| public void collectTableRefs(List<TableRef> tblRefs) { |
| tblRefs.add(new TableRef(tableDef_.getTblName().toPath(), null)); |
| // When foreign keys are specified, we need to add all the tables the foreign keys are |
| // referring to. |
| for(TableDef.ForeignKey fk: tableDef_.getForeignKeysList()){ |
| tblRefs.add(new TableRef(fk.getPkTableName().toPath(), null)); |
| } |
| } |
| |
| @Override |
| public void analyze(Analyzer analyzer) throws AnalysisException { |
| super.analyze(analyzer); |
| owner_ = analyzer.getUserShortName(); |
| // Set the servername here if authorization is enabled because analyzer_ is not |
| // available in the toThrift() method. |
| serverName_ = analyzer.getServerName(); |
| tableDef_.analyze(analyzer); |
| analyzeKuduFormat(analyzer); |
| // Avro tables can have empty column defs because they can infer them from the Avro |
| // schema. Likewise for external Kudu tables, the schema can be read from Kudu. |
| if (getColumnDefs().isEmpty() && getFileFormat() != THdfsFileFormat.AVRO |
| && getFileFormat() != THdfsFileFormat.KUDU && getFileFormat() != |
| THdfsFileFormat.ICEBERG && getFileFormat() != THdfsFileFormat.JDBC) { |
| throw new AnalysisException("Table requires at least 1 column"); |
| } |
| if (getRowFormat() != null) { |
| String fieldDelimiter = getRowFormat().getFieldDelimiter(); |
| String lineDelimiter = getRowFormat().getLineDelimiter(); |
| String escapeChar = getRowFormat().getEscapeChar(); |
| if (getFileFormat() != THdfsFileFormat.TEXT |
| && getFileFormat() != THdfsFileFormat.SEQUENCE_FILE) { |
| if (fieldDelimiter != null) { |
| analyzer.addWarning("'ROW FORMAT DELIMITED FIELDS TERMINATED BY '" |
| + fieldDelimiter + "'' is ignored."); |
| } |
| if (lineDelimiter != null) { |
| analyzer.addWarning("'ROW FORMAT DELIMITED LINES TERMINATED BY '" |
| + lineDelimiter + "'' is ignored."); |
| } |
| if (escapeChar != null) { |
| analyzer.addWarning( |
| "'ROW FORMAT DELIMITED ESCAPED BY '" + escapeChar + "'' is ignored."); |
| } |
| } |
| } |
| if (getFileFormat() == THdfsFileFormat.AVRO) { |
| setColumnDefs(analyzeAvroSchema(analyzer)); |
| if (getColumnDefs().isEmpty()) { |
| throw new AnalysisException( |
| "An Avro table requires column definitions or an Avro schema."); |
| } |
| AvroSchemaUtils.setFromSerdeComment(getColumnDefs()); |
| } |
| |
| if (getFileFormat() == THdfsFileFormat.ICEBERG) { |
| analyzeIcebergColumns(); |
| analyzeIcebergFormat(analyzer); |
| } else { |
| List<IcebergPartitionSpec> iceSpec = tableDef_.getIcebergPartitionSpecs(); |
| if (iceSpec != null && !iceSpec.isEmpty()) { |
| throw new AnalysisException( |
| "PARTITIONED BY SPEC is only valid for Iceberg tables."); |
| } |
| } |
| |
| if (getFileFormat() == THdfsFileFormat.JDBC) { |
| analyzeJdbcSchema(analyzer); |
| } |
| |
| // If lineage logging is enabled, compute minimal lineage graph. |
| if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) { |
| computeLineageGraph(analyzer); |
| } |
| } |
| |
| /** |
| * Computes a minimal column lineage graph for create statement. This will just |
| * populate a few fields of the graph including query text. If this is a CTAS, |
| * the graph is enhanced during the "insert" phase of CTAS. |
| */ |
| protected void computeLineageGraph(Analyzer analyzer) { |
| ColumnLineageGraph graph = analyzer.getColumnLineageGraph(); |
| graph.computeLineageGraph(new ArrayList(), analyzer); |
| } |
| |
| /** |
| * Analyzes the parameters of a CREATE TABLE ... STORED AS KUDU statement. Also checks |
| * if Kudu specific properties and parameters are specified for non-Kudu tables. |
| */ |
| private void analyzeKuduFormat(Analyzer analyzer) throws AnalysisException { |
| if (getFileFormat() != THdfsFileFormat.KUDU) { |
| String handler = getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER); |
| if (KuduTable.isKuduStorageHandler(handler)) { |
| throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE); |
| } |
| AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(), |
| "Only Kudu tables can use the PARTITION BY clause."); |
| return; |
| } |
| |
| analyzeKuduTableProperties(analyzer); |
| if (isExternalWithNoPurge()) { |
| // this is an external table |
| analyzeExternalKuduTableParams(analyzer); |
| } else { |
| // this is either a managed table or an external table with external.table.purge |
| // property set to true |
| analyzeSynchronizedKuduTableParams(analyzer); |
| } |
| } |
| |
| /** |
| * Analyzes and checks table properties which are common to both managed and external |
| * Kudu tables. |
| */ |
| private void analyzeKuduTableProperties(Analyzer analyzer) throws AnalysisException { |
| String kuduMasters = getKuduMasters(analyzer); |
| if (kuduMasters.isEmpty()) { |
| throw new AnalysisException(String.format( |
| "Table property '%s' is required when the impalad startup flag " + |
| "-kudu_master_hosts is not used.", KuduTable.KEY_MASTER_HOSTS)); |
| } |
| putGeneratedProperty(KuduTable.KEY_MASTER_HOSTS, kuduMasters); |
| |
| AuthorizationConfig authzConfig = analyzer.getAuthzConfig(); |
| if (authzConfig.isEnabled()) { |
| boolean isExternal = tableDef_.isExternal() || |
| MetaStoreUtil.findTblPropKeyCaseInsensitive( |
| getTblProperties(), "EXTERNAL") != null; |
| if (isExternal) { |
| String externalTableName = getTblProperties().get(KuduTable.KEY_TABLE_NAME); |
| AnalysisUtils.throwIfNull(externalTableName, |
| String.format("Table property %s must be specified when creating " + |
| "an external Kudu table.", KuduTable.KEY_TABLE_NAME)); |
| List<String> storageUris = getUrisForAuthz(kuduMasters, externalTableName); |
| for (String storageUri : storageUris) { |
| analyzer.registerPrivReq(builder -> builder |
| .onStorageHandlerUri("kudu", storageUri) |
| .rwstorage().build()); |
| } |
| } |
| |
| if (getTblProperties().containsKey(KuduTable.KEY_MASTER_HOSTS)) { |
| String authzServer = authzConfig.getServerName(); |
| Preconditions.checkNotNull(authzServer); |
| analyzer.registerPrivReq(builder -> builder.onServer(authzServer).all().build()); |
| } |
| } |
| |
| // Only the Kudu storage handler may be specified for Kudu tables. |
| String handler = getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER); |
| if (handler != null && !KuduTable.isKuduStorageHandler(handler)) { |
| throw new AnalysisException("Invalid storage handler specified for Kudu table: " + |
| handler); |
| } |
| putGeneratedProperty(KuduTable.KEY_STORAGE_HANDLER, |
| KuduTable.KUDU_STORAGE_HANDLER); |
| |
| // TODO: Find out what is creating a directory in HDFS and stop doing that. Kudu |
| // tables shouldn't have HDFS dirs: IMPALA-3570 |
| AnalysisUtils.throwIfNotNull(getCachingOp(), |
| "A Kudu table cannot be cached in HDFS."); |
| AnalysisUtils.throwIfNotNull(getLocation(), "LOCATION cannot be specified for a " + |
| "Kudu table."); |
| AnalysisUtils.throwIfNotEmpty(tableDef_.getPartitionColumnDefs(), |
| "PARTITIONED BY cannot be used in Kudu tables."); |
| AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLE_ID), |
| String.format("Table property %s should not be specified when creating a " + |
| "Kudu table.", KuduTable.KEY_TABLE_ID)); |
| } |
| |
| private List<String> getUrisForAuthz(String kuduMasterAddresses, String kuduTableName) { |
| List<String> masterAddresses = Lists.newArrayList(kuduMasterAddresses.split(",")); |
| List<String> uris = new ArrayList<>(); |
| for (String masterAddress : masterAddresses) { |
| uris.add(masterAddress + "/" + kuduTableName); |
| } |
| return uris; |
| } |
| |
| /** |
| * Populates Kudu master addresses either from table property or |
| * the -kudu_master_hosts flag. |
| */ |
| private String getKuduMasters(Analyzer analyzer) { |
| String kuduMasters = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS); |
| if (Strings.isNullOrEmpty(kuduMasters)) { |
| kuduMasters = analyzer.getCatalog().getDefaultKuduMasterHosts(); |
| } |
| return kuduMasters; |
| } |
| |
| /** |
| * Analyzes and checks parameters specified for external Kudu tables. |
| */ |
| private void analyzeExternalKuduTableParams(Analyzer analyzer) |
| throws AnalysisException { |
| Preconditions.checkState(!Boolean |
| .parseBoolean(getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))); |
| // this is just a regular external table. Kudu table name must be specified |
| AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME), |
| String.format("Table property %s must be specified when creating " + |
| "an external Kudu table.", KuduTable.KEY_TABLE_NAME)); |
| if (hasPrimaryKey() |
| || getTblProperties().containsKey(KuduTable.KEY_KEY_COLUMNS)) { |
| throw new AnalysisException("Primary keys cannot be specified for an external " + |
| "Kudu table"); |
| } |
| AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS), |
| String.format("Table property '%s' cannot be used with an external Kudu table.", |
| KuduTable.KEY_TABLET_REPLICAS)); |
| AnalysisUtils.throwIfNotEmpty(getColumnDefs(), |
| "Columns cannot be specified with an external Kudu table."); |
| AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(), |
| "PARTITION BY cannot be used with an external Kudu table."); |
| } |
| |
| /** |
| * Analyzes and checks parameters specified for synchronized Kudu tables. |
| */ |
| private void analyzeSynchronizedKuduTableParams(Analyzer analyzer) |
| throws AnalysisException { |
| // A managed table cannot have 'external.table.purge' property set |
| if (!isExternal() && Boolean.parseBoolean( |
| getTblProperties().get(KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) { |
| throw new AnalysisException(String.format("Table property '%s' cannot be set to " + |
| "true with an managed Kudu table.", KuduTable.TBL_PROP_EXTERNAL_TABLE_PURGE)); |
| } |
| analyzeSynchronizedKuduTableName(analyzer); |
| |
| // Check column types are valid Kudu types |
| for (ColumnDef col: getColumnDefs()) { |
| try { |
| KuduUtil.fromImpalaType(col.getType()); |
| } catch (ImpalaRuntimeException e) { |
| throw new AnalysisException(String.format( |
| "Cannot create table '%s': %s", getTbl(), e.getMessage())); |
| } |
| } |
| AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS), |
| String.format("PRIMARY KEY must be used instead of the table property '%s'.", |
| KuduTable.KEY_KEY_COLUMNS)); |
| if (!hasPrimaryKey()) { |
| throw new AnalysisException("A primary key is required for a Kudu table."); |
| } |
| String tabletReplicas = getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS); |
| if (tabletReplicas != null) { |
| Integer r = Ints.tryParse(tabletReplicas); |
| if (r == null) { |
| throw new AnalysisException(String.format( |
| "Table property '%s' must be an integer.", KuduTable.KEY_TABLET_REPLICAS)); |
| } |
| if (r <= 0) { |
| throw new AnalysisException("Number of tablet replicas must be greater than " + |
| "zero. Given number of replicas is: " + r.toString()); |
| } |
| } |
| analyzeKuduPartitionParams(analyzer); |
| } |
| |
| /** |
| * Generates a Kudu table name based on the target database and table and stores |
| * it in TableDef.generatedKuduTableName_. Throws if the Kudu table |
| * name was given manually via TBLPROPERTIES. |
| */ |
| private void analyzeSynchronizedKuduTableName(Analyzer analyzer) |
| throws AnalysisException { |
| AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME), |
| String.format("Not allowed to set '%s' manually for synchronized Kudu tables.", |
| KuduTable.KEY_TABLE_NAME)); |
| String kuduMasters = getKuduMasters(analyzer); |
| boolean isHMSIntegrationEnabled; |
| try { |
| // Check if Kudu's integration with the Hive Metastore is enabled. Validation |
| // of whether Kudu is configured to use the same Hive Metstore as Impala is skipped |
| // and is not necessary for syntax parsing. |
| isHMSIntegrationEnabled = KuduTable.isHMSIntegrationEnabled(kuduMasters); |
| } catch (ImpalaRuntimeException e) { |
| throw new AnalysisException(String.format("Cannot analyze Kudu table '%s': %s", |
| getTbl(), e.getMessage())); |
| } |
| putGeneratedProperty(KuduTable.KEY_TABLE_NAME, |
| KuduUtil.getDefaultKuduTableName(getDb(), getTbl(), isHMSIntegrationEnabled)); |
| } |
| |
| /** |
| * Analyzes the partitioning schemes specified in the CREATE TABLE statement. Also, |
| * adds primary keys to the partitioning scheme if no partitioning keys are provided |
| */ |
| private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException { |
| Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU); |
| if (getKuduPartitionParams().isEmpty()) { |
| analyzer.addWarning( |
| "Unpartitioned Kudu tables are inefficient for large data sizes."); |
| return; |
| } |
| Map<String, ColumnDef> pkColDefsByName = |
| ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs()); |
| for (KuduPartitionParam partitionParam: getKuduPartitionParams()) { |
| partitionParam.setPkColumnDefMap(pkColDefsByName); |
| partitionParam.analyze(analyzer); |
| } |
| } |
| |
| /** |
| * Checks if a primary key is specified in a CREATE TABLE stmt. Should only be called |
| * after tableDef_ has been analyzed. |
| */ |
| private boolean hasPrimaryKey() { |
| Preconditions.checkState(tableDef_.isAnalyzed()); |
| return !tableDef_.getPrimaryKeyColumnDefs().isEmpty(); |
| } |
| |
| /** |
| * Analyzes the Avro schema and compares it with the getColumnDefs() to detect |
| * inconsistencies. Returns a list of column descriptors that should be |
| * used for creating the table (possibly identical to getColumnDefs()). |
| */ |
| private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) throws AnalysisException { |
| Preconditions.checkState(getFileFormat() == THdfsFileFormat.AVRO); |
| // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter |
| // taking precedence. |
| List<Map<String, String>> schemaSearchLocations = new ArrayList<>(); |
| schemaSearchLocations.add(getSerdeProperties()); |
| schemaSearchLocations.add(getTblProperties()); |
| String avroSchema; |
| List<ColumnDef> avroCols; // parsed from avroSchema |
| try { |
| avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); |
| if (avroSchema == null) { |
| // No Avro schema was explicitly set in the serde or table properties, so infer |
| // the Avro schema from the column definitions. |
| Schema inferredSchema = AvroSchemaConverter.convertColumnDefs( |
| getColumnDefs(), getTblName().toString()); |
| avroSchema = inferredSchema.toString(); |
| } |
| if (Strings.isNullOrEmpty(avroSchema)) { |
| throw new AnalysisException("Avro schema is null or empty: " + |
| getTblName().toString()); |
| } |
| avroCols = AvroSchemaParser.parse(avroSchema); |
| } catch (SchemaParseException e) { |
| throw new AnalysisException(String.format( |
| "Error parsing Avro schema for table '%s': %s", getTblName().toString(), |
| e.getMessage())); |
| } |
| Preconditions.checkNotNull(avroCols); |
| |
| // Analyze the Avro schema to detect inconsistencies with the getColumnDefs(). |
| // In case of inconsistencies, the column defs are ignored in favor of the Avro |
| // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104). |
| StringBuilder warning = new StringBuilder(); |
| List<ColumnDef> reconciledColDefs = |
| AvroSchemaUtils.reconcileSchemas(getColumnDefs(), avroCols, warning); |
| if (warning.length() > 0) analyzer.addWarning(warning.toString()); |
| return reconciledColDefs; |
| } |
| |
| /** |
| * Unescapes all values in the property map. |
| */ |
| static void unescapeProperties(Map<String, String> propertyMap) { |
| if (propertyMap == null) return; |
| for (Map.Entry<String, String> kv : propertyMap.entrySet()) { |
| propertyMap.put(kv.getKey(), |
| new StringLiteral(kv.getValue()).getUnescapedValue()); |
| } |
| } |
| |
| /** |
| * For iceberg file format, add related storage handler |
| */ |
| private void analyzeIcebergFormat(Analyzer analyzer) throws AnalysisException { |
| // A managed table cannot have 'external.table.purge' property set |
| if (!isExternal() && Boolean.parseBoolean( |
| getTblProperties().get(IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) { |
| throw new AnalysisException(String.format("Table property '%s' cannot be set to " + |
| "true with a managed Iceberg table.", |
| IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE)); |
| } |
| |
| // Check for managed table |
| if (!isExternal() || Boolean.parseBoolean(getTblProperties().get( |
| Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) { |
| if (getColumnDefs().isEmpty()) { |
| // External iceberg table can have empty column, but managed iceberg table |
| // requires at least one column. |
| throw new AnalysisException("Table requires at least 1 column for " + |
| "managed iceberg table."); |
| } |
| |
| // Check partition columns for managed iceberg table |
| checkPartitionColumns(analyzer); |
| } |
| |
| String handler = getTblProperties().get(IcebergTable.KEY_STORAGE_HANDLER); |
| if (handler != null && !IcebergTable.isIcebergStorageHandler(handler)) { |
| throw new AnalysisException("Invalid storage handler " + |
| "specified for Iceberg format: " + handler); |
| } |
| putGeneratedProperty(IcebergTable.KEY_STORAGE_HANDLER, |
| IcebergTable.ICEBERG_STORAGE_HANDLER); |
| putGeneratedProperty(TableProperties.ENGINE_HIVE_ENABLED, "true"); |
| addMergeOnReadPropertiesIfNeeded(); |
| |
| String fileformat = getTblProperties().get(IcebergTable.ICEBERG_FILE_FORMAT); |
| TIcebergFileFormat icebergFileFormat = IcebergUtil.getIcebergFileFormat(fileformat); |
| if (fileformat != null && icebergFileFormat == null) { |
| throw new AnalysisException("Invalid fileformat for Iceberg table: " + fileformat); |
| } |
| if (fileformat == null || fileformat.isEmpty()) { |
| putGeneratedProperty(IcebergTable.ICEBERG_FILE_FORMAT, "parquet"); |
| } |
| |
| validateIcebergParquetCompressionCodec(icebergFileFormat); |
| validateIcebergParquetRowGroupSize(icebergFileFormat); |
| validateIcebergParquetPageSize(icebergFileFormat, |
| IcebergTable.PARQUET_PLAIN_PAGE_SIZE, "page size"); |
| validateIcebergParquetPageSize(icebergFileFormat, |
| IcebergTable.PARQUET_DICT_PAGE_SIZE, "dictionary page size"); |
| |
| // Determine the Iceberg catalog being used. The default catalog is HiveCatalog. |
| String catalogStr = getTblProperties().get(IcebergTable.ICEBERG_CATALOG); |
| TIcebergCatalog catalog; |
| if (catalogStr == null || catalogStr.isEmpty()) { |
| catalog = TIcebergCatalog.HIVE_CATALOG; |
| } else { |
| catalog = IcebergUtil.getTIcebergCatalog(catalogStr); |
| } |
| validateIcebergTableProperties(catalog); |
| } |
| |
| /** |
| * When creating an Iceberg table that supports row-level modifications |
| * (format-version >= 2) we set write modes to "merge-on-read" which is the write |
| * mode Impala will eventually support (IMPALA-11664). |
| */ |
| private void addMergeOnReadPropertiesIfNeeded() { |
| Map<String, String> tblProps = getTblProperties(); |
| String formatVersion = tblProps.get(TableProperties.FORMAT_VERSION); |
| if (formatVersion == null || |
| Integer.valueOf(formatVersion) < IcebergTable.ICEBERG_FORMAT_V2) { |
| return; |
| } |
| |
| // Only add "merge-on-read" if none of the write modes are specified. |
| final String MERGE_ON_READ = IcebergTable.MERGE_ON_READ; |
| if (!IcebergUtil.isAnyWriteModeSet(tblProps)) { |
| putGeneratedProperty(TableProperties.DELETE_MODE, MERGE_ON_READ); |
| putGeneratedProperty(TableProperties.UPDATE_MODE, MERGE_ON_READ); |
| putGeneratedProperty(TableProperties.MERGE_MODE, MERGE_ON_READ); |
| } |
| } |
| |
| private void validateIcebergParquetCompressionCodec( |
| TIcebergFileFormat icebergFileFormat) throws AnalysisException { |
| if (icebergFileFormat != TIcebergFileFormat.PARQUET) { |
| if (getTblProperties().containsKey(IcebergTable.PARQUET_COMPRESSION_CODEC)) { |
| throw new AnalysisException(IcebergTable.PARQUET_COMPRESSION_CODEC + |
| " should be set only for parquet file format"); |
| } |
| if (getTblProperties().containsKey(IcebergTable.PARQUET_COMPRESSION_LEVEL)) { |
| throw new AnalysisException(IcebergTable.PARQUET_COMPRESSION_LEVEL + |
| " should be set only for parquet file format"); |
| } |
| } else { |
| StringBuilder errMsg = new StringBuilder(); |
| if (IcebergUtil.parseParquetCompressionCodec(true, getTblProperties(), errMsg) |
| == null) { |
| throw new AnalysisException(errMsg.toString()); |
| } |
| } |
| } |
| |
| private void validateIcebergParquetRowGroupSize(TIcebergFileFormat icebergFileFormat) |
| throws AnalysisException { |
| if (getTblProperties().containsKey(IcebergTable.PARQUET_ROW_GROUP_SIZE)) { |
| if (icebergFileFormat != TIcebergFileFormat.PARQUET) { |
| throw new AnalysisException(IcebergTable.PARQUET_ROW_GROUP_SIZE + |
| " should be set only for parquet file format"); |
| } |
| } |
| |
| StringBuilder errMsg = new StringBuilder(); |
| if (IcebergUtil.parseParquetRowGroupSize(getTblProperties(), errMsg) == null) { |
| throw new AnalysisException(errMsg.toString()); |
| } |
| } |
| |
| private void validateIcebergParquetPageSize(TIcebergFileFormat icebergFileFormat, |
| String pageSizeTblProp, String descr) throws AnalysisException { |
| if (getTblProperties().containsKey(pageSizeTblProp)) { |
| if (icebergFileFormat != TIcebergFileFormat.PARQUET) { |
| throw new AnalysisException(pageSizeTblProp + |
| " should be set only for parquet file format"); |
| } |
| } |
| |
| StringBuilder errMsg = new StringBuilder(); |
| if (IcebergUtil.parseParquetPageSize(getTblProperties(), pageSizeTblProp, descr, |
| errMsg) == null) { |
| throw new AnalysisException(errMsg.toString()); |
| } |
| } |
| |
| private void validateIcebergTableProperties(TIcebergCatalog catalog) |
| throws AnalysisException { |
| // Metadata location is only used by HiveCatalog, but we shouldn't allow setting this |
| // for any catalogs to avoid confusion. |
| if (getTblProperties().get(IcebergTable.METADATA_LOCATION) != null) { |
| throw new AnalysisException(String.format("%s cannot be set for Iceberg tables", |
| IcebergTable.METADATA_LOCATION)); |
| } |
| switch(catalog) { |
| case HIVE_CATALOG: validateTableInHiveCatalog(); |
| break; |
| case HADOOP_CATALOG: validateTableInHadoopCatalog(); |
| break; |
| case HADOOP_TABLES: validateTableInHadoopTables(); |
| break; |
| case CATALOGS: validateTableInCatalogs(); |
| break; |
| default: throw new AnalysisException(String.format( |
| "Unknown Iceberg catalog type: %s", catalog)); |
| } |
| // HMS will override 'external.table.purge' to 'TRUE' When 'iceberg.catalog' is not |
| // the Hive Catalog for managed tables. |
| if (!isExternal() && !IcebergUtil.isHiveCatalog(getTblProperties()) |
| && "false".equalsIgnoreCase(getTblProperties().get( |
| Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) { |
| analyzer_.addWarning("The table property 'external.table.purge' will be set " |
| + "to 'TRUE' on newly created managed Iceberg tables."); |
| } |
| if (isExternalWithNoPurge() && IcebergUtil.isHiveCatalog(getTblProperties())) { |
| throw new AnalysisException("Cannot create EXTERNAL Iceberg table in the " + |
| "Hive Catalog."); |
| } |
| } |
| |
| private void validateTableInHiveCatalog() throws AnalysisException { |
| if (getTblProperties().get(IcebergTable.ICEBERG_CATALOG_LOCATION) != null) { |
| throw new AnalysisException(String.format("%s cannot be set for Iceberg table " + |
| "stored in hive.catalog", IcebergTable.ICEBERG_CATALOG_LOCATION)); |
| } |
| } |
| |
| private void validateTableInHadoopCatalog() throws AnalysisException { |
| // Table location cannot be set in SQL when using 'hadoop.catalog' |
| if (getLocation() != null) { |
| throw new AnalysisException(String.format("Location cannot be set for Iceberg " + |
| "table with 'hadoop.catalog'.")); |
| } |
| String catalogLoc = getTblProperties().get(IcebergTable.ICEBERG_CATALOG_LOCATION); |
| if (catalogLoc == null || catalogLoc.isEmpty()) { |
| throw new AnalysisException(String.format("Table property '%s' is necessary " + |
| "for Iceberg table with 'hadoop.catalog'.", |
| IcebergTable.ICEBERG_CATALOG_LOCATION)); |
| } |
| } |
| |
| private void validateTableInHadoopTables() throws AnalysisException { |
| if (getTblProperties().get(IcebergTable.ICEBERG_CATALOG_LOCATION) != null) { |
| throw new AnalysisException(String.format("%s cannot be set for Iceberg table " + |
| "stored in hadoop.tables", IcebergTable.ICEBERG_CATALOG_LOCATION)); |
| } |
| if (isExternalWithNoPurge() && getLocation() == null) { |
| throw new AnalysisException("Set LOCATION for external Iceberg tables " + |
| "stored in hadoop.tables. For creating a completely new Iceberg table, use " + |
| "'CREATE TABLE' (no EXTERNAL keyword)."); |
| } |
| } |
| |
| private void validateTableInCatalogs() { |
| String tableId = getTblProperties().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER); |
| if (tableId != null && !tableId.isEmpty()) { |
| putGeneratedProperty(Catalogs.NAME, tableId); |
| } |
| } |
| |
| /** |
| * For iceberg table, partition column must be from source column |
| */ |
| private void checkPartitionColumns(Analyzer analyzer) throws AnalysisException { |
| // This check is unnecessary for iceberg table without partition spec |
| List<IcebergPartitionSpec> specs = tableDef_.getIcebergPartitionSpecs(); |
| if (specs == null || specs.isEmpty()) return; |
| |
| // Iceberg table only has one partition spec now |
| IcebergPartitionSpec spec = specs.get(0); |
| // Analyzes the partition spec and the underlying partition fields. |
| spec.analyze(analyzer); |
| |
| List<IcebergPartitionField> fields = spec.getIcebergPartitionFields(); |
| Preconditions.checkState(fields != null && !fields.isEmpty()); |
| for (IcebergPartitionField field : fields) { |
| String fieldName = field.getFieldName(); |
| boolean containFlag = false; |
| for (ColumnDef columnDef : tableDef_.getColumnDefs()) { |
| if (columnDef.getColName().equalsIgnoreCase(fieldName)) { |
| containFlag = true; |
| break; |
| } |
| } |
| if (!containFlag) { |
| throw new AnalysisException("Cannot find source column: " + fieldName); |
| } |
| } |
| } |
| |
| /** |
| * Set column's nullable as true for default situation, so we can create optional |
| * Iceberg field |
| */ |
| private void analyzeIcebergColumns() { |
| if (!getPartitionColumnDefs().isEmpty()) { |
| createIcebergPartitionSpecFromPartitionColumns(); |
| } |
| for (ColumnDef def : getColumnDefs()) { |
| if (!def.isNullabilitySet()) { |
| def.setNullable(true); |
| } |
| } |
| } |
| |
| /** |
| * Creates Iceberg partition spec from partition columns. Needed to support old-style |
| * CREATE TABLE .. PARTITIONED BY (<cols>) syntax. In this case the column list in |
| * 'cols' is appended to the table-level columns, but also Iceberg-level IDENTITY |
| * partitions are created from this list. |
| */ |
| private void createIcebergPartitionSpecFromPartitionColumns() { |
| Preconditions.checkState(!getPartitionColumnDefs().isEmpty()); |
| Preconditions.checkState(getIcebergPartitionSpecs().isEmpty()); |
| List<IcebergPartitionField> partFields = new ArrayList<>(); |
| for (ColumnDef colDef : getPartitionColumnDefs()) { |
| partFields.add(new IcebergPartitionField(colDef.getColName(), |
| new IcebergPartitionTransform(TIcebergPartitionTransformType.IDENTITY))); |
| } |
| getIcebergPartitionSpecs().add(new IcebergPartitionSpec(partFields)); |
| getColumnDefs().addAll(getPartitionColumnDefs()); |
| getPartitionColumnDefs().clear(); |
| } |
| |
| /** |
| * Analyzes the parameters of a CREATE TABLE ... STORED BY JDBC statement. Adds the |
| * table properties of DataSource so that JDBC table is stored as DataSourceTable in |
| * HMS. |
| */ |
| private void analyzeJdbcSchema(Analyzer analyzer) throws AnalysisException { |
| for (ColumnDef col: getColumnDefs()) { |
| if (!DataSourceTable.isSupportedColumnType(col.getType())) { |
| throw new AnalysisException("Tables stored by JDBC do not support the column " + |
| "type: " + col.getType()); |
| } |
| } |
| |
| AnalysisUtils.throwIfNotNull(getCachingOp(), |
| "A JDBC table cannot be cached in HDFS."); |
| AnalysisUtils.throwIfNotNull(getLocation(), "LOCATION cannot be specified for a " + |
| "JDBC table."); |
| AnalysisUtils.throwIfNotEmpty(tableDef_.getPartitionColumnDefs(), |
| "PARTITIONED BY cannot be used in a JDBC table."); |
| |
| // Set table properties of the DataSource to make the table saved as DataSourceTable |
| // in HMS. |
| try { |
| DataSourceTable.setJdbcDataSourceProperties(getTblProperties()); |
| } catch (ImpalaRuntimeException e) { |
| throw new AnalysisException(String.format( |
| "Cannot create table '%s': %s", getTbl(), e.getMessage())); |
| } |
| } |
| |
| /** |
| * @return true for external tables that don't have "external.table.purge" set to true. |
| */ |
| private boolean isExternalWithNoPurge() { |
| return isExternal() && !Boolean.parseBoolean(getTblProperties().get( |
| Table.TBL_PROP_EXTERNAL_TABLE_PURGE)); |
| } |
| } |