ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service
### What is this PR for?
This PR is trying to add new configuration zeppelin.interpreter.portRange which control the portRange of interpreter process. This is required by some users for security reason.
### What type of PR is it?
[Improvement | Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3040
### How should this be tested?
Manually test. Set zeppelin.interpreter.portRange and launch python interpreter, verify it is in the proper portRange.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zjffdu@apache.org>
Closes #2666 from zjffdu/ZEPPELIN-3040-0.7 and squashes the following commits:
cd1ca62 [Jeff Zhang] ZEPPELIN-3040. Allow to specify portRange for interpreter process thrift service
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index a9c86c4..e8b2ebd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -61,6 +61,7 @@
private int maxPoolSize;
private String host;
private int port;
+ private String portRange;
private String userName;
private Boolean isUserImpersonate;
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
@@ -72,7 +73,7 @@
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit) {
+ int outputLimit, String portRange) {
super(property);
this.sessionKey = sessionKey;
this.className = className;
@@ -88,6 +89,7 @@
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
+ this.portRange = portRange;
}
@@ -184,7 +186,7 @@
} else {
// create new remote process
remoteProcess = new RemoteInterpreterManagedProcess(
- interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+ interpreterRunner, interpreterPath, localRepoPath, portRange, env, connectTimeout,
remoteInterpreterProcessListener, applicationEventListener);
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index f5d73ed..d638f37 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -42,6 +42,7 @@
private ExecuteWatchdog watchdog;
boolean running = false;
private int port = -1;
+ private String portRange;
private final String interpreterDir;
private final String localRepoDir;
@@ -51,6 +52,7 @@
String intpRunner,
String intpDir,
String localRepoDir,
+ String portRange,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener,
@@ -61,12 +63,14 @@
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
+ this.portRange = portRange;
}
RemoteInterpreterManagedProcess(String intpRunner,
String intpDir,
String localRepoDir,
+ String portRange,
Map<String, String> env,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
@@ -76,6 +80,7 @@
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
+ this.portRange = portRange;
}
@Override
@@ -92,7 +97,7 @@
public void start(String userName, Boolean isUserImpersonate) {
// start server process
try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
} catch (IOException e1) {
throw new InterpreterException(e1);
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 2937e2d..bdf98ff 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,47 @@
return port;
}
+ /**
+ * start:end
+ *
+ * @param portRange
+ * @return
+ * @throws IOException
+ */
+ public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
+ throws IOException {
+
+ // ':' is the default value which means no constraints on the portRange
+ if (StringUtils.isBlank(portRange) || portRange.equals(":")) {
+ int port;
+ try (ServerSocket socket = new ServerSocket(0);) {
+ port = socket.getLocalPort();
+ socket.close();
+ }
+ return port;
+ }
+ // valid user registered port https://en.wikipedia.org/wiki/Registered_port
+ int start = 1024;
+ int end = 65535;
+ String[] ports = portRange.split(":", -1);
+ if (!ports[0].isEmpty()) {
+ start = Integer.parseInt(ports[0]);
+ }
+ if (!ports[1].isEmpty()) {
+ end = Integer.parseInt(ports[1]);
+ }
+ for (int i = start; i <= end; ++i) {
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ socket.close();
+ return socket.getLocalPort();
+ } catch (Exception e) {
+ // ignore this
+ }
+ }
+ throw new IOException("No available port in the portRange: " + portRange);
+ }
+
public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
try {
Socket discover = new Socket();
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index 39a17ae..150f6a9 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -43,7 +43,7 @@
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", ":", new HashMap<String, String>(),
10 * 1000, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
@@ -60,7 +60,7 @@
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+ INTERPRETER_SCRIPT, "nonexists", "fakeRepo", ":", new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
rip.reference(intpGroup, "anonymous", false);
assertEquals(0, rip.getNumActiveClient());
@@ -102,6 +102,7 @@
INTERPRETER_SCRIPT,
"nonexists",
"fakeRepo",
+ ":",
new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class)
, 10 * 1000);
@@ -116,7 +117,7 @@
public void testPropagateError() throws TException, InterruptedException {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
- "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+ "echo hello_world", "nonexists", "fakeRepo", ":", new HashMap<String, String>(),
10 * 1000, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 242a890..d2df6d8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -492,6 +492,9 @@
return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
}
+ public String getInterpreterPortRange() {
+ return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
+ }
public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
ConfigurationKeyPredicate predicate) {
@@ -639,7 +642,9 @@
ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"),
- ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1");
+ ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
+
+ ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":");
private String varName;
@SuppressWarnings("rawtypes")
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 2091dfd..accbfcf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -316,7 +316,8 @@
new RemoteInterpreter(property, interpreterSessionKey, className,
interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
- conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT));
+ conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT),
+ conf.getInterpreterPortRange());
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);