[ZEPPELIN-4543]. Support Shiny in Spark Interpreter

### What is this PR for?
We support shiny in R interpreter in ZEPPELIN-4525, this ticket is to extend it in SparkInterpreter where R is also supported. Writing shiny app in SparkInterpreter is almost the same as do it in R interpreter, but also with additional support of Spark. Main thing is in `SparkShinyInterpreter` which extends `ShinyInterpreter`

### What type of PR is it?
[ Feature ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4543

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3593 from zjffdu/ZEPPELIN-4543 and squashes the following commits:

4b1d045c7 [Jeff Zhang] [ZEPPELIN-4543]. Support Shiny in Spark Interpreter
diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
index b2dc5f3..3d0f24b 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
@@ -65,6 +65,7 @@
     this.z = new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), 1000);
   }
 
+
   @Override
   public void close() throws InterpreterException {
     for (Map.Entry<String,IRInterpreter> entry : shinyIRInterpreters.entrySet()) {
@@ -133,7 +134,7 @@
     synchronized (shinyIRInterpreters) {
       irInterpreter = shinyIRInterpreters.get(shinyApp);
       if (irInterpreter == null) {
-        irInterpreter = new IRInterpreter(properties);
+        irInterpreter = createIRInterpreter();
         irInterpreter.setInterpreterGroup(getInterpreterGroup());
         irInterpreter.open();
         shinyIRInterpreters.put(shinyApp, irInterpreter);
@@ -142,4 +143,12 @@
     return irInterpreter;
   }
 
+  /**
+   * Subclass can overwrite this. e.g. SparkShinyInterpreter.
+   * @return
+   */
+  protected IRInterpreter createIRInterpreter() {
+    return new IRInterpreter(properties);
+  }
+
 }
diff --git a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
index def6436..2819ca5 100644
--- a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
+++ b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java
@@ -47,7 +47,7 @@
 
 public class ShinyInterpreterTest {
 
-  private ShinyInterpreter interpreter;
+  protected ShinyInterpreter interpreter;
 
   @Before
   public void setUp() throws InterpreterException {
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 86a5f75..dfebec8 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -329,6 +329,14 @@
       <version>0.4.4</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>com.mashape.unirest</groupId>
+      <artifactId>unirest-java</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
index 004ce98..ee16c72 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java
@@ -58,8 +58,20 @@
     return this.sparkVersion.isSecretSocketSupported();
   }
 
+  /**
+   * We can inject SparkInterpreter in the case that SparkIRInterpreter is used by
+   * SparkShinyInterpreter in which case it is not in the same InterpreterGroup of
+   * SparkInterpreter.
+   * @param sparkInterpreter
+   */
+  public void setSparkInterpreter(SparkInterpreter sparkInterpreter) {
+    this.sparkInterpreter = sparkInterpreter;
+  }
+
   public void open() throws InterpreterException {
-    this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+    if (sparkInterpreter == null) {
+      this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+    }
     this.sc = sparkInterpreter.getSparkContext();
     this.jsc = sparkInterpreter.getJavaSparkContext();
     this.sparkVersion = new SparkVersion(sc.version());
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java
new file mode 100644
index 0000000..c5dc142
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkShinyInterpreter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.r.IRInterpreter;
+import org.apache.zeppelin.r.ShinyInterpreter;
+
+import java.util.Properties;
+
+/**
+ * The same function as ShinyInterpreter, but support Spark as well.
+ */
+public class SparkShinyInterpreter extends ShinyInterpreter {
+  public SparkShinyInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  protected IRInterpreter createIRInterpreter() {
+    SparkIRInterpreter interpreter = new SparkIRInterpreter(properties);
+    try {
+      interpreter.setSparkInterpreter(getInterpreterInTheSameSessionByClassName(SparkInterpreter.class));
+      return interpreter;
+    } catch (InterpreterException e) {
+      throw new RuntimeException("Fail to set spark interpreter for SparkIRInterpreter", e);
+    }
+  }
+}
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 5fbccaf..100c97c 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -238,7 +238,8 @@
     "editor": {
       "language": "python",
       "editOnDblClick": false,
-      "completionSupport": true
+      "completionSupport": true,
+      "completionKey": "TAB"
     }
   },
   {
@@ -278,7 +279,8 @@
     "editor": {
       "language": "r",
       "editOnDblClick": false,
-      "completionSupport": false
+      "completionSupport": false,
+      "completionKey": "TAB"
     }
   },
   {
@@ -290,7 +292,21 @@
     "editor": {
       "language": "r",
       "editOnDblClick": false,
-      "completionSupport": true
+      "completionSupport": true,
+      "completionKey": "TAB"
+    }
+  },
+  {
+    "group": "spark",
+    "name": "shiny",
+    "className": "org.apache.zeppelin.spark.SparkShinyInterpreter",
+    "properties": {
+    },
+    "editor": {
+      "language": "r",
+      "editOnDblClick": false,
+      "completionSupport": true,
+      "completionKey": "TAB"
     }
   },
   {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
new file mode 100644
index 0000000..eb0e56c
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.r.ShinyInterpreterTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SparkShinyInterpreterTest extends ShinyInterpreterTest {
+
+  private SparkInterpreter sparkInterpreter;
+
+  @Before
+  public void setUp() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("master", "local[*]");
+    properties.setProperty("spark.app.name", "test");
+
+    InterpreterContext context = getInterpreterContext();
+    InterpreterContext.set(context);
+    interpreter = new SparkShinyInterpreter(properties);
+
+    InterpreterGroup interpreterGroup = new InterpreterGroup();
+    interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), "session_1");
+    interpreter.setInterpreterGroup(interpreterGroup);
+
+    sparkInterpreter = new SparkInterpreter(properties);
+    interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1");
+    sparkInterpreter.setInterpreterGroup(interpreterGroup);
+
+    interpreter.open();
+  }
+
+  @After
+  public void tearDown() throws InterpreterException {
+    if (interpreter != null) {
+      interpreter.close();
+    }
+  }
+  
+  @Test
+  public void testSparkShinyApp() throws IOException, InterpreterException, InterruptedException, UnirestException {
+    /****************** Launch Shiny app with default app name *****************************/
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "ui");
+    InterpreterResult result =
+            interpreter.interpret(IOUtils.toString(getClass().getResource("/spark_ui.R")), context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = getInterpreterContext();
+    context.getLocalProperties().put("type", "server");
+    result = interpreter.interpret(IOUtils.toString(getClass().getResource("/spark_server.R")), context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    final InterpreterContext context2 = getInterpreterContext();
+    context2.getLocalProperties().put("type", "run");
+    Thread thread = new Thread(() -> {
+      try {
+        interpreter.interpret("", context2);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    });
+    thread.start();
+    // wait for the shiny app start
+    Thread.sleep(5 * 1000);
+    // extract shiny url
+    List<InterpreterResultMessage> resultMessages = context2.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+    String resultMessageData = resultMessages.get(0).getData();
+    assertTrue(resultMessageData, resultMessageData.contains("<iframe"));
+    Pattern urlPattern = Pattern.compile(".*src=\"(http\\S*)\".*", Pattern.DOTALL);
+    Matcher matcher = urlPattern.matcher(resultMessageData);
+    if (!matcher.matches()) {
+      fail("Unable to extract url: " + resultMessageData);
+    }
+    String shinyURL = matcher.group(1);
+
+    // verify shiny app via calling its rest api
+    HttpResponse<String> response = Unirest.get(shinyURL).asString();
+    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+      assertEquals(200, response.getStatus());
+      assertTrue(response.getBody(), response.getBody().contains("Spark Version"));
+    } else {
+      // spark 1.x will fail due to sparkR.version is not available for spark 1.x
+      assertEquals(500, response.getStatus());
+      assertTrue(response.getBody(),
+              response.getBody().contains("could not find function \"sparkR.version\""));
+    }
+  }
+}
diff --git a/spark/interpreter/src/test/resources/spark_server.R b/spark/interpreter/src/test/resources/spark_server.R
new file mode 100644
index 0000000..071631d
--- /dev/null
+++ b/spark/interpreter/src/test/resources/spark_server.R
@@ -0,0 +1,23 @@
+# Define server logic to summarize and view selected dataset ----
+server <- function(input, output) {
+
+    # Return the requested dataset ----
+    datasetInput <- reactive({
+        switch(input$dataset,
+        "rock" = as.DataFrame(rock),
+        "pressure" = as.DataFrame(pressure),
+        "cars" = as.DataFrame(cars))
+    })
+
+    # Generate a summary of the dataset ----
+    output$summary <- renderPrint({
+        dataset <- datasetInput()
+        showDF(summary(dataset))
+    })
+
+    # Show the first "n" observations ----
+    output$view <- renderTable({
+        head(datasetInput(), n = input$obs)
+    })
+
+}
\ No newline at end of file
diff --git a/spark/interpreter/src/test/resources/spark_ui.R b/spark/interpreter/src/test/resources/spark_ui.R
new file mode 100644
index 0000000..a81ad0c
--- /dev/null
+++ b/spark/interpreter/src/test/resources/spark_ui.R
@@ -0,0 +1,35 @@
+# Define UI for dataset viewer app ----
+ui <- fluidPage(
+
+# App title ----
+titlePanel(paste("Spark Version", sparkR.version(), sep=":")),
+
+# Sidebar layout with a input and output definitions ----
+sidebarLayout(
+
+# Sidebar panel for inputs ----
+sidebarPanel(
+
+# Input: Selector for choosing dataset ----
+selectInput(inputId = "dataset",
+label = "Choose a dataset:",
+choices = c("rock", "pressure", "cars")),
+
+# Input: Numeric entry for number of obs to view ----
+numericInput(inputId = "obs",
+label = "Number of observations to view:",
+value = 10)
+),
+
+# Main panel for displaying outputs ----
+mainPanel(
+
+# Output: Verbatim text for data summary ----
+verbatimTextOutput("summary"),
+
+# Output: HTML table with requested number of observations ----
+tableOutput("view")
+
+)
+)
+)
\ No newline at end of file
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
index deb4afa..1f4e200 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
@@ -104,8 +104,10 @@
    * @throws IOException
    */
   private boolean checkForShinyApp(String response) throws IOException {
-    if (context.getInterpreterClassName() != null &&
-            context.getInterpreterClassName().equals("org.apache.zeppelin.r.ShinyInterpreter")) {
+    String intpClassName = context.getInterpreterClassName();
+    if (intpClassName != null &&
+            (intpClassName.equals("org.apache.zeppelin.r.ShinyInterpreter") ||
+                    intpClassName.equals("org.apache.zeppelin.spark.SparkShinyInterpreter"))) {
       Matcher matcher = ShinyListeningPattern.matcher(response);
       if (matcher.matches()) {
         String url = matcher.group(1);