blob: 33e87a3067c419450e36268febda2f970dfa22b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiFunction;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext;
import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc.DPSortState;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde2.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* File Sink operator implementation.
**/
@SuppressWarnings("deprecation")
public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
Serializable, IConfigureJobConf {
public static final Logger LOG = LoggerFactory.getLogger(FileSinkOperator.class);
protected transient HashMap<String, FSPaths> valToPaths;
protected transient int numDynParts;
protected transient List<String> dpColNames;
protected transient DynamicPartitionCtx dpCtx;
protected transient boolean isCompressed;
protected transient boolean isTemporary;
protected transient Path parent;
protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
protected transient Path specPath;
protected transient String unionPath;
protected transient boolean isUnionDp;
protected transient int dpStartCol; // start column # for DP columns
protected transient List<String> dpVals; // array of values corresponding to DP columns
protected transient List<Object> dpWritables;
protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
protected transient int maxPartitions;
protected transient ListBucketingCtx lbCtx;
protected transient boolean isSkewedStoredAsSubDirectories;
protected transient boolean[] statsFromRecordWriter;
protected transient boolean isCollectRWStats;
private transient FSPaths prevFsp;
private transient FSPaths fpaths;
private StructField recIdField; // field to find record identifier in
private StructField bucketField; // field bucket is in in record id
private StructObjectInspector recIdInspector; // OI for inspecting record id
private IntObjectInspector bucketInspector; // OI for inspecting bucket id
protected transient long numRows = 0;
protected transient long cntr = 1;
protected transient long logEveryNRows = 0;
protected transient int rowIndex = 0;
private transient Path destTablePath;
private transient boolean isInsertOverwrite;
private transient String counterGroup;
private transient BiFunction<Object[], ObjectInspector[], Integer> hashFunc;
public static final String TOTAL_TABLE_ROWS_WRITTEN = "TOTAL_TABLE_ROWS_WRITTEN";
private transient Set<String> dynamicPartitionSpecs = new HashSet<>();
/**
* Counters.
*/
public static enum Counter {
RECORDS_OUT
}
/**
* RecordWriter.
*
*/
public static interface RecordWriter {
void write(Writable w) throws IOException;
void close(boolean abort) throws IOException;
}
public class FSPaths implements Cloneable {
private Path tmpPathRoot;
private String subdirBeforeTxn, subdirAfterTxn;
private final String subdirForTxn;
private Path taskOutputTempPathRoot;
Path[] outPaths;
// The bucket files we successfully wrote to in this writer
Path[] outPathsCommitted;
Path[] finalPaths;
RecordWriter[] outWriters;
RecordUpdater[] updaters;
Stat stat;
int acidLastBucket = -1;
int acidFileOffset = -1;
private boolean isMmTable;
private boolean isDirectInsert;
private boolean isInsertOverwrite;
String dpDirForCounters;
public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert, boolean isInsertOverwrite) {
this.isMmTable = isMmTable;
this.isDirectInsert = isDirectInsert;
this.isInsertOverwrite = isInsertOverwrite;
if (!isMmTable && !isDirectInsert) {
tmpPathRoot = Utilities.toTempPath(specPath);
taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath);
subdirForTxn = null;
} else {
tmpPathRoot = specPath;
taskOutputTempPathRoot = null; // Should not be used.
if (isMmTable) {
subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId());
} else {
/**
* For direct write to final path during ACID insert, we create the delta directories
* later when we create the RecordUpdater using AcidOutputFormat.Options
*/
subdirForTxn = null;
}
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles
+ " files, dynParts = " + bDynParts + " (spec path " + specPath + ")");
}
outPaths = new Path[numFiles];
outPathsCommitted = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
updaters = new RecordUpdater[numFiles];
if (LOG.isDebugEnabled()) {
LOG.debug("Created slots for " + numFiles);
}
stat = new Stat();
}
public void closeWriters(boolean abort) throws HiveException {
Exception exception = null;
for (int idx = 0; idx < outWriters.length; idx++) {
if (outWriters[idx] != null) {
try {
outWriters[idx].close(abort);
updateProgress();
} catch (IOException e) {
exception = e;
LOG.error("Error closing " + outWriters[idx].toString(), e);
// continue closing others
}
}
}
for (int i = 0; i < updaters.length; i++) {
if (updaters[i] != null) {
SerDeStats stats = updaters[i].getStats();
// Ignore 0 row files except in case of insert overwrite
if (isDirectInsert && (stats.getRowCount() > 0 || isInsertOverwrite)) {
outPathsCommitted[i] = updaters[i].getUpdatedFilePath();
}
try {
updaters[i].close(abort);
} catch (IOException e) {
exception = e;
LOG.error("Error closing " + updaters[i].toString(), e);
// continue closing others
}
}
}
// Made an attempt to close all writers.
if (exception != null) {
throw new HiveException(exception);
}
}
private void commit(FileSystem fs, List<Path> commitPaths) throws HiveException {
for (int idx = 0; idx < outPaths.length; ++idx) {
try {
if (outPaths[idx] != null) {
commitOneOutPath(idx, fs, commitPaths);
}
} catch (IOException e) {
throw new HiveException("Unable to commit output from: " +
outPaths[idx] + " to: " + finalPaths[idx], e);
}
}
}
private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths)
throws IOException, HiveException {
if ((bDynParts || isSkewedStoredAsSubDirectories)
&& !fs.exists(finalPaths[idx].getParent())) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("commit making path for dyn/skew: " + finalPaths[idx].getParent());
}
FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf);
}
if(outPaths[idx] != null && fs.exists(outPaths[idx])) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (mm table ="
+ isMmTable + ", direct insert = " + isDirectInsert + ")");
}
if (isMmTable) {
assert outPaths[idx].equals(finalPaths[idx]);
commitPaths.add(outPaths[idx]);
} else if (isDirectInsert && (outPathsCommitted[idx] != null)) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER
.trace("committing " + outPathsCommitted[idx] + " (direct insert = " + isDirectInsert + ")");
}
commitPaths.add(outPathsCommitted[idx]);
} else if (!isDirectInsert && !fs.rename(outPaths[idx], finalPaths[idx])) {
FileStatus fileStatus = FileUtils.getFileStatusOrNull(fs, finalPaths[idx]);
if (fileStatus != null) {
LOG.warn("Target path " + finalPaths[idx] + " with a size " + fileStatus.getLen() + " exists. Trying to delete it.");
if (!fs.delete(finalPaths[idx], true)) {
throw new HiveException("Unable to delete existing target output: " + finalPaths[idx]);
}
}
if (!fs.rename(outPaths[idx], finalPaths[idx])) {
throw new HiveException("Unable to rename output from: "
+ outPaths[idx] + " to: " + finalPaths[idx]);
}
}
}
updateProgress();
}
public void abortWritersAndUpdaters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
for (int idx = 0; idx < outWriters.length; idx++) {
if (outWriters[idx] != null) {
try {
LOG.debug("Aborted: closing: " + outWriters[idx].toString());
outWriters[idx].close(abort);
if (delete) {
fs.delete(outPaths[idx], true);
}
updateProgress();
} catch (IOException e) {
throw new HiveException(e);
}
}
}
for (int idx = 0; idx < updaters.length; idx++) {
if (updaters[idx] != null) {
try {
LOG.debug("Aborted: closing: " + updaters[idx].toString());
updaters[idx].close(abort);
if (delete) {
fs.delete(outPaths[idx], true);
}
updateProgress();
} catch (IOException e) {
throw new HiveException(e);
}
}
}
}
public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable,
boolean isSkewedStoredAsSubDirectories) {
if (isNativeTable) {
String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
String taskWithExt = extension == null ? taskId : taskId + extension;
if (!isMmTable && !isDirectInsert) {
if (!bDynParts && !isSkewedStoredAsSubDirectories) {
finalPaths[filesIdx] = new Path(parent, taskWithExt);
} else {
finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt);
}
outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId));
} else {
String taskIdPath = taskId;
if (conf.isMerge()) {
// Make sure we don't collide with the source files.
// MM tables don't support concat so we don't expect the merge of merged files.
taskIdPath += ".merged";
}
if (extension != null) {
taskIdPath += extension;
}
Path finalPath;
if (isDirectInsert) {
finalPath = buildTmpPath();
} else {
finalPath = new Path(buildTmpPath(), taskIdPath);
}
// In the cases that have multi-stage insert, e.g. a "hive.skewjoin.key"-based skew join,
// it can happen that we want multiple commits into the same directory from different
// tasks (not just task instances). In non-MM case, Utilities.renameOrMoveFiles ensures
// unique names. We could do the same here, but this will still cause the old file to be
// deleted because it has not been committed in /this/ FSOP. We are going to fail to be
// safe. Potentially, we could implement some partial commit between stages, if this
// affects some less obscure scenario.
try {
FileSystem fpfs = finalPath.getFileSystem(hconf);
if ((!isDirectInsert) && fpfs.exists(finalPath)) {
throw new RuntimeException(finalPath + " already exists");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
finalPaths[filesIdx] = finalPath;
outPaths[filesIdx] = finalPath;
}
if (LOG.isInfoEnabled()) {
LOG.info("Final Path: FS " + finalPaths[filesIdx]);
if (LOG.isInfoEnabled() && (!isMmTable && !isDirectInsert)) {
LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
}
}
} else {
finalPaths[filesIdx] = outPaths[filesIdx] = specPath;
}
}
public Path buildTmpPath() {
String pathStr = tmpPathRoot.toString();
if (subdirBeforeTxn != null) {
pathStr += Path.SEPARATOR + subdirBeforeTxn;
}
if (subdirForTxn != null) {
pathStr += Path.SEPARATOR + subdirForTxn;
}
if (subdirAfterTxn != null) {
pathStr += Path.SEPARATOR + subdirAfterTxn;
}
return new Path(pathStr);
}
public Path buildTaskOutputTempPath() {
if (taskOutputTempPathRoot == null) {
return null;
}
assert subdirForTxn == null;
String pathStr = taskOutputTempPathRoot.toString();
if (subdirBeforeTxn != null) {
pathStr += Path.SEPARATOR + subdirBeforeTxn;
}
if (subdirAfterTxn != null) {
pathStr += Path.SEPARATOR + subdirAfterTxn;
}
return new Path(pathStr);
}
public void addToStat(String statType, long amount) {
stat.addToStat(statType, amount);
}
public Collection<String> getStoredStats() {
return stat.getStoredStats();
}
/**
* This method is intended for use with ACID unbucketed tables, where the DELETE ops behave as
* though they are bucketed, but without an explicit pre-specified bucket count. The bucketNum
* is read out of the middle value of the ROW__ID variable and this is written out from a single
* FileSink, in ways similar to the multi file spray, but without knowing the total number of
* buckets ahead of time.
*
* ROW__ID (1,2[0],3) =&gt; bucket_00002
* ROW__ID (1,3[0],4) =&gt; bucket_00003 etc
*
* A new FSP is created for each partition, so this only requires the bucket numbering and that
* is mapped in directly as an index.
*
* This relies on ReduceSinkOperator to shuffle update/delete rows by
* UDFToInteger(RecordIdentifier), i.e. by writerId in ROW__ID.
* {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#getPartitionColsFromBucketColsForUpdateDelete(Operator, boolean)}
*/
public int createDynamicBucket(int bucketNum) {
// this assumes all paths are bucket names (which means no lookup is needed)
int writerOffset = bucketNum;
if (updaters.length <= writerOffset) {
this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
}
if (this.finalPaths[writerOffset] == null) {
// uninitialized bucket
String bucketName =
Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() : parent, bucketName);
this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), bucketName);
}
return writerOffset;
}
} // class FSPaths
private static final long serialVersionUID = 1L;
protected transient FileSystem fs;
protected transient Serializer serializer;
protected final transient LongWritable row_count = new LongWritable();
/**
* The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets,
* it is not a good idea to start so many reducers - if the maximum number of reducers is 100,
* each reducer can write 10 files - this way we effectively get 1000 files.
*/
private transient ExprNodeEvaluator[] partitionEval;
protected transient int totalFiles;
private transient int numFiles;
protected transient boolean multiFileSpray;
protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
private transient boolean isBucketed = false;
private transient int bucketId;
private transient ObjectInspector[] partitionObjectInspectors;
protected transient HivePartitioner<HiveKey, Object> prtner;
protected transient final HiveKey key = new HiveKey();
private transient Configuration hconf;
protected transient FSPaths fsp;
protected transient boolean bDynParts;
private transient SubStructObjectInspector subSetOI;
private transient int timeOut; // JT timeout in msec.
private transient long lastProgressReport = System.currentTimeMillis();
protected transient boolean autoDelete = false;
protected transient JobConf jc;
Class<? extends Writable> outputClass;
String taskId, originalTaskId;
protected boolean filesCreated = false;
protected BitSet filesCreatedPerBucket = new BitSet();
private void initializeSpecPath() {
// For a query of the type:
// insert overwrite table T1
// select * from (subq1 union all subq2)u;
// subQ1 and subQ2 write to directories Parent/Child_1 and
// Parent/Child_2 respectively, and union is removed.
// The movetask that follows subQ1 and subQ2 tasks moves the directory
// 'Parent'
// However, if the above query contains dynamic partitions, subQ1 and
// subQ2 have to write to directories: Parent/DynamicPartition/Child_1
// and Parent/DynamicPartition/Child_1 respectively.
// The movetask that follows subQ1 and subQ2 tasks still moves the directory
// 'Parent'
boolean isLinked = conf.isLinkedFileSink();
if (!isLinked) {
// Simple case - no union.
specPath = conf.getDirName();
unionPath = null;
} else {
isUnionDp = (dpCtx != null);
if (conf.isDirectInsert()) {
specPath = conf.getParentDir();
unionPath = null;
} else if (conf.isMmTable() || isUnionDp) {
// MM tables need custom handling for union suffix; DP tables use parent too.
specPath = conf.getParentDir();
unionPath = conf.getDirName().getName();
} else {
// For now, keep the old logic for non-MM non-DP union case. Should probably be unified.
specPath = conf.getDirName();
unionPath = null;
}
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Setting up FSOP " + System.identityHashCode(this) + " ("
+ conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath + " + " + unionPath);
}
}
/** Kryo ctor. */
protected FileSinkOperator() {
super();
}
public FileSinkOperator(CompilationOpContext ctx) {
super(ctx);
}
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
try {
this.hconf = hconf;
filesCreated = false;
isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
dpCtx = conf.getDynPartCtx();
lbCtx = conf.getLbCtx();
fsp = prevFsp = null;
valToPaths = new HashMap<String, FSPaths>();
taskId = originalTaskId = Utilities.getTaskId(hconf);
initializeSpecPath();
fs = specPath.getFileSystem(hconf);
if (hconf instanceof JobConf) {
jc = (JobConf) hconf;
} else {
// test code path
jc = new JobConf(hconf);
}
try {
createHiveOutputFormat(jc);
} catch (HiveException ex) {
logOutputFormatError(hconf, ex);
throw ex;
}
isCompressed = conf.getCompressed();
if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
parent = Utilities.toTempPath(conf.getFinalDirName());
} else {
parent = Utilities.toTempPath(conf.getDirName());
}
statsFromRecordWriter = new boolean[numFiles];
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties());
outputClass = serializer.getSerializedClass();
destTablePath = conf.getDestPath();
isInsertOverwrite = conf.getInsertOverwrite();
counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP);
if (LOG.isInfoEnabled()) {
LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat +
(isCompressed ? " with compression" : ""));
}
// Timeout is chosen to make sure that even if one iteration takes more than
// half of the script.timeout but less than script.timeout, we will still
// be able to report progress.
timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
if (multiFileSpray) {
partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
int i = 0;
for (ExprNodeDesc e : conf.getPartitionCols()) {
partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
jc.getPartitionerClass(), null);
}
if (dpCtx != null) {
dpSetup();
}
if (lbCtx != null) {
lbSetup();
}
if (!bDynParts) {
fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp)
+ " from ctor; childSpec " + unionPath + ": tmpPath " + fsp.buildTmpPath()
+ ", task path " + fsp.buildTaskOutputTempPath());
}
// Create all the files - this is required because empty files need to be created for
// empty buckets
// createBucketFiles(fsp);
if (!this.isSkewedStoredAsSubDirectories) {
valToPaths.put("", fsp); // special entry for non-DP case
}
}
final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf
.getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
if (isTemporary && fsp != null
&& tmpStorage != StoragePolicyValue.DEFAULT) {
assert !conf.isMmTable(); // Not supported for temp tables.
final Path outputPath = fsp.buildTaskOutputTempPath();
StoragePolicyShim shim = ShimLoader.getHadoopShims()
.getStoragePolicyShim(fs);
if (shim != null) {
// directory creation is otherwise within the writers
fs.mkdirs(outputPath);
shim.setStoragePolicy(outputPath, tmpStorage);
}
}
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
// ROW__ID is always in the first field
recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0);
recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
// bucket is the second field in the record id
bucketField = recIdInspector.getAllStructFieldRefs().get(1);
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
numRows = 0;
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count);
// Setup hashcode
hashFunc = conf.getTableInfo().getBucketingVersion() == 2 ?
ObjectInspectorUtils::getBucketHashCode :
ObjectInspectorUtils::getBucketHashCodeOld;
//Counter for number of rows that are associated with a destination table in FileSinkOperator.
//This count is used to get total number of rows in an insert query.
if (conf.getTableInfo() != null && conf.getTableInfo().getTableName() != null) {
statsMap.put(TOTAL_TABLE_ROWS_WRITTEN, row_count);
}
} catch (HiveException e) {
throw e;
} catch (Exception e) {
throw new HiveException(e);
}
}
public String getCounterName(Counter counter) {
String suffix = Integer.toString(conf.getDestTableId());
String fullName = conf.getTableInfo().getTableName();
if (fullName != null) {
suffix = suffix + "_" + fullName.toLowerCase();
}
return counter + "_" + suffix;
}
private void logOutputFormatError(Configuration hconf, HiveException ex) {
StringBuilder errorWriter = new StringBuilder();
errorWriter.append("Failed to create output format; configuration: ");
// redact sensitive information before logging
HiveConfUtil.dumpConfig(hconf, errorWriter);
Properties tdp = null;
if (this.conf.getTableInfo() != null
&& (tdp = this.conf.getTableInfo().getProperties()) != null) {
errorWriter.append(";\n table properties: { ");
for (Map.Entry<Object, Object> e : tdp.entrySet()) {
errorWriter.append(e.getKey() + ": " + e.getValue() + ", ");
}
errorWriter.append('}');
}
LOG.error(errorWriter.toString(), ex);
}
/**
* Initialize list bucketing information
*/
private void lbSetup() {
this.isSkewedStoredAsSubDirectories = ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir());
}
/**
* Set up for dynamic partitioning including a new ObjectInspector for the output row.
*/
private void dpSetup() {
this.bDynParts = false;
this.numDynParts = dpCtx.getNumDPCols();
this.dpColNames = dpCtx.getDPColNames();
this.maxPartitions = dpCtx.getMaxPartitionsPerNode();
assert numDynParts == dpColNames.size()
: "number of dynamic partitions should be the same as the size of DP mapping";
if (dpColNames != null && dpColNames.size() > 0) {
this.bDynParts = true;
assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
+ inputObjInspectors.length;
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
this.dpStartCol = Utilities.getDPColOffset(conf);
this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
this.dpVals = new ArrayList<String>(numDynParts);
this.dpWritables = new ArrayList<Object>(numDynParts);
}
}
/**
* There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators
* was not correlated correctly with the bucket numbers. So it could happen that rows from different buckets ended up in the same
* FileSinkOperator and got written out into one file. This is not correct, one bucket file must contain rows from the same bucket.
* Therefore the FileSinkOperator got extended with this method to be able to handle rows from different buckets.
* In this case it will create separate files from each bucket. This logic is similar to the one in the createDynamicBucket method.
* @param fsp
* @throws HiveException
*/
protected void createBucketFilesForCompaction(FSPaths fsp) throws HiveException {
try {
if (fsp.outPaths.length < bucketId + 1) {
fsp.updaters = Arrays.copyOf(fsp.updaters, bucketId + 1);
fsp.outPaths = Arrays.copyOf(fsp.outPaths, bucketId + 1);
fsp.finalPaths = Arrays.copyOf(fsp.finalPaths, bucketId + 1);
fsp.outWriters = Arrays.copyOf(fsp.outWriters, bucketId + 1);
statsFromRecordWriter = Arrays.copyOf(statsFromRecordWriter, bucketId + 1);
}
createBucketForFileIdx(fsp, bucketId);
} catch (Exception e) {
throw new HiveException(e);
}
filesCreatedPerBucket.set(bucketId);
}
protected void createBucketFiles(FSPaths fsp) throws HiveException {
try {
int filesIdx = 0;
Set<Integer> seenBuckets = new HashSet<Integer>();
for (int idx = 0; idx < totalFiles; idx++) {
if (this.getExecContext() != null && this.getExecContext().getFileId() != null) {
if (LOG.isInfoEnabled()) {
LOG.info("replace taskId from execContext ");
}
taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId());
if (LOG.isInfoEnabled()) {
LOG.info("new taskId: FS " + taskId);
}
assert !multiFileSpray;
assert totalFiles == 1;
}
int bucketNum = 0;
if (multiFileSpray) {
key.setHashCode(idx);
// Does this hashcode belong to this reducer
int numReducers = totalFiles / numFiles;
if (numReducers > 1) {
int currReducer = Integer.parseInt(Utilities.getTaskIdFromFilename(Utilities
.getTaskId(hconf)));
int reducerIdx = prtner.getPartition(key, null, numReducers);
if (currReducer != reducerIdx) {
continue;
}
}
bucketNum = prtner.getBucket(key, null, totalFiles);
if (seenBuckets.contains(bucketNum)) {
continue;
}
seenBuckets.add(bucketNum);
bucketMap.put(bucketNum, filesIdx);
taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
}
createBucketForFileIdx(fsp, filesIdx);
filesIdx++;
}
assert filesIdx == numFiles;
} catch (Exception e) {
throw new HiveException(e);
}
filesCreated = true;
}
protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
throws HiveException {
try {
if (conf.isCompactionTable()) {
fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId),
isNativeTable(), isSkewedStoredAsSubDirectories);
} else {
fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories);
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
+ "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
+ fsp.buildTmpPath() + ", task " + taskId + ")");
}
if (LOG.isInfoEnabled()) {
LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
}
if (isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()) {
// in recent hadoop versions, use deleteOnExit to clean tmp files.
autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
}
updateDPCounters(fsp, filesIdx);
Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
// only create bucket files only if no dynamic partitions,
// buckets of dynamic partitions will be created for each newly created partition
//todo IOW integration. Full Acid uses the else if block to create Acid's RecordUpdater (HiveFileFormatUtils)
// and that will set writingBase(conf.getInsertOverwrite())
// If MM wants to create a new base for IOW (instead of delta dir), it should specify it here
if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) {
Path outPath = fsp.outPaths[filesIdx];
if (conf.isMmTable()
&& !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
LOG.warn("Unable to create directory with inheritPerms: " + outPath);
}
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
outputClass, conf, outPath, reporter);
// If the record writer provides stats, get it from there instead of the serde
statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
StatsProvidingRecordWriter;
// increment the CREATED_FILES counter
} else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
Path outPath = fsp.outPaths[filesIdx];
if (conf.isDirectInsert()
&& !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
LOG.warn("Unable to create directory: " + outPath);
}
// Only set up the updater for insert. For update and delete we don't know unitl we see
// the row.
ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
String attemptId = null;
if (conf.isDirectInsert()) {
attemptId = taskId.split("_")[1];
}
fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1, attemptId); // outPath.getParent()
}
if (reporter != null) {
reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_FILES, 1);
}
} catch (IOException e) {
throw new HiveException(e);
}
}
private void updateDPCounters(final FSPaths fsp, final int filesIdx) {
// There are 2 cases where we increment CREATED_DYNAMIC_PARTITIONS counters
// 1) Insert overwrite (all partitions are newly created)
// 2) Insert into table which creates new partitions (some new partitions)
if (bDynParts && destTablePath != null && fsp.dpDirForCounters != null) {
Path destPartPath = new Path(destTablePath, fsp.dpDirForCounters);
// For MM tables, directory structure is
// <table-dir>/<partition-dir>/<delta-dir>/
// For Non-MM tables, directory structure is
// <table-dir>/<staging-dir>/<partition-dir>
// if UNION ALL insert, for non-mm tables subquery creates another subdirectory at the end for each union queries
// <table-dir>/<staging-dir>/<partition-dir>/<union-dir>
// for non-MM tables, the final destination partition directory is created during move task via rename
// for MM tables and ACID insert, the final destination partition directory is created by the tasks themselves
try {
if (conf.isMmTable() || conf.isDirectInsert()) {
createDpDir(destPartPath);
} else {
// outPath will be
// non-union case: <table-dir>/<staging-dir>/<partition-dir>/<taskid>
// union case: <table-dir>/<staging-dir>/<partition-dir>/<union-dir>/<taskid>
Path dpStagingDir = fsp.outPaths[filesIdx].getParent();
if (isUnionDp) {
dpStagingDir = dpStagingDir.getParent();
}
if (isInsertOverwrite) {
createDpDir(dpStagingDir);
} else {
createDpDirCheckSrc(dpStagingDir, destPartPath);
}
}
} catch (IOException e) {
LOG.warn("Skipping to increment CREATED_DYNAMIC_PARTITIONS counter.Exception: {}", e.getMessage());
}
}
}
private void createDpDirCheckSrc(final Path dpStagingPath, final Path dpFinalPath) throws IOException {
if (!fs.exists(dpStagingPath) && !fs.exists(dpFinalPath)) {
fs.mkdirs(dpStagingPath);
// move task will create dp final path
if (reporter != null) {
reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
}
}
}
private void createDpDir(final Path dpPath) throws IOException {
if (!fs.exists(dpPath)) {
fs.mkdirs(dpPath);
if (reporter != null) {
reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1);
}
}
}
/**
* Report status to JT so that JT won't kill this task if closing takes too long
* due to too many files to close and the NN is overloaded.
*
* @return true if a new progress update is reported, false otherwise.
*/
protected boolean updateProgress() {
if (reporter != null &&
(System.currentTimeMillis() - lastProgressReport) > timeOut) {
reporter.progress();
lastProgressReport = System.currentTimeMillis();
return true;
} else {
return false;
}
}
protected Writable recordValue;
@Override
public void process(Object row, int tag) throws HiveException {
runTimeNumRows++;
/* Create list bucketing sub-directory only if stored-as-directories is on. */
String lbDirName = null;
lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
if (!bDynParts && (!filesCreated || conf.isCompactionTable())) {
if (lbDirName != null) {
if (valToPaths.get(lbDirName) == null) {
createNewPaths(null, lbDirName);
}
} else if (conf.isCompactionTable()) {
int bucketProperty = getBucketProperty(row);
bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
if (!filesCreatedPerBucket.get(bucketId)) {
createBucketFilesForCompaction(fsp);
}
} else {
createBucketFiles(fsp);
}
}
try {
updateProgress();
// if DP is enabled, get the final output writers and prepare the real output row
assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT
: "input object inspector is not struct";
if (bDynParts) {
// we need to read bucket number which is the last column in value (after partition columns)
if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
numDynParts += 1;
}
// copy the DP column values from the input row to dpVals
dpVals.clear();
dpWritables.clear();
ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol,numDynParts,
(StructObjectInspector) inputObjInspectors[0],ObjectInspectorCopyOption.WRITABLE);
// get a set of RecordWriter based on the DP column values
// pass the null value along to the escaping process to determine what the dir should be
for (Object o : dpWritables) {
if (o == null || o.toString().length() == 0) {
dpVals.add(dpCtx.getDefaultPartitionName());
} else {
dpVals.add(o.toString());
}
}
String invalidPartitionVal;
if((invalidPartitionVal = HiveStringUtils.getPartitionValWithInvalidCharacter(dpVals, dpCtx.getWhiteListPattern()))!=null) {
throw new HiveFatalException("Partition value '" + invalidPartitionVal +
"' contains a character not matched by whitelist pattern '" +
dpCtx.getWhiteListPattern().toString() + "'. " + "(configure with " +
HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname + ")");
}
fpaths = getDynOutPaths(dpVals, lbDirName);
dynamicPartitionSpecs.add(fpaths.dpDirForCounters);
// use SubStructObjectInspector to serialize the non-partitioning columns in the input row
recordValue = serializer.serialize(row, subSetOI);
} else {
if (lbDirName != null) {
fpaths = valToPaths.get(lbDirName);
if (fpaths == null) {
fpaths = createNewPaths(null, lbDirName);
}
} else {
fpaths = fsp;
}
recordValue = serializer.serialize(row, inputObjInspectors[0]);
// if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer
// is kept track of in the SerDe)
if (recordValue == null) {
return;
}
}
rowOutWriters = fpaths.outWriters;
// check if all record writers implement statistics. if atleast one RW
// doesn't implement stats interface we will fallback to conventional way
// of gathering stats
isCollectRWStats = areAllTrue(statsFromRecordWriter);
if (conf.isGatherStats() && !isCollectRWStats) {
SerDeStats stats = serializer.getSerDeStats();
if (stats != null) {
fpaths.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
}
fpaths.addToStat(StatsSetupConst.ROW_COUNT, 1);
}
if ((++numRows == cntr) && LOG.isInfoEnabled()) {
cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows;
if (cntr < 0 || numRows < 0) {
cntr = 0;
numRows = 1;
}
LOG.info(toString() + ": records written - " + numRows);
}
int writerOffset;
// This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
// for a given operator branch prediction should work quite nicely on it.
// RecordUpdater expects to get the actual row, not a serialized version of it. Thus we
// pass the row rather than recordValue.
if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) {
writerOffset = bucketId;
if (!conf.isCompactionTable()) {
writerOffset = findWriterOffset(row);
}
rowOutWriters[writerOffset].write(recordValue);
} else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row);
} else {
// TODO I suspect we could skip much of the stuff above this in the function in the case
// of update and delete. But I don't understand all of the side effects of the above
// code and don't want to skip over it yet.
// Find the bucket id, and switch buckets if need to
ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
int bucketProperty =
bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
int bucketNum =
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
writerOffset = 0;
if (multiFileSpray) {
//bucket_num_reducers_acid.q, TestTxnCommands.testMoreBucketsThanReducers()
if (!bucketMap.containsKey(bucketNum)) {
String extraMsg = " (no path info/)" + recId;
if (fpaths != null && fpaths.finalPaths != null && fpaths.finalPaths.length > 0) {
extraMsg = " (finalPaths[0]=" + fpaths.finalPaths[0] + ")/" + recId;
}
throw new IllegalStateException("Found bucketNum=" + bucketNum +
" from data but no mapping in 'bucketMap'." + extraMsg);
}
writerOffset = bucketMap.get(bucketNum);
} else if (!isBucketed) {
writerOffset = fpaths.createDynamicBucket(bucketNum);
}
if (fpaths.updaters[writerOffset] == null) {
fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
jc, conf.getTableInfo(), bucketNum, conf,
fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
fpaths.outPaths[writerOffset]);
}
}
if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
fpaths.updaters[writerOffset].update(conf.getTableWriteId(), row);
} else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
fpaths.updaters[writerOffset].delete(conf.getTableWriteId(), row);
} else {
throw new HiveException("Unknown write type " + conf.getWriteType().toString());
}
}
} catch (IOException e) {
closeWriters(true);
throw new HiveException(e);
} catch (SerDeException e) {
closeWriters(true);
throw new HiveException(e);
}
}
private void closeWriters(boolean abort) throws HiveException {
fpaths.closeWriters(true);
closeRecordwriters(true);
}
private void closeRecordwriters(boolean abort) {
for (RecordWriter writer : rowOutWriters) {
try {
LOG.info("Closing {} on exception", writer);
writer.close(abort);
} catch (IOException e) {
LOG.error("Error closing rowOutWriter" + writer, e);
}
}
}
protected boolean areAllTrue(boolean[] statsFromRW) {
// If we are doing an acid operation they will always all be true as RecordUpdaters always
// collect stats
if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID && !conf.isMmTable() && !conf.isCompactionTable()) {
return true;
}
for(boolean b : statsFromRW) {
if (!b) {
return false;
}
}
return true;
}
private int findWriterOffset(Object row) throws HiveException {
if (!multiFileSpray) {
return 0;
} else {
assert getConf().getWriteType() != AcidUtils.Operation.DELETE &&
getConf().getWriteType() != AcidUtils.Operation.UPDATE :
"Unexpected operation type: " + getConf().getWriteType();
//this is not used for DELETE commands (partitionEval is not set up correctly
// (or needed) for that
Object[] bucketFieldValues = new Object[partitionEval.length];
for(int i = 0; i < partitionEval.length; i++) {
bucketFieldValues[i] = partitionEval[i].evaluate(row);
}
int keyHashCode = hashFunc.apply(bucketFieldValues, partitionObjectInspectors);
key.setHashCode(keyHashCode);
int bucketNum = prtner.getBucket(key, null, totalFiles);
return bucketMap.get(bucketNum);
}
}
/**
* create new path.
*
* @param dirName
* @return
* @throws HiveException
*/
private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException {
FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert(), conf.getInsertOverwrite());
fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath);
fsp2.subdirBeforeTxn = dpDir;
String pathKey = combinePathFragments(dpDir, lbDir);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("creating new paths {} for {}, childSpec {}: tmpPath {},"
+ " task path {}", System.identityHashCode(fsp2), pathKey, unionPath,
fsp2.buildTmpPath(), fsp2.buildTaskOutputTempPath());
}
if (bDynParts) {
fsp2.dpDirForCounters = pathKey;
}
if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
createBucketFiles(fsp2);
valToPaths.put(pathKey, fsp2);
}
return fsp2;
}
private static String combinePathFragments(String first, String second) {
return first == null ? second : (second == null ? first : first + Path.SEPARATOR + second);
}
/**
* Generate list bucketing directory name from a row.
* @param row row to process.
* @return directory name.
*/
protected String generateListBucketingDirName(Object row) {
if (!this.isSkewedStoredAsSubDirectories) {
return null;
}
String lbDirName = null;
List<String> skewedCols = lbCtx.getSkewedColNames();
List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
if (row != null) {
List<Object> standObjs = new ArrayList<Object>();
List<String> skewedValsCandidate = null;
/* Convert input row to standard objects. */
ObjectInspectorUtils.copyToStandardObject(standObjs, row,
(StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
assert (standObjs.size() >= skewedCols.size()) :
"The row has less number of columns than no. of skewed column.";
skewedValsCandidate = new ArrayList<String>(skewedCols.size());
for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
skewedValsCandidate.add(posPair.getSkewColPosition(),
standObjs.get(posPair.getTblColPosition()).toString());
}
/* The row matches skewed column names. */
if (allSkewedVals.contains(skewedValsCandidate)) {
/* matches skewed values. */
lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
locationMap.put(skewedValsCandidate, lbDirName);
} else {
lbDirName = createDefaultLbDir(skewedCols, locationMap);
}
} else {
lbDirName = createDefaultLbDir(skewedCols, locationMap);
}
return lbDirName;
}
private String createDefaultLbDir(List<String> skewedCols,
Map<List<String>, String> locationMap) {
String lbDirName;
lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
lbCtx.getDefaultDirName());
List<String> defaultKey = Lists.newArrayList(lbCtx.getDefaultKey());
if (!locationMap.containsKey(defaultKey)) {
locationMap.put(defaultKey, lbDirName);
}
return lbDirName;
}
protected FSPaths getDynOutPaths(List<String> row, String lbDir) throws HiveException {
FSPaths fp;
// get the path corresponding to the dynamic partition columns,
String dpDir = getDynPartDirectory(row, dpColNames);
String pathKey = null;
if (dpDir != null) {
String dpAndLbDir = combinePathFragments(dpDir, lbDir);
pathKey = dpAndLbDir;
if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
String buckNum = row.get(row.size() - 1);
taskId = Utilities.replaceTaskIdFromFilename(taskId, buckNum);
pathKey = dpAndLbDir + Path.SEPARATOR + taskId;
}
FSPaths fsp2 = valToPaths.get(pathKey);
if (fsp2 == null) {
// check # of dp
// TODO: add an option to skip this if number of partitions checks is done by Triggers via
// CREATED_DYNAMIC_PARTITION counter
if (valToPaths.size() > maxPartitions) {
// we cannot proceed and need to tell the hive client that retries won't succeed either
throw new HiveFatalException(
ErrorMsg.DYNAMIC_PARTITIONS_TOO_MANY_PER_NODE_ERROR.getErrorCodedMsg()
+ "Maximum was set to " + maxPartitions + " partitions per node"
+ ", number of dynamic partitions on this node: " + valToPaths.size());
}
if (!conf.getDpSortState().equals(DPSortState.NONE) && prevFsp != null) {
// close the previous fsp as it is no longer needed
prevFsp.closeWriters(false);
// since we are closing the previous fsp's record writers, we need to see if we can get
// stats from the record writer and store in the previous fsp that is cached
if (conf.isGatherStats() && isCollectRWStats) {
SerDeStats stats = null;
if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) {
RecordWriter outWriter = prevFsp.outWriters[0];
if (outWriter != null) {
stats = ((StatsProvidingRecordWriter) outWriter).getStats();
}
} else if (prevFsp.updaters[0] != null) {
stats = prevFsp.updaters[0].getStats();
}
if (stats != null && !conf.isFullAcidTable()) {
prevFsp.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
prevFsp.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
}
}
// let writers release the memory for garbage collection
prevFsp.outWriters[0] = null;
prevFsp = null;
}
fsp2 = createNewPaths(dpDir, lbDir);
if (prevFsp == null) {
prevFsp = fsp2;
}
if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
createBucketForFileIdx(fsp2, 0);
valToPaths.put(pathKey, fsp2);
}
}
fp = fsp2;
} else {
fp = fsp;
}
return fp;
}
// given the current input row, the mapping for input col info to dp columns, and # of dp cols,
// return the relative path corresponding to the row.
// e.g., ds=2008-04-08/hr=11
private String getDynPartDirectory(List<String> row, List<String> dpColNames) {
return FileUtils.makePartName(dpColNames, row);
}
@Override
public void closeOp(boolean abort) throws HiveException {
row_count.set(numRows);
LOG.info(toString() + ": records written - " + numRows);
if ("spark".equalsIgnoreCase(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
SparkMetricUtils.updateSparkRecordsWrittenMetrics(runTimeNumRows);
}
if (!bDynParts && !filesCreated) {
boolean isTez = "tez".equalsIgnoreCase(
HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE));
Class<?> clazz = conf.getTableInfo().getOutputFileFormatClass();
boolean isStreaming = StreamingOutputFormat.class.isAssignableFrom(clazz);
// let empty file generation for mm/acid table as a quick and dirty workaround for HIVE-22941
if (!isTez || isStreaming || (this.isInsertOverwrite && (conf.isMmTable() || conf.isFullAcidTable()))) {
createBucketFiles(fsp);
}
}
lastProgressReport = System.currentTimeMillis();
if (!abort) {
// If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size)
// and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full
// (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
if (conf.isUsingBatchingSerDe()) {
try {
recordValue = serializer.serialize(null, inputObjInspectors[0]);
if (null != fpaths) {
rowOutWriters = fpaths.outWriters;
rowOutWriters[0].write(recordValue);
} else if(recordValue instanceof ArrowWrapperWritable) {
//Because LLAP arrow output depends on the ThriftJDBCBinarySerDe code path
//this is required for 0 row outputs
//i.e. we need to write a 0 size batch to signal EOS to the consumer
for (FSPaths fsPaths : valToPaths.values()) {
for(RecordWriter writer : fsPaths.outWriters) {
writer.write(recordValue);
}
}
}
} catch (SerDeException | IOException e) {
throw new HiveException(e);
}
}
List<Path> commitPaths = new ArrayList<>();
for (FSPaths fsp : valToPaths.values()) {
fsp.closeWriters(abort);
// before closing the operator check if statistics gathering is requested
// and is provided by record writer. this is different from the statistics
// gathering done in processOp(). In processOp(), for each row added
// serde statistics about the row is gathered and accumulated in hashmap.
// this adds more overhead to the actual processing of row. But if the
// record writer already gathers the statistics, it can simply return the
// accumulated statistics which will be aggregated in case of spray writers
if (conf.isGatherStats() && isCollectRWStats) {
if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) {
for (int idx = 0; idx < fsp.outWriters.length; idx++) {
RecordWriter outWriter = fsp.outWriters[idx];
if (outWriter != null) {
SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
if (stats != null) {
fsp.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
fsp.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
}
}
}
} else {
for (int i = 0; i < fsp.updaters.length; i++) {
if (fsp.updaters[i] != null) {
SerDeStats stats = fsp.updaters[i].getStats();
if (stats != null) {
fsp.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
fsp.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
}
}
}
}
}
if (isNativeTable()) {
fsp.commit(fs, commitPaths);
}
if ("spark".equals(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE))) {
SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths);
}
}
if (conf.isMmTable() || conf.isDirectInsert()) {
Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(),
conf.getStatementId(), unionPath, conf.getInsertOverwrite(), bDynParts, dynamicPartitionSpecs,
conf.getStaticSpec());
}
// Only publish stats if this operator's flag was set to gather stats
if (conf.isGatherStats()) {
publishStats();
}
} else {
// Will come here if an Exception was thrown in map() or reduce().
// Hadoop always call close() even if an Exception was thrown in map() or
// reduce().
for (FSPaths fsp : valToPaths.values()) {
fsp.abortWritersAndUpdaters(fs, abort,
!autoDelete && isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert());
}
}
fsp = prevFsp = null;
super.closeOp(abort);
}
/**
* @return the name of the operator
*/
@Override
public String getName() {
return getOperatorName();
}
static public String getOperatorName() {
return "FS";
}
@Override
public void jobCloseOp(Configuration hconf, boolean success)
throws HiveException {
try {
if ((conf != null) && isNativeTable()) {
Path specPath = conf.getDirName();
String unionSuffix = null;
DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
ListBucketingCtx lbCtx = conf.getLbCtx();
if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) {
specPath = conf.getParentDir();
unionSuffix = conf.getDirName().getName();
}
if (conf.isLinkedFileSink() && conf.isDirectInsert()) {
specPath = conf.getParentDir();
unionSuffix = null;
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath);
}
if (!conf.isMmTable() && !conf.isDirectInsert()) {
Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter);
} else {
int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
lbLevels = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
// TODO: why is it stored in both table and dpCtx?
int numBuckets = (conf.getTable() != null) ? conf.getTable().getNumBuckets()
: (dpCtx != null ? dpCtx.getNumBuckets() : 0);
MissingBucketsContext mbc = new MissingBucketsContext(
conf.getTableInfo(), numBuckets, conf.getCompressed());
Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc,
conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf
.getInsertOverwrite(), conf.isDirectInsert(), conf.getStaticSpec());
}
}
} catch (IOException e) {
throw new HiveException(e);
}
super.jobCloseOp(hconf, success);
}
@Override
public OperatorType getType() {
return OperatorType.FILESINK;
}
@Override
public void augmentPlan() {
PlanUtils.configureOutputJobPropertiesForStorageHandler(
getConf().getTableInfo());
}
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
if (hiveOutputFormat == null) {
try {
createHiveOutputFormat(job);
} catch (HiveException ex) {
logOutputFormatError(job, ex);
throw new IOException(ex);
}
}
if (conf.getTableInfo().isNonNative()) {
//check the ouput specs only if it is a storage handler (native tables's outputformats does
//not set the job's output properties correctly)
try {
hiveOutputFormat.checkOutputSpecs(ignored, job);
} catch (NoSuchMethodError e) {
//For BC, ignore this for now, but leave a log message
LOG.warn("HiveOutputFormat should implement checkOutputSpecs() method`");
}
}
}
private void createHiveOutputFormat(JobConf job) throws HiveException {
if (hiveOutputFormat == null) {
Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), job);
}
try {
hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(job, getConf().getTableInfo());
} catch (Throwable t) {
throw (t instanceof HiveException) ? (HiveException)t : new HiveException(t);
}
}
private void publishStats() throws HiveException {
boolean isStatsReliable = conf.isStatsReliable();
// Initializing a stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
if (statsPublisher == null) {
// just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
if (isStatsReliable) {
throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
}
return;
}
StatsCollectionContext sContext = new StatsCollectionContext(hconf);
sContext.setStatsTmpDir(conf.getTmpStatsDir());
if (!statsPublisher.connect(sContext)) {
// just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: cannot connect to database");
if (isStatsReliable) {
throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
}
return;
}
String spSpec = conf.getStaticSpec();
for (Map.Entry<String, FSPaths> entry : valToPaths.entrySet()) {
String fspKey = entry.getKey(); // DP/LB
FSPaths fspValue = entry.getValue();
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Observing entry for stats " + fspKey
+ " => FSP with tmpPath " + fspValue.buildTmpPath());
}
// for bucketed tables, hive.optimize.sort.dynamic.partition optimization
// adds the taskId to the fspKey.
if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
String taskID = Utilities.getTaskIdFromFilename(fspKey);
// if length of (prefix/ds=__HIVE_DEFAULT_PARTITION__/000000_0) is greater than max key prefix
// and if (prefix/ds=10/000000_0) is less than max key prefix, then former will get hashed
// to a smaller prefix (MD5hash/000000_0) and later will stored as such in staging stats table.
// When stats gets aggregated in StatsTask only the keys that starts with "prefix" will be fetched.
// Now that (prefix/ds=__HIVE_DEFAULT_PARTITION__) is hashed to a smaller prefix it will
// not be retrieved from staging table and hence not aggregated. To avoid this issue
// we will remove the taskId from the key which is redundant anyway.
fspKey = fspKey.split(taskID)[0];
}
// split[0] = DP, split[1] = LB
String[] split = splitKey(fspKey);
String dpSpec = split[0];
// key = "database.table/SP/DP/"LB/
// Hive store lowercase table name in metastore, and Counters is character case sensitive, so we
// use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter.
String prefix = conf.getTableInfo().getTableName().toLowerCase();
prefix = Utilities.join(prefix, spSpec, dpSpec);
prefix = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR;
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace(
"Prefix for stats " + prefix + " (from " + spSpec + ", " + dpSpec + ")");
}
Map<String, String> statsToPublish = new HashMap<String, String>();
for (String statType : fspValue.getStoredStats()) {
statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType)));
}
if (!statsPublisher.publishStat(prefix, statsToPublish)) {
LOG.error("Failed to publish stats");
// The original exception is lost.
// Not changing the interface to maintain backward compatibility
if (isStatsReliable) {
throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
}
}
}
sContext.setContextSuffix(getOperatorId());
if (!statsPublisher.closeConnection(sContext)) {
LOG.error("Failed to close stats");
// The original exception is lost.
// Not changing the interface to maintain backward compatibility
if (isStatsReliable) {
throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
}
}
}
/**
* This is server side code to create key in order to save statistics to stats database.
* Client side will read it via StatsTask.java aggregateStats().
* Client side reads it via db query prefix which is based on partition spec.
* Since store-as-subdir information is not part of partition spec, we have to
* remove store-as-subdir information from variable "keyPrefix" calculation.
* But we have to keep store-as-subdir information in variable "key" calculation
* since each skewed value has a row in stats db and "key" is db key,
* otherwise later value overwrites previous value.
* Performance impact due to string handling is minimum since this method is
* only called once in FileSinkOperator closeOp().
* For example,
* create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES;
* skewedValueDirList contains 2 elements:
* 1. key=484/value=val_484
* 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
* Case #1: Static partition with store-as-sub-dir
* spSpec has SP path
* fspKey has either
* key=484/value=val_484 or
* HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
* After filter, fspKey is empty, storedAsDirPostFix has either
* key=484/value=val_484 or
* HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
* so, at the end, "keyPrefix" doesnt have subdir information but "key" has
* Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr
* spSpec has SP path
* fspKey has either
* hr=11/key=484/value=val_484 or
* hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
* After filter, fspKey is hr=11, storedAsDirPostFix has either
* key=484/value=val_484 or
* HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
* so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has
*
* In a word, fspKey is consists of DP(dynamic partition spec) + LB(list bucketing spec)
* In stats publishing, full partition spec consists of prefix part of stat key
* but list bucketing spec is regarded as a postfix of stat key. So we split it here.
*/
private String[] splitKey(String fspKey) {
if (!fspKey.isEmpty() && isSkewedStoredAsSubDirectories) {
for (String dir : lbCtx.getSkewedValuesDirNames()) {
int index = fspKey.indexOf(dir);
if (index >= 0) {
return new String[] {fspKey.substring(0, index), fspKey.substring(index + 1)};
}
}
}
return new String[] {fspKey, null};
}
/**
* Check if nested column paths is set for 'conf'.
* If set, create a copy of 'conf' with this property unset.
*/
private Configuration unsetNestedColumnPaths(Configuration conf) {
if (conf.get(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR) != null) {
Configuration confCopy = new Configuration(conf);
confCopy.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
return confCopy;
}
return conf;
}
private boolean isNativeTable() {
return !conf.getTableInfo().isNonNative();
}
@Override
public void configureJobConf(JobConf job) {
if (conf.getInsertOverwrite()) {
job.setBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, true);
}
}
/**
* Get the bucket property as an int from the row. This is necessary because
* VectorFileSinkOperator wraps row values in Writable objects.
* @param row as Object
* @return bucket property as int
*/
private int getBucketProperty(Object row) {
Object bucketProperty = ((Object[]) row)[2];
if (bucketProperty instanceof Writable) {
return ((IntWritable) bucketProperty).get();
} else {
return (int) bucketProperty;
}
}
}