blob: 4ff86e4de152ccf3a6d3ee930fd51e9815464a2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.bundle;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.bundle.BundleTrigger;
import org.apache.flink.streaming.api.bundle.BundleTriggerCallback;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.functions.BundleFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The {@link BundleOperator} allows to process incoming stream records
* as a bundle on <strong>non-keyed-stream</strong>.
*
* <p>In case of chaining of this operator, it has to be made sure that the operators in the chain are
* opened tail to head. The reason for this is that an opened
* {@link org.apache.flink.table.runtime.bundle.BundleOperator} starts
* already emitting recovered {@link StreamElement} to downstream operators.
*
* @param <K> The type of the key in the bundle buffer
* @param <V> The type of the value in the bundle buffer
* @param <IN> Input type for the operator.
* @param <OUT> Output type for the operator.
*/
public class BundleOperator<K, V, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
private static final long serialVersionUID = 5081841938324118594L;
private static final String STATE_NAME = "_bundle_operator_state_";
private final boolean finishBundleBeforeSnapshot;
/** The trigger that determines how many elements should be put into a bundle. */
private final BundleTrigger<IN> bundleTrigger;
private final BundleFunction<K, V, IN, OUT> function;
private final TypeInformation<V> valueType;
private final TypeInformation<K> keyType;
private final KeySelector<IN, K> keySelector;
/** The buffer in heap to store elements. */
private transient Map<K, V> buffer;
/** The state to store buffer to make it exactly once. */
private transient ListState<Tuple2<K, V>> bufferState;
private transient Object checkpointingLock;
/** Output for stream records. */
private transient Collector<OUT> collector;
private transient int numOfElements = 0;
private transient volatile boolean isInFinishingBundle = false;
public BundleOperator(
BundleFunction<K, V, IN, OUT> function,
BundleTrigger<IN> bundleTrigger,
TypeInformation<K> keyType,
TypeInformation<V> valueType,
KeySelector<IN, K> keySelector,
boolean finishBundleBeforeSnapshot) {
this.finishBundleBeforeSnapshot = finishBundleBeforeSnapshot;
chainingStrategy = ChainingStrategy.ALWAYS;
this.function = checkNotNull(function, "function is null");
this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
this.keyType = checkNotNull(keyType, "key type is null");
this.valueType = checkNotNull(valueType, "value type is null");
this.keySelector = checkNotNull(keySelector, "key selector is null");
}
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
}
@Override
public void open() throws Exception {
super.open();
function.open(new ExecutionContextImpl(
this,
getRuntimeContext()));
this.collector = new StreamRecordCollector<>(output);
this.buffer = new HashMap<>();
// create & restore state
if (!finishBundleBeforeSnapshot) {
// recover buffer from partition state
if (bufferState != null) {
for (Tuple2<K, V> tuple : bufferState.get()) {
K key = tuple.f0;
V value = tuple.f1;
V prevValue = buffer.get(key);
V newValue = function.mergeValue(prevValue, value);
buffer.put(key, newValue);
// recovering number
numOfElements++;
}
bufferState = null;
}
}
bundleTrigger.registerBundleTriggerCallback(this);
// reset trigger
bundleTrigger.reset();
// counter metric to get the size of bundle
getRuntimeContext().getMetricGroup().gauge("bundleSize", (Gauge<Integer>) () -> numOfElements);
getRuntimeContext().getMetricGroup().gauge("bundleRatio", (Gauge<Double>) () -> {
int numOfKeys = buffer.size();
if (numOfKeys == 0) {
return 0.0;
} else {
return 1.0 * numOfElements / numOfKeys;
}
});
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
K key = keySelector.getKey(element.getValue());
V value = buffer.get(key); // maybe null
// accumulate to value
V newValue = function.addInput(value, element.getValue());
// update to buffer
buffer.put(key, newValue);
numOfElements++;
bundleTrigger.onElement(element.getValue());
}
/** build bundle and invoke BundleFunction. */
@Override
public void finishBundle() throws Exception {
assert(Thread.holdsLock(checkpointingLock));
while (isInFinishingBundle) {
checkpointingLock.wait();
}
isInFinishingBundle = true;
if (!buffer.isEmpty()) {
numOfElements = 0;
function.finishBundle(buffer, collector);
buffer.clear();
}
// reset trigger
bundleTrigger.reset();
checkpointingLock.notifyAll();
isInFinishingBundle = false;
}
@Override
public void processWatermark(Watermark mark) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
// bundle operator only used in unbounded group by which not need to handle watermark
finishBundle();
super.processWatermark(mark);
}
@Override
public void endInput() throws Exception {
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
TypeInformation<Tuple2<K, V>> tupleType = new TupleTypeInfo<>(keyType, valueType);
this.bufferState = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, tupleType));
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
if (finishBundleBeforeSnapshot) {
finishBundle();
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
while (isInFinishingBundle) {
checkpointingLock.wait();
}
super.snapshotState(context);
if (!finishBundleBeforeSnapshot) {
TypeInformation<Tuple2<K, V>> tupleType = new TupleTypeInfo<>(keyType, valueType);
ListState<Tuple2<K, V>> bufferState = getOperatorStateBackend()
.getListState(new ListStateDescriptor<>(STATE_NAME, tupleType));
// clear state first
bufferState.clear();
Iterator<Map.Entry<K, V>> iter = buffer.entrySet().iterator();
List<Tuple2<K, V>> stateToPut = new ArrayList<>(buffer.size());
while (iter.hasNext()) {
Map.Entry<K, V> entry = iter.next();
K key = entry.getKey();
V value = entry.getValue();
stateToPut.add(Tuple2.of(key, value));
}
// batch put
bufferState.addAll(stateToPut);
}
}
@Override
public void close() throws Exception {
assert(Thread.holdsLock(checkpointingLock));
while (isInFinishingBundle) {
checkpointingLock.wait();
}
try {
finishBundle();
function.endInput(collector);
} finally {
Exception exception = null;
try {
super.close();
if (function != null) {
FunctionUtils.closeFunction(function);
}
} catch (InterruptedException interrupted) {
exception = interrupted;
Thread.currentThread().interrupt();
} catch (Exception e) {
exception = e;
}
if (exception != null) {
LOG.warn("Errors occurred while closing the BundleOperator.", exception);
}
}
}
@Override
public boolean requireState() {
// if finishBundleBeforeSnapshot, then no state requirement
return !finishBundleBeforeSnapshot;
}
}