[ZEPPELIN-4600] Socket connection between zeppelin server and interpreter is not closed properly for branch 0.8
### What is this PR for?
Fix the stability problem of zengin for long time (7day) loop working ,
release the thread pools and socket connection when the interprether is closed
the same bug was found in the version 0.81 and 0.82
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-4600
### How should this be tested?
* Single tast works 30 hours with 7 second delay, test passed.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?No
* Is there breaking changes for older versions?No
* Does this needs documentation?No
Author: tylerba-f <tylerf@126.com>
Closes #3650 from tylerba-f/branch-0.8 and squashes the following commits:
467f3a377 [tylerba-f] [ZEPPELIN-4600] code style adjust
34add36bf [tylerba-f] [ZEPPELIN-4600]
306272a9a [tylerba-f] [ZEPPELIN-4600] nullpoint error in the long time process use api to run a note
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
index b2cb78f..9e5582b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
@@ -44,6 +44,13 @@
this.port = port;
}
+ public void close() {
+ //Close transfer
+ for (TSocket eachTransfer: clientSocketMap.values()) {
+ eachTransfer.close();
+ }
+ }
+
@Override
public Client create() throws Exception {
TSocket transport = new TSocket(host, port);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index abda81e..d437079 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -261,6 +261,9 @@
}
if (appendFuture != null) {
appendFuture.cancel(true);
+ //Close thread pool
+ appendService.shutdown();
+
}
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index b186e48..95abd94 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -229,6 +229,10 @@
} catch (Exception e) {
logger.warn("ignore the exception when shutting down");
}
+
+ // Shutdown connection
+ shutdown();
+
watchdog.destroyProcess();
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 08653ae..e282cee 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -19,13 +19,12 @@
import com.google.gson.Gson;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Abstract class for interpreter process
*/
@@ -36,6 +35,7 @@
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
+ private ClientFactory clientFactory = null;
public RemoteInterpreterProcess(
int connectTimeout) {
@@ -51,13 +51,22 @@
this.remoteInterpreterEventPoller = eventPoller;
}
+ public void shutdown() {
+
+ // Close client socket connection
+ if (clientFactory != null) {
+ clientFactory.close();
+ }
+ }
+
public int getConnectTimeout() {
return connectTimeout;
}
public synchronized Client getClient() throws Exception {
if (clientPool == null || clientPool.isClosed()) {
- clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+ clientFactory = new ClientFactory(getHost(), getPort());
+ clientPool = new GenericObjectPool<>(clientFactory);
}
return clientPool.borrowObject();
}