| import static org.apache.beam.gradle.BeamModulePlugin.getSupportedJavaVersion |
| |
| import org.apache.tools.ant.taskdefs.condition.Os |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| def pythonRootDir = "${rootDir}/sdks/python" |
| def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '') |
| def latestFlinkVersion = project.ext.latestFlinkVersion |
| def currentJavaVersion = project.ext.currentJavaVersion |
| |
| ext { |
| pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker" |
| } |
| |
| def createFlinkRunnerTestTask(String workerType) { |
| def taskName = "flinkCompatibilityMatrix${workerType}" |
| // project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here. |
| def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar" |
| def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}" |
| if (workerType == 'PROCESS') { |
| options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" |
| } |
| def task = toxTask(taskName, 'flink-runner-test', options) |
| // Through the Flink job server, we transitively add dependencies on the expansion services needed in tests. |
| task.configure { |
| dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" |
| // The Java SDK worker is required to execute external transforms. |
| def suffix = getSupportedJavaVersion() |
| dependsOn ":sdks:java:container:${suffix}:docker" |
| if (workerType == 'DOCKER') { |
| dependsOn pythonContainerTask |
| } else if (workerType == 'PROCESS') { |
| dependsOn createProcessWorker |
| } |
| } |
| return task |
| } |
| |
| createFlinkRunnerTestTask('DOCKER') |
| createFlinkRunnerTestTask('PROCESS') |
| createFlinkRunnerTestTask('LOOPBACK') |
| |
| task flinkValidatesRunner() { |
| dependsOn 'flinkCompatibilityMatrixLOOPBACK' |
| } |
| |
| // TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit. |
| tasks.register("flinkTriggerTranscript") { |
| dependsOn 'setupVirtualenv' |
| dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', """ |
| . ${envdir}/bin/activate \\ |
| && cd ${pythonRootDir} \\ |
| && pip install -e .[test] \\ |
| && pytest \\ |
| apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\ |
| --test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}' |
| """ |
| } |
| } |
| } |
| |
| // Verifies BEAM-10702. |
| tasks.register("portableLocalRunnerJuliaSetWithSetupPy") { |
| dependsOn 'setupVirtualenv' |
| dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker" |
| |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', """ |
| . ${envdir}/bin/activate \\ |
| && cd ${pythonRootDir} \\ |
| && pip install -e . \\ |
| && cd apache_beam/examples/complete/juliaset \\ |
| && python juliaset_main.py \\ |
| --runner=PortableRunner \\ |
| --job_endpoint=embed \\ |
| --requirements_file=./requirements.txt \\ |
| --coordinate_output=/tmp/juliaset \\ |
| --grid_size=1 |
| """ |
| } |
| } |
| } |
| |
| def createProcessWorker = tasks.register("createProcessWorker") { |
| dependsOn ':sdks:python:container:build' |
| dependsOn 'setupVirtualenv' |
| def sdkWorkerFile = file("${buildDir}/sdk_worker.sh") |
| def osType = 'linux' |
| if (Os.isFamily(Os.FAMILY_MAC)) |
| osType = 'darwin' |
| def workerScript = "${project(":sdks:python:container:").buildDir.absolutePath}/target/launcher/${osType}_amd64/boot" |
| def sdkWorkerFileCode = """#!/bin/sh |
| . ${envdir}/bin/activate |
| exec ${workerScript} "\"\$@\"" |
| """ |
| outputs.file sdkWorkerFile |
| doLast { |
| sdkWorkerFile.write sdkWorkerFileCode |
| exec { |
| commandLine('sh', '-c', ". ${envdir}/bin/activate && cd ${pythonRootDir} && pip install -e .[test]") |
| } |
| exec { |
| commandLine('chmod', '+x', sdkWorkerFile) |
| } |
| } |
| } |
| |
| // Requirements file is created during the runtime. |
| tasks.register("portableLocalRunnerTestWithRequirementsFile") { |
| dependsOn 'setupVirtualenv' |
| dependsOn ":sdks:python:container:py${pythonVersionSuffix}:docker" |
| |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', """ |
| . ${envdir}/bin/activate \\ |
| && cd ${pythonRootDir} \\ |
| && pip install -e . \\ |
| && cd apache_beam/runners/portability \\ |
| && python requirements_cache_it_test.py \\ |
| --runner=PortableRunner \\ |
| --job_endpoint=embed \\ |
| --environment_type="DOCKER" |
| """ |
| } |
| } |
| } |
| |
| def createSamzaRunnerTestTask(String workerType) { |
| def taskName = "samzaCompatibilityMatrix${workerType}" |
| def jobServerJar = "${rootDir}/runners/samza/job-server/build/libs/beam-runners-samza-job-server-${version}.jar" |
| def options = "--samza_job_server_jar=${jobServerJar} --environment_type=${workerType}" |
| if (workerType == 'PROCESS') { |
| options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" |
| } |
| def task = toxTask(taskName, 'samza-runner-test', options) |
| task.configure { |
| dependsOn ":runners:samza:job-server:shadowJar" |
| if (workerType == 'DOCKER') { |
| dependsOn pythonContainerTask |
| } else if (workerType == 'PROCESS') { |
| dependsOn createProcessWorker |
| } |
| } |
| return task |
| } |
| |
| createSamzaRunnerTestTask('DOCKER') |
| createSamzaRunnerTestTask('PROCESS') |
| createSamzaRunnerTestTask('LOOPBACK') |
| |
| task samzaValidatesRunner() { |
| dependsOn 'samzaCompatibilityMatrixLOOPBACK' |
| } |
| |
| def createSparkRunnerTestTask(String workerType) { |
| def taskName = "sparkCompatibilityMatrix${workerType}" |
| // `project(':runners:spark:3:job-server').shadowJar.archivePath` is not resolvable until runtime, so hard-code it here. |
| def jobServerJar = "${rootDir}/runners/spark/3/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar" |
| def options = "--spark_job_server_jar=${jobServerJar} --environment_type=${workerType}" |
| if (workerType == 'PROCESS') { |
| options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" |
| } |
| def task = toxTask(taskName, 'spark-runner-test', options) |
| task.configure { |
| dependsOn ':runners:spark:3:job-server:shadowJar' |
| if (workerType == 'DOCKER') { |
| dependsOn pythonContainerTask |
| } else if (workerType == 'PROCESS') { |
| dependsOn createProcessWorker |
| } |
| } |
| return task |
| } |
| |
| createSparkRunnerTestTask('DOCKER') |
| createSparkRunnerTestTask('PROCESS') |
| createSparkRunnerTestTask('LOOPBACK') |
| |
| tasks.register("sparkValidatesRunner") { |
| dependsOn 'sparkCompatibilityMatrixLOOPBACK' |
| } |
| |
| def createPrismRunnerTestTask(String workerType) { |
| def taskName = "prismCompatibilityMatrix${workerType}" |
| |
| def prismBin = "${rootDir}/runners/prism/build/tmp/prism" |
| def options = "--prism_bin=${prismBin} --environment_type=${workerType}" |
| if (workerType == 'PROCESS') { |
| options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" |
| } |
| def task = toxTask(taskName, 'prism-runner-test', options) |
| task.configure { |
| dependsOn ":runners:prism:build" |
| // The Java SDK worker is required to execute external transforms. |
| def suffix = getSupportedJavaVersion() |
| dependsOn ":sdks:java:container:${suffix}:docker" |
| if (workerType == 'DOCKER') { |
| dependsOn pythonContainerTask |
| } else if (workerType == 'PROCESS') { |
| dependsOn createProcessWorker |
| } |
| } |
| return task |
| } |
| |
| createPrismRunnerTestTask('DOCKER') |
| createPrismRunnerTestTask('PROCESS') |
| createPrismRunnerTestTask('LOOPBACK') |
| |
| tasks.register("prismValidatesRunner") { |
| dependsOn 'prismCompatibilityMatrixLOOPBACK' |
| } |
| |
| tasks.register("prismTriggerTranscript") { |
| dependsOn 'setupVirtualenv' |
| dependsOn ':runners:prism:build' |
| def prismBin = "${rootDir}/runners/prism/build/tmp/prism" |
| doLast { |
| exec { |
| executable 'sh' |
| args '-c', """ |
| . ${envdir}/bin/activate \\ |
| && cd ${pythonRootDir} \\ |
| && pip install -e .[test] \\ |
| && pytest \\ |
| apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\ |
| --test-pipeline-options='--runner=PrismRunner --environment_type=LOOPBACK --prism_location=${prismBin}' |
| """ |
| } |
| } |
| } |
| |
| project.tasks.register("preCommitPy${pythonVersionSuffix}") { |
| dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker", |
| ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", |
| 'portableWordCountFlinkRunnerBatch', |
| 'portableWordCountFlinkRunnerStreaming'] |
| } |
| |
| project.tasks.register("postCommitPy${pythonVersionSuffix}") { |
| dependsOn = ['setupVirtualenv', |
| "postCommitPy${pythonVersionSuffix}IT", |
| ':runners:spark:3:job-server:shadowJar', |
| 'portableLocalRunnerJuliaSetWithSetupPy', |
| 'portableWordCountSparkRunnerBatch', |
| 'portableLocalRunnerTestWithRequirementsFile' |
| ] |
| } |
| |
| project.tasks.register("flinkExamples") { |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" |
| ] |
| doLast { |
| def testOpts = [ |
| "--log-cli-level=INFO", |
| ] |
| def flink_conf_dir = "${rootDir}/runners/flink/src/test/resources/" |
| def pipelineOpts = [ |
| "--runner=FlinkRunner", |
| "--project=apache-beam-testing", |
| "--environment_type=LOOPBACK", |
| "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", |
| "--flink_conf_dir=${flink_conf_dir}", |
| '--sdk_harness_log_level_overrides=' + |
| // suppress info level flink.runtime log flood |
| '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + |
| // suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult |
| '\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\"}' |
| ] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitExamples-flink-py${pythonVersionSuffix}", |
| "pipeline_opts": pipelineOpts.join(" "), |
| "collect": "examples_postcommit and not sickbay_flink" |
| ]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| project.tasks.register("sparkExamples") { |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ':runners:spark:3:job-server:shadowJar' |
| ] |
| doLast { |
| def testOpts = [ |
| "--log-cli-level=INFO", |
| ] |
| def jobServerJar = "${rootDir}/runners/spark/3/job-server/build/libs/beam-runners-spark-3-job-server-${version}.jar" |
| def pipelineOpts = [ |
| "--runner=SparkRunner", |
| "--project=apache-beam-testing", |
| "--environment_type=LOOPBACK", |
| "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| "--spark_job_server_jar=${jobServerJar}", |
| ] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitExamples-spark-py${pythonVersionSuffix}", |
| "pipeline_opts": pipelineOpts.join(" "), |
| "collect": "examples_postcommit and not sickbay_spark" |
| ]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| project.tasks.register("prismExamples") { |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ':runners:prism:build', |
| ] |
| def prismBin = "${rootDir}/runners/prism/build/tmp/prism" |
| doLast { |
| def testOpts = [ |
| "--log-cli-level=INFO", |
| ] |
| def pipelineOpts = [ |
| "--runner=PrismRunner", |
| "--project=apache-beam-testing", |
| "--environment_type=LOOPBACK", |
| "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| "--prism_location=${prismBin}", |
| ] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitExamples-prism-py${pythonVersionSuffix}", |
| "pipeline_opts": pipelineOpts.join(" "), |
| "collect": "examples_postcommit and not sickbay_prism" |
| ]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { |
| String fork_java_home = null |
| String fork_java_version = currentJavaVersion |
| // DebeziumIO needs Java17+ |
| if (JavaVersion.current() < JavaVersion.VERSION_17) { |
| if (project.hasProperty("java17Home")) { |
| fork_java_version = 'java17' |
| fork_java_home = project.getProperty("java17Home") |
| } else if (project.hasProperty("java21Home")) { |
| fork_java_version = 'java21' |
| fork_java_home = project.getProperty("java21Home") |
| } |
| } |
| |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", |
| ":sdks:java:container:${fork_java_version}:docker", |
| ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', |
| ':sdks:java:io:expansion-service:shadowJar', |
| ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', |
| ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', |
| ':sdks:java:extensions:schemaio-expansion-service:shadowJar', |
| ':sdks:java:io:debezium:expansion-service:shadowJar' |
| ] |
| |
| doLast { |
| def tests = [ |
| "apache_beam/io/gcp/bigquery_read_it_test.py", |
| "apache_beam/io/external/xlang_kafkaio_it_test.py", |
| "apache_beam/io/external/xlang_kinesisio_it_test.py", |
| "apache_beam/io/external/xlang_debeziumio_it_test.py", |
| ] |
| def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"] |
| def flink_conf_dir = "${rootDir}/runners/flink/src/test/resources/" |
| def pipelineOpts = [ |
| "--runner=FlinkRunner", |
| "--project=apache-beam-testing", |
| "--environment_type=LOOPBACK", |
| "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", |
| "--flink_conf_dir=${flink_conf_dir}", |
| '--sdk_harness_log_level_overrides=' + |
| // suppress info level flink.runtime log flood |
| '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + |
| // suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult |
| '\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\",' + |
| // suppress metric name collision warning logs |
| '\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}' |
| ] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitIT-flink-py${pythonVersionSuffix}", |
| "pipeline_opts": pipelineOpts.join(" "), |
| ]) |
| def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath |
| |
| exec { |
| if (fork_java_home != null) { |
| environment "JAVA_HOME", fork_java_home |
| } |
| environment "LOCAL_KAFKA_JAR", kafkaJar |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| project.tasks.register("xlangSpannerIOIT") { |
| dependsOn = [ |
| 'setupVirtualenv', |
| 'installGcpTest', |
| ":runners:flink:${latestFlinkVersion}:job-server:shadowJar", |
| ":sdks:java:container:${currentJavaVersion}:docker", |
| ':sdks:java:io:expansion-service:shadowJar', |
| ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', |
| ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', |
| ':sdks:java:extensions:schemaio-expansion-service:shadowJar', |
| ':sdks:java:io:debezium:expansion-service:shadowJar' |
| ] |
| |
| doLast { |
| def tests = [ |
| "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py", |
| ] |
| def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"] |
| def pipelineOpts = [ |
| "--runner=FlinkRunner", |
| "--project=apache-beam-testing", |
| "--environment_type=LOOPBACK", |
| "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", |
| "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", |
| '--sdk_harness_log_level_overrides=' + |
| // suppress info level flink.runtime log flood |
| '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + |
| // suppress full __metricscontainers log printed in FlinkPipelineRunner.createPortablePipelineResult |
| '\\"org.apache.beam.runners.flink.FlinkPipelineRunner\\":\\"WARN\\",' + |
| // suppress metric name collision warning logs |
| '\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}' |
| ] |
| def cmdArgs = mapToArgString([ |
| "test_opts": testOpts, |
| "suite": "postCommitIT-flink-py${pythonVersionSuffix}", |
| "pipeline_opts": pipelineOpts.join(" "), |
| ]) |
| exec { |
| executable 'sh' |
| args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs" |
| } |
| } |
| } |
| |
| def addTestJavaJarCreator(String runner, Task jobServerJarTask) { |
| project.tasks.register("testJavaJarCreator${runner}") { |
| dependsOn jobServerJarTask |
| dependsOn pythonContainerTask |
| doLast{ |
| exec { |
| executable "sh" |
| def options = [ |
| "--runner ${runner}", |
| "--job_server_jar ${jobServerJarTask.archivePath}", |
| "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", |
| "--python_root_dir ${project.rootDir}/sdks/python", |
| "--python_version ${project.ext.pythonVersion}", |
| "--python_container_image ${project.docker_image_default_repo_root}/" |
| + "${project.docker_image_default_repo_prefix}" |
| + "python${project.ext.pythonVersion}_sdk:${project.sdk_version}", |
| ] |
| args "-c", "${project.rootDir}/runners/portability/test_pipeline_jar.sh ${options.join(' ')}" |
| } |
| } |
| } |
| } |
| |
| // TODO(BEAM-11333) Update and test multiple Flink versions. |
| addTestJavaJarCreator("FlinkRunner", tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar")) |
| addTestJavaJarCreator("SparkRunner", tasks.getByPath(":runners:spark:3:job-server:shadowJar")) |
| |
| def addTestFlinkUberJar(boolean saveMainSession) { |
| project.tasks.register("testUberJarFlinkRunner${saveMainSession ? 'SaveMainSession' : ''}") { |
| dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar" |
| dependsOn ":runners:flink:${latestFlinkVersion}:job-server:miniCluster" |
| dependsOn pythonContainerTask |
| doLast{ |
| exec { |
| executable "sh" |
| def options = [ |
| "--flink_job_server_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:shadowJar").archivePath}", |
| "--flink_mini_cluster_jar ${tasks.getByPath(":runners:flink:${latestFlinkVersion}:job-server:miniCluster").archivePath}", |
| "--env_dir ${project.rootProject.buildDir}/gradleenv/${project.path.hashCode()}", |
| "--python_root_dir ${project.rootDir}/sdks/python", |
| "--python_version ${project.ext.pythonVersion}", |
| "--python_container_image ${project.docker_image_default_repo_root}/" |
| + "${project.docker_image_default_repo_prefix}" |
| + "python${project.ext.pythonVersion}_sdk:${project.sdk_version}", |
| ] |
| if (saveMainSession) { |
| options.add('--save_main_session') |
| } |
| args "-c", "${project.rootDir}/runners/portability/test_flink_uber_jar.sh ${options.join(' ')}" |
| } |
| } |
| } |
| } |
| |
| addTestFlinkUberJar(false) |
| addTestFlinkUberJar(true) |
| |
| tasks.register("testPipelineJarSparkRunner") { |
| dependsOn testJavaJarCreatorSparkRunner |
| } |
| |
| tasks.register("testPipelineJarFlinkRunner") { |
| dependsOn testJavaJarCreatorFlinkRunner |
| dependsOn testUberJarFlinkRunner |
| dependsOn testUberJarFlinkRunnerSaveMainSession |
| } |