[hotfix] Correcting improper uses of the Pemja (#275)
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 5b04322..014d450 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -40,16 +40,12 @@
// =========== RUNNER CONTEXT ===========
private static final String CREATE_FLINK_RUNNER_CONTEXT =
"flink_runner_context.create_flink_runner_context";
- private static final String FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX = "flink_runner_context_";
- private static final AtomicLong FLINK_RUNNER_CONTEXT_REF_ID = new AtomicLong(0);
// ========== ASYNC THREAD POOL ===========
private static final String CREATE_ASYNC_THREAD_POOL =
"flink_runner_context.create_async_thread_pool";
private static final String CLOSE_ASYNC_THREAD_POOL =
"flink_runner_context.close_async_thread_pool";
- private static final String PYTHON_ASYNC_THREAD_POOL_REF_NAME = "python_async_thread_pool";
- private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_REF_ID = new AtomicLong(0);
// =========== PYTHON GENERATOR ===========
private static final String CALL_PYTHON_GENERATOR = "function.call_python_generator";
@@ -66,7 +62,7 @@
private final PythonEnvironmentManager environmentManager;
private final String agentPlanJson;
private PythonInterpreter interpreter;
- private String pythonAsyncThreadPoolObjectName;
+ private Object pythonAsyncThreadPool;
public PythonActionExecutor(PythonEnvironmentManager environmentManager, String agentPlanJson) {
this.environmentManager = environmentManager;
@@ -80,14 +76,7 @@
interpreter = env.getInterpreter();
interpreter.exec(PYTHON_IMPORTS);
- // TODO: remove the set and get thread pool after updating pemja to version 0.5.3. For more
- // details, please refer to
- // https://github.com/apache/flink-agents/issues/83.
- Object pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
- this.pythonAsyncThreadPoolObjectName =
- PYTHON_ASYNC_THREAD_POOL_REF_NAME
- + PYTHON_ASYNC_THREAD_POOL_REF_ID.incrementAndGet();
- interpreter.set(pythonAsyncThreadPoolObjectName, pythonAsyncThreadPool);
+ pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
}
/**
@@ -106,18 +95,12 @@
runnerContext.checkNoPendingEvents();
function.setInterpreter(interpreter);
- // TODO: remove the set and get runner context after updating pemja to version 0.5.3. For
- // more details, please refer to https://github.com/apache/flink-agents/issues/83.
Object pythonRunnerContextObject =
interpreter.invoke(
CREATE_FLINK_RUNNER_CONTEXT,
runnerContext,
agentPlanJson,
- interpreter.get(pythonAsyncThreadPoolObjectName));
- String pythonRunnerContextObjectName =
- FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX
- + FLINK_RUNNER_CONTEXT_REF_ID.incrementAndGet();
- interpreter.set(pythonRunnerContextObjectName, pythonRunnerContextObject);
+ pythonAsyncThreadPool);
Object pythonEventObject = interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent());
@@ -169,9 +152,15 @@
}
public void close() throws Exception {
- if (pythonAsyncThreadPoolObjectName != null) {
- interpreter.invoke(
- CLOSE_ASYNC_THREAD_POOL, interpreter.get(pythonAsyncThreadPoolObjectName));
+ if (interpreter != null) {
+ if (pythonAsyncThreadPool != null) {
+ interpreter.invoke(CLOSE_ASYNC_THREAD_POOL, pythonAsyncThreadPool);
+ }
+ interpreter.close();
+ }
+
+ if (environmentManager != null) {
+ environmentManager.close();
}
}