blob: 6d78e70cffbbc059f5e955b3cf89e755623eb3fe [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hive.streaming;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.HeapMemoryMonitor;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractRecordWriter implements RecordWriter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
private Integer statementId;
protected HiveConf conf;
protected StreamingConnection conn;
protected Table table;
protected List<String> inputColumns;
protected List<String> inputTypes;
protected String fullyQualifiedTableName;
protected Map<String, List<RecordUpdater>> updaters = new HashMap<>();
protected Map<String, Path> partitionPaths = new HashMap<>();
protected Set<String> updatedPartitions = new HashSet<>();
// input OI includes table columns + partition columns
protected StructObjectInspector inputRowObjectInspector;
// output OI strips off the partition columns and retains other columns
protected ObjectInspector outputRowObjectInspector;
protected List<String> partitionColumns = new ArrayList<>();
protected ObjectInspector[] partitionObjInspectors = null;
protected StructField[] partitionStructFields = null;
protected Object[] partitionFieldData;
protected ObjectInspector[] bucketObjInspectors = null;
protected StructField[] bucketStructFields = null;
protected Object[] bucketFieldData;
protected List<Integer> bucketIds = new ArrayList<>();
protected int totalBuckets;
protected String defaultPartitionName;
protected boolean isBucketed;
protected AcidOutputFormat<?, ?> acidOutputFormat;
protected Long curBatchMinWriteId;
protected Long curBatchMaxWriteId;
protected final String lineDelimiter;
protected HeapMemoryMonitor heapMemoryMonitor;
// if low memory canary is set and if records after set canary exceeds threshold, trigger a flush.
// This is to avoid getting notified of low memory too often and flushing too often.
protected AtomicBoolean lowMemoryCanary;
protected long ingestSizeBytes = 0;
protected boolean autoFlush;
protected float memoryUsageThreshold;
protected long ingestSizeThreshold;
protected FileSystem fs;
public AbstractRecordWriter(final String lineDelimiter) {
this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ?
protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName());
private final AtomicBoolean lowMemoryCanary;
OrcMemoryPressureMonitor(final AtomicBoolean lowMemoryCanary) {
this.lowMemoryCanary = lowMemoryCanary;
public void memoryUsageAboveThreshold(final long usedMemory, final long maxMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Orc memory pressure notified! usedMemory: {} maxMemory: {}.",
LlapUtil.humanReadableByteCount(usedMemory), LlapUtil.humanReadableByteCount(maxMemory));
public void init(StreamingConnection conn, long minWriteId, long maxWriteId)
throws StreamingException {
init(conn, minWriteId, maxWriteId, -1);
public void init(StreamingConnection conn, long minWriteId, long maxWriteId,
int statementId) throws StreamingException {
if (conn == null) {
throw new StreamingException("Streaming connection cannot be null during record writer initialization");
this.conn = conn;
this.curBatchMinWriteId = minWriteId;
this.curBatchMaxWriteId = maxWriteId;
this.statementId = statementId;
this.conf = conn.getHiveConf();
this.defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
this.table = conn.getTable();
String location = table.getSd().getLocation();
try {
URI uri = new URI(location);
this.fs = FileSystem.newInstance(uri, conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Created new filesystem instance: {}", System.identityHashCode(this.fs));
} catch (URISyntaxException e) {
throw new StreamingException("Unable to create URI from location: " + location, e);
} catch (IOException e) {
throw new StreamingException("Unable to get filesystem for location: " + location, e);
this.inputColumns = table.getSd().getCols().stream().map(FieldSchema::getName).collect(Collectors.toList());
this.inputTypes = table.getSd().getCols().stream().map(FieldSchema::getType).collect(Collectors.toList());
if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
this.partitionColumns = table.getPartitionKeys().stream().map(FieldSchema::getName)
this.fullyQualifiedTableName = Warehouse.getQualifiedName(table.getDbName(), table.getTableName());
String outFormatName = this.table.getSd().getOutputFormat();
try {
this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils
.newInstance(JavaUtils.loadClass(outFormatName), conf);
} catch (Exception e) {
String shadePrefix = conf.getVar(HiveConf.ConfVars.HIVE_CLASSLOADER_SHADE_PREFIX);
if (shadePrefix != null && !shadePrefix.trim().isEmpty()) {
try {"Shade prefix: {} specified. Using as fallback to load {}..", shadePrefix, outFormatName);
this.acidOutputFormat = (AcidOutputFormat<?, ?>) ReflectionUtils
.newInstance(JavaUtils.loadClass(shadePrefix, outFormatName), conf);
} catch (ClassNotFoundException e1) {
throw new StreamingException(e.getMessage(), e);
} else {
throw new StreamingException(e.getMessage(), e);
try {
final AbstractSerDe serDe = createSerde();
this.inputRowObjectInspector = (StructObjectInspector) serDe.getObjectInspector();
if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
int dpStartCol = inputRowObjectInspector.getAllStructFieldRefs().size() - table.getPartitionKeys().size();
this.outputRowObjectInspector = new SubStructObjectInspector(inputRowObjectInspector, 0, dpStartCol);
} else {
this.outputRowObjectInspector = inputRowObjectInspector;
} catch (SerDeException e) {
throw new StreamingException("Unable to create SerDe", e);
protected void setupMemoryMonitoring() {
this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);"Memory monitoring settings - autoFlush: {} memoryUsageThreshold: {} ingestSizeThreshold: {}",
autoFlush, memoryUsageThreshold, ingestSizeBytes);
this.heapMemoryMonitor = new HeapMemoryMonitor(memoryUsageThreshold);
MemoryUsage tenuredMemUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
if (tenuredMemUsage != null) {
lowMemoryCanary = new AtomicBoolean(false);
heapMemoryMonitor.registerListener(new OrcMemoryPressureMonitor(lowMemoryCanary));
// alert if we already running low on memory (starting with low memory will lead to frequent auto flush)
float currentUsage = (float) tenuredMemUsage.getUsed() / (float) tenuredMemUsage.getMax();
if (currentUsage > memoryUsageThreshold) {
LOG.warn("LOW MEMORY ALERT! Tenured gen memory is already low. Increase memory to improve performance." +
" Used: {} Max: {}", LlapUtil.humanReadableByteCount(tenuredMemUsage.getUsed()),
protected void prepareBucketingFields() {
this.isBucketed = table.getSd().getNumBuckets() > 0;
// For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which
// ends up writing to a file bucket_000000.
// See also {@link #getBucket(Object)}
this.totalBuckets = isBucketed ? table.getSd().getNumBuckets() : 1;
if (isBucketed) {
this.bucketIds = getBucketColIDs(table.getSd().getBucketCols(), table.getSd().getCols());
this.bucketFieldData = new Object[bucketIds.size()];
this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, inputRowObjectInspector);
this.bucketStructFields = new StructField[bucketIds.size()];
List<? extends StructField> allFields = inputRowObjectInspector.getAllStructFieldRefs();
for (int i = 0; i < bucketIds.size(); i++) {
bucketStructFields[i] = allFields.get(bucketIds.get(i));
protected void preparePartitioningFields() {
final int numPartitions = table.getPartitionKeys().size();
this.partitionFieldData = new Object[numPartitions];
this.partitionObjInspectors = new ObjectInspector[numPartitions];
int startIdx = inputRowObjectInspector.getAllStructFieldRefs().size() - numPartitions;
int endIdx = inputRowObjectInspector.getAllStructFieldRefs().size();
int j = 0;
for (int i = startIdx; i < endIdx; i++) {
StructField structField = inputRowObjectInspector.getAllStructFieldRefs().get(i);
partitionObjInspectors[j++] = structField.getFieldObjectInspector();
this.partitionStructFields = new StructField[partitionColumns.size()];
for (int i = 0; i < partitionColumns.size(); i++) {
String partCol = partitionColumns.get(i);
partitionStructFields[i] = inputRowObjectInspector.getStructFieldRef(partCol);
* used to tag error msgs to provided some breadcrumbs
protected String getWatermark(String partition) {
return partition + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
// return the column numbers of the bucketed columns
protected List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
ArrayList<Integer> result = new ArrayList<>(bucketCols.size());
HashSet<String> bucketSet = new HashSet<>(bucketCols);
for (int i = 0; i < cols.size(); i++) {
if (bucketSet.contains(cols.get(i).getName())) {
return result;
* Create SerDe for the record writer.
* @return - serde
* @throws SerializationError - if serde cannot be created.
public abstract AbstractSerDe createSerde() throws SerializationError;
* Encode a record as an Object that Hive can read with the ObjectInspector associated with the
* serde returned by {@link #createSerde}. This is public so that test frameworks can use it.
* @param record record to be deserialized
* @return deserialized record as an Object
* @throws SerializationError - any error during serialization or deserialization of record
public abstract Object encode(byte[] record) throws SerializationError;
// returns the bucket number to which the record belongs to
protected int getBucket(Object row) {
if (!isBucketed) {
return 0;
Object[] bucketFields = getBucketFields(row);
int bucketingVersion = Utilities.getBucketingVersion(
return bucketingVersion == 2 ?
ObjectInspectorUtils.getBucketNumber(bucketFields, bucketObjInspectors, totalBuckets) :
ObjectInspectorUtils.getBucketNumberOld(bucketFields, bucketObjInspectors, totalBuckets);
protected List<String> getPartitionValues(final Object row) {
if (!conn.isPartitionedTable()) {
return null;
List<String> partitionValues = new ArrayList<>();
if (conn.isPartitionedTable() && conn.isDynamicPartitioning()) {
Object[] partitionFields = getPartitionFields(row);
for (int i = 0; i < partitionObjInspectors.length; i++) {
ObjectInspector oi = partitionObjInspectors[i];
Object field = partitionFields[i];
Object partitionValue = ObjectInspectorUtils.copyToStandardObject(field, oi, ObjectInspectorUtils
if (partitionValue == null || partitionValue.toString().length() == 0) {
} else {
} else {
partitionValues = conn.getStaticPartitionValues();
return partitionValues;
public void flush() throws StreamingIOFailure {
try {
if (LOG.isDebugEnabled()) {
logStats("Stats before flush:");
for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing record updater for partitions: {}", entry.getKey());
for (RecordUpdater updater : entry.getValue()) {
if (updater != null) {
ingestSizeBytes = 0;
if (LOG.isDebugEnabled()) {
logStats("Stats after flush:");
} catch (IOException e) {
throw new StreamingIOFailure("Unable to flush recordUpdater", e);
public void close() throws StreamingIOFailure {
if(heapMemoryMonitor != null) {
boolean haveError = false;
String partition = null;
if (LOG.isDebugEnabled()) {
logStats("Stats before close:");
for (Map.Entry<String, List<RecordUpdater>> entry : updaters.entrySet()) {
partition = entry.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Closing updater for partitions: {}", partition);
for (RecordUpdater updater : entry.getValue()) {
if (updater != null) {
try {
//try not to leave any files open
} catch (Exception ex) {
haveError = true;
LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
if (LOG.isDebugEnabled()) {
logStats("Stats after close:");
try {
if(this.fs != null) {
} catch (IOException e) {
throw new StreamingIOFailure("Error while closing FileSystem", e);
if (haveError) {
throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition));
protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
, StructObjectInspector recordObjInspector) {
ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
for (int i = 0; i < bucketIds.size(); i++) {
int bucketId = bucketIds.get(i);
result[i] =
return result;
protected Object[] getBucketFields(Object row) {
for (int i = 0; i < bucketIds.size(); i++) {
bucketFieldData[i] = inputRowObjectInspector.getStructFieldData(row, bucketStructFields[i]);
return bucketFieldData;
protected Object[] getPartitionFields(Object row) {
for (int i = 0; i < partitionFieldData.length; i++) {
partitionFieldData[i] = inputRowObjectInspector.getStructFieldData(row, partitionStructFields[i]);
return partitionFieldData;
public void write(final long writeId, final InputStream inputStream) throws StreamingException {
try (Scanner scanner = new Scanner(inputStream).useDelimiter(lineDelimiter)) {
while (scanner.hasNext()) {
public void write(final long writeId, final byte[] record) throws StreamingException {
ingestSizeBytes += record.length;
try {
Object encodedRow = encode(record);
int bucket = getBucket(encodedRow);
List<String> partitionValues = getPartitionValues(encodedRow);
getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow);
// ingest size bytes gets resetted on flush() whereas connection stats is not
} catch (IOException e) {
throw new StreamingIOFailure("Error writing record in transaction write id ("
+ writeId + ")", e);
protected void checkAutoFlush() throws StreamingIOFailure {
if (!autoFlush) {
if (lowMemoryCanary != null) {
if (lowMemoryCanary.get() && ingestSizeBytes > ingestSizeThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug("Low memory canary is set and ingestion size (buffered) threshold '{}' exceeded. " +
"Flushing all record updaters..", LlapUtil.humanReadableByteCount(ingestSizeThreshold));
} else {
if (ingestSizeBytes > ingestSizeThreshold) {
MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = mxBean.getHeapMemoryUsage();
float memUsedFraction = ((float) heapUsage.getUsed() / (float) heapUsage.getMax());
if (memUsedFraction > memoryUsageThreshold) {
if (LOG.isDebugEnabled()) {"Memory usage threshold '{}' and ingestion size (buffered) threshold '{}' exceeded. " +
"Flushing all record updaters..", memUsedFraction, LlapUtil.humanReadableByteCount(ingestSizeThreshold));
* @return the list of newly added or updated partitions.
public Set<String> getPartitions() {
return updatedPartitions;
protected RecordUpdater createRecordUpdater(List<String> partitionValues, final Path partitionPath,
int bucketId, Long minWriteId, Long maxWriteID)
throws IOException {
// Initialize table properties from the table parameters. This is required because the table
// may define certain table parameters that may be required while writing. The table parameter
// 'transactional_properties' is one such example.
Properties tblProperties = new Properties();
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
// Add write directory information in the connection object.
conn.addWriteDirectoryInfo(partitionValues, AcidUtils.baseOrDeltaSubdirPath(partitionPath, options));
return acidOutputFormat.getRecordUpdater(partitionPath, options);
* Returns the file that would be used to store rows under this.
* parameters
* @param partitionValues partition values
* @param bucketId bucket id
* @param minWriteId min write Id
* @param maxWriteId max write Id
* @param statementId statement Id
* @param table table
* @return the location of the file.
* @throws StreamingException when the path is not found
public Path getDeltaFileLocation(List<String> partitionValues,
Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId,
Table table) throws StreamingException {
Path destLocation;
if (partitionValues == null) {
destLocation = new Path(table.getSd().getLocation());
} else {
Map<String, String> partSpec = Warehouse.makeSpecFromValues(
table.getPartitionKeys(), partitionValues);
try {
destLocation = new Path(table.getDataLocation(), Warehouse.makePartPath(partSpec));
} catch (MetaException e) {
throw new StreamingException("Unable to retrieve the delta file location"
+ " for values: " + partitionValues
+ ", minWriteId: " + minWriteId
+ ", maxWriteId: " + maxWriteId
+ ", statementId: " + statementId, e);
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
return AcidUtils.createFilename(destLocation, options);
protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
RecordUpdater recordUpdater;
String key;
Path destLocation;
try {
key = partitionValues == null ? fullyQualifiedTableName : partitionValues.toString();
// add partition in metastore for dynamic partition. We make a metastore call for every new partition value that
// we encounter even if partition already exists (exists check require a metastore call anyways).
if (partitionPaths.containsKey(key)) {
destLocation = partitionPaths.get(key);
} else {
// un-partitioned table
if (partitionValues == null) {
destLocation = new Path(table.getSd().getLocation());
} else {
PartitionInfo partitionInfo = conn.createPartitionIfNotExists(partitionValues);
// collect the newly added/updated partitions. connection.commitTransaction() will report the dynamically
// added partitions to TxnHandler
destLocation = new Path(partitionInfo.getPartitionLocation());
partitionPaths.put(key, destLocation);
updaters.computeIfAbsent(key, k -> initializeBuckets());
recordUpdater = updaters.get(key).get(bucketId);
} catch (StreamingException e) {
throw new StreamingIOFailure("Unable to create partition: " + partitionValues + "for " + conn, e);
if (recordUpdater == null) {
try {
recordUpdater = createRecordUpdater(partitionValues, destLocation,
bucketId, curBatchMinWriteId, curBatchMaxWriteId);
} catch (IOException e) {
String errMsg = "Failed creating RecordUpdater for " + getWatermark(destLocation.toString());
LOG.error(errMsg, e);
throw new StreamingIOFailure(errMsg, e);
List<RecordUpdater> partitionUpdaters = updaters.get(key);
partitionUpdaters.set(bucketId, recordUpdater);
return recordUpdater;
protected List<RecordUpdater> initializeBuckets() {
List<RecordUpdater> result = new ArrayList<>(totalBuckets);
for (int bucket = 0; bucket < totalBuckets; bucket++) {
result.add(bucket, null); //so that get(i) returns null rather than ArrayOutOfBounds
return result;
protected void logStats(final String prefix) {
int openRecordUpdaters = updaters.values()
long bufferedRecords = updaters.values()
MemoryUsage memoryUsage = heapMemoryMonitor == null ? null : heapMemoryMonitor.getTenuredGenMemoryUsage();
String oldGenUsage = "NA";
if (memoryUsage != null) {
oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" +
LOG.debug("{} [record-updaters: {}, partitions: {}, buffered-records: {} total-records: {} " +
"buffered-ingest-size: {}, total-ingest-size: {} tenured-memory-usage: {}]", prefix, openRecordUpdaters,
partitionPaths.size(), bufferedRecords, conn == null ? 0 : conn.getConnectionStats().getRecordsWritten(),
LlapUtil.humanReadableByteCount(conn == null ? 0 : conn.getConnectionStats().getRecordsSize()), oldGenUsage);