[ZEPPELIN-4543]. Support Shiny in Spark Interpreter
### What is this PR for?
We support shiny in R interpreter in ZEPPELIN-4525, this ticket is to extend it in SparkInterpreter where R is also supported. Writing shiny app in SparkInterpreter is almost the same as do it in R interpreter, but also with additional support of Spark. Main thing is in `SparkShinyInterpreter` which extends `ShinyInterpreter`
### What type of PR is it?
[ Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4543
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zjffdu@apache.org>
Closes #3593 from zjffdu/ZEPPELIN-4543 and squashes the following commits:
4b1d045c7 [Jeff Zhang] [ZEPPELIN-4543]. Support Shiny in Spark Interpreter
diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
index b2dc5f3..3d0f24b 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
@@ -65,6 +65,7 @@
this.z = new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), 1000);
}
+
@Override
public void close() throws InterpreterException {
for (Map.Entry<String,IRInterpreter> entry : shinyIRInterpreters.entrySet()) {
@@ -133,7 +134,7 @@
synchronized (shinyIRInterpreters) {
irInterpreter = shinyIRInterpreters.get(shinyApp);
if (irInterpreter == null) {
- irInterpreter = new IRInterpreter(properties);
+ irInterpreter = createIRInterpreter();
irInterpreter.setInterpreterGroup(getInterpreterGroup());
irInterpreter.open();
shinyIRInterpreters.put(shinyApp, irInterpreter);
@@ -142,4 +143,12 @@
return irInterpreter;
}
+ /**
+ * Subclass can overwrite this. e.g. SparkShinyInterpreter.
+ * @return
+ */
+ protected IRInterpreter createIRInterpreter() {
+ return new IRInterpreter(properties);
+ }
+
}
diff --git a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
index def6436..2819ca5 100644
--- a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
+++ b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
@@ -47,7 +47,7 @@
public class ShinyInterpreterTest {
- private ShinyInterpreter interpreter;
+ protected ShinyInterpreter interpreter;
@Before
public void setUp() throws InterpreterException {
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 86a5f75..dfebec8 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -329,6 +329,14 @@
<version>0.4.4</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.mashape.unirest</groupId>
+ <artifactId>unirest-java</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
index 004ce98..ee16c72 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
@@ -58,8 +58,20 @@
return this.sparkVersion.isSecretSocketSupported();
}
+ /**
+ * We can inject SparkInterpreter in the case that SparkIRInterpreter is used by
+ * SparkShinyInterpreter in which case it is not in the same InterpreterGroup of
+ * SparkInterpreter.
+ * @param sparkInterpreter
+ */
+ public void setSparkInterpreter(SparkInterpreter sparkInterpreter) {
+ this.sparkInterpreter = sparkInterpreter;
+ }
+
public void open() throws InterpreterException {
- this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+ if (sparkInterpreter == null) {
+ this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+ }
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
this.sparkVersion = new SparkVersion(sc.version());
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java
new file mode 100644
index 0000000..c5dc142
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.r.IRInterpreter;
+import org.apache.zeppelin.r.ShinyInterpreter;
+
+import java.util.Properties;
+
+/**
+ * The same function as ShinyInterpreter, but support Spark as well.
+ */
+public class SparkShinyInterpreter extends ShinyInterpreter {
+ public SparkShinyInterpreter(Properties properties) {
+ super(properties);
+ }
+
+ protected IRInterpreter createIRInterpreter() {
+ SparkIRInterpreter interpreter = new SparkIRInterpreter(properties);
+ try {
+ interpreter.setSparkInterpreter(getInterpreterInTheSameSessionByClassName(SparkInterpreter.class));
+ return interpreter;
+ } catch (InterpreterException e) {
+ throw new RuntimeException("Fail to set spark interpreter for SparkIRInterpreter", e);
+ }
+ }
+}
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 5fbccaf..100c97c 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -238,7 +238,8 @@
"editor": {
"language": "python",
"editOnDblClick": false,
- "completionSupport": true
+ "completionSupport": true,
+ "completionKey": "TAB"
}
},
{
@@ -278,7 +279,8 @@
"editor": {
"language": "r",
"editOnDblClick": false,
- "completionSupport": false
+ "completionSupport": false,
+ "completionKey": "TAB"
}
},
{
@@ -290,7 +292,21 @@
"editor": {
"language": "r",
"editOnDblClick": false,
- "completionSupport": true
+ "completionSupport": true,
+ "completionKey": "TAB"
+ }
+ },
+ {
+ "group": "spark",
+ "name": "shiny",
+ "className": "org.apache.zeppelin.spark.SparkShinyInterpreter",
+ "properties": {
+ },
+ "editor": {
+ "language": "r",
+ "editOnDblClick": false,
+ "completionSupport": true,
+ "completionKey": "TAB"
}
},
{
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
new file mode 100644
index 0000000..eb0e56c
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.r.ShinyInterpreterTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SparkShinyInterpreterTest extends ShinyInterpreterTest {
+
+ private SparkInterpreter sparkInterpreter;
+
+ @Before
+ public void setUp() throws InterpreterException {
+ Properties properties = new Properties();
+ properties.setProperty("master", "local[*]");
+ properties.setProperty("spark.app.name", "test");
+
+ InterpreterContext context = getInterpreterContext();
+ InterpreterContext.set(context);
+ interpreter = new SparkShinyInterpreter(properties);
+
+ InterpreterGroup interpreterGroup = new InterpreterGroup();
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), "session_1");
+ interpreter.setInterpreterGroup(interpreterGroup);
+
+ sparkInterpreter = new SparkInterpreter(properties);
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1");
+ sparkInterpreter.setInterpreterGroup(interpreterGroup);
+
+ interpreter.open();
+ }
+
+ @After
+ public void tearDown() throws InterpreterException {
+ if (interpreter != null) {
+ interpreter.close();
+ }
+ }
+
+ @Test
+ public void testSparkShinyApp() throws IOException, InterpreterException, InterruptedException, UnirestException {
+ /****************** Launch Shiny app with default app name *****************************/
+ InterpreterContext context = getInterpreterContext();
+ context.getLocalProperties().put("type", "ui");
+ InterpreterResult result =
+ interpreter.interpret(IOUtils.toString(getClass().getResource("/spark_ui.R")), context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = getInterpreterContext();
+ context.getLocalProperties().put("type", "server");
+ result = interpreter.interpret(IOUtils.toString(getClass().getResource("/spark_server.R")), context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ final InterpreterContext context2 = getInterpreterContext();
+ context2.getLocalProperties().put("type", "run");
+ Thread thread = new Thread(() -> {
+ try {
+ interpreter.interpret("", context2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ // wait for the shiny app start
+ Thread.sleep(5 * 1000);
+ // extract shiny url
+ List<InterpreterResultMessage> resultMessages = context2.out.toInterpreterResultMessage();
+ assertEquals(1, resultMessages.size());
+ assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+ String resultMessageData = resultMessages.get(0).getData();
+ assertTrue(resultMessageData, resultMessageData.contains("<iframe"));
+ Pattern urlPattern = Pattern.compile(".*src=\"(http\\S*)\".*", Pattern.DOTALL);
+ Matcher matcher = urlPattern.matcher(resultMessageData);
+ if (!matcher.matches()) {
+ fail("Unable to extract url: " + resultMessageData);
+ }
+ String shinyURL = matcher.group(1);
+
+ // verify shiny app via calling its rest api
+ HttpResponse<String> response = Unirest.get(shinyURL).asString();
+ if (sparkInterpreter.getSparkVersion().isSpark2()) {
+ assertEquals(200, response.getStatus());
+ assertTrue(response.getBody(), response.getBody().contains("Spark Version"));
+ } else {
+ // spark 1.x will fail due to sparkR.version is not available for spark 1.x
+ assertEquals(500, response.getStatus());
+ assertTrue(response.getBody(),
+ response.getBody().contains("could not find function \"sparkR.version\""));
+ }
+ }
+}
diff --git a/spark/interpreter/src/test/resources/spark_server.R b/spark/interpreter/src/test/resources/spark_server.R
new file mode 100644
index 0000000..071631d
--- /dev/null
+++ b/spark/interpreter/src/test/resources/spark_server.R
@@ -0,0 +1,23 @@
+# Define server logic to summarize and view selected dataset ----
+server <- function(input, output) {
+
+ # Return the requested dataset ----
+ datasetInput <- reactive({
+ switch(input$dataset,
+ "rock" = as.DataFrame(rock),
+ "pressure" = as.DataFrame(pressure),
+ "cars" = as.DataFrame(cars))
+ })
+
+ # Generate a summary of the dataset ----
+ output$summary <- renderPrint({
+ dataset <- datasetInput()
+ showDF(summary(dataset))
+ })
+
+ # Show the first "n" observations ----
+ output$view <- renderTable({
+ head(datasetInput(), n = input$obs)
+ })
+
+}
\ No newline at end of file
diff --git a/spark/interpreter/src/test/resources/spark_ui.R b/spark/interpreter/src/test/resources/spark_ui.R
new file mode 100644
index 0000000..a81ad0c
--- /dev/null
+++ b/spark/interpreter/src/test/resources/spark_ui.R
@@ -0,0 +1,35 @@
+# Define UI for dataset viewer app ----
+ui <- fluidPage(
+
+# App title ----
+titlePanel(paste("Spark Version", sparkR.version(), sep=":")),
+
+# Sidebar layout with a input and output definitions ----
+sidebarLayout(
+
+# Sidebar panel for inputs ----
+sidebarPanel(
+
+# Input: Selector for choosing dataset ----
+selectInput(inputId = "dataset",
+label = "Choose a dataset:",
+choices = c("rock", "pressure", "cars")),
+
+# Input: Numeric entry for number of obs to view ----
+numericInput(inputId = "obs",
+label = "Number of observations to view:",
+value = 10)
+),
+
+# Main panel for displaying outputs ----
+mainPanel(
+
+# Output: Verbatim text for data summary ----
+verbatimTextOutput("summary"),
+
+# Output: HTML table with requested number of observations ----
+tableOutput("view")
+
+)
+)
+)
\ No newline at end of file
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
index deb4afa..1f4e200 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
@@ -104,8 +104,10 @@
* @throws IOException
*/
private boolean checkForShinyApp(String response) throws IOException {
- if (context.getInterpreterClassName() != null &&
- context.getInterpreterClassName().equals("org.apache.zeppelin.r.ShinyInterpreter")) {
+ String intpClassName = context.getInterpreterClassName();
+ if (intpClassName != null &&
+ (intpClassName.equals("org.apache.zeppelin.r.ShinyInterpreter") ||
+ intpClassName.equals("org.apache.zeppelin.spark.SparkShinyInterpreter"))) {
Matcher matcher = ShinyListeningPattern.matcher(response);
if (matcher.matches()) {
String url = matcher.group(1);