blob: d5350feb543049c886653c5860c2ccfaa5fcd075 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.antlr.runtime.TokenRewriteStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
* update, delete and merge statements. It works by rewriting the updates and deletes into insert
* statements (since they are actually inserts) and then doing some patch up to make them work as
* updates and deletes instead.
*/
public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
protected static final Logger LOG = LoggerFactory.getLogger(RewriteSemanticAnalyzer.class);
protected boolean useSuper = false;
protected static final String INDENT = " ";
private IdentifierQuoter quotedIdentifierHelper;
private Table targetTable;
private String targetTableFullName;
RewriteSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@Override
public void analyzeInternal(ASTNode tree) throws SemanticException {
if (useSuper) {
super.analyzeInternal(tree);
} else {
quotedIdentifierHelper = new IdentifierQuoter(ctx.getTokenRewriteStream());
analyze(tree);
cleanUpMetaColumnAccessControl();
}
}
protected abstract ASTNode getTargetTableNode(ASTNode tree);
private void analyze(ASTNode tree) throws SemanticException {
ASTNode tableName = getTargetTableNode(tree);
targetTableFullName = getFullTableNameForSQL(tableName);
targetTable = getTable(tableName, db, true);
validateTxnManager(targetTable);
validateTargetTable(targetTable);
analyze(tree, targetTable, tableName);
}
protected abstract void analyze(ASTNode tree, Table table, ASTNode tableName) throws SemanticException;
public void analyzeRewrittenTree(ASTNode rewrittenTree, Context rewrittenCtx) throws SemanticException {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Rewritten AST {}", rewrittenTree.dump());
}
useSuper = true;
super.analyze(rewrittenTree, rewrittenCtx);
} finally {
useSuper = false;
}
}
/**
* Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
* INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
*/
protected void addColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr)
throws SemanticException {
// If the table is partitioned, we need to select the partition columns as well.
if (partCols != null) {
for (FieldSchema fschema : partCols) {
rewrittenQueryStr.append(", ");
rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
}
}
}
/**
* Append list of columns to rewritten statement.
*/
protected void addColsToSelect(List<FieldSchema> cols, StringBuilder rewrittenQueryStr,
ASTNode target) throws SemanticException {
addColsToSelect(cols, rewrittenQueryStr, getSimpleTableName(target));
}
/**
* Append list of columns to rewritten statement.
* Column names are qualified with the specified alias and quoted.
*/
protected void addColsToSelect(List<FieldSchema> cols, StringBuilder rewrittenQueryStr, String alias) {
// If the table is partitioned, we need to select the partition columns as well.
if (cols != null) {
for (FieldSchema fschema : cols) {
rewrittenQueryStr.append(", ");
rewrittenQueryStr.append(alias).append('.');
rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
}
}
}
protected void addPartitionColsAsValues(List<FieldSchema> partCols, String alias, List<String> values) {
if (partCols == null) {
return;
}
partCols.forEach(
fieldSchema -> values.add(alias + "." + HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf)));
}
/**
* Assert that we are not asked to update a bucketing column or partition column.
* @param colName it's the A in "SET A = B"
*/
protected void checkValidSetClauseTarget(ASTNode colName, Table targetTable) throws SemanticException {
String columnName = normalizeColName(colName.getText());
// Make sure this isn't one of the partitioning columns, that's not supported.
for (FieldSchema fschema : targetTable.getPartCols()) {
if (fschema.getName().equalsIgnoreCase(columnName)) {
throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
}
}
//updating bucket column should move row from one file to another - not supported
if (targetTable.getBucketCols() != null && targetTable.getBucketCols().contains(columnName)) {
throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE, columnName);
}
boolean foundColumnInTargetTable = false;
for (FieldSchema col : targetTable.getCols()) {
if (columnName.equalsIgnoreCase(col.getName())) {
foundColumnInTargetTable = true;
break;
}
}
if (!foundColumnInTargetTable) {
throw new SemanticException(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE, colName.getText(),
targetTable.getFullyQualifiedName());
}
}
protected ASTNode findLHSofAssignment(ASTNode assignment) {
assert assignment.getToken().getType() == HiveParser.EQUAL :
"Expected set assignments to use equals operator but found " + assignment.getName();
ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
"Expected left side of assignment to be table or column";
ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
assert colName.getToken().getType() == HiveParser.Identifier :
"Expected column name";
return colName;
}
protected Map<String, ASTNode> collectSetColumnsAndExpressions(ASTNode setClause,
Set<String> setRCols, Table targetTable) throws SemanticException {
// An update needs to select all of the columns, as we rewrite the entire row. Also,
// we need to figure out which columns we are going to replace.
assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
"Expected second child of update token to be set token";
// Get the children of the set clause, each of which should be a column assignment
List<? extends Node> assignments = setClause.getChildren();
// Must be deterministic order map for consistent q-test output across Java versions
Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
for (Node a : assignments) {
ASTNode assignment = (ASTNode)a;
ASTNode colName = findLHSofAssignment(assignment);
if (setRCols != null) {
addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
}
checkValidSetClauseTarget(colName, targetTable);
String columnName = normalizeColName(colName.getText());
// This means that in UPDATE T SET x = _something_
// _something_ can be whatever is supported in SELECT _something_
setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
}
return setCols;
}
/**
* @param throwException if false, return null if table doesn't exist, else throw
*/
protected static Table getTable(ASTNode tabRef, Hive db, boolean throwException) throws SemanticException {
TableName tableName;
switch (tabRef.getType()) {
case HiveParser.TOK_TABREF:
tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
break;
case HiveParser.TOK_TABNAME:
tableName = getQualifiedTableName(tabRef);
break;
default:
throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
}
Table mTable;
try {
mTable = db.getTable(tableName.getDb(), tableName.getTable(), tableName.getTableMetaRef(), throwException);
} catch (InvalidTableException e) {
LOG.error("Failed to find table " + tableName.getNotEmptyDbTable() + " got exception " + e.getMessage());
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName.getNotEmptyDbTable()), e);
} catch (HiveException e) {
LOG.error("Failed to find table " + tableName.getNotEmptyDbTable() + " got exception " + e.getMessage());
throw new SemanticException(e.getMessage(), e);
}
return mTable;
}
/**
* Walk through all our inputs and set them to note that this read is part of an update or a delete.
*/
protected void markReadEntityForUpdate() {
for (ReadEntity input : inputs) {
if (isWritten(input)) {
//TODO: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
//except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
//for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
//so DbTxnManager skips Read lock on the ReadEntity....
input.setUpdateOrDelete(true); //input.noLockNeeded()?
}
}
}
/**
* For updates, we need to set the column access info so that it contains information on
* the columns we are updating.
* (But not all the columns of the target table even though the rewritten query writes
* all columns of target table since that is an implementation detail).
*/
protected void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
ColumnAccessInfo cai = new ColumnAccessInfo();
for (String colName : setCols.keySet()) {
cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
}
setUpdateColumnAccessInfo(cai);
}
/**
* We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
* require the user to have authorization on that column.
*/
private void cleanUpMetaColumnAccessControl() {
//we do this for Update/Delete (incl Merge) because we introduce this column into the query
//as part of rewrite
if (columnAccessInfo != null) {
columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
}
}
/**
* Parse the newly generated SQL statement to get a new AST.
*/
protected ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery)
throws SemanticException {
// Set dynamic partitioning to nonstrict so that queries do not need any partition
// references.
// TODO: this may be a perf issue as it prevents the optimizer.. or not
HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
// Disable LLAP IO wrapper; doesn't propagate extra ACID columns correctly.
HiveConf.setBoolVar(conf, ConfVars.LLAP_IO_ROW_WRAPPER_ENABLED, false);
// Parse the rewritten query string
Context rewrittenCtx;
rewrittenCtx = new Context(conf);
rewrittenCtx.setHDFSCleanup(true);
// We keep track of all the contexts that are created by this query
// so we can clear them when we finish execution
ctx.addSubContext(rewrittenCtx);
rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
rewrittenCtx.setStatsSource(ctx.getStatsSource());
rewrittenCtx.setPlanMapper(ctx.getPlanMapper());
rewrittenCtx.setIsUpdateDeleteMerge(true);
rewrittenCtx.setCmd(rewrittenQueryStr.toString());
ASTNode rewrittenTree;
try {
LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
rewrittenTree = ParseUtils.parse(rewrittenQueryStr.toString(), rewrittenCtx);
} catch (ParseException e) {
throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
}
return new ReparseResult(rewrittenTree, rewrittenCtx);
}
private void validateTxnManager(Table mTable) throws SemanticException {
if (!AcidUtils.acidTableWithoutTransactions(mTable) && !getTxnMgr().supportsAcid()) {
throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
}
}
/**
* Assert it supports Acid write.
*/
protected void validateTargetTable(Table mTable) throws SemanticException {
if (mTable.getTableType() == TableType.VIRTUAL_VIEW || mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
LOG.error("Table " + mTable.getFullyQualifiedName() + " is a view or materialized view");
throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
}
}
/**
* Check that {@code readEntity} is also being written.
*/
private boolean isWritten(Entity readEntity) {
for (Entity writeEntity : outputs) {
//make sure to compare them as Entity, i.e. that it's the same table or partition, etc
if (writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
return true;
}
}
return false;
}
// This method finds any columns on the right side of a set statement (thus rcols) and puts them
// in a set so we can add them to the list of input cols to check.
private void addSetRCols(ASTNode node, Set<String> setRCols) {
// See if this node is a TOK_TABLE_OR_COL. If so, find the value and put it in the list. If
// not, recurse on any children
if (node.getToken().getType() == HiveParser.TOK_TABLE_OR_COL) {
ASTNode colName = (ASTNode)node.getChildren().get(0);
if (colName.getToken().getType() == HiveParser.TOK_DEFAULT_VALUE) {
return;
}
assert colName.getToken().getType() == HiveParser.Identifier :
"Expected column name";
setRCols.add(normalizeColName(colName.getText()));
} else if (node.getChildren() != null) {
for (Node n : node.getChildren()) {
addSetRCols((ASTNode)n, setRCols);
}
}
}
/**
* Column names are stored in metastore in lower case, regardless of the CREATE TABLE statement.
* Unfortunately there is no single place that normalizes the input query.
* @param colName not null
*/
private static String normalizeColName(String colName) {
return colName.toLowerCase();
}
/**
* SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
* if the read and write are of the same table in "insert ... select ....". Since DbTxnManager
* uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
* have change the table WriteEntity to a set of partition WriteEntity objects based on
* ReadEntity objects computed for this table.
*/
protected void updateOutputs(Table targetTable) {
markReadEntityForUpdate();
if (targetTable.isPartitioned()) {
List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
if (!partitionsRead.isEmpty()) {
// if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
// WriteEntity for each partition
List<WriteEntity> toRemove = new ArrayList<>();
for (WriteEntity we : outputs) {
WriteEntity.WriteType wt = we.getWriteType();
if (isTargetTable(we, targetTable) &&
(wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
// The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
// partition that exists and is matched by the WHERE clause (which may be all of them).
// Since we don't allow updating the value of a partition column, we know that we always
// write the same (or fewer) partitions than we read. Still, the write is a Dynamic
// Partition write - see HIVE-15032.
toRemove.add(we);
}
}
outputs.removeAll(toRemove);
// TODO: why is this like that?
for (ReadEntity re : partitionsRead) {
for (WriteEntity original : toRemove) {
//since we may have both Update and Delete branches, Auth needs to know
WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
outputs.add(we);
}
}
}
}
}
/**
* If the optimizer has determined that it only has to read some of the partitions of the
* target table to satisfy the query, then we know that the write side of update/delete
* (and update/delete parts of merge)
* can only write (at most) that set of partitions (since we currently don't allow updating
* partition (or bucket) columns). So we want to replace the table level
* WriteEntity in the outputs with WriteEntity for each of these partitions
* ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
* insert which does a select against the same table. Then SemanticAnalyzer would also
* be able to not use DP for the Insert...
*
* Note that the Insert of Merge may be creating new partitions and writing to partitions
* which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created
* in MoveTask (or some other task after the query is complete).
*/
private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
List<ReadEntity> partitionsRead = new ArrayList<>();
for (ReadEntity re : inputs) {
if (re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
partitionsRead.add(re);
}
}
return partitionsRead;
}
/**
* Does this Entity belong to target table (partition).
*/
private boolean isTargetTable(Entity entity, Table targetTable) {
//todo: https://issues.apache.org/jira/browse/HIVE-15048
// Since any DDL now advances the write id, we should ignore the write Id,
// while comparing two tables
return targetTable.equalsWithIgnoreWriteId(entity.getTable());
}
/**
* Returns the table name to use in the generated query preserving original quotes/escapes if any.
* @see #getFullTableNameForSQL(ASTNode)
*/
protected String getSimpleTableName(ASTNode n) throws SemanticException {
return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
}
protected static final class ReparseResult {
final ASTNode rewrittenTree;
final Context rewrittenCtx;
ReparseResult(ASTNode n, Context c) {
rewrittenTree = n;
rewrittenCtx = c;
}
}
// Patch up the projection list for updates, putting back the original set expressions.
// Walk through the projection list and replace the column names with the
// expressions from the original update. Under the TOK_SELECT (see above) the structure
// looks like:
// TOK_SELECT -> TOK_SELEXPR -> expr
// \-> TOK_SELEXPR -> expr ...
protected void patchProjectionForUpdate(ASTNode insertBranch, Map<Integer, ASTNode> setColExprs) {
ASTNode rewrittenSelect = (ASTNode) insertBranch.getChildren().get(1);
assert rewrittenSelect.getToken().getType() == HiveParser.TOK_SELECT :
"Expected TOK_SELECT as second child of TOK_INSERT but found " + rewrittenSelect.getName();
for (Map.Entry<Integer, ASTNode> entry : setColExprs.entrySet()) {
ASTNode selExpr = (ASTNode) rewrittenSelect.getChildren().get(entry.getKey());
assert selExpr.getToken().getType() == HiveParser.TOK_SELEXPR :
"Expected child of TOK_SELECT to be TOK_SELEXPR but was " + selExpr.getName();
// Now, change it's child
selExpr.setChild(0, entry.getValue());
}
}
protected StringBuilder createRewrittenQueryStrBuilder() {
return new StringBuilder("FROM\n");
}
protected void appendTarget(StringBuilder rewrittenQueryStr, ASTNode target, String targetName) {
rewrittenQueryStr.append(INDENT).append(targetTableFullName);
if (isAliased(target)) {
rewrittenQueryStr.append(" ").append(targetName);
}
rewrittenQueryStr.append('\n');
}
protected boolean isAliased(ASTNode n) {
switch (n.getType()) {
case HiveParser.TOK_TABREF:
return findTabRefIdxs(n)[0] != 0;
case HiveParser.TOK_TABNAME:
return false;
case HiveParser.TOK_SUBQUERY:
assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
return true;
default:
throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
}
}
protected void appendInsertBranch(StringBuilder rewrittenQueryStr, String hintStr, List<String> values) {
rewrittenQueryStr.append("INSERT INTO ").append(targetTableFullName);
addPartitionColsToInsert(targetTable.getPartCols(), rewrittenQueryStr);
rewrittenQueryStr.append("\n");
rewrittenQueryStr.append(INDENT);
rewrittenQueryStr.append("SELECT ");
if (isNotBlank(hintStr)) {
rewrittenQueryStr.append(hintStr);
}
rewrittenQueryStr.append(StringUtils.join(values, ","));
rewrittenQueryStr.append("\n");
}
protected void appendSortBy(StringBuilder rewrittenQueryStr, List<String> keys) {
if (keys.isEmpty()) {
return;
}
rewrittenQueryStr.append(INDENT).append("SORT BY ");
rewrittenQueryStr.append(StringUtils.join(keys, ","));
rewrittenQueryStr.append("\n");
}
/**
* This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
* Since HiveLexer.g is written such that it strips away any ` (back ticks) around
* quoted identifiers we need to add those back to generated SQL.
* Additionally, the parser only produces tokens of type Identifier and never
* QuotedIdentifier (HIVE-6013). So here we just quote all identifiers.
* (') around String literals are retained w/o issues
*/
private static class IdentifierQuoter {
private final TokenRewriteStream trs;
private final IdentityHashMap<ASTNode, ASTNode> visitedNodes = new IdentityHashMap<>();
IdentifierQuoter(TokenRewriteStream trs) {
this.trs = trs;
if (trs == null) {
throw new IllegalArgumentException("Must have a TokenRewriteStream");
}
}
private void visit(ASTNode n) {
if (n.getType() == HiveParser.Identifier) {
if (visitedNodes.containsKey(n)) {
/**
* Since we are modifying the stream, it's not idempotent. Ideally, the caller would take
* care to only quote Identifiers in each subtree once, but this makes it safe
*/
return;
}
visitedNodes.put(n, n);
trs.insertBefore(n.getToken(), "`");
trs.insertAfter(n.getToken(), "`");
}
if (n.getChildCount() <= 0) {
return;
}
for (Node c : n.getChildren()) {
visit((ASTNode)c);
}
}
}
/**
* This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without
* needing to understand what it is (except for QuotedIdentifiers).
*/
protected String getMatchedText(ASTNode n) {
quotedIdentifierHelper.visit(n);
return ctx.getTokenRewriteStream().toString(n.getTokenStartIndex(),
n.getTokenStopIndex() + 1).trim();
}
public static final String DELETE_PREFIX = "__d__";
public static final String SUB_QUERY_ALIAS = "s";
protected ColumnAppender getColumnAppender(String subQueryAlias, String deletePrefix) {
boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(targetTable, true);
return nonNativeAcid ? new NonNativeAcidColumnAppender(targetTable, conf, subQueryAlias, deletePrefix) :
new NativeAcidColumnAppender(targetTable, conf, subQueryAlias);
}
protected static abstract class ColumnAppender {
protected final Table table;
protected final HiveConf conf;
protected final String subQueryAlias;
protected ColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
this.table = table;
this.conf = conf;
this.subQueryAlias = subQueryAlias;
}
public abstract void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation);
public abstract List<String> getDeleteValues(Context.Operation operation);
public abstract List<String> getSortKeys();
protected String qualify(String columnName) {
if (isBlank(subQueryAlias)) {
return columnName;
}
return String.format("%s.%s", subQueryAlias, columnName);
}
}
protected static class NativeAcidColumnAppender extends ColumnAppender {
public NativeAcidColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
super(table, conf, subQueryAlias);
}
@Override
public void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation) {
stringBuilder.append("ROW__ID,");
for (FieldSchema fieldSchema : table.getPartCols()) {
String identifier = HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf);
stringBuilder.append(identifier);
stringBuilder.append(",");
}
}
@Override
public List<String> getDeleteValues(Context.Operation operation) {
List<String> deleteValues = new ArrayList<>(1 + table.getPartCols().size());
deleteValues.add(qualify("ROW__ID"));
for (FieldSchema fieldSchema : table.getPartCols()) {
deleteValues.add(qualify(HiveUtils.unparseIdentifier(fieldSchema.getName(), conf)));
}
return deleteValues;
}
@Override
public List<String> getSortKeys() {
return singletonList(qualify("ROW__ID"));
}
}
protected static class NonNativeAcidColumnAppender extends ColumnAppender {
private final String deletePrefix;
public NonNativeAcidColumnAppender(Table table, HiveConf conf, String subQueryAlias, String deletePrefix) {
super(table, conf, subQueryAlias);
this.deletePrefix = deletePrefix;
}
@Override
public void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation) {
List<FieldSchema> acidSelectColumns = table.getStorageHandler().acidSelectColumns(table, operation);
for (FieldSchema fieldSchema : acidSelectColumns) {
String identifier = HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf);
stringBuilder.append(identifier);
if (StringUtils.isNotEmpty(deletePrefix)) {
stringBuilder.append(" AS ");
String prefixedIdentifier = HiveUtils.unparseIdentifier(deletePrefix + fieldSchema.getName(), this.conf);
stringBuilder.append(prefixedIdentifier);
}
stringBuilder.append(",");
}
}
@Override
public List<String> getDeleteValues(Context.Operation operation) {
List<FieldSchema> acidSelectColumns = table.getStorageHandler().acidSelectColumns(table, operation);
List<String> deleteValues = new ArrayList<>(acidSelectColumns.size());
for (FieldSchema fieldSchema : acidSelectColumns) {
String prefixedIdentifier = HiveUtils.unparseIdentifier(deletePrefix + fieldSchema.getName(), this.conf);
deleteValues.add(qualify(prefixedIdentifier));
}
return deleteValues;
}
@Override
public List<String> getSortKeys() {
return table.getStorageHandler().acidSortColumns(table, Context.Operation.DELETE).stream()
.map(fieldSchema -> qualify(
HiveUtils.unparseIdentifier(deletePrefix + fieldSchema.getName(), this.conf)))
.collect(Collectors.toList());
}
}
}