blob: 3569fb4b11fdceed1dd36e469b8511f7acaa0a49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
@Slf4j
public class RestApiIT {
private static final String HOST = "http://localhost:";
private static ClientJobProxy clientJobProxy;
private static ClientJobProxy batchJobProxy;
private static HazelcastInstanceImpl node1;
private static HazelcastInstanceImpl node2;
private static SeaTunnelClient engineClient;
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
node2 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
String filePath = TestUtils.getResource("stream_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
clientJobProxy = jobExecutionEnv.execute();
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING, clientJobProxy.getJobStatus()));
String batchFilePath = TestUtils.getResource("fakesource_to_console.conf");
JobConfig batchConf = new JobConfig();
batchConf.setName("fake_to_console");
ClientJobExecutionEnvironment batchJobExecutionEnv =
engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig);
batchJobProxy = batchJobExecutionEnv.execute();
Awaitility.await()
.atMost(5, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, batchJobProxy.getJobStatus()));
}
@Test
public void testGetRunningJobById() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_JOB_URL
+ "/"
+ clientJobProxy.getJobId())
.then()
.statusCode(200)
.body("jobName", equalTo("fake_to_file"))
.body("jobStatus", equalTo("RUNNING"));
});
}
@Test
public void testGetJobById() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_JOB_URL
+ "/"
+ batchJobProxy.getJobId())
.then()
.statusCode(200)
.body("jobName", equalTo("fake_to_console"))
.body("jobStatus", equalTo("FINISHED"));
});
}
@Test
public void testGetAnNotExistJobById() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_JOB_URL
+ "/"
+ 123)
.then()
.statusCode(200)
.body("jobId", equalTo("123"));
});
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_JOB_URL
+ "/")
.then()
.statusCode(500);
});
}
@Test
public void testGetRunningJobs() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_JOBS_URL)
.then()
.statusCode(200)
.body("[0].jobName", equalTo("fake_to_file"))
.body("[0].jobStatus", equalTo("RUNNING"));
});
}
@Test
public void testGetJobInfoByJobId() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.JOB_INFO_URL
+ "/"
+ batchJobProxy.getJobId())
.then()
.statusCode(200)
.body("jobName", equalTo("fake_to_console"))
.body("jobStatus", equalTo("FINISHED"));
});
}
@Test
public void testGetRunningThreads() {
Arrays.asList(node2, node1)
.forEach(
instance ->
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.RUNNING_THREADS)
.then()
.statusCode(200)
.body("[0].threadName", notNullValue())
.body("[0].classLoader", notNullValue()));
}
@Test
public void testSystemMonitoringInformation() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.SYSTEM_MONITORING_INFORMATION)
.then()
.assertThat()
.time(lessThan(5000L))
.statusCode(200);
});
}
@Test
public void testEncryptConfig() {
Arrays.asList(node2, node1)
.forEach(
instance -> {
String config =
"{\n"
+ " \"env\": {\n"
+ " \"parallelism\": 1,\n"
+ " \"shade.identifier\":\"base64\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
+ " \"plugin_name\": \"MySQL-CDC\",\n"
+ " \"schema\" : {\n"
+ " \"fields\": {\n"
+ " \"name\": \"string\",\n"
+ " \"age\": \"int\"\n"
+ " }\n"
+ " },\n"
+ " \"result_table_name\": \"fake\",\n"
+ " \"parallelism\": 1,\n"
+ " \"hostname\": \"127.0.0.1\",\n"
+ " \"username\": \"seatunnel\",\n"
+ " \"password\": \"seatunnel_password\",\n"
+ " \"table-name\": \"inventory_vwyw0n\"\n"
+ " }\n"
+ " ],\n"
+ " \"transform\": [\n"
+ " ],\n"
+ " \"sink\": [\n"
+ " {\n"
+ " \"plugin_name\": \"Clickhouse\",\n"
+ " \"host\": \"localhost:8123\",\n"
+ " \"database\": \"default\",\n"
+ " \"table\": \"fake_all\",\n"
+ " \"username\": \"seatunnel\",\n"
+ " \"password\": \"seatunnel_password\"\n"
+ " }\n"
+ " ]\n"
+ "}";
given().body(config)
.post(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.ENCRYPT_CONFIG)
.then()
.statusCode(200)
.body("source[0].result_table_name", equalTo("fake"))
.body("source[0].username", equalTo("c2VhdHVubmVs"))
.body(
"source[0].password",
equalTo("c2VhdHVubmVsX3Bhc3N3b3Jk"));
});
}
@AfterEach
void afterClass() {
if (engineClient != null) {
engineClient.close();
}
if (node1 != null) {
node1.shutdown();
}
if (node2 != null) {
node2.shutdown();
}
}
}