blob: 3b48b3e636db51228bfb1a93527193ca3a34ac16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.EgressType;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
public final class StatefulFunctionsUniverse
implements StatefulFunctionModule.Binder, FlinkIoModule.Binder {
private final Map<IngressIdentifier<?>, IngressSpec<?>> ingress = new HashMap<>();
private final Map<EgressIdentifier<?>, EgressSpec<?>> egress = new HashMap<>();
private final Map<IngressIdentifier<?>, List<Router<?>>> routers = new HashMap<>();
private final Map<FunctionType, StatefulFunctionProvider> functions = new HashMap<>();
private final Map<IngressType, SourceProvider> sources = new HashMap<>();
private final Map<EgressType, SinkProvider> sinks = new HashMap<>();
private final StaticallyRegisteredTypes types;
private final MessageFactoryType messageFactoryType;
public StatefulFunctionsUniverse(MessageFactoryType messageFactoryType) {
this.messageFactoryType = messageFactoryType;
this.types = new StaticallyRegisteredTypes(messageFactoryType);
}
@Override
public <T> void bindIngress(IngressSpec<T> spec) {
Objects.requireNonNull(spec);
IngressIdentifier<T> id = spec.id();
putAndThrowIfPresent(ingress, id, spec);
}
@Override
public <T> void bindIngressRouter(IngressIdentifier<T> ingressIdentifier, Router<T> router) {
Objects.requireNonNull(ingressIdentifier);
Objects.requireNonNull(router);
List<Router<?>> ingressRouters =
routers.computeIfAbsent(ingressIdentifier, unused -> new ArrayList<>());
ingressRouters.add(router);
}
@Override
public <T> void bindEgress(EgressSpec<T> spec) {
Objects.requireNonNull(spec);
EgressIdentifier<T> id = spec.id();
putAndThrowIfPresent(egress, id, spec);
}
@Override
public void bindFunctionProvider(FunctionType functionType, StatefulFunctionProvider provider) {
Objects.requireNonNull(functionType);
Objects.requireNonNull(provider);
putAndThrowIfPresent(functions, functionType, provider);
}
@Override
public void bindSourceProvider(IngressType type, SourceProvider provider) {
Objects.requireNonNull(type);
Objects.requireNonNull(provider);
putAndThrowIfPresent(sources, type, provider);
}
@Override
public void bindSinkProvider(EgressType type, SinkProvider provider) {
putAndThrowIfPresent(sinks, type, provider);
}
public Map<IngressIdentifier<?>, IngressSpec<?>> ingress() {
return ingress;
}
public Map<EgressIdentifier<?>, EgressSpec<?>> egress() {
return egress;
}
public Map<IngressIdentifier<?>, List<Router<?>>> routers() {
return routers;
}
public Map<FunctionType, StatefulFunctionProvider> functions() {
return functions;
}
public Map<IngressType, SourceProvider> sources() {
return sources;
}
public Map<EgressType, SinkProvider> sinks() {
return sinks;
}
public StaticallyRegisteredTypes types() {
return types;
}
private static <K, V> void putAndThrowIfPresent(Map<K, V> map, K key, V value) {
@Nullable V previous = map.put(key, value);
if (previous == null) {
return;
}
throw new IllegalStateException(
String.format("A binding for the key %s was previously defined.", key));
}
public MessageFactoryType messageFactoryType() {
return messageFactoryType;
}
}