blob: 6fe38567e4cbf5f5467f768d9b978d6d7bfe1c21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.harness;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.spi.Modules;
import org.apache.flink.statefun.flink.harness.io.ConsumingEgressSpec;
import org.apache.flink.statefun.flink.harness.io.SerializableConsumer;
import org.apache.flink.statefun.flink.harness.io.SerializableSupplier;
import org.apache.flink.statefun.flink.harness.io.SupplyingIngressSpec;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Harness {
private final Configuration flinkConfig;
private final Map<String, String> globalConfigurations = new HashMap<>();
private final Map<IngressIdentifier<?>, IngressSpec<?>> overrideIngress = new HashMap<>();
private final Map<EgressIdentifier<?>, EgressSpec<?>> overrideEgress = new HashMap<>();
public Harness() {
flinkConfig = new Configuration();
}
public <T> Harness withSupplyingIngress(
IngressIdentifier<T> identifier, SerializableSupplier<T> supplier) {
Objects.requireNonNull(identifier);
Objects.requireNonNull(supplier);
// TODO: consider closure cleaner
overrideIngress.put(identifier, new SupplyingIngressSpec<>(identifier, supplier, 0));
return this;
}
public <T> Harness withFlinkSourceFunction(
IngressIdentifier<T> identifier, SourceFunction<T> supplier) {
Objects.requireNonNull(identifier);
Objects.requireNonNull(supplier);
overrideIngress.put(identifier, new SourceFunctionSpec<>(identifier, supplier));
return this;
}
public <T> Harness withConsumingEgress(
EgressIdentifier<T> identifier, SerializableConsumer<T> consumer) {
Objects.requireNonNull(identifier);
Objects.requireNonNull(consumer);
// TODO: consider closure cleaner
overrideEgress.put(identifier, new ConsumingEgressSpec<>(identifier, consumer));
return this;
}
public <T> Harness withPrintingEgress(EgressIdentifier<T> identifier) {
return withConsumingEgress(identifier, new PrintingConsumer<>());
}
public Harness withKryoMessageSerializer() {
flinkConfig.set(
StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
return this;
}
/** Set the name used in the Flink UI. */
public Harness withFlinkJobName(String flinkJobName) {
flinkConfig.set(StatefulFunctionsConfig.FLINK_JOB_NAME, flinkJobName);
return this;
}
/** Set a flink-conf configuration. */
public Harness withConfiguration(String key, String value) {
flinkConfig.setString(key, value);
return this;
}
/**
* Sets a global configuration available in the {@link
* org.apache.flink.statefun.sdk.spi.StatefulFunctionModule} on configure.
*/
public Harness withGlobalConfiguration(String key, String value) {
globalConfigurations.put(key, value);
return this;
}
public void start() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
configureStrictlyRequiredFlinkConfigs(flinkConfig);
// Configure will change the value of a setting only if a corresponding option was set in the
// underlying configuration. If a key is not present, the current value of a field will remain
// untouched.
env.configure(flinkConfig, Thread.currentThread().getContextClassLoader());
StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConfig);
stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
stateFunConfig.setProvider(new HarnessProvider(overrideIngress, overrideEgress));
StatefulFunctionsJob.main(env, stateFunConfig);
}
private static final class HarnessProvider implements StatefulFunctionsUniverseProvider {
private static final long serialVersionUID = 1;
private final Map<IngressIdentifier<?>, IngressSpec<?>> ingressToReplace;
private final Map<EgressIdentifier<?>, EgressSpec<?>> egressToReplace;
HarnessProvider(
Map<IngressIdentifier<?>, IngressSpec<?>> dummyIngress,
Map<EgressIdentifier<?>, EgressSpec<?>> dummyEgress) {
this.ingressToReplace = dummyIngress;
this.egressToReplace = dummyEgress;
}
@Override
public StatefulFunctionsUniverse get(
ClassLoader classLoader, StatefulFunctionsConfig configuration) {
Modules modules = Modules.loadFromClassPath();
StatefulFunctionsUniverse universe = modules.createStatefulFunctionsUniverse(configuration);
ingressToReplace.forEach((id, spec) -> universe.ingress().put(id, spec));
egressToReplace.forEach((id, spec) -> universe.egress().put(id, spec));
return universe;
}
}
private static final class PrintingConsumer<T> implements SerializableConsumer<T> {
private static final long serialVersionUID = 1;
@Override
public void accept(T t) {
System.out.println(t);
}
}
private static void configureStrictlyRequiredFlinkConfigs(Configuration flinkConfig) {
flinkConfig.set(
CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
String.join(";", StatefulFunctionsConfigValidator.PARENT_FIRST_CLASSLOADER_PATTERNS));
flinkConfig.set(
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
StatefulFunctionsConfigValidator.MAX_CONCURRENT_CHECKPOINTS);
}
}