blob: 73df62834bab547d0307458fba713f6377d11124 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.GenericPool;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.CommandResult;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.spark.launcher.SparkLauncher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class SparkEtlJobHandlerTest {
private long loadJobId;
private String label;
private String resourceName;
private String broker;
private long pendingTaskId;
private String appId;
private String etlOutputPath;
private String trackingUrl;
private String dppVersion;
private String remoteArchivePath;
private SparkRepository.SparkArchive archive;
private final String runningReport = "Application Report :\n" +
"Application-Id : application_15888888888_0088\n" +
"Application-Name : label0\n" +
"Application-Type : SPARK-2.4.1\n" +
"User : test\n" +
"Queue : test-queue\n" +
"Start-Time : 1597654469958\n" +
"Finish-Time : 0\n" +
"Progress : 50%\n" +
"State : RUNNING\n" +
"Final-State : UNDEFINED\n" +
"Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" +
"RPC Port : 40236\n" +
"AM Host : host-name";
private final String failedReport = "Application Report :\n" +
"Application-Id : application_15888888888_0088\n" +
"Application-Name : label0\n" +
"Application-Type : SPARK-2.4.1\n" +
"User : test\n" +
"Queue : test-queue\n" +
"Start-Time : 1597654469958\n" +
"Finish-Time : 1597654801939\n" +
"Progress : 100%\n" +
"State : FINISHED\n" +
"Final-State : FAILED\n" +
"Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" +
"RPC Port : 40236\n" +
"AM Host : host-name";
private final String finishReport = "Application Report :\n" +
"Application-Id : application_15888888888_0088\n" +
"Application-Name : label0\n" +
"Application-Type : SPARK-2.4.1\n" +
"User : test\n" +
"Queue : test-queue\n" +
"Start-Time : 1597654469958\n" +
"Finish-Time : 1597654801939\n" +
"Progress : 100%\n" +
"State : FINISHED\n" +
"Final-State : SUCCEEDED\n" +
"Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" +
"RPC Port : 40236\n" +
"AM Host : host-name";
@Before
public void setUp() {
FeConstants.runningUnitTest = true;
loadJobId = 0L;
label = "label0";
resourceName = "spark0";
broker = "broker0";
pendingTaskId = 3L;
appId = "application_15888888888_0088";
etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101";
trackingUrl = "http://127.0.0.1:8080/proxy/application_1586619723848_0088/";
dppVersion = Config.spark_dpp_version;
remoteArchivePath = etlOutputPath + "/__repository__/__archive_" + dppVersion;
archive = new SparkRepository.SparkArchive(remoteArchivePath, dppVersion);
archive.libraries.add(new SparkRepository
.SparkLibrary("", "", SparkRepository.SparkLibrary.LibType.DPP, 0L));
archive.libraries.add(new SparkRepository
.SparkLibrary("", "", SparkRepository.SparkLibrary.LibType.SPARK2X, 0L));
}
@Test
public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, @Injectable Process process,
@Mocked SparkLoadAppHandle handle ) throws IOException, LoadException {
new Expectations() {
{
launcher.launch();
result = process;
handle.getAppId();
result = appId;
handle.getState();
result = SparkLoadAppHandle.State.RUNNING;
}
};
EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), etlOutputPath, label, null);
SparkResource resource = new SparkResource(resourceName);
new Expectations(resource) {
{
resource.prepareArchive();
result = archive;
}
};
Map<String, String> sparkConfigs = resource.getSparkConfigs();
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment);
// check submit etl job success
Assert.assertEquals(appId, attachment.getAppId());
}
@Test(expected = LoadException.class)
public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, @Injectable Process process,
@Mocked SparkLoadAppHandle handle) throws IOException, LoadException {
new Expectations() {
{
launcher.launch();
result = process;
handle.getAppId();
result = appId;
handle.getState();
result = SparkLoadAppHandle.State.FAILED;
}
};
EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), etlOutputPath, label, null);
SparkResource resource = new SparkResource(resourceName);
new Expectations(resource) {
{
resource.prepareArchive();
result = archive;
}
};
Map<String, String> sparkConfigs = resource.getSparkConfigs();
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment);
}
@Test
public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked Util util, @Mocked CommandResult commandResult,
@Mocked SparkYarnConfigFiles sparkYarnConfigFiles, @Mocked SparkLoadAppHandle handle)
throws IOException, UserException {
new Expectations() {
{
sparkYarnConfigFiles.prepare();
sparkYarnConfigFiles.getConfigDir();
result = "./yarn_config";
commandResult.getReturnCode();
result = 0;
commandResult.getStdout();
returns(runningReport, runningReport, failedReport, failedReport, finishReport, finishReport);
handle.getUrl();
result = trackingUrl;
}
};
new Expectations() {
{
Util.executeCommand(anyString, (String[]) any);
minTimes = 0;
result = commandResult;
BrokerUtil.readFile(anyString, (BrokerDesc) any);
result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
}
};
SparkResource resource = new SparkResource(resourceName);
Map<String, String> sparkConfigs = resource.getSparkConfigs();
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
new Expectations(resource) {
{
resource.getYarnClientPath();
result = Config.yarn_client_path;
}
};
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkEtlJobHandler handler = new SparkEtlJobHandler();
// running
EtlStatus status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.RUNNING, status.getState());
Assert.assertEquals(50, status.getProgress());
Assert.assertEquals(trackingUrl, status.getTrackingUrl());
// yarn finished and spark failed
status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.CANCELLED, status.getState());
Assert.assertEquals(100, status.getProgress());
Assert.assertEquals("etl job failed", status.getDppResult().failedReason);
// finished
status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.FINISHED, status.getState());
Assert.assertEquals(100, status.getProgress());
Assert.assertEquals(trackingUrl, status.getTrackingUrl());
Assert.assertEquals(10, status.getDppResult().normalRows);
Assert.assertEquals(0, status.getDppResult().abnormalRows);
}
@Test
public void testGetEtlJobStatusFailed(@Mocked Util util, @Mocked CommandResult commandResult,
@Mocked SparkYarnConfigFiles sparkYarnConfigFiles, @Mocked SparkLoadAppHandle handle)
throws IOException, UserException {
new Expectations() {
{
sparkYarnConfigFiles.prepare();
sparkYarnConfigFiles.getConfigDir();
result = "./yarn_config";
commandResult.getReturnCode();
result = -1;
}
};
new Expectations() {
{
Util.executeCommand(anyString, (String[]) any);
minTimes = 0;
result = commandResult;
}
};
SparkResource resource = new SparkResource(resourceName);
Map<String, String> sparkConfigs = resource.getSparkConfigs();
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
new Expectations(resource) {
{
resource.getYarnClientPath();
result = Config.yarn_client_path;
}
};
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkEtlJobHandler handler = new SparkEtlJobHandler();
// yarn finished and spark failed
EtlStatus status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc);
Assert.assertEquals(TEtlState.CANCELLED, status.getState());
}
@Test
public void testKillEtlJob(@Mocked Util util, @Mocked CommandResult commandResult,
@Mocked SparkYarnConfigFiles sparkYarnConfigFiles) throws IOException, UserException {
new Expectations() {
{
sparkYarnConfigFiles.prepare();
sparkYarnConfigFiles.getConfigDir();
result = "./yarn_config";
}
};
new Expectations() {
{
commandResult.getReturnCode();
result = 0;
}
};
new Expectations() {
{
Util.executeCommand(anyString, (String[]) any);
minTimes = 0;
result = commandResult;
}
};
SparkResource resource = new SparkResource(resourceName);
Map<String, String> sparkConfigs = resource.getSparkConfigs();
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
new Expectations(resource) {
{
resource.getYarnClientPath();
result = Config.yarn_client_path;
}
};
SparkEtlJobHandler handler = new SparkEtlJobHandler();
try {
handler.killEtlJob(null, appId, loadJobId, resource);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testGetEtlFilePaths(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog,
@Injectable BrokerMgr brokerMgr) throws Exception {
// list response
TBrokerListResponse response = new TBrokerListResponse();
TBrokerOperationStatus status = new TBrokerOperationStatus();
status.statusCode = TBrokerOperationStatusCode.OK;
response.opStatus = status;
List<TBrokerFileStatus> files = Lists.newArrayList();
String filePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/label6.10.11.12.0.666666.parquet";
files.add(new TBrokerFileStatus(filePath, false, 10, false));
response.files = files;
FsBroker fsBroker = new FsBroker("127.0.0.1", 99999);
new MockUp<GenericPool<TPaloBrokerService.Client>>() {
@Mock
public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception {
return client;
}
@Mock
public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) {
}
@Mock
public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) {
}
};
new Expectations() {
{
client.listPath((TBrokerListPathRequest) any);
result = response;
catalog.getBrokerMgr();
result = brokerMgr;
brokerMgr.getBroker(anyString, anyString);
result = fsBroker;
}
};
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkEtlJobHandler handler = new SparkEtlJobHandler();
Map<String, Long> filePathToSize = handler.getEtlFilePaths(etlOutputPath, brokerDesc);
Assert.assertTrue(filePathToSize.containsKey(filePath));
Assert.assertEquals(10, (long) filePathToSize.get(filePath));
}
@Test
public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws UserException {
new Expectations() {
{
BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
times = 1;
}
};
BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
SparkEtlJobHandler handler = new SparkEtlJobHandler();
try {
handler.deleteEtlOutputPath(etlOutputPath, brokerDesc);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}