| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.statefun.flink.datastream; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import javax.annotation.Nullable; |
| import org.apache.flink.shaded.guava18.com.google.common.base.Optional; |
| import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; |
| import org.apache.flink.statefun.flink.core.feedback.FeedbackKey; |
| import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; |
| import org.apache.flink.statefun.flink.core.message.Message; |
| import org.apache.flink.statefun.flink.core.message.RoutableMessage; |
| import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator; |
| import org.apache.flink.statefun.sdk.FunctionType; |
| import org.apache.flink.statefun.sdk.io.EgressIdentifier; |
| import org.apache.flink.streaming.api.datastream.DataStream; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| |
| /** |
| * Builder for a Stateful Function Application. |
| * |
| * <p>This builder allows defining all the aspects of a stateful function application. define input |
| * streams as ingresses, define function providers and egress ids. |
| */ |
| public final class StatefulFunctionDataStreamBuilder { |
| |
| /** Creates a {@code StatefulFunctionDataStreamBuilder}. */ |
| public static StatefulFunctionDataStreamBuilder builder(String pipelineName) { |
| FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1); |
| return new StatefulFunctionDataStreamBuilder(key); |
| } |
| |
| private StatefulFunctionDataStreamBuilder(FeedbackKey<Message> feedbackKey) { |
| this.feedbackKey = Objects.requireNonNull(feedbackKey); |
| } |
| |
| private final FeedbackKey<Message> feedbackKey; |
| private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList<>(); |
| private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders = |
| new HashMap<>(); |
| private final Map<FunctionType, HttpFunctionSpec> requestReplyFunctions = new HashMap<>(); |
| private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet<>(); |
| |
| @Nullable private StatefulFunctionsConfig config; |
| |
| /** |
| * Adds an ingress of incoming messages. |
| * |
| * @param ingress an incoming stream of messages. |
| * @return this builder. |
| */ |
| public StatefulFunctionDataStreamBuilder withDataStreamAsIngress( |
| DataStream<RoutableMessage> ingress) { |
| Objects.requireNonNull(ingress); |
| definedIngresses.add(ingress); |
| return this; |
| } |
| |
| /** |
| * Adds a function provider to this builder |
| * |
| * @param functionType the type of the function that this provider providers. |
| * @param provider the stateful function provider. |
| * @return this builder. |
| */ |
| public StatefulFunctionDataStreamBuilder withFunctionProvider( |
| FunctionType functionType, SerializableStatefulFunctionProvider provider) { |
| Objects.requireNonNull(functionType); |
| Objects.requireNonNull(provider); |
| putAndThrowIfPresent(functionProviders, functionType, provider); |
| return this; |
| } |
| |
| /** |
| * Adds a remote RequestReply type of function provider to this builder. |
| * |
| * @param builder an already configured {@code RequestReplyFunctionBuilder}. |
| * @return this builder. |
| */ |
| public StatefulFunctionDataStreamBuilder withRequestReplyRemoteFunction( |
| RequestReplyFunctionBuilder builder) { |
| Objects.requireNonNull(builder); |
| HttpFunctionSpec spec = builder.spec(); |
| putAndThrowIfPresent(requestReplyFunctions, spec.functionType(), spec); |
| return this; |
| } |
| |
| /** |
| * Registers an {@link EgressIdentifier}. |
| * |
| * <p>See {@link StatefulFunctionEgressStreams#getDataStreamForEgressId(EgressIdentifier)}. |
| * |
| * @param egressId an ingress id |
| * @return this builder. |
| */ |
| public StatefulFunctionDataStreamBuilder withEgressId(EgressIdentifier<?> egressId) { |
| Objects.requireNonNull(egressId); |
| putAndThrowIfPresent(egressesIds, egressId); |
| return this; |
| } |
| |
| /** |
| * Set a stateful function configuration. |
| * |
| * @param configuration the stateful function configuration to set. |
| * @return this builder. |
| */ |
| public StatefulFunctionDataStreamBuilder withConfiguration( |
| StatefulFunctionsConfig configuration) { |
| Objects.requireNonNull(configuration); |
| this.config = configuration; |
| return this; |
| } |
| |
| /** |
| * Adds Stateful Functions operators into the topology. |
| * |
| * @param env the stream execution environment. |
| */ |
| public StatefulFunctionEgressStreams build(StreamExecutionEnvironment env) { |
| final StatefulFunctionsConfig config = |
| Optional.fromNullable(this.config).or(() -> StatefulFunctionsConfig.fromEnvironment(env)); |
| |
| SerializableHttpFunctionProvider httpFunctionProvider = |
| new SerializableHttpFunctionProvider(requestReplyFunctions); |
| requestReplyFunctions.forEach( |
| (type, unused) -> functionProviders.put(type, httpFunctionProvider)); |
| |
| EmbeddedTranslator embeddedTranslator = new EmbeddedTranslator(config, feedbackKey); |
| Map<EgressIdentifier<?>, DataStream<?>> sideOutputs = |
| embeddedTranslator.translate(definedIngresses, egressesIds, functionProviders); |
| return new StatefulFunctionEgressStreams(sideOutputs); |
| } |
| |
| private static <K, V> void putAndThrowIfPresent(Map<K, V> map, K key, V value) { |
| @Nullable V previous = map.put(key, value); |
| if (previous == null) { |
| return; |
| } |
| throw new IllegalStateException( |
| String.format("A binding for the key %s was previously defined.", key)); |
| } |
| |
| private static <K> void putAndThrowIfPresent(Set<K> set, K key) { |
| if (set.add(key)) { |
| return; |
| } |
| throw new IllegalStateException( |
| String.format("A binding for the key %s was previously defined.", key)); |
| } |
| } |