blob: 375a6d21f109413f89ae7a948d11aed9479e215a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.SnapshotContext;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc;
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.StorageFormat;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.stats.Partish;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinCompressionCodec;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Throwables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class);
private static final String ICEBERG_URI_PREFIX = "iceberg://";
private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
private static final String TABLE_NAME_SEPARATOR = "..";
private static final String ICEBERG = "iceberg";
private static final String PUFFIN = "puffin";
public static final String COPY_ON_WRITE = "copy-on-write";
public static final String MERGE_ON_READ = "merge-on-read";
public static final String STATS = "/stats/";
/**
* Function template for producing a custom sort expression function:
* Takes the source column index and the bucket count to creat a function where Iceberg bucket UDF is used to build
* the sort expression, e.g. iceberg_bucket(_col2, 5)
*/
private static final transient BiFunction<Integer, Integer, Function<List<ExprNodeDesc>, ExprNodeDesc>>
BUCKET_SORT_EXPR =
(idx, bucket) -> cols -> {
try {
ExprNodeDesc icebergBucketSourceCol = cols.get(idx);
return ExprNodeGenericFuncDesc.newInstance(new GenericUDFIcebergBucket(), "iceberg_bucket",
Lists.newArrayList(
icebergBucketSourceCol,
new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, bucket)
));
} catch (UDFArgumentException e) {
throw new RuntimeException(e);
}
};
private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(VirtualColumn.PARTITION_SPEC_ID,
VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION);
private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = ACID_VIRTUAL_COLS.stream()
.map(v -> new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
.collect(Collectors.toList());
private Configuration conf;
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return HiveIcebergInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return HiveIcebergOutputFormat.class;
}
@Override
public Class<? extends AbstractSerDe> getSerDeClass() {
return HiveIcebergSerDe.class;
}
@Override
public HiveMetaHook getMetaHook() {
// Make sure to always return a new instance here, as HiveIcebergMetaHook might hold state relevant for the
// operation.
return new HiveIcebergMetaHook(conf);
}
@Override
public HiveAuthorizationProvider getAuthorizationProvider() {
return null;
}
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
// Until the vectorized reader can handle delete files, let's fall back to non-vector mode for V2 tables
fallbackToNonVectorizedModeBasedOnProperties(tableDesc.getProperties());
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
// Until the vectorized reader can handle delete files, let's fall back to non-vector mode for V2 tables
fallbackToNonVectorizedModeBasedOnProperties(tableDesc.getProperties());
// For Tez, setting the committer here is enough to make sure it'll be part of the jobConf
map.put("mapred.output.committer.class", HiveIcebergNoJobCommitter.class.getName());
// For MR, the jobConf is set only in configureJobConf, so we're setting the write key here to detect it over there
String opType = getOperationType();
map.put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
// Putting the key into the table props as well, so that projection pushdown can be determined on a
// table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
// the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
}
/**
* Committer with no-op job commit. We can pass this into the Tez AM to take care of task commits/aborts, as well
* as aborting jobs reliably if an execution error occurred. However, we want to execute job commits on the
* HS2-side during the MoveTask, so we will use the full-featured HiveIcebergOutputCommitter there.
*/
static class HiveIcebergNoJobCommitter extends HiveIcebergOutputCommitter {
@Override
public void commitJob(JobContext originalContext) throws IOException {
// do nothing
}
}
@Override
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {
}
// Override annotation commented out, since this interface method has been introduced only in Hive 3
// @Override
public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> secrets) {
}
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
setCommonJobConf(jobConf);
if (tableDesc != null && tableDesc.getProperties() != null &&
tableDesc.getProperties().get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName()) != null) {
String tableName = tableDesc.getTableName();
String opKey = InputFormatConfig.OPERATION_TYPE_PREFIX + tableName;
// set operation type into job conf too
jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey));
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
"Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME);
if (catalogName != null) {
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
}
}
try {
if (!jobConf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_IDE.varname, false)) {
// For running unit test this won't work as maven surefire CP is different than what we have on a cluster:
// it places the current projects' classes and test-classes to top instead of jars made from these...
Utilities.addDependencyJars(jobConf, HiveIcebergStorageHandler.class);
}
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public boolean directInsert() {
return true;
}
@Override
public boolean alwaysUnpartitioned() {
return true;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public String toString() {
return this.getClass().getName();
}
/**
* @param jobConf Job configuration for InputFormat to access
* @param deserializer Deserializer
* @param exprNodeDesc Filter expression extracted by Hive
* @return Entire filter to take advantage of Hive's pruning as well as Iceberg's pruning.
*/
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) {
DecomposedPredicate predicate = new DecomposedPredicate();
predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
return predicate;
}
@Override
public boolean canProvideBasicStatistics() {
return true;
}
@Override
public StorageFormatDescriptor getStorageFormatDescriptor(org.apache.hadoop.hive.metastore.api.Table table)
throws SemanticException {
if (table.getParameters() != null) {
String format = table.getParameters().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, IOConstants.PARQUET);
return StorageFormat.getDescriptor(format, TableProperties.DEFAULT_FILE_FORMAT);
}
return null;
}
@Override
public Map<String, String> getBasicStatistics(Partish partish) {
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
// For write queries where rows got modified, don't fetch from cache as values could have changed.
Table table = getTable(hmsTable);
Map<String, String> stats = Maps.newHashMap();
if (getStatsSource().equals(ICEBERG)) {
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {
if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}
if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
}
if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
}
return stats;
}
private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table;
final Optional<QueryState> queryState = SessionStateUtil.getQueryState(conf);
if (!queryState.isPresent() || queryState.get().getNumModifiedRows() > 0) {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable(), true);
} else {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
}
return table;
}
@Override
public boolean canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return table.currentSnapshot() != null && getStatsSource().equals(ICEBERG);
}
@Override
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String snapshotId = String.format("%s-STATS-%d", tbl.name(), tbl.currentSnapshot().snapshotId());
invalidateStats(getStatsPath(tbl));
byte[] serializeColStats = SerializationUtils.serialize((Serializable) colStats);
try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getStatsPath(tbl).toString()))
.createdBy(Constants.HIVE_ENGINE).build()) {
writer.add(
new Blob(
tbl.name() + "-" + snapshotId,
ImmutableList.of(1),
tbl.currentSnapshot().snapshotId(),
tbl.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(serializeColStats),
PuffinCompressionCodec.NONE,
ImmutableMap.of()));
writer.finish();
} catch (IOException e) {
LOG.error(String.valueOf(e));
}
return false;
}
@Override
public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (canSetColStatistics(hmsTable)) {
Path statsPath = getStatsPath(table);
try (FileSystem fs = statsPath.getFileSystem(conf)) {
if (fs.exists(statsPath)) {
return true;
}
} catch (IOException e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
}
}
return false;
}
@Override
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String statsPath = getStatsPath(table).toString();
LOG.info("Using stats from puffin file at: {}", statsPath);
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath)).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
Map<BlobMetadata, List<ColumnStatistics>> collect =
Streams.stream(reader.readAll(blobMetadata)).collect(Collectors.toMap(Pair::first,
blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0)).get(0).getStatsObj();
} catch (IOException e) {
LOG.error("Error when trying to read iceberg col stats from puffin files: ", e);
}
return null;
}
@Override
public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
if (getStatsSource().equals(ICEBERG)) {
Table table = getTable(hmsTable);
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null && summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
return totalEqDeletes + totalPosDeletes == 0;
}
}
}
return false;
}
private String getStatsSource() {
return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, ICEBERG).toLowerCase();
}
private Path getStatsPath(Table table) {
return new Path(table.location() + STATS + table.name() + table.currentSnapshot().snapshotId());
}
private void invalidateStats(Path statsPath) {
try (FileSystem fs = statsPath.getFileSystem(conf)) {
if (fs.exists(statsPath)) {
fs.delete(statsPath, true);
}
} catch (IOException e) {
LOG.error("Failed to invalidate stale column stats: {}", e);
}
}
/**
* No need for exclusive locks when writing, since Iceberg tables use optimistic concurrency when writing
* and only lock the table during the commit operation.
*/
@Override
public LockType getLockType(WriteEntity writeEntity) {
return LockType.SHARED_READ;
}
@Override
public boolean supportsPartitionTransform() {
return true;
}
@Override
public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return table.spec().fields().stream().map(f ->
getTransformSpec(table, f.transform().toString().toUpperCase(), f.sourceId())
).collect(Collectors.toList());
}
private List<TransformSpec> getSortTransformSpec(Table table) {
return table.sortOrder().fields().stream().map(s ->
getTransformSpec(table, s.transform().toString().toUpperCase(), s.sourceId())
).collect(Collectors.toList());
}
private TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
TransformSpec spec = new TransformSpec();
spec.setColumnName(table.schema().findColumnName(sourceId));
// if the transform name contains '[' it means it has some config params
if (transformName.contains("[")) {
spec.setTransformType(TransformSpec.TransformType
.valueOf(transformName.substring(0, transformName.indexOf("["))));
spec.setTransformParam(Optional.of(Integer
.valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
} else {
spec.setTransformType(TransformSpec.TransformType.valueOf(transformName));
spec.setTransformParam(Optional.empty());
}
return spec;
}
@Override
public DynamicPartitionCtx createDPContext(
HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation)
throws SemanticException {
// delete records are already clustered by partition spec id and the hash of the partition struct
// there is no need to do any additional sorting based on partition columns
if (writeOperation == Operation.DELETE) {
return null;
}
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
dpCtx.setCustomSortExpressions(customSortExprs);
if (table.spec().isPartitioned()) {
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable));
}
SortOrder sortOrder = table.sortOrder();
if (sortOrder.isSorted()) {
List<Integer> customSortPositions = Lists.newLinkedList();
List<Integer> customSortOrder = Lists.newLinkedList();
dpCtx.setCustomSortOrder(customSortOrder);
List<Integer> customSortNullOrder = Lists.newLinkedList();
dpCtx.setCustomSortNullOrder(customSortNullOrder);
for (SortField sortField : sortOrder.fields()) {
int pos = 0;
for (Types.NestedField field : table.schema().columns()) {
if (sortField.sourceId() == field.fieldId()) {
customSortPositions.add(pos);
customSortOrder.add(sortField.direction() == SortDirection.ASC ? 1 : 0);
customSortNullOrder.add(sortField.nullOrder() == NullOrder.NULLS_FIRST ? 0 : 1);
break;
}
pos++;
}
}
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table));
}
return dpCtx;
}
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
List<TransformSpec> transformSpecs) {
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
List<Types.NestedField> fields = table.schema().columns();
for (int i = 0; i < fields.size(); ++i) {
fieldOrderMap.put(fields.get(i).name(), i);
}
int offset = acidSelectColumns(hmsTable, writeOperation).size();
for (TransformSpec spec : transformSpecs) {
int order = fieldOrderMap.get(spec.getColumnName());
if (TransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset, spec.getTransformParam().get()));
} else {
customSortExprs.add(cols -> cols.get(order + offset).clone());
}
}
}
@Override
public String getFileFormatPropertyKey() {
return TableProperties.DEFAULT_FILE_FORMAT;
}
@Override
public boolean commitInMoveTask() {
return true;
}
@Override
public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
String location = commitProperties.getProperty(Catalogs.LOCATION);
String branchName = commitProperties.getProperty(Catalogs.BRANCH_NAME);
Configuration configuration = SessionState.getSessionConf();
if (location != null) {
HiveTableUtil.cleanupTableObjectFile(location, configuration);
}
List<JobContext> jobContextList = generateJobContext(configuration, tableName, branchName, overwrite);
if (jobContextList.isEmpty()) {
return;
}
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
try {
committer.commitJobs(jobContextList);
} catch (Throwable e) {
String ids = jobContextList
.stream().map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(", "));
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
ids, tableName, e);
try {
committer.abortJobs(jobContextList);
} catch (IOException ioe) {
LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
// no throwing here because the original exception should be propagated
}
throw new HiveException(
"Error committing job: " + ids + " for table: " + tableName, e);
}
}
@Override
public boolean isAllowedAlterOperation(AlterTableType opType) {
return HiveIcebergMetaHook.SUPPORTED_ALTER_OPS.contains(opType);
}
@Override
public boolean supportsTruncateOnNonNativeTables() {
return true;
}
@Override
public boolean isTimeTravelAllowed() {
return true;
}
@Override
public boolean isTableMetaRefSupported() {
return true;
}
@Override
public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, AlterTableExecuteSpec executeSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
switch (executeSpec.getOperationType()) {
case ROLLBACK:
LOG.info("Executing rollback operation on iceberg table. If you would like to revert rollback you could " +
"try altering the metadata location to the current metadata location by executing the following query:" +
"ALTER TABLE {}.{} SET TBLPROPERTIES('metadata_location'='{}'). This operation is supported for Hive " +
"Catalog tables.", hmsTable.getDbName(), hmsTable.getTableName(),
((BaseTable) icebergTable).operations().current().metadataFileLocation());
AlterTableExecuteSpec.RollbackSpec rollbackSpec =
(AlterTableExecuteSpec.RollbackSpec) executeSpec.getOperationParams();
IcebergTableUtil.rollback(icebergTable, rollbackSpec.getRollbackType(), rollbackSpec.getParam());
break;
case EXPIRE_SNAPSHOT:
LOG.info("Executing expire snapshots operation on iceberg table {}.{}", hmsTable.getDbName(),
hmsTable.getTableName());
AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec =
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", hmsTable.getCompleteName(),
numThreads);
final ExecutorService deleteExecutorService =
getDeleteExecutorService(hmsTable.getCompleteName(), numThreads);
try {
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis())
.executeDeleteWith(deleteExecutorService).commit();
} finally {
deleteExecutorService.shutdown();
}
} else {
icebergTable.expireSnapshots().expireOlderThan(expireSnapshotsSpec.getTimestampMillis()).commit();
}
break;
case SET_CURRENT_SNAPSHOT:
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
(AlterTableExecuteSpec.SetCurrentSnapshotSpec) executeSpec.getOperationParams();
LOG.debug("Executing set current snapshot operation on iceberg table {}.{} to version {}", hmsTable.getDbName(),
hmsTable.getTableName(), setSnapshotVersionSpec.getSnapshotId());
IcebergTableUtil.setCurrentSnapshot(icebergTable, setSnapshotVersionSpec.getSnapshotId());
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
}
}
private static ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
return Executors.newFixedThreadPool(numThreads, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
return thread;
});
}
@Override
public void alterTableBranchOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableBranchSpec alterBranchSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
Optional.ofNullable(icebergTable.currentSnapshot()).orElseThrow(() ->
new UnsupportedOperationException(String.format("Cannot alter branch on iceberg table" +
" %s.%s which has no snapshot", hmsTable.getDbName(), hmsTable.getTableName())));
switch (alterBranchSpec.getOperationType()) {
case CREATE_BRANCH:
AlterTableBranchSpec.CreateBranchSpec createBranchSpec =
(AlterTableBranchSpec.CreateBranchSpec) alterBranchSpec.getOperationParams();
IcebergBranchExec.createBranch(icebergTable, createBranchSpec);
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", alterBranchSpec.getOperationType().name()));
}
}
@Override
public boolean isValidMetadataTable(String metaTableName) {
return IcebergMetadataTables.isValidMetaTable(metaTableName);
}
@Override
public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
String branch = HiveUtils.getTableBranch(tableMetaRef);
if (branch != null) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (tbl.snapshot(branch) != null) {
hmsTable.setBranchName(tableMetaRef);
return hmsTable;
}
throw new SemanticException(String.format("Cannot use branch (does not exist): %s", branch));
}
if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) {
hmsTable.setMetaTable(tableMetaRef);
return hmsTable;
}
throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, tableMetaRef);
}
@Override
public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
String dbName = hmsTable.getDbName();
String tableName = hmsTable.getTableName();
StringBuilder authURI =
new StringBuilder(ICEBERG_URI_PREFIX).append(encodeString(dbName)).append("/").append(encodeString(tableName))
.append("?snapshot=");
Optional<String> locationProperty = SessionStateUtil.getProperty(conf, hive_metastoreConstants.META_TABLE_LOCATION);
if (locationProperty.isPresent()) {
Preconditions.checkArgument(locationProperty.get() != null,
"Table location is not set in SessionState. Authorization URI cannot be supplied.");
// this property is set during the create operation before the hive table was created
// we are returning a dummy iceberg metadata file
authURI.append(encodeString(URI.create(locationProperty.get()).getPath()))
.append(encodeString("/metadata/dummy.metadata.json"));
} else {
Table table = IcebergTableUtil.getTable(conf, hmsTable);
authURI.append(
encodeString(URI.create(((BaseTable) table).operations().current().metadataFileLocation()).getPath()));
}
LOG.debug("Iceberg storage handler authorization URI {}", authURI);
return new URI(authURI.toString());
}
@VisibleForTesting
static String encodeString(String rawString) {
if (rawString == null) {
return null;
}
return HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.encode(rawString);
}
@Override
public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
HiveStorageHandler.super.validateSinkDesc(sinkDesc);
if (sinkDesc.getInsertOverwrite()) {
Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
if (table.currentSnapshot() != null &&
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.TOTAL_RECORDS_PROP)) == 0) {
// If the table is empty we don't have any danger that some data can get lost.
return;
}
if (IcebergTableUtil.isBucketed(table)) {
throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
}
if (table.currentSnapshot() != null) {
if (table.currentSnapshot().allManifests(table.io()).parallelStream().map(ManifestFile::partitionSpecId)
.anyMatch(id -> id < table.spec().specId())) {
throw new SemanticException(
"Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " +
"to successfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
"conforming to the latest spec. ");
}
}
}
}
@Override
public AcidSupportType supportsAcidOperations(org.apache.hadoop.hive.ql.metadata.Table table,
boolean isWriteOperation) {
if (table.getParameters() != null && "2".equals(table.getParameters().get(TableProperties.FORMAT_VERSION))) {
if (isWriteOperation) {
checkDMLOperationMode(table);
}
return AcidSupportType.WITHOUT_TRANSACTIONS;
}
return AcidSupportType.NONE;
}
// TODO: remove the checks as copy-on-write mode implementation for these DML ops get added
private static void checkDMLOperationMode(org.apache.hadoop.hive.ql.metadata.Table table) {
Map<String, String> opTypes = ImmutableMap.of(
TableProperties.MERGE_MODE, TableProperties.MERGE_MODE_DEFAULT,
TableProperties.UPDATE_MODE, TableProperties.UPDATE_MODE_DEFAULT);
for (Map.Entry<String, String> opType : opTypes.entrySet()) {
String mode = table.getParameters().get(opType.getKey());
RowLevelOperationMode rowLevelOperationMode = RowLevelOperationMode.fromName(
mode != null ? mode : opType.getValue()
);
if (RowLevelOperationMode.COPY_ON_WRITE.equals(rowLevelOperationMode)) {
throw new UnsupportedOperationException(
String.format("Hive doesn't support copy-on-write mode as %s. Please set '%s'='merge-on-read' on %s " +
"before running ACID operations on it.", opType.getKey(), opType.getKey(), table.getTableName())
);
}
}
}
@Override
public List<VirtualColumn> acidVirtualColumns() {
return ACID_VIRTUAL_COLS;
}
@Override
public List<FieldSchema> acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
case DELETE:
case UPDATE:
// TODO: make it configurable whether we want to include the table columns in the select query.
// It might make delete writes faster if we don't have to write out the row object
return ListUtils.union(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols());
case MERGE:
return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
default:
return ImmutableList.of();
}
}
@Override
public List<FieldSchema> acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
case DELETE:
return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
default:
// For update operations we use the same sort order defined by
// {@link #createDPContext(HiveConf, org.apache.hadoop.hive.ql.metadata.Table)}
return ImmutableList.of();
}
}
@Override
public boolean supportsSortColumns() {
return true;
}
@Override
public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
if (table.sortOrder().isUnsorted()) {
return Collections.emptyList();
}
Schema schema = table.schema();
return table.sortOrder().fields().stream().map(s -> new FieldSchema(schema.findColumnName(s.sourceId()),
schema.findType(s.sourceId()).toString(),
String.format("Transform: %s, Sort direction: %s, Null sort order: %s",
s.transform().toString(), s.direction().name(), s.nullOrder().name()))).collect(Collectors.toList());
}
private void setCommonJobConf(JobConf jobConf) {
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
}
public boolean addDynamicSplitPruningEdge(org.apache.hadoop.hive.ql.metadata.Table table,
ExprNodeDesc syntheticFilterPredicate) {
try {
Collection<String> partitionColumns = ((HiveIcebergSerDe) table.getDeserializer()).partitionColumns();
if (partitionColumns.size() > 0) {
// The filter predicate contains ExprNodeDynamicListDesc object(s) for places where we will substitute
// dynamic values later during execution. For Example:
// GenericUDFIn(Column[ss_sold_date_sk], RS[5] <-- This is an ExprNodeDynamicListDesc)
//
// We would like to check if we will be able to convert these expressions to Iceberg filters when the
// actual values will be available, so in this check we replace the ExprNodeDynamicListDesc with dummy
// values and check whether the conversion will be possible or not.
ExprNodeDesc clone = syntheticFilterPredicate.clone();
String filterColumn = collectColumnAndReplaceDummyValues(clone, null);
// If the filter is for a partition column then it could be worthwhile to try dynamic partition pruning
if (partitionColumns.contains(filterColumn)) {
// Check if we can convert the expression to a valid Iceberg filter
SearchArgument sarg = ConvertAstToSearchArg.create(conf, (ExprNodeGenericFuncDesc) clone);
HiveIcebergFilterFactory.generateFilterExpression(sarg);
LOG.debug("Found Iceberg partition column to prune with predicate {}", syntheticFilterPredicate);
return true;
}
}
} catch (UnsupportedOperationException uoe) {
// If we can not convert the filter, we do not prune
LOG.debug("Unsupported predicate {}", syntheticFilterPredicate, uoe);
}
// There is nothing to prune, or we could not use the filter
LOG.debug("Not found Iceberg partition columns to prune with predicate {}", syntheticFilterPredicate);
return false;
}
/**
* Returns the Table serialized to the configuration based on the table name.
* If configuration is missing from the FileIO of the table, it will be populated with the input config.
*
* @param config The configuration used to get the data from
* @param name The name of the table we need as returned by TableDesc.getTableName()
* @return The Table
*/
public static Table table(Configuration config, String name) {
Table table = SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + name));
if (table == null &&
config.getBoolean(hive_metastoreConstants.TABLE_IS_CTAS, false) &&
StringUtils.isNotBlank(config.get(InputFormatConfig.TABLE_LOCATION))) {
table = HiveTableUtil.readTableObjectFromFile(config);
}
checkAndSetIoConfig(config, table);
return table;
}
/**
* If enabled, it populates the FileIO's hadoop configuration with the input config object.
* This might be necessary when the table object was serialized without the FileIO config.
*
* @param config Configuration to set for FileIO, if enabled
* @param table The Iceberg table object
*/
public static void checkAndSetIoConfig(Configuration config, Table table) {
if (table != null && config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).setConf(config);
}
}
/**
* If enabled, it ensures that the FileIO's hadoop configuration will not be serialized.
* This might be desirable for decreasing the overall size of serialized table objects.
*
* Note: Skipping FileIO config serialization in this fashion might in turn necessitate calling
* {@link #checkAndSetIoConfig(Configuration, Table)} on the deserializer-side to enable subsequent use of the FileIO.
*
* @param config Configuration to set for FileIO in a transient manner, if enabled
* @param table The Iceberg table object
*/
public static void checkAndSkipIoConfigSerialization(Configuration config, Table table) {
if (table != null && config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).serializeConfWith(conf -> new NonSerializingConfig(config)::get);
}
}
/**
* Returns the names of the output tables stored in the configuration.
* @param config The configuration used to get the data from
* @return The collection of the table names as returned by TableDesc.getTableName()
*/
public static Set<String> outputTables(Configuration config) {
return Sets.newHashSet(TABLE_NAME_SPLITTER.split(config.get(InputFormatConfig.OUTPUT_TABLES)));
}
/**
* Returns the catalog name serialized to the configuration.
* @param config The configuration used to get the data from
* @param name The name of the table we neeed as returned by TableDesc.getTableName()
* @return catalog name
*/
public static String catalogName(Configuration config, String name) {
return config.get(InputFormatConfig.TABLE_CATALOG_PREFIX + name);
}
/**
* Returns the Table Schema serialized to the configuration.
* @param config The configuration used to get the data from
* @return The Table Schema object
*/
public static Schema schema(Configuration config) {
return SchemaParser.fromJson(config.get(InputFormatConfig.TABLE_SCHEMA));
}
/**
* Stores the serializable table data in the configuration.
* Currently the following is handled:
* <ul>
* <li>- Table - in case the table is serializable</li>
* <li>- Location</li>
* <li>- Schema</li>
* <li>- Partition specification</li>
* <li>- FileIO for handling table files</li>
* <li>- Location provider used for file generation</li>
* <li>- Encryption manager for encryption handling</li>
* </ul>
* @param configuration The configuration storing the catalog information
* @param tableDesc The table which we want to store to the configuration
* @param map The map of the configuration properties which we append with the serialized data
*/
@VisibleForTesting
static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
Properties props = tableDesc.getProperties();
Maps.fromProperties(props).entrySet().stream()
.filter(entry -> !map.containsKey(entry.getKey())) // map overrides tableDesc properties
.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
String location;
Schema schema;
PartitionSpec spec;
try {
Table table = IcebergTableUtil.getTable(configuration, props);
location = table.location();
schema = table.schema();
spec = table.spec();
// serialize table object into config
Table serializableTable = SerializableTable.copyOf(table);
checkAndSkipIoConfigSerialization(configuration, serializableTable);
map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
SerializationUtil.serializeToBase64(serializableTable));
} catch (NoSuchTableException ex) {
if (!HiveTableUtil.isCtas(props)) {
throw ex;
}
if (!Catalogs.hiveCatalog(configuration, props)) {
throw new UnsupportedOperationException(HiveIcebergSerDe.CTAS_EXCEPTION_MSG);
}
location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
SerializationUtil.serializeToBase64(null));
try {
AbstractSerDe serDe = tableDesc.getDeserializer(configuration);
HiveIcebergSerDe icebergSerDe = (HiveIcebergSerDe) serDe;
schema = icebergSerDe.getTableSchema();
spec = IcebergTableUtil.spec(configuration, icebergSerDe.getTableSchema());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME));
if (StringUtils.isNotBlank(location)) {
map.put(InputFormatConfig.TABLE_LOCATION, location);
}
String schemaJson = SchemaParser.toJson(schema);
map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
// save schema into table props as well to avoid repeatedly hitting the HMS during serde initializations
// this is an exception to the interface documentation, but it's a safe operation to add this property
props.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
if (spec == null) {
spec = PartitionSpec.unpartitioned();
}
props.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));
// We need to remove this otherwise the job.xml will be invalid as column comments are separated with '\0' and
// the serialization utils fail to serialize this character
map.remove("columns.comments");
}
/**
* Recursively replaces the ExprNodeDynamicListDesc nodes by a dummy ExprNodeConstantDesc so we can test if we can
* convert the predicate to an Iceberg predicate when pruning the partitions later. Also collects the column names
* in the filter.
* <p>
* Please make sure that it is ok to change the input node (clone if needed)
* @param node The node we are traversing
* @param foundColumn The column we already found
*/
private String collectColumnAndReplaceDummyValues(ExprNodeDesc node, String foundColumn) {
String column = foundColumn;
List<ExprNodeDesc> children = node.getChildren();
if (children != null && !children.isEmpty()) {
ListIterator<ExprNodeDesc> iterator = children.listIterator();
while (iterator.hasNext()) {
ExprNodeDesc child = iterator.next();
if (child instanceof ExprNodeDynamicListDesc) {
Object dummy;
switch (((PrimitiveTypeInfo) child.getTypeInfo()).getPrimitiveCategory()) {
case INT:
case SHORT:
dummy = 1;
break;
case LONG:
dummy = 1L;
break;
case TIMESTAMP:
case TIMESTAMPLOCALTZ:
dummy = new Timestamp();
break;
case CHAR:
case VARCHAR:
case STRING:
dummy = "1";
break;
case DOUBLE:
case FLOAT:
case DECIMAL:
dummy = 1.1;
break;
case DATE:
dummy = new Date();
break;
case BOOLEAN:
dummy = true;
break;
default:
throw new UnsupportedOperationException("Not supported primitive type in partition pruning: " +
child.getTypeInfo());
}
iterator.set(new ExprNodeConstantDesc(child.getTypeInfo(), dummy));
} else {
String newColumn;
if (child instanceof ExprNodeColumnDesc) {
newColumn = ((ExprNodeColumnDesc) child).getColumn();
} else {
newColumn = collectColumnAndReplaceDummyValues(child, column);
}
if (column != null && newColumn != null && !newColumn.equals(column)) {
throw new UnsupportedOperationException("Partition pruning does not support filtering for more columns");
}
if (column == null) {
column = newColumn;
}
}
}
}
return column;
}
/**
* If any of the following checks is true we fall back to non vectorized mode:
* <ul>
* <li>fileformat is set to avro</li>
* <li>querying metadata tables</li>
* <li>fileformat is set to ORC, and table schema has time type column</li>
* <li>fileformat is set to PARQUET, and table schema has a list type column, that has a complex type element</li>
* </ul>
* @param tableProps table properties, must be not null
*/
private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) {
Schema tableSchema = SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA));
if (FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)) ||
(tableProps.containsKey("metaTable") && isValidMetadataTable(tableProps.getProperty("metaTable"))) ||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, false);
}
}
/**
* Iceberg Time type columns are written as longs into ORC files. There is no Time type in Hive, so it is represented
* as String instead. For ORC there's no automatic conversion from long to string during vectorized reading such as
* for example in Parquet (in Parquet files Time type is an int64 with 'time' logical annotation).
* @param tableProps iceberg table properties
* @param tableSchema iceberg table schema
* @return
*/
private static boolean hasOrcTimeInSchema(Properties tableProps, Schema tableSchema) {
if (!FileFormat.ORC.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) {
return false;
}
return tableSchema.columns().stream().anyMatch(f -> Types.TimeType.get().typeId() == f.type().typeId());
}
/**
* Vectorized reads of parquet files from columns with list or map type is only supported if the nested types are of
* primitive type category
* check {@link VectorizedParquetRecordReader#checkListColumnSupport} for details on nested types under lists
* @param tableProps iceberg table properties
* @param tableSchema iceberg table schema
* @return
*/
private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps, Schema tableSchema) {
if (!FileFormat.PARQUET.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) {
return true;
}
for (Types.NestedField field : tableSchema.columns()) {
if (field.type().isListType() || field.type().isMapType()) {
for (Types.NestedField nestedField : field.type().asNestedType().fields()) {
if (!nestedField.type().isPrimitiveType()) {
return false;
}
}
}
}
return true;
}
/**
* Generates {@link JobContext}s for the OutputCommitter for the specific table.
* @param configuration The configuration used for as a base of the JobConf
* @param tableName The name of the table we are planning to commit
* @param overwrite If we have to overwrite the existing table or just add the new data
* @return The generated Optional JobContext list or empty if not presents.
*/
private List<JobContext> generateJobContext(Configuration configuration, String tableName,
String branchName, boolean overwrite) {
JobConf jobConf = new JobConf(configuration);
Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
SessionStateUtil.getCommitInfo(jobConf, tableName);
if (commitInfoMap.isPresent()) {
List<JobContext> jobContextList = Lists.newLinkedList();
for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) {
JobID jobID = JobID.forName(commitInfo.getJobIdStr());
commitInfo.getProps().forEach(jobConf::set);
jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
// we should only commit this current table because
// for multi-table inserts, this hook method will be called sequentially for each target table
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
if (branchName != null) {
jobConf.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, branchName);
}
jobContextList.add(new JobContextImpl(jobConf, jobID, null));
}
return jobContextList;
} else {
// most likely empty write scenario
LOG.debug("Unable to find commit information in query state for table: {}", tableName);
return Collections.emptyList();
}
}
private String getOperationType() {
return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
.orElse(Operation.OTHER.name());
}
private static class NonSerializingConfig implements Serializable {
private final transient Configuration conf;
NonSerializingConfig(Configuration conf) {
this.conf = conf;
}
public Configuration get() {
if (conf == null) {
throw new IllegalStateException("Configuration was not serialized on purpose but was not set manually either");
}
return conf;
}
}
@Override
public boolean areSnapshotsSupported() {
return true;
}
@Override
public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
Snapshot current = table.currentSnapshot();
if (current == null) {
return null;
}
return new SnapshotContext(current.snapshotId());
}
@Override
public void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc,
EnvironmentContext environmentContext) {
if (alterTableDesc instanceof AlterTableSetPropertiesDesc &&
alterTableDesc.getProps().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) {
// signal manual iceberg metadata location updated by user
environmentContext.putToProperties(HiveIcebergMetaHook.MANUAL_ICEBERG_METADATA_LOCATION_CHANGE, "true");
}
}
@Override
public void setTableParametersForCTLT(org.apache.hadoop.hive.ql.metadata.Table tbl, CreateTableLikeDesc desc,
Map<String, String> origParams) {
// Preserve the format-version of the iceberg table and filter out rest.
String formatVersion = origParams.get(TableProperties.FORMAT_VERSION);
if ("2".equals(formatVersion)) {
tbl.getParameters().put(TableProperties.FORMAT_VERSION, formatVersion);
tbl.getParameters().put(TableProperties.DELETE_MODE, MERGE_ON_READ);
tbl.getParameters().put(TableProperties.UPDATE_MODE, MERGE_ON_READ);
tbl.getParameters().put(TableProperties.MERGE_MODE, MERGE_ON_READ);
}
// check if the table is being created as managed table, in that case we translate it to external
if (!desc.isExternal()) {
tbl.getParameters().put("TRANSLATED_TO_EXTERNAL", "TRUE");
desc.setIsExternal(true);
}
// If source is Iceberg table set the schema and the partition spec
if ("ICEBERG".equalsIgnoreCase(origParams.get("table_type"))) {
tbl.getParameters()
.put(InputFormatConfig.TABLE_SCHEMA, origParams.get(InputFormatConfig.TABLE_SCHEMA));
tbl.getParameters()
.put(InputFormatConfig.PARTITION_SPEC, origParams.get(InputFormatConfig.PARTITION_SPEC));
} else {
// if the source is partitioned non-iceberg table set the partition transform spec and set the table as
// unpartitioned
List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(tbl.getPartitionKeys());
SessionStateUtil.addResourceOrThrow(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec);
tbl.getSd().getCols().addAll(tbl.getPartitionKeys());
tbl.getTTable().setPartitionKeysIsSet(false);
}
}
@Override
public Map<String, String> getNativeProperties(org.apache.hadoop.hive.ql.metadata.Table table) {
Table origTable = IcebergTableUtil.getTable(conf, table.getTTable());
Map<String, String> props = Maps.newHashMap();
props.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(origTable.schema()));
props.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(origTable.spec()));
return props;
}
@Override
public boolean shouldOverwrite(org.apache.hadoop.hive.ql.metadata.Table mTable, String operationName) {
String mode = null;
String formatVersion = mTable.getTTable().getParameters().get(TableProperties.FORMAT_VERSION);
// As of now only delete mode is supported, for all others return false
if ("2".equals(formatVersion) && operationName.equalsIgnoreCase(Context.Operation.DELETE.toString())) {
mode = mTable.getTTable().getParameters()
.getOrDefault(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT);
}
return COPY_ON_WRITE.equalsIgnoreCase(mode);
}
@Override
public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return hasAppendsOnly(table.snapshots(), since);
}
@VisibleForTesting
Boolean hasAppendsOnly(Iterable<Snapshot> snapshots, SnapshotContext since) {
boolean foundSince = since == null;
for (Snapshot snapshot : snapshots) {
if (!foundSince) {
if (snapshot.snapshotId() == since.getSnapshotId()) {
foundSince = true;
}
} else {
if (!"append".equals(snapshot.operation())) {
return false;
}
}
}
if (foundSince) {
return true;
}
return null;
}
}