blob: f38952554b131f35e0e68cf1994059a8aac330e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/** Helper class to build StreamConfig for chain of operators. */
public class StreamConfigChainer<OWNER> {
public static final int MAIN_NODE_ID = 0;
private final OWNER owner;
private final StreamConfig headConfig;
private final Map<Integer, StreamConfig> chainedConfigs = new HashMap<>();
private final int numberOfNonChainedOutputs;
private int bufferTimeout;
private StreamConfig tailConfig;
private int chainIndex = MAIN_NODE_ID;
StreamConfigChainer(
OperatorID headOperatorID,
StreamConfig headConfig,
OWNER owner,
int numberOfNonChainedOutputs) {
this.owner = checkNotNull(owner);
this.headConfig = checkNotNull(headConfig);
this.tailConfig = checkNotNull(headConfig);
this.numberOfNonChainedOutputs = numberOfNonChainedOutputs;
head(headOperatorID);
}
private void head(OperatorID headOperatorID) {
headConfig.setOperatorID(headOperatorID);
headConfig.setChainStart();
headConfig.setChainIndex(chainIndex);
}
public <T> StreamConfigChainer<OWNER> chain(
OperatorID operatorID,
OneInputStreamOperator<T, T> operator,
TypeSerializer<T> typeSerializer,
boolean createKeyedStateBackend) {
return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend);
}
public <T> StreamConfigChainer<OWNER> chain(
OneInputStreamOperator<T, T> operator, TypeSerializer<T> typeSerializer) {
return chain(new OperatorID(), operator, typeSerializer);
}
public <T> StreamConfigChainer<OWNER> chain(
OperatorID operatorID,
OneInputStreamOperator<T, T> operator,
TypeSerializer<T> typeSerializer) {
return chain(operatorID, operator, typeSerializer, typeSerializer, false);
}
public <T> StreamConfigChainer<OWNER> chain(
OneInputStreamOperatorFactory<T, T> operatorFactory, TypeSerializer<T> typeSerializer) {
return chain(new OperatorID(), operatorFactory, typeSerializer);
}
public <T> StreamConfigChainer<OWNER> chain(
OperatorID operatorID,
OneInputStreamOperatorFactory<T, T> operatorFactory,
TypeSerializer<T> typeSerializer) {
return chain(operatorID, operatorFactory, typeSerializer, typeSerializer, false);
}
private <IN, OUT> StreamConfigChainer<OWNER> chain(
OperatorID operatorID,
OneInputStreamOperator<IN, OUT> operator,
TypeSerializer<IN> inputSerializer,
TypeSerializer<OUT> outputSerializer,
boolean createKeyedStateBackend) {
return chain(
operatorID,
SimpleOperatorFactory.of(operator),
inputSerializer,
outputSerializer,
createKeyedStateBackend);
}
public <IN, OUT> StreamConfigChainer<OWNER> chain(
OperatorID operatorID,
StreamOperatorFactory<OUT> operatorFactory,
TypeSerializer<IN> inputSerializer,
TypeSerializer<OUT> outputSerializer,
boolean createKeyedStateBackend) {
chainIndex++;
StreamEdge streamEdge =
new StreamEdge(
new StreamNode(
tailConfig.getChainIndex(),
null,
null,
(StreamOperator<?>) null,
null,
null),
new StreamNode(
chainIndex, null, null, (StreamOperator<?>) null, null, null),
0,
null,
null);
streamEdge.setBufferTimeout(bufferTimeout);
tailConfig.setChainedOutputs(Collections.singletonList(streamEdge));
tailConfig = new StreamConfig(new Configuration());
tailConfig.setStreamOperatorFactory(checkNotNull(operatorFactory));
tailConfig.setOperatorID(checkNotNull(operatorID));
tailConfig.setupNetworkInputs(inputSerializer);
tailConfig.setTypeSerializerOut(outputSerializer);
if (createKeyedStateBackend) {
// used to test multiple stateful operators chained in a single task.
tailConfig.setStateKeySerializer(inputSerializer);
tailConfig.setStateBackendUsesManagedMemory(true);
tailConfig.setManagedMemoryFractionOperatorOfUseCase(
ManagedMemoryUseCase.STATE_BACKEND, 1.0);
}
tailConfig.setChainIndex(chainIndex);
chainedConfigs.put(chainIndex, tailConfig);
return this;
}
public OWNER finish() {
checkState(chainIndex > 0, "Use finishForSingletonOperatorChain");
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertex =
new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null);
for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
StreamEdge streamEdge =
new StreamEdge(
sourceVertex,
new StreamNode(
chainIndex + i,
null,
null,
(StreamOperator<?>) null,
null,
null),
0,
new BroadcastPartitioner<>(),
null);
streamEdge.setBufferTimeout(1);
outEdgesInOrder.add(streamEdge);
}
tailConfig.setChainEnd();
tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
tailConfig.setOutEdgesInOrder(outEdgesInOrder);
tailConfig.setNonChainedOutputs(outEdgesInOrder);
headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
headConfig.setOutEdgesInOrder(outEdgesInOrder);
return owner;
}
public <OUT> OWNER finishForSingletonOperatorChain(TypeSerializer<OUT> outputSerializer) {
checkState(chainIndex == 0, "Use finishForSingletonOperatorChain");
checkState(headConfig == tailConfig);
StreamOperator<OUT> dummyOperator =
new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
StreamNode sourceVertexDummy =
new StreamNode(
MAIN_NODE_ID,
"group",
null,
dummyOperator,
"source dummy",
SourceStreamTask.class);
for (int i = 0; i < numberOfNonChainedOutputs; ++i) {
StreamNode targetVertexDummy =
new StreamNode(
MAIN_NODE_ID + 1 + i,
"group",
null,
dummyOperator,
"target dummy",
SourceStreamTask.class);
outEdgesInOrder.add(
new StreamEdge(
sourceVertexDummy,
targetVertexDummy,
0,
new BroadcastPartitioner<>(),
null));
}
headConfig.setVertexID(0);
headConfig.setNumberOfOutputs(1);
headConfig.setOutEdgesInOrder(outEdgesInOrder);
headConfig.setNonChainedOutputs(outEdgesInOrder);
headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
headConfig.setOutEdgesInOrder(outEdgesInOrder);
headConfig.setTypeSerializerOut(outputSerializer);
return owner;
}
public StreamConfigChainer<OWNER> name(String name) {
tailConfig.setOperatorName(name);
return this;
}
public void setBufferTimeout(int bufferTimeout) {
this.bufferTimeout = bufferTimeout;
}
}