| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.seatunnel.e2e.common.container.flink; |
| |
| import org.apache.seatunnel.e2e.common.container.AbstractTestContainer; |
| import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; |
| import org.apache.seatunnel.e2e.common.container.TestContainer; |
| |
| import org.testcontainers.containers.Container; |
| import org.testcontainers.containers.GenericContainer; |
| import org.testcontainers.containers.output.Slf4jLogConsumer; |
| import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; |
| import org.testcontainers.lifecycle.Startables; |
| import org.testcontainers.utility.DockerLoggerFactory; |
| |
| import com.google.common.collect.Lists; |
| import lombok.NoArgsConstructor; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.stream.Stream; |
| |
| /** |
| * This class is the base class of FlinkEnvironment test. The before method will create a Flink |
| * cluster, and after method will close the Flink cluster. You can use {@link |
| * TestContainer#executeJob} to submit a seatunnel config and run a seatunnel job. |
| */ |
| @NoArgsConstructor |
| @Slf4j |
| public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { |
| |
| protected static final List<String> DEFAULT_FLINK_PROPERTIES = |
| Arrays.asList( |
| "jobmanager.rpc.address: jobmanager", |
| "taskmanager.numberOfTaskSlots: 10", |
| "parallelism.default: 4", |
| "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"); |
| |
| protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11"; |
| |
| protected GenericContainer<?> jobManager; |
| protected GenericContainer<?> taskManager; |
| |
| @Override |
| protected String getDockerImage() { |
| return DEFAULT_DOCKER_IMAGE; |
| } |
| |
| @Override |
| public void startUp() throws Exception { |
| final String dockerImage = getDockerImage(); |
| final String properties = String.join("\n", getFlinkProperties()); |
| jobManager = |
| new GenericContainer<>(dockerImage) |
| .withCommand("jobmanager") |
| .withNetwork(NETWORK) |
| .withNetworkAliases("jobmanager") |
| .withExposedPorts() |
| .withEnv("FLINK_PROPERTIES", properties) |
| .withLogConsumer( |
| new Slf4jLogConsumer( |
| DockerLoggerFactory.getLogger(dockerImage + ":jobmanager"))) |
| .waitingFor( |
| new LogMessageWaitStrategy() |
| .withRegEx(".*Starting the resource manager.*") |
| .withStartupTimeout(Duration.ofMinutes(2))); |
| copySeaTunnelStarterToContainer(jobManager); |
| copySeaTunnelStarterLoggingToContainer(jobManager); |
| |
| jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8081))); |
| |
| taskManager = |
| new GenericContainer<>(dockerImage) |
| .withCommand("taskmanager") |
| .withNetwork(NETWORK) |
| .withNetworkAliases("taskmanager") |
| .withEnv("FLINK_PROPERTIES", properties) |
| .dependsOn(jobManager) |
| .withLogConsumer( |
| new Slf4jLogConsumer( |
| DockerLoggerFactory.getLogger( |
| dockerImage + ":taskmanager"))) |
| .waitingFor( |
| new LogMessageWaitStrategy() |
| .withRegEx( |
| ".*Successful registration at resource manager.*") |
| .withStartupTimeout(Duration.ofMinutes(2))); |
| |
| Startables.deepStart(Stream.of(jobManager)).join(); |
| Startables.deepStart(Stream.of(taskManager)).join(); |
| // execute extra commands |
| executeExtraCommands(jobManager); |
| } |
| |
| protected List<String> getFlinkProperties() { |
| return DEFAULT_FLINK_PROPERTIES; |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| if (taskManager != null) { |
| taskManager.stop(); |
| } |
| if (jobManager != null) { |
| jobManager.stop(); |
| } |
| } |
| |
| @Override |
| protected String getSavePointCommand() { |
| throw new UnsupportedOperationException("Not implemented"); |
| } |
| |
| @Override |
| protected String getRestoreCommand() { |
| throw new UnsupportedOperationException("Not implemented"); |
| } |
| |
| @Override |
| protected List<String> getExtraStartShellCommands() { |
| return Collections.emptyList(); |
| } |
| |
| public void executeExtraCommands(ContainerExtendedFactory extendedFactory) |
| throws IOException, InterruptedException { |
| extendedFactory.extend(jobManager); |
| extendedFactory.extend(taskManager); |
| } |
| |
| @Override |
| public Container.ExecResult executeJob(String confFile) |
| throws IOException, InterruptedException { |
| return executeJob(confFile, null); |
| } |
| |
| @Override |
| public Container.ExecResult executeJob(String confFile, List<String> variables) |
| throws IOException, InterruptedException { |
| log.info("test in container: {}", identifier()); |
| return executeJob(jobManager, confFile, variables); |
| } |
| |
| @Override |
| public String getServerLogs() { |
| return jobManager.getLogs() + "\n" + taskManager.getLogs(); |
| } |
| |
| public String executeJobManagerInnerCommand(String command) |
| throws IOException, InterruptedException { |
| return jobManager.execInContainer("bash", "-c", command).getStdout(); |
| } |
| } |