[hotfix] Correcting improper uses of the Pemja (#275)

diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 5b04322..014d450 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -40,16 +40,12 @@
     // =========== RUNNER CONTEXT ===========
     private static final String CREATE_FLINK_RUNNER_CONTEXT =
             "flink_runner_context.create_flink_runner_context";
-    private static final String FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX = "flink_runner_context_";
-    private static final AtomicLong FLINK_RUNNER_CONTEXT_REF_ID = new AtomicLong(0);
 
     // ========== ASYNC THREAD POOL ===========
     private static final String CREATE_ASYNC_THREAD_POOL =
             "flink_runner_context.create_async_thread_pool";
     private static final String CLOSE_ASYNC_THREAD_POOL =
             "flink_runner_context.close_async_thread_pool";
-    private static final String PYTHON_ASYNC_THREAD_POOL_REF_NAME = "python_async_thread_pool";
-    private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_REF_ID = new AtomicLong(0);
 
     // =========== PYTHON GENERATOR ===========
     private static final String CALL_PYTHON_GENERATOR = "function.call_python_generator";
@@ -66,7 +62,7 @@
     private final PythonEnvironmentManager environmentManager;
     private final String agentPlanJson;
     private PythonInterpreter interpreter;
-    private String pythonAsyncThreadPoolObjectName;
+    private Object pythonAsyncThreadPool;
 
     public PythonActionExecutor(PythonEnvironmentManager environmentManager, String agentPlanJson) {
         this.environmentManager = environmentManager;
@@ -80,14 +76,7 @@
         interpreter = env.getInterpreter();
         interpreter.exec(PYTHON_IMPORTS);
 
-        // TODO: remove the set and get thread pool after updating pemja to version 0.5.3. For more
-        // details, please refer to
-        //    https://github.com/apache/flink-agents/issues/83.
-        Object pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
-        this.pythonAsyncThreadPoolObjectName =
-                PYTHON_ASYNC_THREAD_POOL_REF_NAME
-                        + PYTHON_ASYNC_THREAD_POOL_REF_ID.incrementAndGet();
-        interpreter.set(pythonAsyncThreadPoolObjectName, pythonAsyncThreadPool);
+        pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
     }
 
     /**
@@ -106,18 +95,12 @@
         runnerContext.checkNoPendingEvents();
         function.setInterpreter(interpreter);
 
-        // TODO: remove the set and get runner context after updating pemja to version 0.5.3. For
-        // more details, please refer to https://github.com/apache/flink-agents/issues/83.
         Object pythonRunnerContextObject =
                 interpreter.invoke(
                         CREATE_FLINK_RUNNER_CONTEXT,
                         runnerContext,
                         agentPlanJson,
-                        interpreter.get(pythonAsyncThreadPoolObjectName));
-        String pythonRunnerContextObjectName =
-                FLINK_RUNNER_CONTEXT_REF_NAME_PREFIX
-                        + FLINK_RUNNER_CONTEXT_REF_ID.incrementAndGet();
-        interpreter.set(pythonRunnerContextObjectName, pythonRunnerContextObject);
+                        pythonAsyncThreadPool);
 
         Object pythonEventObject = interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent());
 
@@ -169,9 +152,15 @@
     }
 
     public void close() throws Exception {
-        if (pythonAsyncThreadPoolObjectName != null) {
-            interpreter.invoke(
-                    CLOSE_ASYNC_THREAD_POOL, interpreter.get(pythonAsyncThreadPoolObjectName));
+        if (interpreter != null) {
+            if (pythonAsyncThreadPool != null) {
+                interpreter.invoke(CLOSE_ASYNC_THREAD_POOL, pythonAsyncThreadPool);
+            }
+            interpreter.close();
+        }
+
+        if (environmentManager != null) {
+            environmentManager.close();
         }
     }