ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter
### What is this PR for?
Allow user to specify the pool when running spark sql in concurrent approach.
e.g.
```
%spark.sql(pool=pool_1)
sql statement
```
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3563
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zjffdu@apache.org>
Closes #3044 from zjffdu/ZEPPELIN-3563 and squashes the following commits:
255f5baf9 [Jeff Zhang] ZEPPELIN-3563. Add pool to paragraph property that use spark interpreter
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 6775fbf..34f5bb6 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -121,6 +121,11 @@
<td>Execute multiple SQL concurrently if set true.</td>
</tr>
<tr>
+ <td>zeppelin.spark.concurrentSQL.max</td>
+ <td>10</td>
+ <td>Max number of SQL concurrently executed</td>
+ </tr>
+ <tr>
<td>zeppelin.spark.maxResult</td>
<td>1000</td>
<td>Max number of Spark SQL result to display.</td>
@@ -332,6 +337,21 @@
<img class="img-responsive" src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/matplotlibAngularExample.gif" />
+## Running spark sql concurrently
+By default, each sql statement would run sequentially in `%spark.sql`. But you can run them concurrently by following setup.
+
+1. set `zeppelin.spark.concurrentSQL` to true to enable the sql concurrent feature, underneath zeppelin will change to use fairscheduler for spark. And also set `zeppelin.spark.concurrentSQL.max` to control the max number of sql statements running concurrently.
+2. configure pools by creating `fairscheduler.xml` under your `SPARK_CONF_DIR`, check the offical spark doc [Configuring Pool Properties](http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties)
+3. set pool property via setting paragraph property. e.g.
+
+```
+%spark(pool=pool1)
+
+sql statement
+```
+
+This feature is available for both all versions of scala spark, pyspark. For sparkr, it is only available starting from 2.3.0.
+
## Interpreter setting option
You can choose one of `shared`, `scoped` and `isolated` options wheh you configure Spark interpreter.
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index b6eb014..1cc88b8 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -118,12 +118,21 @@
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterContext.set(context);
String jobGroupId = Utils.buildJobGroupId(context);
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+ String jobDesc = Utils.buildJobDesc(context);
String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
InterpreterResult result = super.interpret(setJobGroupStmt, context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
}
+ String pool = "None";
+ if (context.getLocalProperties().containsKey("pool")) {
+ pool = "'" + context.getLocalProperties().get("pool") + "'";
+ }
+ String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+ result = super.interpret(setPoolStmt, context);
+ if (result.code().equals(InterpreterResult.Code.ERROR)) {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
+ }
return super.interpret(st, context);
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
index 1bde23f..17a257c 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
@@ -86,9 +86,15 @@
if (!StringUtils.isBlank(entry.getValue().toString())) {
conf.set(entry.getKey().toString(), entry.getValue().toString());
}
+ // zeppelin.spark.useHiveContext & zeppelin.spark.concurrentSQL are legacy zeppelin
+ // properties, convert them to spark properties here.
if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) {
conf.set("spark.useHiveContext", entry.getValue().toString());
}
+ if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
+ && entry.getValue().toString().equals("true")) {
+ conf.set("spark.scheduler.mode", "FAIR");
+ }
}
// use local mode for embedded spark mode when spark.master is not found
conf.setIfMissing("spark.master", "local");
@@ -141,8 +147,11 @@
z.setGui(context.getGui());
z.setNoteGui(context.getNoteGui());
z.setInterpreterContext(context);
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+ sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
+ // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
+ // sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
+ sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
+
return innerInterpreter.interpret(st, context);
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index 4c2ec7c..b9a7868 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -1041,8 +1041,7 @@
synchronized (this) {
z.setGui(context.getGui());
z.setNoteGui(context.getNoteGui());
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+ sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
InterpreterResult r = interpretInput(lines, context);
sc.clearJobGroup();
return r;
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 8d5ce70..2093dde 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -150,10 +150,17 @@
@Override
protected void preCallPython(InterpreterContext context) {
String jobGroup = Utils.buildJobGroupId(context);
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+ String jobDesc = Utils.buildJobDesc(context);
callPython(new PythonInterpretRequest(
String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc),
false, false));
+
+ String pool = "None";
+ if (context.getLocalProperties().containsKey("pool")) {
+ pool = "'" + context.getLocalProperties().get("pool") + "'";
+ }
+ String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+ callPython(new PythonInterpretRequest(setPoolStmt, false, false));
}
// Run python shell
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index f1b1253..9bd5445 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -123,8 +123,7 @@
throws InterpreterException {
String jobGroup = Utils.buildJobGroupId(interpreterContext);
- String jobDesc = "Started by: " +
- Utils.getUserName(interpreterContext.getAuthenticationInfo());
+ String jobDesc = Utils.buildJobDesc(interpreterContext);
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
String imageWidth = getProperty("zeppelin.R.image.width", "100%");
@@ -156,7 +155,15 @@
"\", \"" + jobDesc + "\", TRUE)";
}
lines = setJobGroup + "\n" + lines;
-
+ if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) {
+ // setLocalProperty is only available from spark 2.3.0
+ String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)";
+ if (interpreterContext.getLocalProperties().containsKey("pool")) {
+ setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" +
+ interpreterContext.getLocalProperties().get("pool") + "')";
+ }
+ lines = setPoolStmt + "\n" + lines;
+ }
try {
// render output with knitr
if (rbackendDead.get()) {
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 4d2bed1..31e883a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -44,19 +44,13 @@
public class SparkSqlInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
- public static final String MAX_RESULTS = "zeppelin.spark.maxResult";
-
- AtomicInteger num = new AtomicInteger(0);
-
- private int maxResult;
-
public SparkSqlInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
- this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
+
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
@@ -88,25 +82,17 @@
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
- SQLContext sqlc = null;
SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
if (sparkInterpreter.isUnsupportedSparkVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
- sqlc = sparkInterpreter.getSQLContext();
+ SQLContext sqlc = sparkInterpreter.getSQLContext();
SparkContext sc = sqlc.sparkContext();
- if (concurrentSQL()) {
- sc.setLocalProperty("spark.scheduler.pool", "fair");
- } else {
- sc.setLocalProperty("spark.scheduler.pool", null);
- }
-
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+ sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
+ sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
Object rdd = null;
try {
// method signature of sqlc.sql() is changed
@@ -138,9 +124,7 @@
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
- SQLContext sqlc = sparkInterpreter.getSQLContext();
- SparkContext sc = sqlc.sparkContext();
-
+ SparkContext sc = sparkInterpreter.getSparkContext();
sc.cancelJobGroup(Utils.buildJobGroupId(context));
}
@@ -159,7 +143,7 @@
@Override
public Scheduler getScheduler() {
if (concurrentSQL()) {
- int maxConcurrency = 10;
+ int maxConcurrency = Integer.parseInt(getProperty("zeppelin.spark.concurrentSQL", "10"));
return SchedulerFactory.singleton().createOrGetParallelScheduler(
SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
} else {
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 5e412eb..3284986 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@
public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
+ public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0");
public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1");
public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0");
@@ -109,6 +110,10 @@
return this.olderThan(SPARK_1_3_0);
}
+ public boolean isSpark2() {
+ return this.newerThanEquals(SPARK_2_0_0);
+ }
+
public boolean isSecretSocketSupported() {
return this.newerThanEquals(SPARK_2_3_1);
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
index 492a997..e89607f 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -167,8 +167,7 @@
if (rows.length > maxResult) {
msg.append("\n");
- msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
- SparkSqlInterpreter.MAX_RESULTS));
+ msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
}
sc.clearJobGroup();
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 82bf210..cd6c607 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -152,6 +152,10 @@
return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
}
+ public static String buildJobDesc(InterpreterContext context) {
+ return "Started by: " + getUserName(context.getAuthenticationInfo());
+ }
+
public static String getNoteId(String jobgroupId) {
int indexOf = jobgroupId.indexOf("-");
int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 8791ece..5746754 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -102,6 +102,13 @@
"description": "Execute multiple SQL concurrently if set true.",
"type": "checkbox"
},
+ "zeppelin.spark.concurrentSQL.max": {
+ "envName": "ZEPPELIN_SPARK_CONCURRENTSQL_MAX",
+ "propertyName": "zeppelin.spark.concurrentSQL.max",
+ "defaultValue": 10,
+ "description": "Max number of SQL concurrently executed",
+ "type": "number"
+ },
"zeppelin.spark.sql.stacktrace": {
"envName": "ZEPPELIN_SPARK_SQL_STACKTRACE",
"propertyName": "zeppelin.spark.sql.stacktrace",
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 73bd52c..e9f85fe 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -423,6 +423,33 @@
assertEquals("hello world", output);
}
+ @Test
+ public void testSchedulePool() throws InterpreterException {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.useNew", "true");
+ properties.setProperty("spark.scheduler.mode", "FAIR");
+
+ interpreter = new SparkInterpreter(properties);
+ assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
+ interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+ interpreter.open();
+
+ InterpreterContext context = getInterpreterContext();
+ context.getLocalProperties().put("pool", "pool1");
+ InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("pool1", interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+
+ // pool is reset to null if user don't specify it via paragraph properties
+ result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(null, interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
+ }
+
@After
public void tearDown() throws InterpreterException {
if (this.interpreter != null) {
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 300388d..525a9a8 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -48,10 +48,10 @@
@BeforeClass
public static void setUp() throws Exception {
Properties p = new Properties();
- p.setProperty("spark.master", "local");
+ p.setProperty("spark.master", "local[4]");
p.setProperty("spark.app.name", "test");
p.setProperty("zeppelin.spark.maxResult", "10");
- p.setProperty("zeppelin.spark.concurrentSQL", "false");
+ p.setProperty("zeppelin.spark.concurrentSQL", "true");
p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
p.setProperty("zeppelin.spark.useNew", "true");
intpGroup = new InterpreterGroup();
@@ -179,4 +179,49 @@
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().get(1).getData().contains("alert-warning"));
}
+
+ @Test
+ public void testConcurrentSQL() throws InterpreterException, InterruptedException {
+ if (sparkInterpreter.getSparkVersion().isSpark2()) {
+ sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+ } else {
+ sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+ }
+
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ // start running 2 spark sql, each would sleep 10 seconds, the totally running time should
+ // be less than 20 seconds, which means they run concurrently.
+ long start = System.currentTimeMillis();
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ long end = System.currentTimeMillis();
+ assertTrue("running time must be less than 20 seconds", ((end - start)/1000) < 20);
+
+ }
+
}
diff --git a/spark/interpreter/src/test/resources/fairscheduler.xml b/spark/interpreter/src/test/resources/fairscheduler.xml
new file mode 100644
index 0000000..d163c08
--- /dev/null
+++ b/spark/interpreter/src/test/resources/fairscheduler.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<allocations>
+ <pool name="pool1">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>1</weight>
+ <minShare>2</minShare>
+ </pool>
+ <pool name="pool2">
+ <schedulingMode>FIFO</schedulingMode>
+ <weight>2</weight>
+ <minShare>3</minShare>
+ </pool>
+</allocations>
\ No newline at end of file