blob: e0af99abfee3c82eab90c268c9ed26f9535b3e69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.integration;
import net.jodah.concurrentunit.Waiter;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.zeppelin.interpreter.ExecutionContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class SparkSubmitIntegrationTest {
private static Logger LOGGER = LoggerFactory.getLogger(SparkSubmitIntegrationTest.class);
private static MiniHadoopCluster hadoopCluster;
private static MiniZeppelin zeppelin;
private static InterpreterFactory interpreterFactory;
private static InterpreterSettingManager interpreterSettingManager;
private static String sparkHome;
@BeforeAll
public static void setUp() throws IOException {
String sparkVersion = "3.4.1";
String hadoopVersion = "3";
LOGGER.info("Testing Spark Version: " + sparkVersion);
LOGGER.info("Testing Hadoop Version: " + hadoopVersion);
sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
hadoopCluster = new MiniHadoopCluster();
hadoopCluster.start();
zeppelin = new MiniZeppelin();
zeppelin.start(SparkIntegrationTest.class);
interpreterFactory = zeppelin.getInterpreterFactory();
interpreterSettingManager = zeppelin.getInterpreterSettingManager();
InterpreterSetting sparkSubmitInterpreterSetting =
interpreterSettingManager.getInterpreterSettingByName("spark-submit");
sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
sparkSubmitInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
sparkSubmitInterpreterSetting.setProperty("YARN_CONF_DIR", hadoopCluster.getConfigPath());
}
@AfterAll
public static void tearDown() throws IOException {
if (zeppelin != null) {
zeppelin.stop();
}
if (hadoopCluster != null) {
hadoopCluster.stop();
}
}
@Test
void testLocalMode() throws InterpreterException, YarnException {
try {
// test SparkSubmitInterpreterSetting
Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
InterpreterResult interpreterResult =
sparkSubmitInterpreter.interpret("--master local --class org.apache.spark.examples.SparkPi --deploy-mode client " +
sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
// no yarn application launched
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertEquals(0, response.getApplicationList().size());
} finally {
interpreterSettingManager.close();
}
}
@Test
void testYarnMode() throws InterpreterException, YarnException {
try {
// test SparkSubmitInterpreterSetting
Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
String yarnAppName = "yarn_example";
InterpreterResult interpreterResult =
sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
"--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
"--conf spark.executor.memory=512m " +
sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString());
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
assertTrue(response.getApplicationList().size() >= 1);
List<ApplicationReport> apps = response.getApplicationList().stream()
.filter(app -> app.getName().equals(yarnAppName))
.collect(Collectors.toList());
assertEquals(1, apps.size());
} finally {
interpreterSettingManager.close();
}
}
@Test
void testCancelSparkYarnApp() throws InterpreterException, YarnException, TimeoutException, InterruptedException {
try {
// test SparkSubmitInterpreterSetting
Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
final Waiter waiter = new Waiter();
Thread thread = new Thread() {
@Override
public void run() {
try {
String yarnAppName = "yarn_cancel_example";
InterpreterResult interpreterResult =
sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " +
"--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " +
"--conf spark.executor.memory=512m " +
sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context);
assertEquals(InterpreterResult.Code.INCOMPLETE, interpreterResult.code(), interpreterResult.toString());
assertTrue(interpreterResult.toString().contains("Paragraph received a SIGTERM"), interpreterResult.toString());
} catch (InterpreterException e) {
waiter.fail("Should not throw exception\n" + ExceptionUtils.getStackTrace(e));
}
waiter.resume();
}
};
thread.start();
long start = System.currentTimeMillis();
long threshold = 120 * 1000;
while ((System.currentTimeMillis() - start) < threshold) {
GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
if (response.getApplicationList().size() >= 1) {
break;
}
Thread.sleep(5 * 1000);
}
sparkSubmitInterpreter.cancel(context);
waiter.await(10000);
} finally {
interpreterSettingManager.close();
}
}
}