blob: 1315d50349cdde6f7ea8fe9df5a9ae3543659bb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.bundle;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.bundle.BundleTrigger;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.functions.BundleFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
/**
* The {@link KeyedBundleOperator} allows to process incoming stream records as a bundle on {@link
* org.apache.flink.streaming.api.datastream.KeyedStream}.
* For that the operator creates a bundle which is passed to an {@link BundleFunction}. <strong> In a bundle,
* elements of the same key are in order, and elements of different key are out of order.</strong>
* In case of chaining of this operator, it has to be made sure that the operators in the chain are
* opened tail to head. The reason for this is that an opened {@link KeyedBundleOperator} starts
* already emitting recovered {@link StreamElement} to downstream operators.
*
* @param <K> The type of the key in the Keyed Stream.
* @param <V> The type of the value in the bundle buffer
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
public class KeyedBundleOperator<K, V, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
private static final long serialVersionUID = 5081841938324118594L;
private final boolean finishBundleBeforeSnapshot;
/** The trigger that determines how many elementsMap should be put into a bundle. */
private final BundleTrigger<IN> bundleTrigger;
private final BundleFunction<K, V, IN, OUT> function;
private final TypeInformation<V> valueType;
private transient Object checkpointingLock;
private transient Map<K, V> buffer;
/** Output for stream records. */
private transient Collector<OUT> collector;
/** The state to store buffer to make it exactly once. */
private transient KeyedValueState<K, V> bufferState;
private transient int numOfElements = 0;
private transient volatile boolean isInFinishingBundle = false;
public KeyedBundleOperator(
BundleFunction<K, V, IN, OUT> function,
BundleTrigger<IN> bundleTrigger,
TypeInformation<V> valueType,
boolean finishBundleBeforeSnapshot) {
this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
chainingStrategy = ChainingStrategy.ALWAYS;
this.function = function;
this.bundleTrigger = Preconditions.checkNotNull(bundleTrigger, "bundleTrigger is null");
this.valueType = Preconditions.checkNotNull(valueType, "valueType is null");
}
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
}
@Override
public void open() throws Exception {
super.open();
function.open(new ExecutionContextImpl(this, getRuntimeContext()));
this.numOfElements = 0;
this.collector = new StreamRecordCollector<>(output);
this.buffer = new HashMap<>();
// create & restore state
if (!finishBundleBeforeSnapshot) {
TypeSerializer<V> valueSer = valueType.createSerializer(getExecutionConfig());
//noinspection unchecked
KeyedValueStateDescriptor<K, V> bufferStateDesc = new KeyedValueStateDescriptor<>(
"globalBufferState",
(TypeSerializer<K>) getKeySerializer(),
valueSer);
this.bufferState = getKeyedState(bufferStateDesc);
// recovering buffer
buffer.putAll(bufferState.getAll());
bufferState.removeAll();
}
// recovering number
numOfElements = buffer.size();
bundleTrigger.registerBundleTriggerCallback(this);
//reset trigger
bundleTrigger.reset();
LOG.info("KeyedBundleOperator's trigger info: " + bundleTrigger.explain());
// counter metric to get the size of bundle
getRuntimeContext().getMetricGroup().gauge("bundleSize", (Gauge<Integer>) () -> numOfElements);
getRuntimeContext().getMetricGroup().gauge("bundleRatio", (Gauge<Double>) () -> {
int numOfKeys = buffer.size();
if (numOfKeys == 0) {
return 0.0;
} else {
return 1.0 * numOfElements / numOfKeys;
}
});
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
K key = (K) getCurrentKey();
V value = buffer.get(key); // maybe null
V newValue = function.addInput(value, element.getValue());
buffer.put(key, newValue);
numOfElements++;
bundleTrigger.onElement(element.getValue());
}
/** finish bundle and invoke BundleFunction. */
@Override
public void finishBundle() throws Exception {
assert(Thread.holdsLock(checkpointingLock));
while (isInFinishingBundle) {
checkpointingLock.wait();
}
isInFinishingBundle = true;
if (!buffer.isEmpty()) {
numOfElements = 0;
function.finishBundle(buffer, collector);
buffer.clear();
}
// reset trigger
bundleTrigger.reset();
checkpointingLock.notifyAll();
isInFinishingBundle = false;
}
@Override
public void processWatermark(Watermark mark) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
// bundle operator only used in unbounded group by which not need to handle watermark
finishBundle();
super.processWatermark(mark);
}
@Override
public void endInput() throws Exception {
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
if (finishBundleBeforeSnapshot) {
finishBundle();
}
}
@SuppressWarnings("unchecked")
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
super.snapshotState(context);
if (!finishBundleBeforeSnapshot) {
// clear state first
bufferState.removeAll();
// update state
bufferState.putAll(buffer);
}
}
@Override
public void close() throws Exception {
assert(Thread.holdsLock(checkpointingLock));
while (isInFinishingBundle) {
checkpointingLock.wait();
}
try {
finishBundle();
function.endInput(collector);
} finally {
Exception exception = null;
try {
super.close();
if (function != null) {
function.close();
}
} catch (InterruptedException interrupted) {
exception = interrupted;
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}
if (exception != null) {
LOG.warn("Errors occurred while closing the BundleOperator.", exception);
}
}
}
@Override
public boolean requireState() {
// always requireState
return true;
}
}