blob: ad5c6365f2f3e2728df1b718644f0d1990833007 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.commons.collections4.CollectionUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
disabledReason = "only flink adjusts the parameter configuration rules")
public class UnifyEnvParameterIT extends TestSuiteBase {
@TestContainerExtension
protected final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel && chown -R flink /tmp/seatunnel");
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
};
@TestTemplate
public void testUnifiedParam(AbstractTestFlinkContainer container)
throws IOException, InterruptedException {
genericTest(
"/unify-env-param-test-resource/unify_env_param_fakesource_to_localfile.conf",
container);
}
@TestTemplate
public void testOutdatedParam(AbstractTestFlinkContainer container)
throws IOException, InterruptedException {
genericTest(
"/unify-env-param-test-resource/outdated_env_param_fakesource_to_localfile.conf",
container);
}
@TestTemplate
public void testUnifiedFlinkTableEnvParam(AbstractTestFlinkContainer container) {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/unify-env-param-test-resource/unify_flink_table_env_param_fakesource_to_console.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
// wait obtain job id
AtomicReference<String> jobId = new AtomicReference<>();
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> jobInfo =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
"curl http://localhost:8081/jobs/overview"),
String.class,
Object.class);
List<Map<String, Object>> jobs =
(List<Map<String, Object>>) jobInfo.get("jobs");
if (!CollectionUtils.isEmpty(jobs)) {
jobId.set(jobs.get(0).get("jid").toString());
}
Assertions.assertNotNull(jobId.get());
});
// obtain job info
AtomicReference<Map<String, Object>> jobInfoReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> jobInfo =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
String.format(
"curl http://localhost:8081/jobs/%s",
jobId.get())),
String.class,
Object.class);
// wait the job initialization is complete and enters the Running state
if (null != jobInfo && "RUNNING".equals(jobInfo.get("state"))) {
jobInfoReference.set(jobInfo);
}
Assertions.assertNotNull(jobInfoReference.get());
});
Map<String, Object> jobInfo = jobInfoReference.get();
/**
* 'table.exec.resource.default-parallelism' has a higher priority than 'parallelism', so
* one of these nodes must have a parallelism of 2.
*/
Map<String, Object> plan = (Map<String, Object>) jobInfo.get("plan");
List<Map<String, Object>> nodes = (List<Map<String, Object>>) plan.get("nodes");
boolean tableExecParallelism = false;
for (Map<String, Object> node : nodes) {
int parallelism = (int) node.get("parallelism");
if (!tableExecParallelism && parallelism == 2) {
tableExecParallelism = true;
}
}
Assertions.assertTrue(tableExecParallelism);
}
public void genericTest(String configPath, AbstractTestFlinkContainer container)
throws IOException, InterruptedException {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(configPath);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
// wait obtain job id
AtomicReference<String> jobId = new AtomicReference<>();
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> jobInfo =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
"curl http://localhost:8081/jobs/overview"),
String.class,
Object.class);
List<Map<String, Object>> jobs =
(List<Map<String, Object>>) jobInfo.get("jobs");
if (!CollectionUtils.isEmpty(jobs)) {
jobId.set(jobs.get(0).get("jid").toString());
}
Assertions.assertNotNull(jobId.get());
});
// obtain job info
AtomicReference<Map<String, Object>> jobInfoReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> jobInfo =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
String.format(
"curl http://localhost:8081/jobs/%s",
jobId.get())),
String.class,
Object.class);
// wait the job initialization is complete and enters the Running state
if (null != jobInfo && "RUNNING".equals(jobInfo.get("state"))) {
jobInfoReference.set(jobInfo);
}
Assertions.assertNotNull(jobInfoReference.get());
});
Map<String, Object> jobInfo = jobInfoReference.get();
// obtain execution configuration
Map<String, Object> jobConfig =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
String.format(
"curl http://localhost:8081/jobs/%s/config", jobId.get())),
String.class,
Object.class);
Map<String, Object> executionConfig =
(Map<String, Object>) jobConfig.get("execution-config");
// obtain checkpoint configuration
Map<String, Object> checkpointConfig =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
String.format(
"curl http://localhost:8081/jobs/%s/checkpoints/config",
jobId.get())),
String.class,
Object.class);
// obtain checkpoint storage
AtomicReference<Map<String, Object>> completedCheckpointReference = new AtomicReference<>();
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Map<String, Object> checkpointsInfo =
JsonUtils.toMap(
container.executeJobManagerInnerCommand(
String.format(
"curl http://localhost:8081/jobs/%s/checkpoints",
jobId.get())),
String.class,
Object.class);
Map<String, Object> latestCheckpoint =
(Map<String, Object>) checkpointsInfo.get("latest");
// waiting for at least one checkpoint trigger
if (null != latestCheckpoint) {
completedCheckpointReference.set(
(Map<String, Object>) latestCheckpoint.get("completed"));
Assertions.assertNotNull(completedCheckpointReference.get());
}
});
/**
* adjust the configuration of this {@link
* org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName} to use the 'flink.' and the
* flink parameter name, and check whether the configuration takes effect
*/
// PARALLELISM
int parallelism = (int) executionConfig.get("job-parallelism");
Assertions.assertEquals(1, parallelism);
// MAX_PARALLELISM
int maxParallelism = (int) jobInfo.get("maxParallelism");
Assertions.assertEquals(5, maxParallelism);
// CHECKPOINT_INTERVAL
int interval = (int) checkpointConfig.get("interval");
Assertions.assertEquals(10000, interval);
// CHECKPOINT_MODE
String mode = checkpointConfig.get("mode").toString();
Assertions.assertEquals("exactly_once", mode);
// CHECKPOINT_TIMEOUT
int checkpointTimeout = (int) checkpointConfig.get("timeout");
Assertions.assertEquals(600000, checkpointTimeout);
// CHECKPOINT_DATA_URI
String externalPath = completedCheckpointReference.get().get("external_path").toString();
Assertions.assertTrue(externalPath.startsWith("file:/tmp/seatunnel/flink/checkpoints"));
// MAX_CONCURRENT_CHECKPOINTS
int maxConcurrent = (int) checkpointConfig.get("max_concurrent");
Assertions.assertEquals(2, maxConcurrent);
// CHECKPOINT_CLEANUP_MODE
Map<String, Object> externalizationMap =
(Map<String, Object>) checkpointConfig.get("externalization");
boolean externalization = (boolean) externalizationMap.get("delete_on_cancellation");
Assertions.assertTrue(externalization);
// MIN_PAUSE_BETWEEN_CHECKPOINTS
int minPause = (int) checkpointConfig.get("min_pause");
Assertions.assertEquals(100, minPause);
// FAIL_ON_CHECKPOINTING_ERRORS
int tolerableFailedCheckpoints = (int) checkpointConfig.get("tolerable_failed_checkpoints");
Assertions.assertEquals(5, tolerableFailedCheckpoints);
// RESTART_STRATEGY / because the restart strategy is fixed-delay in config file, so don't
// check failure-rate
String restartStrategy = executionConfig.get("restart-strategy").toString();
Assertions.assertTrue(restartStrategy.contains("fixed delay"));
// RESTART_ATTEMPTS
Assertions.assertTrue(restartStrategy.contains("2 restart attempts"));
// RESTART_DELAY_BETWEEN_ATTEMPTS
Assertions.assertTrue(restartStrategy.contains("fixed delay (1000 ms)"));
// STATE_BACKEND
String stateBackend = checkpointConfig.get("state_backend").toString();
Assertions.assertTrue(stateBackend.contains("RocksDBStateBackend"));
}
}