blob: f203756867f4d53860c152e87bdfc65bf645f13c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
*
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
* It flushes(write) the records batch when the batch size exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
* or the total buffer size exceeds the configured size {@link FlinkOptions#WRITE_TASK_MAX_SIZE}
* or a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
*
* <p><h2>The Semantics</h2>
*
* <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
* starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
*
* <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until
* the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing,
* when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata
* can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job
* recovers.
*
* <p><h2>Fault Tolerance</h2>
*
* <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully.
* It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint,
* the write function blocks data buffer flushing for the configured checkpoint timeout
* before it throws exception, any checkpoint failure would finally trigger the job failure.
*
* <p>Note: The function task requires the input stream be shuffled by the file IDs.
*
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
/**
* Write buffer as buckets for a checkpoint. The key is bucket ID.
*/
private transient Map<String, DataBucket> buckets;
protected transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
private transient HoodieRecordMerger recordMerger;
/**
* Total size tracer.
*/
private transient TotalSizeTracer tracer;
/**
* Metrics for flink stream write.
*/
protected transient FlinkStreamWriteMetrics writeMetrics;
/**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
*/
public StreamWriteFunction(Configuration config) {
super(config);
}
@Override
public void open(Configuration parameters) throws IOException {
this.tracer = new TotalSizeTracer(this.config);
initBuffer();
initWriteFunction();
initMergeClass();
registerMetrics();
}
@Override
public void snapshotState() {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
}
@Override
public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
bufferRecord((HoodieRecord<?>) value);
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
super.endInput();
flushRemaining(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear();
}
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
@VisibleForTesting
@SuppressWarnings("rawtypes")
public Map<String, List<HoodieRecord>> getDataBuffer() {
Map<String, List<HoodieRecord>> ret = new HashMap<>();
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
ret.put(entry.getKey(), entry.getValue().writeBuffer());
}
return ret;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initBuffer() {
this.buckets = new LinkedHashMap<>();
}
private void initWriteFunction() {
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
switch (WriteOperationType.fromValue(writeOperation)) {
case INSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
break;
case UPSERT:
case DELETE: // shares the code path with UPSERT
case DELETE_PREPPED:
this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
break;
case INSERT_OVERWRITE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
break;
case INSERT_OVERWRITE_TABLE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
break;
default:
throw new RuntimeException("Unsupported write operation : " + writeOperation);
}
}
private void initMergeClass() {
recordMerger = HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
LOG.info("init hoodie merge with class [{}]", recordMerger.getClass().getName());
}
/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
*
* <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem}
* for buffering, it then transforms back to the {@link HoodieRecord} before flushing.
*/
private static class DataItem {
private final String key; // record key
private final String instant; // 'U' or 'I'
private final HoodieRecordPayload<?> data; // record payload
private final HoodieOperation operation; // operation
private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) {
this.key = key;
this.instant = instant;
this.data = data;
this.operation = operation;
}
public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
return new DataItem(
record.getRecordKey(),
record.getCurrentLocation().getInstantTime(),
((HoodieAvroRecord) record).getData(),
record.getOperation());
}
public HoodieRecord<?> toHoodieRecord(String partitionPath) {
HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data, operation);
HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
record.setCurrentLocation(loc);
return record;
}
}
/**
* Data bucket.
*/
protected static class DataBucket {
private final List<DataItem> records;
private final BufferSizeDetector detector;
private final String partitionPath;
private final String fileID;
private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
this.records = new ArrayList<>();
this.detector = new BufferSizeDetector(batchSize);
this.partitionPath = hoodieRecord.getPartitionPath();
this.fileID = hoodieRecord.getCurrentLocation().getFileId();
}
/**
* Prepare the write data buffer: patch up all the records with correct partition path.
*/
public List<HoodieRecord> writeBuffer() {
// rewrite all the records with new record key
return records.stream()
.map(record -> record.toHoodieRecord(partitionPath))
.collect(Collectors.toList());
}
/**
* Sets up before flush: patch up the first record with correct partition path and fileID.
*
* <p>Note: the method may modify the given records {@code records}.
*/
public void preWrite(List<HoodieRecord> records) {
// rewrite the first record with expected fileID
HoodieRecord<?> first = records.get(0);
HoodieRecord<?> record = new HoodieAvroRecord<>(first.getKey(), (HoodieRecordPayload) first.getData(), first.getOperation());
HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
record.setCurrentLocation(newLoc);
records.set(0, record);
}
public void reset() {
this.records.clear();
this.detector.reset();
}
}
/**
* Tool to detect if to flush out the existing buffer.
* Sampling the record to compute the size with 0.01 percentage.
*/
private static class BufferSizeDetector {
private final Random random = new Random(47);
private static final int DENOMINATOR = 100;
private final double batchSizeBytes;
private long lastRecordSize = -1L;
private long totalSize = 0L;
BufferSizeDetector(double batchSizeMb) {
this.batchSizeBytes = batchSizeMb * 1024 * 1024;
}
boolean detect(Object record) {
if (lastRecordSize == -1 || sampling()) {
lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
}
totalSize += lastRecordSize;
return totalSize > this.batchSizeBytes;
}
boolean sampling() {
// 0.01 sampling percentage
return random.nextInt(DENOMINATOR) == 1;
}
void reset() {
this.lastRecordSize = -1L;
this.totalSize = 0L;
}
}
/**
* Tool to trace the total buffer size. It computes the maximum buffer size,
* if current buffer size is greater than the maximum buffer size, the data bucket
* flush triggers.
*/
private static class TotalSizeTracer {
private long bufferSize = 0L;
private final double maxBufferSize;
TotalSizeTracer(Configuration conf) {
long mergeReaderMem = 100; // constant 100MB
long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024;
final String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)",
FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
ValidationUtils.checkState(this.maxBufferSize > 0, errMsg);
}
/**
* Trace the given record size {@code recordSize}.
*
* @param recordSize The record size
* @return true if the buffer size exceeds the maximum buffer size
*/
boolean trace(long recordSize) {
this.bufferSize += recordSize;
return this.bufferSize > this.maxBufferSize;
}
void countDown(long size) {
this.bufferSize -= size;
}
public void reset() {
this.bufferSize = 0;
}
}
/**
* Returns the bucket ID with the given value {@code value}.
*/
private String getBucketID(HoodieRecord<?> record) {
final String fileId = record.getCurrentLocation().getFileId();
return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
}
/**
* Buffers the given record.
*
* <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
*
* <p>Flush the max size data bucket if the total buffer size exceeds the configured
* threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}.
*
* @param value HoodieRecord
*/
protected void bufferRecord(HoodieRecord<?> value) {
writeMetrics.markRecordIn();
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
final DataItem item = DataItem.fromHoodieRecord(value);
bucket.records.add(item);
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
// update buffer metrics after tracing buffer size
writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
if (flushBucket) {
if (flushBucket(bucket)) {
this.tracer.countDown(bucket.detector.totalSize);
bucket.reset();
}
} else if (flushBuffer) {
// find the max size bucket and flush it out
DataBucket bucketToFlush = this.buckets.values().stream()
.max(Comparator.comparingLong(b -> b.detector.totalSize))
.orElseThrow(NoSuchElementException::new);
if (flushBucket(bucketToFlush)) {
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
} else {
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
}
}
}
private boolean hasData() {
return this.buckets.size() > 0
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
records = deduplicateRecordsIfNeeded(records);
final List<WriteStatus> writeStatus = writeBucket(instant, bucket, records);
records.clear();
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.build();
this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true;
}
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean endInput) {
writeMetrics.startDataFlush();
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
}
final List<WriteStatus> writeStatus;
if (buckets.size() > 0) {
writeStatus = new ArrayList<>();
this.buckets.values()
// The records are partitioned by the bucket ID and each batch sent to
// the writer belongs to one bucket.
.forEach(bucket -> {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
records = deduplicateRecordsIfNeeded(records);
writeStatus.addAll(writeBucket(currentInstant, bucket, records));
records.clear();
bucket.reset();
}
});
} else {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
}
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
writeMetrics.endDataFlush();
writeMetrics.resetAfterCommit();
}
private void registerMetrics() {
MetricGroup metrics = getRuntimeContext().getMetricGroup();
writeMetrics = new FlinkStreamWriteMetrics(metrics);
writeMetrics.registerMetrics();
}
protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, List<HoodieRecord> records) {
bucket.preWrite(records);
writeMetrics.startFileFlush();
List<WriteStatus> statuses = writeFunction.apply(records, instant);
writeMetrics.endFileFlush();
writeMetrics.increaseNumOfFilesWritten();
return statuses;
}
private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> records) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
return FlinkWriteHelper.newInstance()
.deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger);
} else {
return records;
}
}
}