blob: 49c97beb63794d4a59db896e330381ddd6c8b76f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.flink;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.flink.util.ConfigKeyName;
import org.apache.seatunnel.flink.util.EnvironmentUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class FlinkEnvironment implements RuntimeEnv {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkEnvironment.class);
private Config config;
private StreamExecutionEnvironment environment;
private StreamTableEnvironment tableEnvironment;
private ExecutionEnvironment batchEnvironment;
private BatchTableEnvironment batchTableEnvironment;
private JobMode jobMode;
private String jobName = "seatunnel";
@Override
public FlinkEnvironment setConfig(Config config) {
this.config = config;
return this;
}
@Override
public Config getConfig() {
return config;
}
@Override
public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(config);
}
@Override
public FlinkEnvironment prepare() {
// Batch/Streaming both use data stream api in SeaTunnel New API
createStreamEnvironment();
createStreamTableEnvironment();
if (!isStreaming()) {
createExecutionEnvironment();
createBatchTableEnvironment();
}
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
return this;
}
public String getJobName() {
return jobName;
}
public boolean isStreaming() {
return JobMode.STREAMING.equals(jobMode);
}
@Override
public FlinkEnvironment setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
}
@Override
public JobMode getJobMode() {
return jobMode;
}
@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
List<Configuration> configurations = new ArrayList<>();
try {
configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
"getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
"method: getConfiguration")).invoke(this.environment));
if (!isStreaming()) {
configurations.add(batchEnvironment.getConfiguration());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
configurations.forEach(configuration -> {
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
jars = new ArrayList<>();
}
jars.addAll(pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
configuration.set(PipelineOptions.JARS, jars.stream().distinct().collect(Collectors.toList()));
List<String> classpath = configuration.get(PipelineOptions.CLASSPATHS);
if (classpath == null) {
classpath = new ArrayList<>();
}
classpath.addAll(pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
configuration.set(PipelineOptions.CLASSPATHS, classpath.stream().distinct().collect(Collectors.toList()));
});
}
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}
public StreamTableEnvironment getStreamTableEnvironment() {
return tableEnvironment;
}
private void createStreamTableEnvironment() {
// use blink and streammode
EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance()
.inStreamingMode();
if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink"
.equals(this.config.getString(ConfigKeyName.PLANNER))) {
envBuilder.useBlinkPlanner();
} else {
envBuilder.useOldPlanner();
}
EnvironmentSettings environmentSettings = envBuilder.build();
tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config
.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
}
}
private void createStreamEnvironment() {
environment = StreamExecutionEnvironment.getExecutionEnvironment();
setTimeCharacteristic();
setCheckpoint();
EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
if (config.hasPath(ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
}
if (config.hasPath(ConfigKeyName.PARALLELISM)) {
int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
environment.setParallelism(parallelism);
}
if (config.hasPath(ConfigKeyName.MAX_PARALLELISM)) {
int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
environment.setMaxParallelism(max);
}
if (this.jobMode.equals(JobMode.BATCH)) {
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
}
public ExecutionEnvironment getBatchEnvironment() {
return batchEnvironment;
}
public BatchTableEnvironment getBatchTableEnvironment() {
return batchTableEnvironment;
}
private void createExecutionEnvironment() {
batchEnvironment = ExecutionEnvironment.getExecutionEnvironment();
if (config.hasPath(ConfigKeyName.PARALLELISM)) {
int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
batchEnvironment.setParallelism(parallelism);
}
EnvironmentUtil.setRestartStrategy(config, batchEnvironment.getConfig());
}
private void createBatchTableEnvironment() {
batchTableEnvironment = BatchTableEnvironment.create(batchEnvironment);
}
private void setTimeCharacteristic() {
if (config.hasPath(ConfigKeyName.TIME_CHARACTERISTIC)) {
String timeType = config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
switch (timeType.toLowerCase()) {
case "event-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
break;
case "ingestion-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
break;
case "processing-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
break;
default:
LOGGER.warn(
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
break;
}
}
}
private void setCheckpoint() {
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
environment.enableCheckpointing(interval);
if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) {
String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
switch (mode.toLowerCase()) {
case "exactly-once":
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
break;
case "at-least-once":
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
break;
default:
LOGGER.warn(
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
break;
}
}
if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}
if (config.hasPath(ConfigKeyName.CHECKPOINT_DATA_URI)) {
String uri = config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
StateBackend fsStateBackend = new FsStateBackend(uri);
if (config.hasPath(ConfigKeyName.STATE_BACKEND)) {
String stateBackend = config.getString(ConfigKeyName.STATE_BACKEND);
if ("rocksdb".equalsIgnoreCase(stateBackend)) {
StateBackend rocksDBStateBackend = new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE);
environment.setStateBackend(rocksDBStateBackend);
}
} else {
environment.setStateBackend(fsStateBackend);
}
}
if (config.hasPath(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
int max = config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
checkpointConfig.setMaxConcurrentCheckpoints(max);
}
if (config.hasPath(ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
if (cleanup) {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
} else {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
}
if (config.hasPath(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
long minPause = config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
}
if (config.hasPath(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
int failNum = config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
}
}
}
}