| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.mr; |
| |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.Set; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.iceberg.CatalogProperties; |
| import org.apache.iceberg.CatalogUtil; |
| import org.apache.iceberg.PartitionSpec; |
| import org.apache.iceberg.PartitionSpecParser; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.SchemaParser; |
| import org.apache.iceberg.Table; |
| import org.apache.iceberg.catalog.Catalog; |
| import org.apache.iceberg.catalog.TableIdentifier; |
| import org.apache.iceberg.hadoop.HadoopTables; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| |
| /** |
| * Class for catalog resolution and accessing the common functions for {@link Catalog} API. |
| * <p> |
| * If the catalog name is provided, get the catalog type from iceberg.catalog.<code>catalogName</code>.type config. |
| * <p> |
| * In case the catalog name is {@link #ICEBERG_HADOOP_TABLE_NAME location_based_table}, |
| * type is ignored and tables will be loaded using {@link HadoopTables}. |
| * <p> |
| * In case the value of catalog type is null, iceberg.catalog.<code>catalogName</code>.catalog-impl config |
| * is used to determine the catalog implementation class. |
| * <p> |
| * If catalog name is null, get the catalog type from {@link InputFormatConfig#CATALOG iceberg.mr.catalog} config: |
| * <ul> |
| * <li>hive: HiveCatalog</li> |
| * <li>location: HadoopTables</li> |
| * <li>hadoop: HadoopCatalog</li> |
| * </ul> |
| * <p> |
| * In case the value of catalog type is null, |
| * {@link InputFormatConfig#CATALOG_LOADER_CLASS iceberg.mr.catalog.loader.class} is used to determine |
| * the catalog implementation class. |
| * <p> |
| * Note: null catalog name mode is only supported for backwards compatibility. Using this mode is NOT RECOMMENDED. |
| */ |
| public final class Catalogs { |
| |
| public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg"; |
| public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table"; |
| |
| public static final String NAME = "name"; |
| public static final String LOCATION = "location"; |
| public static final String BRANCH_NAME = "branch_name"; |
| |
| private static final String NO_CATALOG_TYPE = "no catalog"; |
| private static final Set<String> PROPERTIES_TO_REMOVE = |
| ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME, |
| InputFormatConfig.CATALOG_NAME); |
| |
| private Catalogs() { |
| } |
| |
| /** |
| * Load an Iceberg table using the catalog and table identifier (or table path) specified by the configuration. |
| * @param conf a Hadoop conf |
| * @return an Iceberg table |
| */ |
| public static Table loadTable(Configuration conf) { |
| return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION), |
| conf.get(InputFormatConfig.CATALOG_NAME)); |
| } |
| |
| /** |
| * Load an Iceberg table using the catalog specified by the configuration. |
| * <p> |
| * The table identifier ({@link Catalogs#NAME}) and the catalog name ({@link InputFormatConfig#CATALOG_NAME}), |
| * or table path ({@link Catalogs#LOCATION}) should be specified by the controlling properties. |
| * <p> |
| * Used by HiveIcebergSerDe and HiveIcebergStorageHandler. |
| * @param conf a Hadoop configuration |
| * @param props the controlling properties |
| * @return an Iceberg table |
| */ |
| public static Table loadTable(Configuration conf, Properties props) { |
| return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION), |
| props.getProperty(InputFormatConfig.CATALOG_NAME)); |
| } |
| |
| private static Table loadTable(Configuration conf, String tableIdentifier, String tableLocation, |
| String catalogName) { |
| Optional<Catalog> catalog = loadCatalog(conf, catalogName); |
| |
| if (catalog.isPresent()) { |
| Preconditions.checkArgument(tableIdentifier != null, "Table identifier not set"); |
| return catalog.get().loadTable(TableIdentifier.parse(tableIdentifier)); |
| } |
| |
| Preconditions.checkArgument(tableLocation != null, "Table location not set"); |
| return new HadoopTables(conf).load(tableLocation); |
| } |
| |
| /** |
| * Creates an Iceberg table using the catalog specified by the configuration. |
| * <p> |
| * The properties should contain the following values: |
| * <ul> |
| * <li>Table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) is required |
| * <li>Table schema ({@link InputFormatConfig#TABLE_SCHEMA}) is required |
| * <li>Partition specification ({@link InputFormatConfig#PARTITION_SPEC}) is optional. Table will be unpartitioned if |
| * not provided |
| * </ul><p> |
| * Other properties will be handled over to the Table creation. The controlling properties above will not be |
| * propagated. |
| * @param conf a Hadoop conf |
| * @param props the controlling properties |
| * @return the created Iceberg table |
| */ |
| public static Table createTable(Configuration conf, Properties props) { |
| Schema schema = schema(props); |
| PartitionSpec spec = spec(props, schema); |
| String location = props.getProperty(LOCATION); |
| String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); |
| |
| Map<String, String> map = filterIcebergTableProperties(props); |
| |
| Optional<Catalog> catalog = loadCatalog(conf, catalogName); |
| |
| if (catalog.isPresent()) { |
| String name = props.getProperty(NAME); |
| Preconditions.checkNotNull(name, "Table identifier not set"); |
| return catalog.get().createTable(TableIdentifier.parse(name), schema, spec, location, map); |
| } |
| |
| Preconditions.checkNotNull(location, "Table location not set"); |
| return new HadoopTables(conf).create(schema, spec, map, location); |
| } |
| |
| /** |
| * Drops an Iceberg table using the catalog specified by the configuration. |
| * <p> |
| * The table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) should be specified by |
| * the controlling properties. |
| * @param conf a Hadoop conf |
| * @param props the controlling properties |
| * @return the created Iceberg table |
| */ |
| public static boolean dropTable(Configuration conf, Properties props) { |
| String location = props.getProperty(LOCATION); |
| String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); |
| |
| Optional<Catalog> catalog = loadCatalog(conf, catalogName); |
| |
| if (catalog.isPresent()) { |
| String name = props.getProperty(NAME); |
| Preconditions.checkNotNull(name, "Table identifier not set"); |
| return catalog.get().dropTable(TableIdentifier.parse(name)); |
| } |
| |
| Preconditions.checkNotNull(location, "Table location not set"); |
| return new HadoopTables(conf).dropTable(location); |
| } |
| |
| /** |
| * Returns true if HiveCatalog is used |
| * @param conf a Hadoop conf |
| * @param props the controlling properties |
| * @return true if the Catalog is HiveCatalog |
| */ |
| public static boolean hiveCatalog(Configuration conf, Properties props) { |
| String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); |
| String catalogType = getCatalogType(conf, catalogName); |
| if (catalogType != null) { |
| return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); |
| } |
| catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME); |
| if (catalogType != null) { |
| return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); |
| } |
| return getCatalogProperties(conf, catalogName, catalogType).get(CatalogProperties.CATALOG_IMPL) == null; |
| } |
| |
| /** |
| * Register a table with the configured catalog if it does not exist. |
| * @param conf a Hadoop conf |
| * @param props the controlling properties |
| * @param metadataLocation the location of a metadata file |
| * @return the created Iceberg table |
| */ |
| public static Table registerTable(Configuration conf, Properties props, String metadataLocation) { |
| Schema schema = schema(props); |
| PartitionSpec spec = spec(props, schema); |
| Map<String, String> map = filterIcebergTableProperties(props); |
| String location = props.getProperty(LOCATION); |
| String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); |
| |
| Optional<Catalog> catalog = loadCatalog(conf, catalogName); |
| if (catalog.isPresent()) { |
| String name = props.getProperty(NAME); |
| Preconditions.checkNotNull(name, "Table identifier not set"); |
| return catalog.get().registerTable(TableIdentifier.parse(name), metadataLocation); |
| } |
| |
| Preconditions.checkNotNull(location, "Table location not set"); |
| return new HadoopTables(conf).create(schema, spec, map, location); |
| } |
| |
| public static void renameTable(Configuration conf, Properties props, TableIdentifier to) { |
| String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); |
| |
| Optional<Catalog> catalog = loadCatalog(conf, catalogName); |
| if (catalog.isPresent()) { |
| String name = props.getProperty(NAME); |
| Preconditions.checkNotNull(name, "Table identifier not set"); |
| catalog.get().renameTable(TableIdentifier.parse(name), to); |
| } else { |
| throw new RuntimeException("Rename from " + props.getProperty(NAME) + " to " + to + " failed"); |
| } |
| } |
| |
| static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) { |
| String catalogType = getCatalogType(conf, catalogName); |
| if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) { |
| return Optional.empty(); |
| } else { |
| String name = catalogName == null ? ICEBERG_DEFAULT_CATALOG_NAME : catalogName; |
| return Optional.of(CatalogUtil.buildIcebergCatalog(name, |
| getCatalogProperties(conf, name, catalogType), conf)); |
| } |
| } |
| |
| /** |
| * Collect all the catalog specific configuration from the global hive configuration. |
| * @param conf a Hadoop configuration |
| * @param catalogName name of the catalog |
| * @param catalogType type of the catalog |
| * @return complete map of catalog properties |
| */ |
| private static Map<String, String> getCatalogProperties(Configuration conf, String catalogName, String catalogType) { |
| Map<String, String> catalogProperties = Maps.newHashMap(); |
| conf.forEach(config -> { |
| if (config.getKey().startsWith(InputFormatConfig.CATALOG_DEFAULT_CONFIG_PREFIX)) { |
| catalogProperties.putIfAbsent( |
| config.getKey().substring(InputFormatConfig.CATALOG_DEFAULT_CONFIG_PREFIX.length()), |
| config.getValue()); |
| } else if (config.getKey().startsWith(InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName)) { |
| catalogProperties.put( |
| config.getKey().substring((InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName).length() + 1), |
| config.getValue()); |
| } |
| }); |
| |
| return addCatalogPropertiesIfMissing(conf, catalogType, catalogProperties); |
| } |
| |
| /** |
| * This method is used for backward-compatible catalog configuration. |
| * Collect all the catalog specific configuration from the global hive configuration. |
| * Note: this should be removed when the old catalog configuration is depracated. |
| * @param conf global hive configuration |
| * @param catalogType type of the catalog |
| * @param catalogProperties pre-populated catalog properties |
| * @return complete map of catalog properties |
| */ |
| private static Map<String, String> addCatalogPropertiesIfMissing(Configuration conf, String catalogType, |
| Map<String, String> catalogProperties) { |
| if (catalogType != null) { |
| catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType); |
| } |
| |
| String legacyCatalogImpl = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS); |
| if (legacyCatalogImpl != null) { |
| catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, legacyCatalogImpl); |
| } |
| |
| String legacyWarehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION); |
| if (legacyWarehouseLocation != null) { |
| catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, legacyWarehouseLocation); |
| } |
| return catalogProperties; |
| } |
| |
| /** |
| * Return the catalog type based on the catalog name. |
| * <p> |
| * See {@link Catalogs} documentation for catalog type resolution strategy. |
| * |
| * @param conf global hive configuration |
| * @param catalogName name of the catalog |
| * @return type of the catalog, can be null |
| */ |
| private static String getCatalogType(Configuration conf, String catalogName) { |
| if (catalogName != null) { |
| String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey( |
| catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE)); |
| if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) { |
| return NO_CATALOG_TYPE; |
| } else { |
| return catalogType; |
| } |
| } else { |
| String catalogType = conf.get(InputFormatConfig.CATALOG); |
| if (catalogType != null && catalogType.equals(LOCATION)) { |
| return NO_CATALOG_TYPE; |
| } else { |
| return catalogType; |
| } |
| } |
| } |
| |
| /** |
| * Parse the table schema from the properties |
| * @param props the controlling properties |
| * @return schema instance |
| */ |
| private static Schema schema(Properties props) { |
| String schemaString = props.getProperty(InputFormatConfig.TABLE_SCHEMA); |
| Preconditions.checkNotNull(schemaString, "Table schema not set"); |
| return SchemaParser.fromJson(props.getProperty(InputFormatConfig.TABLE_SCHEMA)); |
| } |
| |
| /** |
| * Get the partition spec from the properties |
| * @param props the controlling properties |
| * @param schema instance of the iceberg schema |
| * @return instance of the partition spec |
| */ |
| private static PartitionSpec spec(Properties props, Schema schema) { |
| String specString = props.getProperty(InputFormatConfig.PARTITION_SPEC); |
| PartitionSpec spec = PartitionSpec.unpartitioned(); |
| if (specString != null) { |
| spec = PartitionSpecParser.fromJson(schema, specString); |
| } |
| return spec; |
| } |
| |
| /** |
| * Create the iceberg table properties without the {@link Catalogs#PROPERTIES_TO_REMOVE} |
| * @param props the controlling properties |
| * @return map of iceberg table properties |
| */ |
| private static Map<String, String> filterIcebergTableProperties(Properties props) { |
| Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size()); |
| for (Object key : props.keySet()) { |
| if (!PROPERTIES_TO_REMOVE.contains(key)) { |
| map.put(key.toString(), props.get(key).toString()); |
| } |
| } |
| return map; |
| } |
| } |