blob: f3d9d98f8683f6f70aed9ce4711002a672744fcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergTableUtil {
public static final int SPEC_IDX = 1;
public static final int PART_IDX = 0;
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableUtil.class);
private IcebergTableUtil() {
}
/**
* Constructs the table properties needed for the Iceberg table loading by retrieving the information from the
* hmsTable. It then calls {@link IcebergTableUtil#getTable(Configuration, Properties)} with these properties.
* @param configuration a Hadoop configuration
* @param hmsTable the HMS table
* @param skipCache if set to true there won't be an attempt to retrieve the table from SessionState
* @return the Iceberg table
*/
static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable,
boolean skipCache) {
Properties properties = new Properties();
properties.setProperty(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString());
properties.setProperty(Catalogs.LOCATION, hmsTable.getSd().getLocation());
hmsTable.getParameters().computeIfPresent(InputFormatConfig.CATALOG_NAME,
(k, v) -> {
properties.setProperty(k, v);
return v;
});
return getTable(configuration, properties, skipCache);
}
public static Table getTable(Configuration configuration, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
return getTable(configuration, hmsTable, false);
}
/**
* Load the iceberg table either from the {@link QueryState} or through the configured catalog. Look for the table
* object stored in the query state. If it's null, it means the table was not loaded yet within the same query
* therefore we claim it through the Catalogs API and then store it in query state.
* @param configuration a Hadoop configuration
* @param properties controlling properties
* @param skipCache if set to true there won't be an attempt to retrieve the table from SessionState
* @return an Iceberg table
*/
static Table getTable(Configuration configuration, Properties properties, boolean skipCache) {
String metaTable = properties.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY);
String tableName = properties.getProperty(Catalogs.NAME);
String location = properties.getProperty(Catalogs.LOCATION);
if (metaTable != null) {
// HiveCatalog, HadoopCatalog uses NAME to identify the metadata table
properties.setProperty(Catalogs.NAME, tableName + "." + metaTable);
// HadoopTable uses LOCATION to identify the metadata table
properties.setProperty(Catalogs.LOCATION, location + "#" + metaTable);
}
String tableIdentifier = properties.getProperty(Catalogs.NAME);
Function<Void, Table> tableLoadFunc =
unused -> {
Table tab = Catalogs.loadTable(configuration, properties);
SessionStateUtil.addResource(configuration, tableIdentifier, tab);
return tab;
};
if (skipCache) {
return tableLoadFunc.apply(null);
} else {
return SessionStateUtil.getResource(configuration, tableIdentifier).filter(o -> o instanceof Table)
.map(o -> (Table) o).orElseGet(() -> {
LOG.debug("Iceberg table {} is not found in QueryState. Loading table from configured catalog",
tableIdentifier);
return tableLoadFunc.apply(null);
});
}
}
static Table getTable(Configuration configuration, Properties properties) {
return getTable(configuration, properties, false);
}
static Optional<Path> getColStatsPath(Table table) {
return getColStatsPath(table, table.currentSnapshot().snapshotId());
}
static Optional<Path> getColStatsPath(Table table, long snapshotId) {
return table.statisticsFiles().stream()
.filter(stats -> stats.snapshotId() == snapshotId)
.filter(stats -> stats.blobMetadata().stream()
.anyMatch(metadata -> ColumnStatisticsObj.class.getSimpleName().equals(metadata.type()))
)
.map(stats -> new Path(stats.path()))
.findAny();
}
/**
* Create {@link PartitionSpec} based on the partition information stored in
* {@link TransformSpec}.
* @param configuration a Hadoop configuration
* @param schema iceberg table schema
* @return iceberg partition spec, always non-null
*/
public static PartitionSpec spec(Configuration configuration, Schema schema) {
List<TransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
.map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
if (partitionTransformSpecList == null) {
LOG.debug("Iceberg partition transform spec is not found in QueryState.");
return null;
}
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
partitionTransformSpecList.forEach(spec -> {
switch (spec.getTransformType()) {
case IDENTITY:
builder.identity(spec.getColumnName().toLowerCase());
break;
case YEAR:
builder.year(spec.getColumnName());
break;
case MONTH:
builder.month(spec.getColumnName());
break;
case DAY:
builder.day(spec.getColumnName());
break;
case HOUR:
builder.hour(spec.getColumnName());
break;
case TRUNCATE:
builder.truncate(spec.getColumnName(), spec.getTransformParam().get());
break;
case BUCKET:
builder.bucket(spec.getColumnName(), spec.getTransformParam().get());
break;
}
});
return builder.build();
}
public static void updateSpec(Configuration configuration, Table table) {
// get the new partition transform spec
PartitionSpec newPartitionSpec = spec(configuration, table.schema());
if (newPartitionSpec == null) {
LOG.debug("Iceberg Partition spec is not updated due to empty partition spec definition.");
return;
}
// delete every field from the old partition spec
UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false);
table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name()));
List<TransformSpec> partitionTransformSpecList = SessionStateUtil
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
.map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
partitionTransformSpecList.forEach(spec -> {
switch (spec.getTransformType()) {
case IDENTITY:
updatePartitionSpec.addField(spec.getColumnName());
break;
case YEAR:
updatePartitionSpec.addField(Expressions.year(spec.getColumnName()));
break;
case MONTH:
updatePartitionSpec.addField(Expressions.month(spec.getColumnName()));
break;
case DAY:
updatePartitionSpec.addField(Expressions.day(spec.getColumnName()));
break;
case HOUR:
updatePartitionSpec.addField(Expressions.hour(spec.getColumnName()));
break;
case TRUNCATE:
updatePartitionSpec.addField(Expressions.truncate(spec.getColumnName(), spec.getTransformParam().get()));
break;
case BUCKET:
updatePartitionSpec.addField(Expressions.bucket(spec.getColumnName(), spec.getTransformParam().get()));
break;
}
});
updatePartitionSpec.commit();
}
public static boolean isBucketed(Table table) {
return table.spec().fields().stream().anyMatch(f -> f.transform().toString().startsWith("bucket["));
}
/**
* Roll an iceberg table's data back to a specific snapshot identified either by id or before a given timestamp.
* @param table the iceberg table
* @param type the type of the rollback, can be either time based or version based
* @param value parameter of the rollback, that can be a timestamp in millis or a snapshot id
*/
public static void rollback(Table table, AlterTableExecuteSpec.RollbackSpec.RollbackType type, Long value) {
ManageSnapshots manageSnapshots = table.manageSnapshots();
if (type == AlterTableExecuteSpec.RollbackSpec.RollbackType.TIME) {
LOG.debug("Trying to rollback iceberg table to snapshot before timestamp {}", value);
manageSnapshots.rollbackToTime(value);
} else {
LOG.debug("Trying to rollback iceberg table to snapshot ID {}", value);
manageSnapshots.rollbackTo(value);
}
manageSnapshots.commit();
}
/**
* Set the current snapshot for the iceberg table
* @param table the iceberg table
* @param value parameter of the rollback, that can be a snapshot id or a SnapshotRef name
*/
public static void setCurrentSnapshot(Table table, String value) {
ManageSnapshots manageSnapshots = table.manageSnapshots();
long snapshotId;
try {
snapshotId = Long.parseLong(value);
LOG.debug("Rolling the iceberg table {} from snapshot id {} to snapshot ID {}", table.name(),
table.currentSnapshot().snapshotId(), snapshotId);
} catch (NumberFormatException e) {
String refName = PlanUtils.stripQuotes(value);
snapshotId = Optional.ofNullable(table.refs().get(refName)).map(SnapshotRef::snapshotId).orElseThrow(() ->
new IllegalArgumentException(String.format("SnapshotRef %s does not exist", refName)));
LOG.debug("Rolling the iceberg table {} from snapshot id {} to the snapshot ID {} of SnapshotRef {}",
table.name(), table.currentSnapshot().snapshotId(), snapshotId, refName);
}
manageSnapshots.setCurrentSnapshot(snapshotId);
manageSnapshots.commit();
}
/**
* Fast forwards a branch to another.
* @param table the iceberg table
* @param sourceBranch the source branch
* @param targetBranch the target branch
*/
public static void fastForwardBranch(Table table, String sourceBranch, String targetBranch) {
LOG.debug("Fast Forwarding the iceberg table {} branch {} to {}", table.name(), sourceBranch, targetBranch);
table.manageSnapshots().fastForwardBranch(sourceBranch, targetBranch).commit();
}
public static void cherryPick(Table table, long snapshotId) {
LOG.debug("Cherry-Picking {} to {}", snapshotId, table.name());
table.manageSnapshots().cherrypick(snapshotId).commit();
}
public static boolean isV2Table(Map<String, String> props) {
return props != null &&
"2".equals(props.get(TableProperties.FORMAT_VERSION));
}
public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOperator<String> props) {
String mode = null;
switch (operation) {
case DELETE:
mode = props.apply(TableProperties.DELETE_MODE,
TableProperties.DELETE_MODE_DEFAULT);
break;
case UPDATE:
mode = props.apply(TableProperties.UPDATE_MODE,
TableProperties.UPDATE_MODE_DEFAULT);
break;
case MERGE:
mode = props.apply(TableProperties.MERGE_MODE,
TableProperties.MERGE_MODE_DEFAULT);
break;
}
return RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(mode);
}
public static void performMetadataDelete(Table icebergTable, String branchName, SearchArgument sarg) {
Expression exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
DeleteFiles deleteFiles = icebergTable.newDelete();
if (StringUtils.isNotEmpty(branchName)) {
deleteFiles = deleteFiles.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
deleteFiles.deleteFromRowFilter(exp).commit();
}
public static PartitionData toPartitionData(StructLike key, Types.StructType keyType) {
PartitionData data = new PartitionData(keyType);
for (int i = 0; i < keyType.fields().size(); i++) {
Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass());
if (val != null) {
data.set(i, val);
}
}
return data;
}
public static PartitionData toPartitionData(StructLike sourceKey, Types.StructType sourceKeyType,
Types.StructType targetKeyType) {
PartitionData data = new PartitionData(targetKeyType);
for (int i = 0; i < targetKeyType.fields().size(); i++) {
int fi = i;
String fieldName = targetKeyType.fields().get(fi).name();
Object val = sourceKeyType.fields().stream()
.filter(f -> f.name().equals(fieldName)).findFirst()
.map(sourceKeyElem -> sourceKey.get(sourceKeyType.fields().indexOf(sourceKeyElem),
targetKeyType.fields().get(fi).type().typeId().javaClass()))
.orElseThrow(() -> new RuntimeException(
String.format("Error retrieving value of partition field %s", fieldName)));
if (val != null) {
data.set(fi, val);
} else {
throw new RuntimeException(String.format("Partition field's %s value is null", fieldName));
}
}
return data;
}
public static List<DataFile> getDataFiles(Table table, int specId,
String partitionPath) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}
public static List<DeleteFile> getDeleteFiles(Table table, int specId, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return file.specId() == specId && table.specs()
.get(specId).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}
public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table).stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
Expression finalExp = Expressions.alwaysTrue();
for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
String partColName = entry.getKey();
if (partitionFieldMap.containsKey(partColName)) {
PartitionField partitionField = partitionFieldMap.get(partColName);
Type resultType = partitionField.transform().getResultType(table.schema()
.findField(partitionField.sourceId()).type());
Object value = Conversions.fromPartitionString(resultType, entry.getValue());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Iterable<?> iterable = () -> Collections.singletonList(value).iterator();
if (TransformSpec.TransformType.IDENTITY == transformType) {
Expression boundPredicate = Expressions.in(partitionField.name(), iterable);
finalExp = Expressions.and(finalExp, boundPredicate);
} else {
throw new SemanticException(
String.format("Partition transforms are not supported via truncate operation: %s", partColName));
}
} else {
throw new SemanticException(String.format("No partition column/transform by the name: %s", partColName));
}
}
return finalExp;
}
public static List<FieldSchema> getPartitionKeys(Table table, int specId) {
Schema schema = table.specs().get(specId).schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return table.specs().get(specId).fields().stream().map(partField ->
new FieldSchema(schema.findColumnName(partField.sourceId()),
colNameToColType.get(schema.findColumnName(partField.sourceId())),
String.format("Transform: %s", partField.transform().toString()))).collect(Collectors.toList());
}
public static List<PartitionField> getPartitionFields(Table table) {
return table.specs().values().stream().flatMap(spec -> spec.fields()
.stream()).distinct().collect(Collectors.toList());
}
public static Map<PartitionData, Integer> getPartitionInfo(Table icebergTable, Map<String, String> partSpecMap,
boolean allowPartialSpec) throws SemanticException, IOException {
Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(icebergTable, partSpecMap);
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS);
Map<PartitionData, Integer> result = Maps.newLinkedHashMap();
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
fileScanTasks.forEach(task ->
CloseableIterable.filter(
CloseableIterable.transform(task.asDataTask().rows(), row -> {
StructProjection data = row.get(IcebergTableUtil.PART_IDX, StructProjection.class);
Integer specId = row.get(IcebergTableUtil.SPEC_IDX, Integer.class);
return Maps.immutableEntry(IcebergTableUtil.toPartitionData(data,
Partitioning.partitionType(icebergTable), icebergTable.specs().get(specId).partitionType()),
specId);
}), entry -> {
ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.specs().get(entry.getValue()),
expression, false);
return resEval.residualFor(entry.getKey()).isEquivalentTo(Expressions.alwaysTrue()) &&
(entry.getKey().size() == partSpecMap.size() || allowPartialSpec);
}).forEach(entry -> result.put(entry.getKey(), entry.getValue())));
}
return result;
}
}