ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter

### What is this PR for?
Allow user to specify the pool when running spark sql in concurrent approach.
e.g.
```
%spark.sql(pool=pool_1)

sql statement
```

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3563

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjffdu@apache.org>

Closes #3044 from zjffdu/ZEPPELIN-3563 and squashes the following commits:

255f5baf9 [Jeff Zhang] ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 6775fbf..34f5bb6 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -121,6 +121,11 @@
     <td>Execute multiple SQL concurrently if set true.</td>
   </tr>
   <tr>
+    <td>zeppelin.spark.concurrentSQL.max</td>
+    <td>10</td>
+    <td>Max number of SQL concurrently executed</td>
+  </tr>
+  <tr>
     <td>zeppelin.spark.maxResult</td>
     <td>1000</td>
     <td>Max number of Spark SQL result to display.</td>
@@ -332,6 +337,21 @@
 
 <img class="img-responsive" src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif" />
 
+## Running spark sql concurrently
+By default, each sql statement would run sequentially in `%spark.sql`. But you can run them concurrently by following setup.
+
+1. set `zeppelin.spark.concurrentSQL` to true to enable the sql concurrent feature, underneath zeppelin will change to use fairscheduler for spark. And also set `zeppelin.spark.concurrentSQL.max` to control the max number of sql statements running concurrently.
+2. configure pools by creating `fairscheduler.xml` under your `SPARK_CONF_DIR`, check the offical spark doc [Configuring Pool Properties](http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties)
+3. set pool property via setting paragraph property. e.g.
+
+```
+%spark(pool=pool1)
+
+sql statement
+```
+
+This feature is available for both all versions of scala spark, pyspark. For sparkr, it is only available starting from 2.3.0.
+ 
 ## Interpreter setting option
 
 You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter.
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index b6eb014..1cc88b8 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -118,12 +118,21 @@
   public InterpreterResult interpret(String st, InterpreterContext context) {
     InterpreterContext.set(context);
     String jobGroupId = Utils.buildJobGroupId(context);
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(context);
     String setJobGroupStmt = "sc.setJobGroup('" +  jobGroupId + "', '" + jobDesc + "')";
     InterpreterResult result = super.interpret(setJobGroupStmt, context);
     if (result.code().equals(InterpreterResult.Code.ERROR)) {
       return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
     }
+    String pool = "None";
+    if (context.getLocalProperties().containsKey("pool")) {
+      pool = "'" + context.getLocalProperties().get("pool") + "'";
+    }
+    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+    result = super.interpret(setPoolStmt, context);
+    if (result.code().equals(InterpreterResult.Code.ERROR)) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
+    }
     return super.interpret(st, context);
   }
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
index 1bde23f..17a257c 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
@@ -86,9 +86,15 @@
         if (!StringUtils.isBlank(entry.getValue().toString())) {
           conf.set(entry.getKey().toString(), entry.getValue().toString());
         }
+        // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are legacy zeppelin
+        // properties, convert them to spark properties here.
         if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) {
           conf.set("spark.useHiveContext", entry.getValue().toString());
         }
+        if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
+            && entry.getValue().toString().equals("true")) {
+          conf.set("spark.scheduler.mode", "FAIR");
+        }
       }
       // use local mode for embedded spark mode when spark.master is not found
       conf.setIfMissing("spark.master", "local");
