blob: f35604bab44b4b9d24185f1778d808956a052e83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.iteration.operator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.iteration.IterationID;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.config.IterationOptions;
import org.apache.flink.iteration.operator.feedback.SpillableFeedbackChannel;
import org.apache.flink.iteration.proxy.ProxyKeySelector;
import org.apache.flink.iteration.typeinfo.IterationRecordSerializer;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.NetworkInputConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkState;
/** Utility class for operators. */
public class OperatorUtils {
/** Returns the unique id for the specified operator. */
public static String getUniqueSenderId(OperatorID operatorId, int subtaskIndex) {
return operatorId.toHexString() + "-" + subtaskIndex;
}
/** Creates {@link FeedbackKey} from the {@code iterationId} and {@code feedbackIndex}. */
public static <V> FeedbackKey<V> createFeedbackKey(IterationID iterationId, int feedbackIndex) {
return new FeedbackKey<>(iterationId.toHexString(), feedbackIndex);
}
/** Registers the specified {@code feedbackConsumer} to the {@code feedbackChannel}. */
public static <V> void registerFeedbackConsumer(
SpillableFeedbackChannel<V> feedbackChannel,
FeedbackConsumer<V> feedbackConsumer,
Executor executor) {
feedbackChannel.registerConsumer(feedbackConsumer, executor);
}
/** Initialize the given {@code feedbackChannel}. */
public static void initializeFeedbackChannel(
SpillableFeedbackChannel feedbackChannel, AbstractStreamOperator<?> operator)
throws MemoryAllocationException {
StreamTask<?, ?> containingTask = operator.getContainingTask();
TypeSerializer<StreamRecord<IterationRecord<?>>> serializer =
new StreamElementSerializer(
operator.getOperatorConfig()
.getTypeSerializerIn(0, operator.getUserCodeClassloader()));
MemorySize totalManagedMemory =
new MemorySize(containingTask.getEnvironment().getMemoryManager().getMemorySize());
double fraction =
containingTask
.getConfiguration()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
operator.getRuntimeContext()
.getTaskManagerRuntimeInfo()
.getConfiguration(),
operator.getRuntimeContext().getUserCodeClassLoader());
long feedbackChannelBufferSize = totalManagedMemory.multiply(fraction).getBytes();
feedbackChannel.initialize(
containingTask.getEnvironment().getIOManager(),
containingTask.getEnvironment().getMemoryManager(),
serializer,
feedbackChannelBufferSize);
}
public static <T> void processOperatorOrUdfIfSatisfy(
StreamOperator<?> operator,
Class<T> targetInterface,
ThrowingConsumer<T, Exception> action) {
try {
if (targetInterface.isAssignableFrom(operator.getClass())) {
action.accept((T) operator);
} else if (operator instanceof AbstractUdfStreamOperator<?, ?>) {
Object udf = ((AbstractUdfStreamOperator<?, ?>) operator).getUserFunction();
if (targetInterface.isAssignableFrom(udf.getClass())) {
action.accept((T) udf);
}
}
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
public static StreamConfig createWrappedOperatorConfig(StreamConfig config, ClassLoader cl) {
StreamConfig wrappedConfig = new StreamConfig(config.getConfiguration().clone());
for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
KeySelector keySelector = config.getStatePartitioner(i, cl);
if (keySelector != null) {
checkState(
keySelector instanceof ProxyKeySelector,
"The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
+ keySelector);
wrappedConfig.setStatePartitioner(
i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
}
}
StreamConfig.InputConfig[] inputs = config.getInputs(cl);
for (int i = 0; i < inputs.length; ++i) {
if (inputs[i] instanceof NetworkInputConfig) {
TypeSerializer<?> typeSerializerIn =
((NetworkInputConfig) inputs[i]).getTypeSerializer();
checkState(
typeSerializerIn instanceof IterationRecordSerializer,
"The serializer of input[%s] should be IterationRecordSerializer but it is %s.",
i,
typeSerializerIn);
inputs[i] =
new NetworkInputConfig(
((IterationRecordSerializer<?>) typeSerializerIn)
.getInnerSerializer(),
i);
}
}
wrappedConfig.setInputs(inputs);
TypeSerializer<?> typeSerializerOut = config.getTypeSerializerOut(cl);
checkState(
typeSerializerOut instanceof IterationRecordSerializer,
"The serializer of output should be IterationRecordSerializer but it is %s.",
typeSerializerOut);
wrappedConfig.setTypeSerializerOut(
((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
config.getChainedOutputs(cl)
.forEach(
chainedOutput -> {
OutputTag<?> outputTag = chainedOutput.getOutputTag();
setTypeSerializerSideOut(outputTag, config, wrappedConfig, cl);
});
config.getOperatorNonChainedOutputs(cl)
.forEach(
nonChainedOutput -> {
OutputTag<?> outputTag = nonChainedOutput.getOutputTag();
setTypeSerializerSideOut(outputTag, config, wrappedConfig, cl);
});
wrappedConfig.serializeAllConfigs();
return wrappedConfig;
}
public static Path getDataCachePath(Configuration configuration, String[] localSpillPaths) {
String pathStr = configuration.get(IterationOptions.DATA_CACHE_PATH);
if (pathStr == null) {
Random random = new Random();
final String localSpillPath = localSpillPaths[random.nextInt(localSpillPaths.length)];
pathStr = Paths.get(localSpillPath).toUri().toString();
}
return new Path(pathStr);
}
public static SupplierWithException<Path, IOException> createDataCacheFileGenerator(
Path basePath, String fileTypeName, OperatorID operatorId) {
return () ->
new Path(
String.format(
"%s/%s-%s-%s",
basePath.toString(),
fileTypeName,
operatorId,
UUID.randomUUID().toString()));
}
private static void setTypeSerializerSideOut(
OutputTag<?> outputTag,
StreamConfig config,
StreamConfig wrappedConfig,
ClassLoader cl) {
if (outputTag == null) {
return;
}
TypeSerializer<?> typeSerializerSideOut = config.getTypeSerializerSideOut(outputTag, cl);
checkState(
typeSerializerSideOut instanceof IterationRecordSerializer,
"The serializer of side output with tag[%s] should be IterationRecordSerializer but it is %s.",
outputTag,
typeSerializerSideOut);
wrappedConfig.setTypeSerializerSideOut(
new OutputTag<>(
outputTag.getId(),
((IterationRecordTypeInfo<?>) outputTag.getTypeInfo()).getInnerTypeInfo()),
((IterationRecordSerializer) typeSerializerSideOut).getInnerSerializer());
}
}