blob: 3366f58a8b2cf50118e9cb371c3b2e3d3f9e2381 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.integration;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
import org.apache.zeppelin.interpreter.InterpreterProperty;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.utils.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Test against spark cluster.
*/
public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinSparkClusterTest.class);
public static final String SPARK_MASTER_PROPERTY_NAME = "spark.master";
//This is for only run setupSparkInterpreter one time for each spark version, otherwise
//each test method will run setupSparkInterpreter which will cost a long time and may cause a
//ci timeout.
//TODO(zjffdu) remove this after we upgrade it to junit 4.13 (ZEPPELIN-3341)
private static Set<String> verifiedSparkVersions = new HashSet<>();
private String sparkVersion;
private String sparkHome;
private String hadoopVersion;
private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
public void prepareSpark(String sparkVersion, String hadoopVersion) throws Exception {
this.sparkVersion = sparkVersion;
LOGGER.info("Testing SparkVersion: " + sparkVersion);
this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
this.hadoopVersion = hadoopVersion;
if (!verifiedSparkVersions.contains(sparkVersion)) {
verifiedSparkVersions.add(sparkVersion);
setupSparkInterpreter(sparkHome);
verifySparkVersionNumber();
}
}
private boolean isHadoopVersionMatch() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
String majorVersion = version.split("\\.")[0];
return majorVersion.equals(hadoopVersion.split("\\.")[0]);
}
public void setupSparkInterpreter(String sparkHome) throws InterpreterException {
InterpreterSetting sparkIntpSetting = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
.getInterpreterSettingByName("spark");
Map<String, InterpreterProperty> sparkProperties =
(Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
LOG.info("SPARK HOME detected " + sparkHome);
String masterEnv = System.getenv("SPARK_MASTER");
sparkProperties.put(SPARK_MASTER_PROPERTY_NAME,
new InterpreterProperty(SPARK_MASTER_PROPERTY_NAME, masterEnv == null ? "local[2]" : masterEnv));
sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome));
sparkProperties.put("spark.cores.max",
new InterpreterProperty("spark.cores.max", "2"));
sparkProperties.put("zeppelin.spark.useHiveContext",
new InterpreterProperty("zeppelin.spark.useHiveContext", "false"));
sparkProperties.put("zeppelin.pyspark.useIPython",
new InterpreterProperty("zeppelin.pyspark.useIPython", "false"));
sparkProperties.put("zeppelin.spark.useNew",
new InterpreterProperty("zeppelin.spark.useNew", "true"));
sparkProperties.put("spark.serializer",
new InterpreterProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
sparkProperties.put("zeppelin.spark.scala.color",
new InterpreterProperty("zeppelin.spark.scala.color", "false"));
sparkProperties.put("zeppelin.spark.deprecatedMsg.show",
new InterpreterProperty("zeppelin.spark.deprecatedMsg.show", "false"));
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(sparkIntpSetting.getId());
}
@BeforeAll
public static void setUp() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
"helium");
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
}
@AfterAll
public static void destroy() throws Exception {
AbstractTestRestApi.shutDown();
}
private void waitForFinish(Paragraph p) {
while (p.getStatus() != Status.FINISHED
&& p.getStatus() != Status.ERROR
&& p.getStatus() != Status.ABORT) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Exception in WebDriverManager while getWebDriver ", e);
}
}
}
private void waitForRunning(Paragraph p) {
while (p.getStatus() != Status.RUNNING) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Exception in WebDriverManager while getWebDriver ", e);
}
}
}
@Test
public void scalaOutputTest() throws IOException, InterruptedException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
// create new note
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark import java.util.Date\n" +
"import java.net.URL\n" +
"println(\"hello\")\n"
);
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("hello\n" +
"import java.util.Date\n" +
"import java.net.URL\n",
p.getReturn().message().get(0).getData());
// check spark weburl in zeppelin-server side
InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class);
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getByName("spark");
assertEquals(1, sparkInterpreterSetting.getAllInterpreterGroups().size());
assertNotNull(sparkInterpreterSetting.getAllInterpreterGroups().get(0).getWebUrl());
p.setText("%spark invalid_code");
note.run(p.getId(), true);
assertEquals(Status.ERROR, p.getStatus());
assertTrue(p.getReturn().message().get(0).getData().contains("error: "));
// test local properties
p.setText("%spark(p1=v1,p2=v2) print(z.getInterpreterContext().getLocalProperties().size())");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("2", p.getReturn().message().get(0).getData());
// test code completion
List<InterpreterCompletion> completions = note.completion(p.getId(), "sc.", 2, AuthenticationInfo.ANONYMOUS);
assertTrue(completions.size() > 0);
// test cancel
p.setText("%spark sc.range(1,10).map(e=>{Thread.sleep(1000); e}).collect()");
note.run(p.getId(), false);
waitForRunning(p);
p.abort();
waitForFinish(p);
assertEquals(Status.ABORT, p.getStatus());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void basicRDDTransformationAndActionTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55", p.getReturn().message().get(0).getData());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void sparkReadJSONTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
File tmpJsonFile = File.createTempFile("test", ".json");
FileWriter jsonFileWriter = new FileWriter(tmpJsonFile);
IOUtils.copy(new StringReader("{\"metadata\": { \"key\": 84896, \"value\": 54 }}\n"),
jsonFileWriter);
jsonFileWriter.close();
p.setText("%spark spark.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue(p.getReturn().message().get(0).getData().contains(
"org.apache.spark.sql.DataFrame = [metadata: struct<key: bigint, value: bigint>]"));
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void sparkReadCSVTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
File tmpCSVFile = File.createTempFile("test", ".csv");
FileWriter csvFileWriter = new FileWriter(tmpCSVFile);
IOUtils.copy(new StringReader("84896,54"), csvFileWriter);
csvFileWriter.close();
p.setText("%spark spark.read.csv(\"file://" + tmpCSVFile.getAbsolutePath() + "\")");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue(p.getReturn().message().get(0).getData().contains(
"org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]\n"));
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void sparkSQLTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
// test basic dataframe api
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
".toDF(\"name\", \"age\")\n" +
"df.collect()");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue(p.getReturn().message().get(0).getData().contains(
"Array[org.apache.spark.sql.Row] = Array([hello,20])"));
// test display DataFrame
p = note.addNewParagraph(anonymous);
p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
".toDF(\"name\", \"age\")\n" +
"df.createOrReplaceTempView(\"test_table\")\n" +
"z.show(df)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("name\tage\nhello\t20\n", p.getReturn().message().get(0).getData());
// run sql and save it into resource pool
p = note.addNewParagraph(anonymous);
p.setText("%spark.sql(saveAs=table_result) select * from test_table");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("name\tage\nhello\t20\n", p.getReturn().message().get(0).getData());
// get resource from spark
p = note.addNewParagraph(anonymous);
p.setText("%spark val df=z.getAsDataFrame(\"table_result\")\nz.show(df)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("name\tage\nhello\t20\n", p.getReturn().message().get(0).getData());
// get resource from pyspark
p = note.addNewParagraph(anonymous);
p.setText("%spark.pyspark df=z.getAsDataFrame('table_result')\nz.show(df)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("name\tage\nhello\t20\n", p.getReturn().message().get(0).getData());
// get resource from ipyspark
p = note.addNewParagraph(anonymous);
p.setText("%spark.ipyspark df=z.getAsDataFrame('table_result')\nz.show(df)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("name\tage\nhello\t20\n", p.getReturn().message().get(0).getData());
// get resource from sparkr
p = note.addNewParagraph(anonymous);
p.setText("%spark.r df=z.getAsDataFrame('table_result')\ndf");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TEXT, p.getReturn().message().get(0).getType());
assertTrue(p.getReturn().message().get(0).getData().contains("name age\n1 hello 20"),
p.getReturn().toString());
// test display DataSet
p = note.addNewParagraph(anonymous);
p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" +
"z.show(ds)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType());
assertEquals("_1\t_2\nhello\t20\n", p.getReturn().message().get(0).getData());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void sparkRTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" +
"df <- createDataFrame(localDF)\n" +
"count(df)"
);
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void pySparkTest() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
// create new note
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
// run markdown paragraph, again
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark.pyspark sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b)");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55\n", p.getReturn().message().get(0).getData());
// simple form via local properties
p = note.addNewParagraph(anonymous);
p.setText("%spark.pyspark(form=simple) print('name_' + '${name=abc}')");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("name_abc\n", p.getReturn().message().get(0).getData());
// test code completion
String code = "%spark.pyspark spark.";
List<InterpreterCompletion> completions = note.completion(p.getId(), code, code.length(), AuthenticationInfo.ANONYMOUS);
assertTrue(completions.size() > 0);
// run SparkSession test
p = note.addNewParagraph(anonymous);
p.setText("%pyspark from pyspark.sql import Row\n" +
"df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
"df.collect()");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(id=1, age=20)]\n", p.getReturn().message().get(0).getData());
// test udf
p = note.addNewParagraph(anonymous);
// use SQLContext to register UDF but use this UDF through SparkSession
p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
"spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) ||
"[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData()));
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void zRunTest() throws IOException, InterruptedException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
String note2Id = null;
try {
// create new note
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
note2Id = TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p0 = note.addNewParagraph(anonymous);
// z.run(paragraphIndex)
p0.setText("%spark z.run(1)");
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%spark val a=10");
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark print(a)");
note.run(p0.getId(), true);
assertEquals(Status.FINISHED, p0.getStatus());
// z.run is not blocking call. So p1 may not be finished when p0 is done.
waitForFinish(p1);
assertEquals(Status.FINISHED, p1.getStatus());
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("10", p2.getReturn().message().get(0).getData());
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark println(new java.util.Date())");
// run current Node, z.runNote(noteId)
p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
note.run(p0.getId());
waitForFinish(p0);
waitForFinish(p1);
waitForFinish(p2);
waitForFinish(p3);
assertEquals(Status.FINISHED, p3.getStatus());
String p3result = p3.getReturn().message().get(0).getData();
assertTrue(p3result.length() > 0);
// z.run(noteId, paragraphId)
p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId()));
p3.setText("%spark println(\"END\")");
note.run(p0.getId(), true);
// Sleep 1 second to ensure p3 start running
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail();
}
waitForFinish(p3);
assertEquals(Status.FINISHED, p3.getStatus());
assertEquals("END\n", p3.getReturn().message().get(0).getData());
// run paragraph in note2 via paragraph in note1
String noteId2 = TestUtils.getInstance(Notebook.class).createNote("note2", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId2,
note2 -> {
Paragraph p20 = note2.addNewParagraph(anonymous);
p20.setText("%spark val a = 1");
Paragraph p21 = note2.addNewParagraph(anonymous);
p21.setText("%spark print(a)");
// run p20 of note2 via paragraph in note1
p0.setText(String.format("%%spark.pyspark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId()));
note.run(p0.getId(), true);
waitForFinish(p20);
assertEquals(Status.FINISHED, p20.getStatus());
assertEquals(Status.READY, p21.getStatus());
p0.setText(String.format("%%spark z.runNote(\"%s\")", note2.getId()));
note.run(p0.getId(), true);
waitForFinish(p20);
waitForFinish(p21);
assertEquals(Status.FINISHED, p20.getStatus());
assertEquals(Status.FINISHED, p21.getStatus());
assertEquals("1", p21.getReturn().message().get(0).getData());
return null;
});
return noteId2;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
if (null != note2Id) {
TestUtils.getInstance(Notebook.class).removeNote(note2Id, anonymous);
}
}
}
@Test
public void testZeppelinContextResource() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%spark z.put(\"var_1\", \"hello world\")");
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark println(z.get(\"var_1\"))");
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark.pyspark print(z.get(\"var_1\"))");
Paragraph p4 = note.addNewParagraph(anonymous);
p4.setText("%spark.r z.get(\"var_1\")");
// resources across interpreter processes (via DistributedResourcePool)
Paragraph p5 = note.addNewParagraph(anonymous);
p5.setText("%python print(z.get('var_1'))");
note.run(p1.getId(), true);
note.run(p2.getId(), true);
note.run(p3.getId(), true);
note.run(p4.getId(), true);
note.run(p5.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("hello world\n", p2.getReturn().message().get(0).getData());
assertEquals(Status.FINISHED, p3.getStatus());
assertEquals("hello world\n", p3.getReturn().message().get(0).getData());
assertEquals(Status.FINISHED, p4.getStatus());
assertTrue(p4.getReturn().message().get(0).getData().contains("hello world"),
p4.getReturn().toString());
assertEquals(Status.FINISHED, p5.getStatus());
assertEquals("hello world\n", p5.getReturn().message().get(0).getData());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testZeppelinContextHook() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
String note2Id = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
note2Id = TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
// register global hook & note1 hook
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%python from __future__ import print_function\n" +
"z.registerHook('pre_exec', 'print(1)')\n" +
"z.registerHook('post_exec', 'print(2)')\n" +
"z.registerNoteHook('pre_exec', 'print(3)', '" + note.getId() + "')\n" +
"z.registerNoteHook('post_exec', 'print(4)', '" + note.getId() + "')\n");
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%python print(5)");
note.run(p1.getId(), true);
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("1\n3\n5\n4\n2\n", p2.getReturn().message().get(0).getData());
String note2Tmp = TestUtils.getInstance(Notebook.class).createNote("note2", anonymous);
TestUtils.getInstance(Notebook.class).processNote(note2Tmp,
note2 -> {
Paragraph p3 = note2.addNewParagraph(anonymous);
p3.setText("%python print(6)");
note2.run(p3.getId(), true);
assertEquals("1\n6\n2\n", p3.getReturn().message().get(0).getData());
return null;
});
return note2Tmp;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
if (null != note2Id) {
TestUtils.getInstance(Notebook.class).removeNote(note2Id, anonymous);
}
}
}
private void verifySparkVersionNumber() throws IOException {
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark print(sc.version)");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(sparkVersion, p.getReturn().message().get(0).getData());
p.setText("%spark.pyspark sc.version");
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue(p.getReturn().message().get(0).getData().contains(sparkVersion),
p.getReturn().toString());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testSparkZeppelinContextDynamicForms() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
String code = "%spark println(z.textbox(\"my_input\", \"default_name\"))\n" +
"println(z.password(\"my_pwd\"))\n" +
"println(z.select(\"my_select\", \"1\"," +
"Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" +
"val items=z.checkbox(\"my_checkbox\", " +
"Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")), Seq(\"2\"))\n" +
"println(items(0))";
p.setText(code);
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
Iterator<String> formIter = p.settings.getForms().keySet().iterator();
assertEquals("my_input", formIter.next());
assertEquals("my_pwd", formIter.next());
assertEquals("my_select", formIter.next());
assertEquals("my_checkbox", formIter.next());
// check dynamic forms values
String[] result = p.getReturn().message().get(0).getData().split("\n");
assertEquals(5, result.length);
assertEquals("default_name", result[0]);
assertEquals("null", result[1]);
assertEquals("1", result[2]);
assertEquals("2", result[3]);
assertEquals("items: Seq[Any] = Buffer(2)", result[4]);
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testPySparkZeppelinContextDynamicForms() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
String code = "%spark.pyspark print(z.input('my_input', 'default_name'))\n" +
"print(z.password('my_pwd'))\n" +
"print(z.select('my_select', " +
"[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" +
"items=z.checkbox('my_checkbox', " +
"[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
"print(items[0])";
p.setText(code);
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
Iterator<String> formIter = p.settings.getForms().keySet().iterator();
assertEquals("my_input", formIter.next());
assertEquals("my_pwd", formIter.next());
assertEquals("my_select", formIter.next());
assertEquals("my_checkbox", formIter.next());
// check dynamic forms values
String[] result = p.getReturn().message().get(0).getData().split("\n");
assertEquals(4, result.length);
assertEquals("default_name", result[0]);
assertEquals("None", result[1]);
assertEquals("1", result[2]);
assertEquals("2", result[3]);
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testAngularObjects() throws IOException, InterpreterNotFoundException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p1 = note.addNewParagraph(anonymous);
// add local angular object
p1.setText("%spark z.angularBind(\"name\", \"world\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
// angular object is saved to InterpreterGroup's AngularObjectRegistry
List<AngularObject> angularObjects;
try {
angularObjects = p1.getBindedInterpreter().getInterpreterGroup()
.getAngularObjectRegistry().getAll(note.getId(), null);
assertEquals(1, angularObjects.size());
assertEquals("name", angularObjects.get(0).getName());
assertEquals("world", angularObjects.get(0).get());
} catch (InterpreterNotFoundException e) {
fail();
}
// angular object is saved to note as well.
try {
angularObjects = note.getAngularObjects(p1.getBindedInterpreter().getInterpreterGroup().getId());
assertEquals(1, angularObjects.size());
assertEquals("name", angularObjects.get(0).getName());
assertEquals("world", angularObjects.get(0).get());
} catch (InterpreterNotFoundException e) {
fail();
}
// remove local angular object
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark z.angularUnbind(\"name\")");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
try {
angularObjects = p1.getBindedInterpreter().getInterpreterGroup().getAngularObjectRegistry()
.getAll(note.getId(), null);
assertEquals(0, angularObjects.size());
} catch (InterpreterNotFoundException e) {
fail();
}
try {
angularObjects = note.getAngularObjects(p1.getBindedInterpreter().getInterpreterGroup().getId());
assertEquals(0, angularObjects.size());
} catch (InterpreterNotFoundException e) {
fail();
}
// add global angular object
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark z.angularBindGlobal(\"name2\", \"world2\")");
note.run(p3.getId(), true);
assertEquals(Status.FINISHED, p3.getStatus());
List<AngularObject> globalAngularObjects;
try {
globalAngularObjects = p3.getBindedInterpreter().getInterpreterGroup()
.getAngularObjectRegistry().getAll(null, null);
assertEquals(1, globalAngularObjects.size());
assertEquals("name2", globalAngularObjects.get(0).getName());
assertEquals("world2", globalAngularObjects.get(0).get());
} catch (InterpreterNotFoundException e) {
fail();
}
// global angular object is not saved to note
try {
angularObjects = note.getAngularObjects(p1.getBindedInterpreter().getInterpreterGroup().getId());
assertEquals(0, angularObjects.size());
} catch (InterpreterNotFoundException e) {
fail();
}
// remove global angular object
Paragraph p4 = note.addNewParagraph(anonymous);
p4.setText("%spark z.angularUnbindGlobal(\"name2\")");
note.run(p4.getId(), true);
assertEquals(Status.FINISHED, p4.getStatus());
try {
globalAngularObjects = p4.getBindedInterpreter().getInterpreterGroup()
.getAngularObjectRegistry().getAll(note.getId(), null);
assertEquals(0, globalAngularObjects.size());
} catch (InterpreterNotFoundException e) {
fail();
}
// global angular object is not saved to note
try {
angularObjects = note.getAngularObjects(p1.getBindedInterpreter().getInterpreterGroup().getId());
assertEquals(0, angularObjects.size());
} catch (InterpreterNotFoundException e) {
fail();
}
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testScalaNoteDynamicForms() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p1 = note.addNewParagraph(anonymous);
// create TextBox
p1.setText("%spark z.noteTextbox(\"name\", \"world\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
Input input = p1.getNote().getNoteForms().get("name");
assertTrue(input instanceof TextBox);
TextBox inputTextBox = (TextBox) input;
assertEquals("name", inputTextBox.getDisplayName());
assertEquals("world", inputTextBox.getDefaultValue());
assertEquals("world", p1.getNote().getNoteParams().get("name"));
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${name}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello world"), p2.getReturn().toString());
// create Select
p1.setText("%spark z.noteSelect(\"language\", Seq((\"java\" -> \"JAVA\"), (\"scala\" -> \"SCALA\")), \"java\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
input = p1.getNote().getNoteForms().get("language");
assertTrue(input instanceof Select);
Select select = (Select) input;
assertEquals("language", select.getDisplayName());
assertEquals("java", select.getDefaultValue());
assertEquals("java", p1.getNote().getNoteParams().get("language"));
p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${language}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello java"), p2.getReturn().toString());
// create Checkbox
p1.setText("%spark z.noteCheckbox(\"languages\", Seq((\"java\" -> \"JAVA\"), (\"scala\" -> \"SCALA\")), Seq(\"java\", \"scala\"))");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
input = p1.getNote().getNoteForms().get("languages");
assertTrue(input instanceof CheckBox);
CheckBox checkbox = (CheckBox) input;
assertEquals("languages", checkbox.getDisplayName());
assertArrayEquals(new Object[]{"java", "scala"}, checkbox.getDefaultValue());
assertEquals(Arrays.asList("java", "scala"), p1.getNote().getNoteParams().get("languages"));
p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${checkbox:languages}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello java,scala"), p2.getReturn().toString());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testPythonNoteDynamicForms() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p1 = note.addNewParagraph(anonymous);
// create TextBox
p1.setText("%spark.pyspark z.noteTextbox(\"name\", \"world\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
Input input = p1.getNote().getNoteForms().get("name");
assertTrue(input instanceof TextBox);
TextBox inputTextBox = (TextBox) input;
assertEquals("name", inputTextBox.getDisplayName());
assertEquals("world", inputTextBox.getDefaultValue());
assertEquals("world", p1.getNote().getNoteParams().get("name"));
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${name}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello world"), p2.getReturn().toString());
// create Select
p1.setText("%spark.pyspark z.noteSelect('language', [('java', 'JAVA'), ('scala', 'SCALA')], 'java')");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
input = p1.getNote().getNoteForms().get("language");
assertTrue(input instanceof Select);
Select select = (Select) input;
assertEquals("language", select.getDisplayName());
assertEquals("java", select.getDefaultValue());
assertEquals("java", p1.getNote().getNoteParams().get("language"));
p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${language}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello java"), p2.getReturn().toString());
// create Checkbox
p1.setText("%spark.pyspark z.noteCheckbox('languages', [('java', 'JAVA'), ('scala', 'SCALA')], ['java', 'scala'])");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
input = p1.getNote().getNoteForms().get("languages");
assertTrue(input instanceof CheckBox);
CheckBox checkbox = (CheckBox) input;
assertEquals("languages", checkbox.getDisplayName());
assertArrayEquals(new Object[]{"java", "scala"}, checkbox.getDefaultValue());
assertEquals(Arrays.asList("java", "scala"), p1.getNote().getNoteParams().get("languages"));
p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${checkbox:languages}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello java,scala"), p2.getReturn().toString());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testRNoteDynamicForms() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p1 = note.addNewParagraph(anonymous);
// create TextBox
p1.setText("%spark.r z.noteTextbox(\"name\", \"world\")");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
Input input = p1.getNote().getNoteForms().get("name");
assertTrue(input instanceof TextBox);
TextBox inputTextBox = (TextBox) input;
assertEquals("name", inputTextBox.getDisplayName());
assertEquals("world", inputTextBox.getDefaultValue());
assertEquals("world", p1.getNote().getNoteParams().get("name"));
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%md hello $${name}");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("hello world"), p2.getReturn().toString());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testConfInterpreter() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().close();
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%spark\nimport com.databricks.spark.csv._");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());
// test pyspark imports path
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark.pyspark\nimport sys\nsys.path");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("databricks_spark"));
Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark.ipyspark\nimport sys\nsys.path");
note.run(p3.getId(), true);
assertEquals(Status.FINISHED, p3.getStatus());
assertTrue(p3.getReturn().toString().contains("databricks_spark"));
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
}
}
@Test
public void testFailtoLaunchSpark() throws IOException {
assumeTrue(isHadoopVersionMatch(), "Hadoop version mismatch, skip test");
String noteId = null;
try {
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().close();
noteId = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
TestUtils.getInstance(Notebook.class).processNote(noteId,
note -> {
Paragraph p = note.addNewParagraph(anonymous);
p.setText("%spark.conf SPARK_HOME invalid_spark_home");
note.run(p.getId(), true);
assertEquals(Status.FINISHED, p.getStatus());
Paragraph p1 = note.addNewParagraph(anonymous);
p1.setText("%spark\nsc.version");
note.run(p1.getId(), true);
assertEquals(Status.ERROR, p1.getStatus());
assertTrue(p1.getReturn().message().get(0).getData().contains("No such file or directory"),
"Actual error message: " + p1.getReturn().message().get(0).getData());
// run it again, and get the same error
note.run(p1.getId(), true);
assertEquals(Status.ERROR, p1.getStatus());
assertTrue(p1.getReturn().message().get(0).getData().contains("No such file or directory"),
"Actual error message: " + p1.getReturn().message().get(0).getData());
return null;
});
} finally {
if (null != noteId) {
TestUtils.getInstance(Notebook.class).removeNote(noteId, anonymous);
}
// reset SPARK_HOME, otherwise it will cause the following test fail
InterpreterSetting sparkIntpSetting = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
.getInterpreterSettingByName("spark");
Map<String, InterpreterProperty> sparkProperties =
(Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome));
sparkIntpSetting.close();
}
}
}