@@ -141,8 +147,11 @@
     z.setGui(context.getGui());
     z.setNoteGui(context.getNoteGui());
     z.setInterpreterContext(context);
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+    sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
+    // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
+    // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
+    sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
+
     return innerInterpreter.interpret(st, context);
   }
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index 4c2ec7c..b9a7868 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -1041,8 +1041,7 @@
     synchronized (this) {
       z.setGui(context.getGui());
       z.setNoteGui(context.getNoteGui());
-      String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-      sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+      sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
       InterpreterResult r = interpretInput(lines, context);
       sc.clearJobGroup();
       return r;
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8d5ce70..2093dde 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -150,10 +150,17 @@
   @Override
   protected void preCallPython(InterpreterContext context) {
     String jobGroup = Utils.buildJobGroupId(context);
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(context);
     callPython(new PythonInterpretRequest(
         String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
         false, false));
+
+    String pool = "None";
+    if (context.getLocalProperties().containsKey("pool")) {
+      pool = "'" + context.getLocalProperties().get("pool") + "'";
+    }
+    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+    callPython(new PythonInterpretRequest(setPoolStmt, false, false));
   }
 
   // Run python shell
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index f1b1253..9bd5445 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -123,8 +123,7 @@
       throws InterpreterException {
 
     String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    String jobDesc = "Started by: " +
-       Utils.getUserName(interpreterContext.getAuthenticationInfo());
+    String jobDesc = Utils.buildJobDesc(interpreterContext);
     sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
 
     String imageWidth = getProperty("zeppelin.R.image.width", "100%");
@@ -156,7 +155,15 @@
           "\", \"" + jobDesc + "\", TRUE)";
     }
     lines = setJobGroup + "\n" + lines;
-
+    if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) {
+      // setLocalProperty is only available from spark 2.3.0
+      String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)";
+      if (interpreterContext.getLocalProperties().containsKey("pool")) {
+        setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" +
+            interpreterContext.getLocalProperties().get("pool") + "')";
+      }
+      lines = setPoolStmt + "\n" + lines;
+    }
     try {
       // render output with knitr
       if (rbackendDead.get()) {
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 4d2bed1..31e883a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -44,19 +44,13 @@
 public class SparkSqlInterpreter extends Interpreter {
   private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
 
-  public static final String MAX_RESULTS = "zeppelin.spark.maxResult";
-
-  AtomicInteger num = new AtomicInteger(0);
-
-  private int maxResult;
-
   public SparkSqlInterpreter(Properties property) {
     super(property);
   }
 
   @Override
   public void open() {
-    this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
+
   }
 
   private SparkInterpreter getSparkInterpreter() throws InterpreterException {
@@ -88,25 +82,17 @@
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    SQLContext sqlc = null;
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
     if (sparkInterpreter.isUnsupportedSparkVersion()) {
       return new InterpreterResult(Code.ERROR, "Spark "
           + sparkInterpreter.getSparkVersion().toString() + " is not supported");
     }
 
     sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
-    sqlc = sparkInterpreter.getSQLContext();
+    SQLContext sqlc = sparkInterpreter.getSQLContext();
     SparkContext sc = sqlc.sparkContext();
-    if (concurrentSQL()) {
-      sc.setLocalProperty("spark.scheduler.pool", "fair");
-    } else {
-      sc.setLocalProperty("spark.scheduler.pool", null);
-    }
-
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+    sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
+    sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
     Object rdd = null;
     try {
       // method signature of sqlc.sql() is changed
@@ -138,9 +124,7 @@
   @Override
   public void cancel(InterpreterContext context) throws InterpreterException {
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    SQLContext sqlc = sparkInterpreter.getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-
+    SparkContext sc = sparkInterpreter.getSparkContext();
     sc.cancelJobGroup(Utils.buildJobGroupId(context));
   }
 
@@ -159,7 +143,7 @@
   @Override
   public Scheduler getScheduler() {
     if (concurrentSQL()) {
-      int maxConcurrency = 10;
+      int maxConcurrency = Integer.parseInt(getProperty("zeppelin.spark.concurrentSQL", "10"));
       return SchedulerFactory.singleton().createOrGetParallelScheduler(
           SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
     } else {
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 5e412eb..3284986 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@
   public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
 
   public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
+  public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0");
   public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0");
 
@@ -109,6 +110,10 @@
     return this.olderThan(SPARK_1_3_0);
   }
 
+  public boolean isSpark2() {
+    return this.newerThanEquals(SPARK_2_0_0);
+  }
+
   public boolean isSecretSocketSupported() {
     return this.newerThanEquals(SPARK_2_3_1);
   }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
index 492a997..e89607f 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -167,8 +167,7 @@
 
     if (rows.length > maxResult) {
       msg.append("\n");
-      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
-          SparkSqlInterpreter.MAX_RESULTS));
+      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
     }
 
     sc.clearJobGroup();
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 82bf210..cd6c607 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -152,6 +152,10 @@
     return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
   }
 
+  public static String buildJobDesc(InterpreterContext context) {
+    return "Started by: " + getUserName(context.getAuthenticationInfo());
+  }
+
   public static String getNoteId(String jobgroupId) {
     int indexOf = jobgroupId.indexOf("-");
     int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 8791ece..5746754 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -102,6 +102,13 @@
         "description": "Execute multiple SQL concurrently if set true.",
         "type": "checkbox"
       },
+      "zeppelin.spark.concurrentSQL.max": {
+        "envName": "ZEPPELIN_SPARK_CONCURRENTSQL_MAX",
+        "propertyName": "zeppelin.spark.concurrentSQL.max",
+        "defaultValue": 10,
+        "description": "Max number of SQL concurrently executed",
+        "type": "number"
+      },
       "zeppelin.spark.sql.stacktrace": {
         "envName": "ZEPPELIN_SPARK_SQL_STACKTRACE",
         "propertyName": "zeppelin.spark.sql.stacktrace",
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 73bd52c..e9f85fe 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -423,6 +423,33 @@
     assertEquals("hello world", output);
   }
 
+  @Test
+  public void testSchedulePool() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.useNew", "true");
+    properties.setProperty("spark.scheduler.mode", "FAIR");
+
+    interpreter = new SparkInterpreter(properties);
+    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
+    interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+    interpreter.open();
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("pool", "pool1");
+    InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("pool1", interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+
+    // pool is reset to null if user don't specify it via paragraph properties
+    result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(null, interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+  }
+
   @After
   public void tearDown() throws InterpreterException {
     if (this.interpreter != null) {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 300388d..525a9a8 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -48,10 +48,10 @@
   @BeforeClass
   public static void setUp() throws Exception {
     Properties p = new Properties();
-    p.setProperty("spark.master", "local");
+    p.setProperty("spark.master", "local[4]");
     p.setProperty("spark.app.name", "test");
     p.setProperty("zeppelin.spark.maxResult", "10");
-    p.setProperty("zeppelin.spark.concurrentSQL", "false");
+    p.setProperty("zeppelin.spark.concurrentSQL", "true");
     p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
     p.setProperty("zeppelin.spark.useNew", "true");
     intpGroup = new InterpreterGroup();
@@ -179,4 +179,49 @@
     assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
     assertTrue(ret.message().get(1).getData().contains("alert-warning"));
   }
+
+  @Test
+  public void testConcurrentSQL() throws InterpreterException, InterruptedException {
+    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+      sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+    } else {
+      sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+          assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        } catch (InterpreterException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+          assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+        } catch (InterpreterException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    // start running 2 spark sql, each would sleep 10 seconds, the totally running time should
+    // be less than 20 seconds, which means they run concurrently.
+    long start = System.currentTimeMillis();
+    thread1.start();
+    thread2.start();
+    thread1.join();
+    thread2.join();
+    long end = System.currentTimeMillis();
+    assertTrue("running time must be less than 20 seconds", ((end - start)/1000) < 20);
+
+  }
+
 }
diff --git a/spark/interpreter/src/test/resources/fairscheduler.xml b/spark/interpreter/src/test/resources/fairscheduler.xml
new file mode 100644
index 0000000..d163c08
--- /dev/null
+++ b/spark/interpreter/src/test/resources/fairscheduler.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<allocations>
+    <pool name="pool1">
+        <schedulingMode>FAIR</schedulingMode>
+        <weight>1</weight>
+        <minShare>2</minShare>
+    </pool>
+    <pool name="pool2">
+        <schedulingMode>FIFO</schedulingMode>
+        <weight>2</weight>
+        <minShare>3</minShare>
+    </pool>
+</allocations>
\ No newline at end of file