blob: 3157fbb041100aa5fb11624e1ca7ab4d45daa9bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.mr.hive;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.SnapshotContext;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc;
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.StorageFormat;
import org.apache.hadoop.hive.ql.parse.StorageFormat.StorageHandlerTypes;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.stats.Partish;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.DefaultFetchFormatter;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.FetchFormatter;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinCompressionCodec;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Throwables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class);
private static final String ICEBERG_URI_PREFIX = "iceberg://";
private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
private static final String TABLE_NAME_SEPARATOR = "..";
// Column index for partition metadata table
private static final int SPEC_IDX = 1;
private static final int PART_IDX = 0;
public static final String COPY_ON_WRITE = "copy-on-write";
public static final String MERGE_ON_READ = "merge-on-read";
public static final String STATS = "/stats/snap-";
public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";
private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(VirtualColumn.PARTITION_SPEC_ID,
VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION);
private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = ACID_VIRTUAL_COLS.stream()
.map(v -> new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
.collect(Collectors.toList());
private Configuration conf;
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return HiveIcebergInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return HiveIcebergOutputFormat.class;
}
@Override
public Class<? extends AbstractSerDe> getSerDeClass() {
return HiveIcebergSerDe.class;
}
@Override
public HiveMetaHook getMetaHook() {
// Make sure to always return a new instance here, as HiveIcebergMetaHook might hold state relevant for the
// operation.
return new HiveIcebergMetaHook(conf);
}
@Override
public HiveAuthorizationProvider getAuthorizationProvider() {
return null;
}
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
// Until the vectorized reader can handle delete files, let's fall back to non-vector mode for V2 tables
fallbackToNonVectorizedModeBasedOnProperties(tableDesc.getProperties());
boolean allowDataFilesWithinTableLocationOnly =
conf.getBoolean(HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,
HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.defaultBoolVal);
map.put(HiveConf.ConfVars.HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY.varname,
String.valueOf(allowDataFilesWithinTableLocationOnly));
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
overlayTableProperties(conf, tableDesc, map);
// Until the vectorized reader can handle delete files, let's fall back to non-vector mode for V2 tables
fallbackToNonVectorizedModeBasedOnProperties(tableDesc.getProperties());
// For Tez, setting the committer here is enough to make sure it'll be part of the jobConf
map.put("mapred.output.committer.class", HiveIcebergNoJobCommitter.class.getName());
// For MR, the jobConf is set only in configureJobConf, so we're setting the write key here to detect it over there
String opType = getOperationType();
map.put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
// Putting the key into the table props as well, so that projection pushdown can be determined on a
// table-level and skipped only for output tables in HiveIcebergSerde. Properties from the map will be present in
// the serde config for all tables in the query, not just the output tables, so we can't rely on that in the serde.
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
}
/**
* Committer with no-op job commit. We can pass this into the Tez AM to take care of task commits/aborts, as well
* as aborting jobs reliably if an execution error occurred. However, we want to execute job commits on the
* HS2-side during the MoveTask, so we will use the full-featured HiveIcebergOutputCommitter there.
*/
static class HiveIcebergNoJobCommitter extends HiveIcebergOutputCommitter {
@Override
public void commitJob(JobContext originalContext) {
// do nothing
}
}
@Override
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {
}
// Override annotation commented out, since this interface method has been introduced only in Hive 3
// @Override
public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> secrets) {
}
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
setCommonJobConf(jobConf);
if (tableDesc != null && tableDesc.getProperties() != null &&
tableDesc.getProperties().get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName()) != null) {
String tableName = tableDesc.getTableName();
String opKey = InputFormatConfig.OPERATION_TYPE_PREFIX + tableName;
// set operation type into job conf too
jobConf.set(opKey, tableDesc.getProperties().getProperty(opKey));
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
"Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME);
if (catalogName != null) {
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
}
}
try {
if (!jobConf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_IDE.varname, false)) {
// For running unit test this won't work as maven surefire CP is different than what we have on a cluster:
// it places the current projects' classes and test-classes to top instead of jars made from these...
Utilities.addDependencyJars(jobConf, HiveIcebergStorageHandler.class);
}
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public boolean directInsert() {
return true;
}
@Override
public boolean alwaysUnpartitioned() {
return true;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public String toString() {
return this.getClass().getName();
}
/**
* @param jobConf Job configuration for InputFormat to access
* @param deserializer Deserializer
* @param exprNodeDesc Filter expression extracted by Hive
* @return Entire filter to take advantage of Hive's pruning as well as Iceberg's pruning.
*/
@Override
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) {
DecomposedPredicate predicate = new DecomposedPredicate();
predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
ExprNodeDesc pushedPredicate = exprNodeDesc.clone();
List<ExprNodeDesc> subExprNodes = pushedPredicate.getChildren();
if (subExprNodes.removeIf(nodeDesc -> nodeDesc.getCols() != null &&
nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()))) {
if (subExprNodes.size() == 1) {
pushedPredicate = subExprNodes.get(0);
} else if (subExprNodes.isEmpty()) {
pushedPredicate = null;
}
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
if (filterExpr != null) {
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr);
}
return predicate;
}
@Override
public boolean canProvideBasicStatistics() {
return true;
}
@Override
public StorageFormatDescriptor getStorageFormatDescriptor(org.apache.hadoop.hive.metastore.api.Table table)
throws SemanticException {
if (table.getParameters() != null) {
String format = table.getParameters().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, IOConstants.PARQUET);
return StorageFormat.getDescriptor(format, TableProperties.DEFAULT_FILE_FORMAT);
}
return null;
}
public boolean supportsAppendData(org.apache.hadoop.hive.metastore.api.Table table, boolean withPartClause)
throws SemanticException {
Table icebergTbl = IcebergTableUtil.getTable(conf, table);
if (icebergTbl.spec().isUnpartitioned()) {
return true;
}
// If it is a table which has undergone partition evolution, return false;
if (hasUndergonePartitionEvolution(icebergTbl)) {
if (withPartClause) {
throw new SemanticException("Can not Load into an iceberg table, which has undergone partition evolution " +
"using the PARTITION clause");
}
return false;
}
return withPartClause;
}
public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fromURI, boolean isOverwrite,
Map<String, String> partitionSpec)
throws SemanticException {
Table icebergTbl = IcebergTableUtil.getTable(conf, table);
String format = table.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT);
HiveTableUtil.appendFiles(fromURI, format, icebergTbl, isOverwrite, partitionSpec, conf);
}
@Override
public Map<String, String> getBasicStatistics(Partish partish) {
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
// For write queries where rows got modified, don't fetch from cache as values could have changed.
Table table = getTable(hmsTable);
Map<String, String> stats = Maps.newHashMap();
if (getStatsSource().equals(HiveMetaHook.ICEBERG)) {
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {
if (summary.containsKey(SnapshotSummary.TOTAL_DATA_FILES_PROP)) {
stats.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}
if (summary.containsKey(SnapshotSummary.TOTAL_RECORDS_PROP)) {
long totalRecords = Long.parseLong(summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
if (summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
long actualRecords = totalRecords - (totalEqDeletes > 0 ? 0 : totalPosDeletes);
totalRecords = actualRecords > 0 ? actualRecords : totalRecords;
// actualRecords maybe -ve in edge cases
}
stats.put(StatsSetupConst.ROW_COUNT, String.valueOf(totalRecords));
}
if (summary.containsKey(SnapshotSummary.TOTAL_FILE_SIZE_PROP)) {
stats.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}
}
} else {
stats.put(StatsSetupConst.NUM_FILES, "0");
stats.put(StatsSetupConst.ROW_COUNT, "0");
stats.put(StatsSetupConst.TOTAL_SIZE, "0");
}
}
return stats;
}
private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table;
final Optional<QueryState> queryState = SessionStateUtil.getQueryState(conf);
if (!queryState.isPresent() || queryState.get().getNumModifiedRows() > 0) {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable(), true);
} else {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
}
return table;
}
@Override
public boolean canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return table.currentSnapshot() != null && getStatsSource().equals(HiveMetaHook.ICEBERG);
}
@Override
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return writeColStats(colStats.get(0), tbl);
}
private boolean writeColStats(ColumnStatistics tableColStats, Table tbl) {
try {
if (!shouldRewriteColStats(tbl)) {
checkAndMergeColStats(tableColStats, tbl);
}
// Currently, we are only serializing table level stats.
byte[] serializeColStats = SerializationUtils.serialize(tableColStats);
StatisticsFile statisticsFile;
String statsPath = tbl.location() + STATS + UUID.randomUUID();
try (PuffinWriter puffinWriter = Puffin.write(tbl.io().newOutputFile(statsPath))
.createdBy(Constants.HIVE_ENGINE).build()) {
long snapshotId = tbl.currentSnapshot().snapshotId();
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
puffinWriter.add(
new Blob(
ColumnStatisticsObj.class.getSimpleName(),
ImmutableList.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap(serializeColStats),
PuffinCompressionCodec.NONE,
ImmutableMap.of()
));
puffinWriter.finish();
statisticsFile =
new GenericStatisticsFile(
snapshotId,
statsPath,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList())
);
} catch (IOException e) {
LOG.warn("Unable to write stats to puffin file {}", e.getMessage());
return false;
}
tbl.updateStatistics()
.setStatistics(statisticsFile.snapshotId(), statisticsFile)
.commit();
return true;
} catch (Exception e) {
LOG.warn("Unable to invalidate or merge stats: {}", e.getMessage());
}
return false;
}
@Override
public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return canSetColStatistics(hmsTable) && canProvideColStats(table, table.currentSnapshot().snapshotId());
}
private boolean canProvideColStats(Table table, long snapshotId) {
return IcebergTableUtil.getColStatsPath(table, snapshotId).isPresent();
}
@Override
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.getColStatsPath(table).map(statsPath -> readColStats(table, statsPath))
.orElse(new ColumnStatistics()).getStatsObj();
}
private ColumnStatistics readColStats(Table table, Path statsPath) {
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
if (it.hasNext()) {
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
LOG.info("Using col stats from : {}", statsPath);
return SerializationUtils.deserialize(byteBuffer);
}
} catch (Exception e) {
LOG.warn(" Unable to read col stats: ", e);
}
return new ColumnStatistics();
}
@Override
public boolean canComputeQueryUsingStats(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
if (getStatsSource().equals(HiveMetaHook.ICEBERG) && hmsTable.getMetaTable() == null) {
Table table = getTable(hmsTable);
if (table.currentSnapshot() != null) {
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null && summary.containsKey(SnapshotSummary.TOTAL_EQ_DELETES_PROP) &&
summary.containsKey(SnapshotSummary.TOTAL_POS_DELETES_PROP)) {
long totalEqDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_EQ_DELETES_PROP));
long totalPosDeletes = Long.parseLong(summary.get(SnapshotSummary.TOTAL_POS_DELETES_PROP));
return totalEqDeletes + totalPosDeletes == 0;
}
}
}
return false;
}
private String getStatsSource() {
return HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_STATS_SOURCE, HiveMetaHook.ICEBERG)
.toUpperCase();
}
private boolean shouldRewriteColStats(Table tbl) {
return SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
.filter(opType -> HiveOperation.ANALYZE_TABLE == opType).isPresent() ||
IcebergTableUtil.getColStatsPath(tbl).isPresent();
}
private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) throws InvalidObjectException {
Long previousSnapshotId = tbl.currentSnapshot().parentId();
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
ColumnStatistics statsObjOld = IcebergTableUtil.getColStatsPath(tbl, previousSnapshotId)
.map(statsPath -> readColStats(tbl, statsPath))
.orElse(null);
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
}
}
}
/**
* No need for exclusive locks when writing, since Iceberg tables use optimistic concurrency when writing
* and only lock the table during the commit operation.
*/
@Override
public LockType getLockType(WriteEntity writeEntity) {
return LockType.SHARED_READ;
}
@Override
public boolean supportsPartitionTransform() {
return true;
}
@Override
public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return table.spec().fields().stream().map(f ->
getTransformSpec(table, f.transform().toString().toUpperCase(), f.sourceId())
).collect(Collectors.toList());
}
private List<TransformSpec> getSortTransformSpec(Table table) {
return table.sortOrder().fields().stream().map(s ->
getTransformSpec(table, s.transform().toString().toUpperCase(), s.sourceId())
).collect(Collectors.toList());
}
private TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
TransformSpec spec = new TransformSpec();
spec.setColumnName(table.schema().findColumnName(sourceId));
// if the transform name contains '[' it means it has some config params
if (transformName.contains("[")) {
spec.setTransformType(TransformSpec.TransformType
.valueOf(transformName.substring(0, transformName.indexOf("["))));
spec.setTransformParam(Optional.of(Integer
.valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
} else {
spec.setTransformType(TransformSpec.TransformType.valueOf(transformName));
spec.setTransformParam(Optional.empty());
}
return spec;
}
@Override
public DynamicPartitionCtx createDPContext(
HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation)
throws SemanticException {
// delete records are already clustered by partition spec id and the hash of the partition struct
// there is no need to do any additional sorting based on partition columns
if (writeOperation == Operation.DELETE && !shouldOverwrite(hmsTable, writeOperation)) {
return null;
}
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
hiveConf.getVar(HiveConf.ConfVars.DEFAULT_PARTITION_NAME),
hiveConf.getIntVar(HiveConf.ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
dpCtx.setCustomSortExpressions(customSortExprs);
if (table.spec().isPartitioned()) {
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable));
}
SortOrder sortOrder = table.sortOrder();
if (sortOrder.isSorted()) {
List<Integer> customSortPositions = Lists.newLinkedList();
List<Integer> customSortOrder = Lists.newLinkedList();
dpCtx.setCustomSortOrder(customSortOrder);
List<Integer> customSortNullOrder = Lists.newLinkedList();
dpCtx.setCustomSortNullOrder(customSortNullOrder);
for (SortField sortField : sortOrder.fields()) {
int pos = 0;
for (Types.NestedField field : table.schema().columns()) {
if (sortField.sourceId() == field.fieldId()) {
customSortPositions.add(pos);
customSortOrder.add(sortField.direction() == SortDirection.ASC ? 1 : 0);
customSortNullOrder.add(sortField.nullOrder() == NullOrder.NULLS_FIRST ? 0 : 1);
break;
}
pos++;
}
}
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table));
}
dpCtx.setHasCustomSortExprs(!customSortExprs.isEmpty());
return dpCtx;
}
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
List<TransformSpec> transformSpecs) {
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
List<Types.NestedField> fields = table.schema().columns();
for (int i = 0; i < fields.size(); ++i) {
fieldOrderMap.put(fields.get(i).name(), i);
}
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();
customSortExprs.addAll(transformSpecs.stream().map(spec ->
IcebergTransformSortFunctionUtil.getCustomSortExprs(spec, fieldOrderMap.get(spec.getColumnName()) + offset)
).collect(Collectors.toList()));
}
@Override
public String getFileFormatPropertyKey() {
return TableProperties.DEFAULT_FILE_FORMAT;
}
@Override
public boolean commitInMoveTask() {
return true;
}
@Override
public void storageHandlerCommit(Properties commitProperties, Operation operation)
throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
String location = commitProperties.getProperty(Catalogs.LOCATION);
String snapshotRef = commitProperties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();
if (location != null) {
HiveTableUtil.cleanupTableObjectFile(location, configuration);
}
List<JobContext> jobContextList = generateJobContext(configuration, tableName, snapshotRef);
if (jobContextList.isEmpty()) {
return;
}
HiveIcebergOutputCommitter committer = getOutputCommitter();
try {
committer.commitJobs(jobContextList, operation);
} catch (Throwable e) {
String ids = jobContextList
.stream().map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(", "));
// Aborting the job if the commit has failed
LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
ids, tableName, e);
try {
committer.abortJobs(jobContextList);
} catch (IOException ioe) {
LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
// no throwing here because the original exception should be propagated
}
throw new HiveException(
"Error committing job: " + ids + " for table: " + tableName, e);
}
}
public HiveIcebergOutputCommitter getOutputCommitter() {
return new HiveIcebergOutputCommitter();
}
@Override
public boolean isAllowedAlterOperation(AlterTableType opType) {
return HiveIcebergMetaHook.SUPPORTED_ALTER_OPS.contains(opType);
}
@Override
public boolean supportsTruncateOnNonNativeTables() {
return true;
}
@Override
public boolean isTimeTravelAllowed() {
return true;
}
@Override
public boolean isTableMetaRefSupported() {
return true;
}
@Override
public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, AlterTableExecuteSpec executeSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
switch (executeSpec.getOperationType()) {
case ROLLBACK:
LOG.info("Executing rollback operation on iceberg table. If you would like to revert rollback you could " +
"try altering the metadata location to the current metadata location by executing the following query:" +
"ALTER TABLE {}.{} SET TBLPROPERTIES('metadata_location'='{}'). This operation is supported for Hive " +
"Catalog tables.", hmsTable.getDbName(), hmsTable.getTableName(),
((BaseTable) icebergTable).operations().current().metadataFileLocation());
AlterTableExecuteSpec.RollbackSpec rollbackSpec =
(AlterTableExecuteSpec.RollbackSpec) executeSpec.getOperationParams();
IcebergTableUtil.rollback(icebergTable, rollbackSpec.getRollbackType(), rollbackSpec.getParam());
break;
case EXPIRE_SNAPSHOT:
LOG.info("Executing expire snapshots operation on iceberg table {}.{}", hmsTable.getDbName(),
hmsTable.getTableName());
AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec =
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
int numThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
expireSnapshot(icebergTable, expireSnapshotsSpec, numThreads);
break;
case SET_CURRENT_SNAPSHOT:
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
(AlterTableExecuteSpec.SetCurrentSnapshotSpec) executeSpec.getOperationParams();
LOG.debug("Executing set current snapshot operation on iceberg table {}.{} to version {}", hmsTable.getDbName(),
hmsTable.getTableName(), setSnapshotVersionSpec.getSnapshotIdOrRefName());
IcebergTableUtil.setCurrentSnapshot(icebergTable, setSnapshotVersionSpec.getSnapshotIdOrRefName());
break;
case FAST_FORWARD:
AlterTableExecuteSpec.FastForwardSpec fastForwardSpec =
(AlterTableExecuteSpec.FastForwardSpec) executeSpec.getOperationParams();
IcebergTableUtil.fastForwardBranch(icebergTable, fastForwardSpec.getSourceBranch(),
fastForwardSpec.getTargetBranch());
break;
case CHERRY_PICK:
AlterTableExecuteSpec.CherryPickSpec cherryPickSpec =
(AlterTableExecuteSpec.CherryPickSpec) executeSpec.getOperationParams();
IcebergTableUtil.cherryPick(icebergTable, cherryPickSpec.getSnapshotId());
break;
case DELETE_METADATA:
AlterTableExecuteSpec.DeleteMetadataSpec deleteMetadataSpec =
(AlterTableExecuteSpec.DeleteMetadataSpec) executeSpec.getOperationParams();
IcebergTableUtil.performMetadataDelete(icebergTable, deleteMetadataSpec.getBranchName(),
deleteMetadataSpec.getSarg());
break;
case DELETE_ORPHAN_FILES:
int numDeleteThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
AlterTableExecuteSpec.DeleteOrphanFilesDesc deleteOrphanFilesSpec =
(AlterTableExecuteSpec.DeleteOrphanFilesDesc) executeSpec.getOperationParams();
deleteOrphanFiles(icebergTable, deleteOrphanFilesSpec.getTimestampMillis(), numDeleteThreads);
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
}
}
private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int numThreads) {
ExecutorService deleteExecutorService = null;
try {
if (numThreads > 0) {
LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
}
HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable);
deleteOrphanFiles.olderThan(timestampMillis);
if (deleteExecutorService != null) {
deleteOrphanFiles.executeDeleteWith(deleteExecutorService);
}
DeleteOrphanFiles.Result result = deleteOrphanFiles.execute();
LOG.debug("Cleaned files {} for {}", result.orphanFileLocations(), icebergTable);
} finally {
if (deleteExecutorService != null) {
deleteExecutorService.shutdown();
}
}
}
private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
int numThreads) {
ExecutorService deleteExecutorService = null;
try {
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec == null) {
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByTimestampRange()) {
expireSnapshotByTimestampRange(icebergTable, expireSnapshotsSpec.getFromTimestampMillis(),
expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByIds()) {
expireSnapshotByIds(icebergTable, expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByRetainLast()) {
expireSnapshotRetainLast(icebergTable, expireSnapshotsSpec.getNumRetainLast(), deleteExecutorService);
} else {
expireSnapshotOlderThanTimestamp(icebergTable, expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
}
} finally {
if (deleteExecutorService != null) {
deleteExecutorService.shutdown();
}
}
}
private void expireSnapshotWithDefaultParams(Table icebergTable, ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
private void expireSnapshotRetainLast(Table icebergTable, int numRetainLast, ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
expireSnapshots.retainLast(numRetainLast);
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
private void expireSnapshotByTimestampRange(Table icebergTable, Long fromTimestamp, Long toTimestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
for (Snapshot snapshot : icebergTable.snapshots()) {
if (snapshot.timestampMillis() >= fromTimestamp && snapshot.timestampMillis() <= toTimestamp) {
expireSnapshots.expireSnapshotId(snapshot.snapshotId());
LOG.debug("Expiring snapshot on {} with id: {} and timestamp: {}", icebergTable.name(), snapshot.snapshotId(),
snapshot.timestampMillis());
}
}
LOG.info("Expiring snapshot on {} within time range {} -> {}", icebergTable.name(), fromTimestamp, toTimestamp);
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long timestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots().expireOlderThan(timestamp);
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
private void expireSnapshotByIds(Table icebergTable, String[] idsToExpire,
ExecutorService deleteExecutorService) {
if (idsToExpire.length != 0) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
for (String id : idsToExpire) {
expireSnapshots.expireSnapshotId(Long.parseLong(id));
}
LOG.info("Expiring snapshot on {} for snapshot Ids: {}", icebergTable.name(), Arrays.toString(idsToExpire));
if (deleteExecutorService != null) {
expireSnapshots = expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}
}
private ExecutorService getDeleteExecutorService(String completeName, int numThreads) {
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
return Executors.newFixedThreadPool(numThreads, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
return thread;
});
}
@Override
public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableSnapshotRefSpec alterTableSnapshotRefSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
switch (alterTableSnapshotRefSpec.getOperationType()) {
case CREATE_BRANCH:
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergSnapshotRefExec.createBranch(icebergTable, createBranchSpec);
break;
case CREATE_TAG:
Optional.ofNullable(icebergTable.currentSnapshot()).orElseThrow(() -> new UnsupportedOperationException(
String.format("Cannot alter %s on iceberg table %s.%s which has no snapshot",
alterTableSnapshotRefSpec.getOperationType().getName(), hmsTable.getDbName(),
hmsTable.getTableName())));
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createTagSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergSnapshotRefExec.createTag(icebergTable, createTagSpec);
break;
case DROP_BRANCH:
AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropBranchSpec =
(AlterTableSnapshotRefSpec.DropSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergSnapshotRefExec.dropBranch(icebergTable, dropBranchSpec);
break;
case RENAME_BRANCH:
AlterTableSnapshotRefSpec.RenameSnapshotrefSpec renameSnapshotrefSpec =
(AlterTableSnapshotRefSpec.RenameSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergSnapshotRefExec.renameBranch(icebergTable, renameSnapshotrefSpec);
break;
case REPLACE_SNAPSHOTREF:
AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec =
(AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
if (replaceSnapshotrefSpec.isReplaceBranch()) {
IcebergSnapshotRefExec.replaceBranch(icebergTable, replaceSnapshotrefSpec);
} else {
IcebergSnapshotRefExec.replaceTag(icebergTable, replaceSnapshotrefSpec);
}
break;
case DROP_TAG:
AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropTagSpec =
(AlterTableSnapshotRefSpec.DropSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergSnapshotRefExec.dropTag(icebergTable, dropTagSpec);
break;
default:
throw new UnsupportedOperationException(String.format(
"Operation type %s is not supported", alterTableSnapshotRefSpec.getOperationType().getName()));
}
}
@Override
public boolean isValidMetadataTable(String metaTableName) {
return metaTableName != null && IcebergMetadataTables.isValidMetaTable(metaTableName);
}
@Override
public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
String refName = HiveUtils.getTableSnapshotRef(tableMetaRef);
if (refName != null) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (tbl.snapshot(refName) != null) {
hmsTable.setSnapshotRef(tableMetaRef);
return hmsTable;
}
throw new SemanticException(String.format("Cannot use snapshotRef (does not exist): %s", refName));
}
if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) {
hmsTable.setMetaTable(tableMetaRef);
return hmsTable;
}
throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, tableMetaRef);
}
@Override
public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
String dbName = hmsTable.getDbName();
String tableName = hmsTable.getTableName();
StringBuilder authURI =
new StringBuilder(ICEBERG_URI_PREFIX).append(encodeString(dbName)).append("/").append(encodeString(tableName))
.append("?snapshot=");
// If metadata location is provided we should use that location for auth, since during create if the
// metadata_location is explicitly provided we register a table using that path.
Optional<String> metadataLocation =
SessionStateUtil.getProperty(conf, BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
if (metadataLocation.isPresent()) {
authURI.append(getPathForAuth(metadataLocation.get()));
} else {
Optional<String> locationProperty =
SessionStateUtil.getProperty(conf, hive_metastoreConstants.META_TABLE_LOCATION);
if (locationProperty.isPresent()) {
// this property is set during the create operation before the hive table was created
// we are returning a dummy iceberg metadata file
authURI.append(getPathForAuth(locationProperty.get()))
.append(encodeString("/metadata/dummy.metadata.json"));
} else {
Table table = IcebergTableUtil.getTable(conf, hmsTable);
authURI.append(getPathForAuth(((BaseTable) table).operations().current().metadataFileLocation(),
hmsTable.getSd().getLocation()));
}
}
LOG.debug("Iceberg storage handler authorization URI {}", authURI);
return new URI(authURI.toString());
}
@VisibleForTesting
static String encodeString(String rawString) {
if (rawString == null) {
return null;
}
return HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.encode(rawString);
}
String getPathForAuth(String locationProperty) {
return getPathForAuth(locationProperty,
SessionStateUtil.getProperty(conf, SessionStateUtil.DEFAULT_TABLE_LOCATION).orElse(null));
}
String getPathForAuth(String locationProperty, String defaultTableLocation) {
boolean maskDefaultLocation = conf.getBoolean(HiveConf.ConfVars.HIVE_ICEBERG_MASK_DEFAULT_LOCATION.varname,
HiveConf.ConfVars.HIVE_ICEBERG_MASK_DEFAULT_LOCATION.defaultBoolVal);
String location = URI.create(locationProperty).getPath();
if (!maskDefaultLocation || defaultTableLocation == null ||
!arePathsInSameFs(locationProperty, defaultTableLocation)) {
return encodeString(location);
}
try {
Path locationPath = new Path(location);
Path defaultLocationPath = locationPath.toUri().getScheme() != null ?
FileUtils.makeQualified(new Path(defaultTableLocation), conf) :
Path.getPathWithoutSchemeAndAuthority(new Path(defaultTableLocation));
return encodeString(location.replaceFirst(defaultLocationPath.toString(), TABLE_DEFAULT_LOCATION));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private boolean arePathsInSameFs(String locationProperty, String defaultTableLocation) {
try {
return FileUtils.equalsFileSystem(new Path(locationProperty).getFileSystem(conf),
new Path(defaultTableLocation).getFileSystem(conf));
} catch (IOException e) {
LOG.debug("Unable to get FileSystem for path {} and {}", locationProperty, defaultTableLocation);
return false;
}
}
@Override
public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
HiveStorageHandler.super.validateSinkDesc(sinkDesc);
if (sinkDesc.getInsertOverwrite()) {
Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
if (table.currentSnapshot() != null &&
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.TOTAL_RECORDS_PROP)) == 0) {
// If the table is empty we don't have any danger that some data can get lost.
return;
}
if (RewritePolicy.fromString(conf.get(ConfVars.REWRITE_POLICY.varname, RewritePolicy.DEFAULT.name())) ==
RewritePolicy.ALL_PARTITIONS) {
// Table rewriting has special logic as part of IOW that handles the case when table had a partition evolution
return;
}
if (IcebergTableUtil.isBucketed(table)) {
throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
}
if (hasUndergonePartitionEvolution(table)) {
throw new SemanticException(
"Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " +
"to successfully carry out any insert overwrite operation on this table, the data has to be rewritten " +
"conforming to the latest spec. ");
}
}
}
@Override
public AcidSupportType supportsAcidOperations() {
return AcidSupportType.WITHOUT_TRANSACTIONS;
}
@Override
public List<VirtualColumn> acidVirtualColumns() {
return ACID_VIRTUAL_COLS;
}
@Override
public List<FieldSchema> acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
case DELETE:
// TODO: make it configurable whether we want to include the table columns in the select query.
// It might make delete writes faster if we don't have to write out the row object
return ListUtils.union(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols());
case UPDATE:
return shouldOverwrite(table, operation) ?
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA :
ListUtils.union(ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA, table.getCols());
case MERGE:
return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
default:
return ImmutableList.of();
}
}
@Override
public FieldSchema getRowId() {
VirtualColumn rowId = VirtualColumn.ROW_POSITION;
return new FieldSchema(rowId.getName(), rowId.getTypeInfo().getTypeName(), "");
}
@Override
public List<FieldSchema> acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
case DELETE:
return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
default:
// For update operations we use the same sort order defined by
// {@link #createDPContext(HiveConf, org.apache.hadoop.hive.ql.metadata.Table)}
return ImmutableList.of();
}
}
@Override
public boolean supportsSortColumns() {
return true;
}
@Override
public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
if (table.sortOrder().isUnsorted()) {
return Collections.emptyList();
}
Schema schema = table.schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return table.sortOrder().fields().stream().map(s -> new FieldSchema(schema.findColumnName(s.sourceId()),
colNameToColType.get(schema.findColumnName(s.sourceId())),
String.format("Transform: %s, Sort direction: %s, Null sort order: %s",
s.transform().toString(), s.direction().name(), s.nullOrder().name()))).collect(Collectors.toList());
}
private void setCommonJobConf(JobConf jobConf) {
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
}
public StorageHandlerTypes getType() {
return StorageHandlerTypes.ICEBERG;
}
public boolean addDynamicSplitPruningEdge(org.apache.hadoop.hive.ql.metadata.Table table,
ExprNodeDesc syntheticFilterPredicate) {
try {
Collection<String> partitionColumns = ((HiveIcebergSerDe) table.getDeserializer()).partitionColumns();
if (partitionColumns.size() > 0) {
// The filter predicate contains ExprNodeDynamicListDesc object(s) for places where we will substitute
// dynamic values later during execution. For Example:
// GenericUDFIn(Column[ss_sold_date_sk], RS[5] <-- This is an ExprNodeDynamicListDesc)
//
// We would like to check if we will be able to convert these expressions to Iceberg filters when the
// actual values will be available, so in this check we replace the ExprNodeDynamicListDesc with dummy
// values and check whether the conversion will be possible or not.
ExprNodeDesc clone = syntheticFilterPredicate.clone();
String filterColumn = collectColumnAndReplaceDummyValues(clone, null);
// If the filter is for a partition column then it could be worthwhile to try dynamic partition pruning
if (partitionColumns.contains(filterColumn)) {
// Check if we can convert the expression to a valid Iceberg filter
SearchArgument sarg = ConvertAstToSearchArg.create(conf, (ExprNodeGenericFuncDesc) clone);
HiveIcebergFilterFactory.generateFilterExpression(sarg);
LOG.debug("Found Iceberg partition column to prune with predicate {}", syntheticFilterPredicate);
return true;
}
}
} catch (UnsupportedOperationException uoe) {
// If we can not convert the filter, we do not prune
LOG.debug("Unsupported predicate {}", syntheticFilterPredicate, uoe);
}
// There is nothing to prune, or we could not use the filter
LOG.debug("Not found Iceberg partition columns to prune with predicate {}", syntheticFilterPredicate);
return false;
}
/**
* Returns the Table serialized to the configuration based on the table name.
* If configuration is missing from the FileIO of the table, it will be populated with the input config.
*
* @param config The configuration used to get the data from
* @param name The name of the table we need as returned by TableDesc.getTableName()
* @return The Table
*/
public static Table table(Configuration config, String name) {
Table table = SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + name));
if (table == null &&
config.getBoolean(hive_metastoreConstants.TABLE_IS_CTAS, false) &&
StringUtils.isNotBlank(config.get(InputFormatConfig.TABLE_LOCATION))) {
table = HiveTableUtil.readTableObjectFromFile(config);
}
checkAndSetIoConfig(config, table);
return table;
}
/**
* If enabled, it populates the FileIO's hadoop configuration with the input config object.
* This might be necessary when the table object was serialized without the FileIO config.
*
* @param config Configuration to set for FileIO, if enabled
* @param table The Iceberg table object
*/
public static void checkAndSetIoConfig(Configuration config, Table table) {
if (table != null && config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).setConf(config);
}
}
/**
* If enabled, it ensures that the FileIO's hadoop configuration will not be serialized.
* This might be desirable for decreasing the overall size of serialized table objects.
*
* Note: Skipping FileIO config serialization in this fashion might in turn necessitate calling
* {@link #checkAndSetIoConfig(Configuration, Table)} on the deserializer-side to enable subsequent use of the FileIO.
*
* @param config Configuration to set for FileIO in a transient manner, if enabled
* @param table The Iceberg table object
*/
public static void checkAndSkipIoConfigSerialization(Configuration config, Table table) {
if (table != null && config.getBoolean(InputFormatConfig.CONFIG_SERIALIZATION_DISABLED,
InputFormatConfig.CONFIG_SERIALIZATION_DISABLED_DEFAULT) && table.io() instanceof HadoopConfigurable) {
((HadoopConfigurable) table.io()).serializeConfWith(conf -> new NonSerializingConfig(config)::get);
}
}
/**
* Returns the names of the output tables stored in the configuration.
* @param config The configuration used to get the data from
* @return The collection of the table names as returned by TableDesc.getTableName()
*/
public static Set<String> outputTables(Configuration config) {
return Sets.newHashSet(TABLE_NAME_SPLITTER.split(config.get(InputFormatConfig.OUTPUT_TABLES)));
}
/**
* Returns the catalog name serialized to the configuration.
* @param config The configuration used to get the data from
* @param name The name of the table we neeed as returned by TableDesc.getTableName()
* @return catalog name
*/
public static String catalogName(Configuration config, String name) {
return config.get(InputFormatConfig.TABLE_CATALOG_PREFIX + name);
}
/**
* Returns the Table Schema serialized to the configuration.
* @param config The configuration used to get the data from
* @return The Table Schema object
*/
public static Schema schema(Configuration config) {
return SchemaParser.fromJson(config.get(InputFormatConfig.TABLE_SCHEMA));
}
/**
* Stores the serializable table data in the configuration.
* Currently the following is handled:
* <ul>
* <li>- Table - in case the table is serializable</li>
* <li>- Location</li>
* <li>- Schema</li>
* <li>- Partition specification</li>
* <li>- FileIO for handling table files</li>
* <li>- Location provider used for file generation</li>
* <li>- Encryption manager for encryption handling</li>
* </ul>
* @param configuration The configuration storing the catalog information
* @param tableDesc The table which we want to store to the configuration
* @param map The map of the configuration properties which we append with the serialized data
*/
@VisibleForTesting
static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
Properties props = tableDesc.getProperties();
Maps.fromProperties(props).entrySet().stream()
.filter(entry -> !map.containsKey(entry.getKey())) // map overrides tableDesc properties
.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
String location;
Schema schema;
PartitionSpec spec;
try {
Table table = IcebergTableUtil.getTable(configuration, props);
location = table.location();
schema = table.schema();
spec = table.spec();
// serialize table object into config
Table serializableTable = SerializableTable.copyOf(table);
checkAndSkipIoConfigSerialization(configuration, serializableTable);
map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
SerializationUtil.serializeToBase64(serializableTable));
} catch (NoSuchTableException ex) {
if (!HiveTableUtil.isCtas(props)) {
throw ex;
}
if (!Catalogs.hiveCatalog(configuration, props)) {
throw new UnsupportedOperationException(HiveIcebergSerDe.CTAS_EXCEPTION_MSG);
}
location = map.get(hive_metastoreConstants.META_TABLE_LOCATION);
map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
SerializationUtil.serializeToBase64(null));
try {
AbstractSerDe serDe = tableDesc.getDeserializer(configuration);
HiveIcebergSerDe icebergSerDe = (HiveIcebergSerDe) serDe;
schema = icebergSerDe.getTableSchema();
spec = IcebergTableUtil.spec(configuration, icebergSerDe.getTableSchema());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME));
if (StringUtils.isNotBlank(location)) {
map.put(InputFormatConfig.TABLE_LOCATION, location);
}
String schemaJson = SchemaParser.toJson(schema);
map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
// save schema into table props as well to avoid repeatedly hitting the HMS during serde initializations
// this is an exception to the interface documentation, but it's a safe operation to add this property
props.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
if (spec == null) {
spec = PartitionSpec.unpartitioned();
}
props.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));
// We need to remove this otherwise the job.xml will be invalid as column comments are separated with '\0' and
// the serialization utils fail to serialize this character
map.remove("columns.comments");
}
/**
* Recursively replaces the ExprNodeDynamicListDesc nodes by a dummy ExprNodeConstantDesc so we can test if we can
* convert the predicate to an Iceberg predicate when pruning the partitions later. Also collects the column names
* in the filter.
* <p>
* Please make sure that it is ok to change the input node (clone if needed)
* @param node The node we are traversing
* @param foundColumn The column we already found
*/
private String collectColumnAndReplaceDummyValues(ExprNodeDesc node, String foundColumn) {
String column = foundColumn;
List<ExprNodeDesc> children = node.getChildren();
if (children != null && !children.isEmpty()) {
ListIterator<ExprNodeDesc> iterator = children.listIterator();
while (iterator.hasNext()) {
ExprNodeDesc child = iterator.next();
if (child instanceof ExprNodeDynamicListDesc) {
Object dummy;
switch (((PrimitiveTypeInfo) child.getTypeInfo()).getPrimitiveCategory()) {
case INT:
case SHORT:
dummy = 1;
break;
case LONG:
dummy = 1L;
break;
case TIMESTAMP:
case TIMESTAMPLOCALTZ:
dummy = new Timestamp();
break;
case CHAR:
case VARCHAR:
case STRING:
dummy = "1";
break;
case DOUBLE:
case FLOAT:
case DECIMAL:
dummy = 1.1;
break;
case DATE:
dummy = new Date();
break;
case BOOLEAN:
dummy = true;
break;
default:
throw new UnsupportedOperationException("Not supported primitive type in partition pruning: " +
child.getTypeInfo());
}
iterator.set(new ExprNodeConstantDesc(child.getTypeInfo(), dummy));
} else {
String newColumn;
if (child instanceof ExprNodeColumnDesc) {
newColumn = ((ExprNodeColumnDesc) child).getColumn();
} else {
newColumn = collectColumnAndReplaceDummyValues(child, column);
}
if (column != null && newColumn != null && !newColumn.equals(column)) {
throw new UnsupportedOperationException("Partition pruning does not support filtering for more columns");
}
if (column == null) {
column = newColumn;
}
}
}
}
return column;
}
/**
* If any of the following checks is true we fall back to non vectorized mode:
* <ul>
* <li>fileformat is set to avro</li>
* <li>querying metadata tables</li>
* <li>fileformat is set to ORC, and table schema has time type column</li>
* <li>fileformat is set to PARQUET, and table schema has a list type column, that has a complex type element</li>
* </ul>
* @param tableProps table properties, must be not null
*/
private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) {
Schema tableSchema = SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA));
if (FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)) ||
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, false);
}
}
/**
* Iceberg Time type columns are written as longs into ORC files. There is no Time type in Hive, so it is represented
* as String instead. For ORC there's no automatic conversion from long to string during vectorized reading such as
* for example in Parquet (in Parquet files Time type is an int64 with 'time' logical annotation).
* @param tableProps iceberg table properties
* @param tableSchema iceberg table schema
* @return
*/
private static boolean hasOrcTimeInSchema(Properties tableProps, Schema tableSchema) {
if (!FileFormat.ORC.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) {
return false;
}
return tableSchema.columns().stream().anyMatch(f -> Types.TimeType.get().typeId() == f.type().typeId());
}
/**
* Vectorized reads of parquet files from columns with list or map type is only supported if the nested types are of
* primitive type category
* check {@link VectorizedParquetRecordReader#checkListColumnSupport} for details on nested types under lists
* @param tableProps iceberg table properties
* @param tableSchema iceberg table schema
* @return
*/
private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps, Schema tableSchema) {
if (!FileFormat.PARQUET.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) {
return true;
}
for (Types.NestedField field : tableSchema.columns()) {
if (field.type().isListType() || field.type().isMapType()) {
for (Types.NestedField nestedField : field.type().asNestedType().fields()) {
if (!nestedField.type().isPrimitiveType()) {
return false;
}
}
}
}
return true;
}
/**
* Generates {@link JobContext}s for the OutputCommitter for the specific table.
* @param configuration The configuration used for as a base of the JobConf
* @param tableName The name of the table we are planning to commit
* @param branchName the name of the branch
* @return The generated Optional JobContext list or empty if not presents.
*/
private List<JobContext> generateJobContext(Configuration configuration, String tableName,
String branchName) {
JobConf jobConf = new JobConf(configuration);
Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
SessionStateUtil.getCommitInfo(jobConf, tableName);
if (commitInfoMap.isPresent()) {
List<JobContext> jobContextList = Lists.newLinkedList();
for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) {
JobID jobID = JobID.forName(commitInfo.getJobIdStr());
commitInfo.getProps().forEach(jobConf::set);
// we should only commit this current table because
// for multi-table inserts, this hook method will be called sequentially for each target table
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
if (branchName != null) {
jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName);
}
jobContextList.add(new JobContextImpl(jobConf, jobID, null));
}
return jobContextList;
} else {
// most likely empty write scenario
LOG.debug("Unable to find commit information in query state for table: {}", tableName);
return Collections.emptyList();
}
}
private String getOperationType() {
return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
.orElse(Operation.OTHER.name());
}
private static class NonSerializingConfig implements Serializable {
private final transient Configuration conf;
NonSerializingConfig(Configuration conf) {
this.conf = conf;
}
public Configuration get() {
if (conf == null) {
throw new IllegalStateException("Configuration was not serialized on purpose but was not set manually either");
}
return conf;
}
}
@Override
public boolean areSnapshotsSupported() {
return true;
}
@Override
public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
Snapshot current = table.currentSnapshot();
if (current == null) {
return null;
}
return toSnapshotContext(current);
}
private SnapshotContext toSnapshotContext(Snapshot snapshot) {
Map<String, String> summaryMap = snapshot.summary();
long addedRecords = getLongSummary(summaryMap, SnapshotSummary.ADDED_RECORDS_PROP);
long deletedRecords = getLongSummary(summaryMap, SnapshotSummary.DELETED_RECORDS_PROP);
return new SnapshotContext(
snapshot.snapshotId(), toWriteOperationType(snapshot.operation()), addedRecords, deletedRecords);
}
private SnapshotContext.WriteOperationType toWriteOperationType(String operation) {
try {
return SnapshotContext.WriteOperationType.valueOf(operation.toUpperCase());
} catch (NullPointerException | IllegalArgumentException ex) {
return SnapshotContext.WriteOperationType.UNKNOWN;
}
}
private long getLongSummary(Map<String, String> summaryMap, String key) {
String textValue = summaryMap.get(key);
if (StringUtils.isBlank(textValue)) {
return 0;
}
return Long.parseLong(textValue);
}
@Override
public Iterable<SnapshotContext> getSnapshotContexts(
org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return getSnapshots(table.snapshots(), since);
}
@VisibleForTesting
Iterable<SnapshotContext> getSnapshots(Iterable<Snapshot> snapshots, SnapshotContext since) {
List<SnapshotContext> result = Lists.newArrayList();
boolean foundSince = Objects.isNull(since);
for (Snapshot snapshot : snapshots) {
if (!foundSince) {
if (snapshot.snapshotId() == since.getSnapshotId()) {
foundSince = true;
}
} else {
result.add(toSnapshotContext(snapshot));
}
}
return foundSince ? result : Collections.emptyList();
}
@Override
public void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc,
EnvironmentContext environmentContext) {
if (alterTableDesc instanceof AlterTableSetPropertiesDesc &&
alterTableDesc.getProps().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)) {
// signal manual iceberg metadata location updated by user
environmentContext.putToProperties(HiveIcebergMetaHook.MANUAL_ICEBERG_METADATA_LOCATION_CHANGE, "true");
}
}
@Override
public void setTableParametersForCTLT(org.apache.hadoop.hive.ql.metadata.Table tbl, CreateTableLikeDesc desc,
Map<String, String> origParams) {
// Preserve the format-version of the iceberg table and filter out rest.
if (IcebergTableUtil.isV2Table(origParams)) {
tbl.getParameters().put(TableProperties.FORMAT_VERSION, "2");
tbl.getParameters().put(TableProperties.DELETE_MODE, MERGE_ON_READ);
tbl.getParameters().put(TableProperties.UPDATE_MODE, MERGE_ON_READ);
tbl.getParameters().put(TableProperties.MERGE_MODE, MERGE_ON_READ);
}
// check if the table is being created as managed table, in that case we translate it to external
if (!desc.isExternal()) {
tbl.getParameters().put(HiveMetaHook.TRANSLATED_TO_EXTERNAL, "TRUE");
desc.setIsExternal(true);
}
// If source is Iceberg table set the schema and the partition spec
if (MetaStoreUtils.isIcebergTable(origParams)) {
tbl.getParameters()
.put(InputFormatConfig.TABLE_SCHEMA, origParams.get(InputFormatConfig.TABLE_SCHEMA));
tbl.getParameters()
.put(InputFormatConfig.PARTITION_SPEC, origParams.get(InputFormatConfig.PARTITION_SPEC));
} else {
// if the source is partitioned non-iceberg table set the partition transform spec and set the table as
// unpartitioned
List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(tbl.getPartitionKeys());
SessionStateUtil.addResourceOrThrow(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec);
tbl.getSd().getCols().addAll(tbl.getPartitionKeys());
tbl.getTTable().setPartitionKeysIsSet(false);
}
}
@Override
public void setTableLocationForCTAS(CreateTableDesc desc, String location) {
desc.setLocation(location);
}
@Override
public Map<String, String> getNativeProperties(org.apache.hadoop.hive.ql.metadata.Table table) {
Table origTable = IcebergTableUtil.getTable(conf, table.getTTable());
Map<String, String> props = Maps.newHashMap();
props.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(origTable.schema()));
props.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(origTable.spec()));
return props;
}
@Override
public boolean shouldOverwrite(org.apache.hadoop.hive.ql.metadata.Table mTable, Context.Operation operation) {
return IcebergTableUtil.isCopyOnWriteMode(operation, mTable.getParameters()::getOrDefault);
}
@Override
public void addResourcesForCreateTable(Map<String, String> tblProps, HiveConf hiveConf) {
String metadataLocation = tblProps.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
if (StringUtils.isNotEmpty(metadataLocation)) {
SessionStateUtil.addResourceOrThrow(hiveConf, BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
metadataLocation);
}
}
/**
* Check the operation type of all snapshots which are newer than the specified. The specified snapshot is excluded.
* @deprecated
* <br>Use {@link HiveStorageHandler#getSnapshotContexts(
* org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since)}
* and check {@link SnapshotContext.WriteOperationType#APPEND}.equals({@link SnapshotContext#getOperation()}).
*
* @param hmsTable table metadata stored in Hive Metastore
* @param since the snapshot preceding the oldest snapshot which should be checked.
* The value null means all should be checked.
* @return null if table is empty, true if all snapshots are {@link SnapshotContext.WriteOperationType#APPEND}s,
* false otherwise.
*/
@Deprecated
@Override
public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return hasAppendsOnly(table.snapshots(), since);
}
@VisibleForTesting
Boolean hasAppendsOnly(Iterable<Snapshot> snapshots, SnapshotContext since) {
boolean foundSince = since == null;
for (Snapshot snapshot : snapshots) {
if (!foundSince) {
if (snapshot.snapshotId() == since.getSnapshotId()) {
foundSince = true;
}
} else {
if (!DataOperations.APPEND.equals(snapshot.operation())) {
return false;
}
}
}
if (foundSince) {
return true;
}
return null;
}
@Override
public List<String> showPartitions(DDLOperationContext context, org.apache.hadoop.hive.ql.metadata.Table hmstbl)
throws HiveException {
Configuration confs = context.getConf();
JobConf job = HiveTableUtil.getPartJobConf(confs, hmstbl);
Class<? extends InputFormat> formatter = hmstbl.getInputFormatClass();
try {
InputFormat inputFormat = FetchOperator.getInputFormatFromCache(formatter, job);
InputSplit[] splits = inputFormat.getSplits(job, 1);
try (RecordReader<WritableComparable, Writable> reader = inputFormat.getRecordReader(splits[0], job,
Reporter.NULL)) {
return getPartitions(context, job, reader, hmstbl);
}
} catch (Exception e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR,
"show partitions for table " + hmstbl.getTableName() + ". " + ErrorMsg.TABLE_NOT_PARTITIONED +
" or the table is empty ");
}
}
private List<String> getPartitions(DDLOperationContext context, Configuration job,
RecordReader<WritableComparable, Writable> reader, org.apache.hadoop.hive.ql.metadata.Table hmstbl)
throws Exception {
List<String> parts = Lists.newArrayList();
Writable value = reader.createValue();
WritableComparable key = reader.createKey();
try (FetchFormatter fetcher = new DefaultFetchFormatter()) {
fetcher.initialize(job, HiveTableUtil.getSerializationProps());
org.apache.hadoop.hive.ql.metadata.Table metaDataPartTable =
context.getDb().getTable(hmstbl.getDbName(), hmstbl.getTableName(), "partitions", true);
Deserializer currSerDe = metaDataPartTable.getDeserializer();
ObjectMapper mapper = new ObjectMapper();
Table tbl = getTable(hmstbl);
while (reader.next(key, value)) {
String[] row =
fetcher.convert(currSerDe.deserialize(value), currSerDe.getObjectInspector())
.toString().split("\t");
parts.add(HiveTableUtil.getParseData(row[PART_IDX], row[SPEC_IDX], mapper, tbl.spec().specId()));
}
}
Collections.sort(parts);
return parts;
}
@Override
public void validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec)
throws SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (hmsTable.getSnapshotRef() != null && hasUndergonePartitionEvolution(table)) {
// for this case we rewrite the query as delete query, so validations would be done as part of delete.
return;
}
if (table.spec().isUnpartitioned() && MapUtils.isNotEmpty(partitionSpec)) {
throw new SemanticException("Writing data into a partition fails when the Iceberg table is unpartitioned.");
}
Map<String, Types.NestedField> mapOfPartColNamesWithTypes = Maps.newHashMap();
for (PartitionField partField : table.spec().fields()) {
Types.NestedField field = table.schema().findField(partField.sourceId());
mapOfPartColNamesWithTypes.put(field.name(), field);
}
for (Map.Entry<String, String> spec : partitionSpec.entrySet()) {
Types.NestedField field = mapOfPartColNamesWithTypes.get(spec.getKey());
Objects.requireNonNull(field, String.format("%s is not a partition column", spec.getKey()));
// If the partition spec value is null, it's a dynamic partition column.
if (spec.getValue() != null) {
Object partKeyVal = Conversions.fromPartitionString(field.type(), spec.getValue());
Objects.requireNonNull(partKeyVal,
String.format("Partition spec value for column : %s is invalid", field.name()));
}
}
}
/**
* A function to decide whether a given truncate query can perform a metadata delete or not.
* If its not possible to perform metadata delete then try to perform a delete based on the mode.
* The steps to decide whether truncate is possible is as follows - <br>
* a. Create an expression based on the partition spec columns and partition spec values. <br>
* b. Find files which match the expression using Apache Iceberg's FindFiles API. <br>
* c. Do evaluation on whether the expression can match the partition value in the file. <br>
* If for all files, the strict evaluation returns true, it means that we safely delete all files
* by performing a metadata delete operation. If not, we must convert the truncate to delete query
* which eventually performs a delete based on the mode.
* @param hmsTable A Hive table instance.
* @param partitionSpec Map containing partition specification given by user.
* @return true if we can perform metadata delete, otherwise false.
* @throws SemanticException Exception raised when a partition transform is being used
* or when partition column is not present in the table.
*/
@Override
public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map<String, String> partitionSpec)
throws SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
if (MapUtils.isEmpty(partitionSpec) || !hasUndergonePartitionEvolution(table)) {
return true;
} else if (hmsTable.getSnapshotRef() != null) {
return false;
}
Expression finalExp = generateExpressionFromPartitionSpec(table, partitionSpec);
FindFiles.Builder builder = new FindFiles.Builder(table).withRecordsMatching(finalExp);
Set<DataFile> dataFiles = Sets.newHashSet(builder.collect());
boolean result = true;
for (DataFile dataFile : dataFiles) {
PartitionData partitionData = (PartitionData) dataFile.partition();
Expression residual = ResidualEvaluator.of(table.spec(), finalExp, false)
.residualFor(partitionData);
if (!residual.isEquivalentTo(Expressions.alwaysTrue())) {
result = false;
break;
}
}
return result;
}
private boolean hasUndergonePartitionEvolution(Table table) {
// If it is a table which has undergone partition evolution, return true.
return table.currentSnapshot() != null &&
table.currentSnapshot().allManifests(table.io()).parallelStream()
.map(ManifestFile::partitionSpecId)
.anyMatch(id -> id < table.spec().specId());
}
private boolean isIdentityPartitionTable(org.apache.hadoop.hive.ql.metadata.Table table) {
return getPartitionTransformSpec(table).stream().map(TransformSpec::getTransformType)
.allMatch(type -> type == TransformSpec.TransformType.IDENTITY);
}
@Override
public Optional<ErrorMsg> isEligibleForCompaction(
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec) {
if (partitionSpec != null) {
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
if (hasUndergonePartitionEvolution(icebergTable)) {
return Optional.of(ErrorMsg.COMPACTION_PARTITION_EVOLUTION);
}
if (!isIdentityPartitionTable(table)) {
return Optional.of(ErrorMsg.COMPACTION_NON_IDENTITY_PARTITION_SPEC);
}
}
return Optional.empty();
}
@Override
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
return getPartitionNames(table, partitionSpec).stream()
.map(partName -> new DummyPartition(table, partName, partitionSpec)).collect(Collectors.toList());
}
@Override
public Partition getPartition(org.apache.hadoop.hive.ql.metadata.Table table,
Map<String, String> partitionSpec) throws SemanticException {
validatePartSpec(table, partitionSpec);
try {
String partName = Warehouse.makePartName(partitionSpec, false);
return new DummyPartition(table, partName, partitionSpec);
} catch (MetaException e) {
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
}
}
/**
* Returns a list of partitions which are corresponding to the table based on the partition spec provided.
* @param hmsTable A Hive table instance.
* @param partitionSpec Map containing partition specification.
* @return A list of partition values which satisfies the partition spec provided corresponding to the table.
* @throws SemanticException Exception raised when there is an issue performing a scan on the partitions table.
*/
@Override
public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec) throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS);
Expression expression = generateExpressionFromPartitionSpec(icebergTable, partitionSpec);
Set<PartitionData> partitionList = Sets.newHashSet();
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
fileScanTasks.forEach(task -> {
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(), row -> {
StructProjection data = row.get(PART_IDX, StructProjection.class);
PartitionSpec pSpec = icebergTable.spec();
PartitionData partitionData = new PartitionData(pSpec.partitionType());
for (int index = 0; index < pSpec.fields().size(); index++) {
partitionData.set(index, data.get(index, Object.class));
}
return partitionData;
})));
});
List<String> partPathList = partitionList.stream().filter(partitionData -> {
ResidualEvaluator resEval = ResidualEvaluator.of(icebergTable.spec(), expression, false);
return resEval.residualFor(partitionData).isEquivalentTo(Expressions.alwaysTrue());
}).map(partitionData -> icebergTable.spec().partitionToPath(partitionData)).collect(Collectors.toList());
return partPathList;
} catch (IOException e) {
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}
private Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = table.spec().fields().stream()
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
Expression finalExp = Expressions.alwaysTrue();
for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
String partColName = entry.getKey();
if (partitionFieldMap.containsKey(partColName)) {
PartitionField partitionField = partitionFieldMap.get(partColName);
Type resultType = partitionField.transform().getResultType(table.schema()
.findField(partitionField.sourceId()).type());
Object value = Conversions.fromPartitionString(resultType, entry.getValue());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Iterable<?> iterable = () -> Collections.singletonList(value).iterator();
if (TransformSpec.TransformType.IDENTITY == transformType) {
Expression boundPredicate = Expressions.in(partitionField.name(), iterable);
finalExp = Expressions.and(finalExp, boundPredicate);
} else {
throw new SemanticException(
String.format("Partition transforms are not supported via truncate operation: %s", partColName));
}
} else {
throw new SemanticException(String.format("No partition column/transform by the name: %s", partColName));
}
}
return finalExp;
}
/**
* A function to fetch the column information of the underlying column defined by the table format.
* @param hmsTable A Hive table instance
* @param colName Column name
* @return An instance of ColumnInfo.
* @throws SemanticException An exception is thrown when the column is not present, or if we unable to fetch
* column type due to SerDeException, or if the associated field object inspector is not present.
*/
@Override
public ColumnInfo getColumnInfo(org.apache.hadoop.hive.ql.metadata.Table hmsTable, String colName)
throws SemanticException {
Table icebergTbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Deserializer deserializer = hmsTable.getDeserializer();
Types.NestedField field = icebergTbl.schema().findField(colName);
if (field != null) {
try {
ObjectInspector fieldObjInspector = null;
StructObjectInspector structObjectInspector = (StructObjectInspector) deserializer.getObjectInspector();
for (StructField structField : structObjectInspector.getAllStructFieldRefs()) {
if (field.name().equalsIgnoreCase(structField.getFieldName())) {
fieldObjInspector = structField.getFieldObjectInspector();
break;
}
}
if (fieldObjInspector != null) {
return new ColumnInfo(field.name(), fieldObjInspector, hmsTable.getTableName(), false);
} else {
throw new SemanticException(String.format("Unable to fetch column type of column %s " +
"since we are not able to infer its object inspector.", colName));
}
} catch (SerDeException e) {
throw new SemanticException(String.format("Unable to fetch column type of column %s due to: %s", colName, e));
}
} else {
throw new SemanticException(String.format("Unable to find a column with the name: %s", colName));
}
}
@Override
public boolean canPerformMetadataDelete(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
String branchName, SearchArgument sarg) {
Expression exp;
try {
exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter," +
" continuing without metadata delete: ", e);
return false;
}
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
// The following code is inspired & copied from Iceberg's SparkTable.java#canDeleteUsingMetadata
if (ExpressionUtil.selectsPartitions(exp, table, false)) {
return true;
}
TableScan scan = table.newScan().filter(exp).caseSensitive(false).includeColumnStats().ignoreResiduals();
if (branchName != null) {
scan.useRef(HiveUtils.getTableSnapshotRef(branchName));
}
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
Map<Integer, Evaluator> evaluators = Maps.newHashMap();
StrictMetricsEvaluator metricsEvaluator =
new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table, branchName), exp);
return Iterables.all(tasks, task -> {
DataFile file = task.file();
PartitionSpec spec = task.spec();
Evaluator evaluator = evaluators.computeIfAbsent(spec.specId(), specId ->
new Evaluator(spec.partitionType(), Projections.strict(spec).project(exp)));
return evaluator.eval(file.partition()) || metricsEvaluator.eval(file);
});
} catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
return false;
}
}
@Override
public List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Schema schema = icebergTable.schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return icebergTable.spec().fields().stream().map(partField ->
new FieldSchema(schema.findColumnName(partField.sourceId()),
colNameToColType.get(schema.findColumnName(partField.sourceId())),
String.format("Transform: %s", partField.transform().toString()))).collect(Collectors.toList());
}
@Override
public List<Partition> getPartitionsByExpr(org.apache.hadoop.hive.ql.metadata.Table hmsTable, ExprNodeDesc desc)
throws SemanticException {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
PartitionSpec pSpec = icebergTable.spec();
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
.createMetadataTableInstance(icebergTable, MetadataTableType.PARTITIONS);
SearchArgument sarg = ConvertAstToSearchArg.create(conf, (ExprNodeGenericFuncDesc) desc);
Expression expression = HiveIcebergFilterFactory.generateFilterExpression(sarg);
Set<PartitionData> partitionList = Sets.newHashSet();
ResidualEvaluator resEval = ResidualEvaluator.of(pSpec, expression, false);
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
fileScanTasks.forEach(task ->
partitionList.addAll(Sets.newHashSet(CloseableIterable.transform(task.asDataTask().rows(), row -> {
StructProjection data = row.get(PART_IDX, StructProjection.class);
return IcebergTableUtil.toPartitionData(data, pSpec.partitionType());
})).stream()
.filter(partitionData -> resEval.residualFor(partitionData).isEquivalentTo(Expressions.alwaysTrue()))
.collect(Collectors.toSet())));
return partitionList.stream()
.map(partitionData -> new DummyPartition(hmsTable, pSpec.partitionToPath(partitionData)))
.collect(Collectors.toList());
} catch (IOException e) {
throw new SemanticException(String.format("Error while fetching the partitions due to: %s", e));
}
}
@Override
public boolean supportsMergeFiles() {
return true;
}
@Override
public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOException {
String tableName = properties.getProperty(Catalogs.NAME);
String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();
List<JobContext> originalContextList = generateJobContext(configuration, tableName, snapshotRef);
List<JobContext> jobContextList = originalContextList.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
if (jobContextList.isEmpty()) {
return Collections.emptyList();
}
return new HiveIcebergOutputCommitter().getOutputFiles(jobContextList);
}
@Override
public MergeTaskProperties getMergeTaskProperties(Properties properties) {
return new IcebergMergeTaskProperties(properties);
}
}