trying to notify the master that the task finished successfully
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index eaf8630..c49cebf 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -30,7 +30,7 @@
private val requirementsPath: String = "dist/$requirementsFileName"
override val runnerResources: Array<String>
- get() = arrayOf("amaterasu-sdk-${conf!!.version()}.zip")
+ get() = arrayOf("amaterasu-sdk-${conf!!.version()}.zip", "requests")
override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String {
var cmd = "python3 -m pip install --upgrade --force-reinstall -r $requirementsFileName"
diff --git a/sdk_python/amaterasu/_utils.py b/sdk_python/amaterasu/_utils.py
new file mode 100644
index 0000000..be16a47
--- /dev/null
+++ b/sdk_python/amaterasu/_utils.py
@@ -0,0 +1,25 @@
+import sys
+
+
+class _ExitHooks(object):
+ def __init__(self):
+ self.exit_code = None
+ self.exception = None
+
+ def hook(self):
+ self._orig_exit = sys.exit
+ sys.exit = self.exit
+ self._orig_excepthook = sys.excepthook
+ sys.excepthook = self.exc_handler
+
+ def exit(self, code=0):
+ self.exit_code = code
+ self._orig_exit(code)
+
+ def exc_handler(self, exc_type, exc, *args):
+ self.exception = exc
+ sys.excepthook(exc_type, exc, *args)
+
+
+hooks = _ExitHooks()
+hooks.hook()
\ No newline at end of file
diff --git a/sdk_python/amaterasu/runtime.py b/sdk_python/amaterasu/runtime.py
index a04c195..adcbae6 100644
--- a/sdk_python/amaterasu/runtime.py
+++ b/sdk_python/amaterasu/runtime.py
@@ -23,7 +23,7 @@
import abc
from munch import Munch, munchify
from typing import Any
-
+from ._utils import hooks as _hooks
from amaterasu.datasets import BaseDatasetManager
@@ -31,6 +31,7 @@
cwd = os.getcwd()
return os.path.join(cwd, file_name)
+
class ImproperlyConfiguredError(Exception):
pass
@@ -169,6 +170,7 @@
return munchify(_dict, factory=Environment)
+
def _send_mesos_task_finished_event():
mesos_agent_ep = os.getenv('MESOS_AGENT_ENDPOINT')
executor_dir = os.getenv('MESOS_DIRECTORY')
@@ -195,12 +197,13 @@
}
})
-def send_completion_event():
- if os.getenv('MESOS_EXECUTOR_ID'):
- _send_mesos_task_finished_event()
-# conf = _create_configuration()
-# ama_context = BaseAmaContext()
+def send_completion_event():
+ if (_hooks.exit_code is None or _hooks.exit_code == 0) and _hooks.exception is None:
+ if os.getenv('MESOS_EXECUTOR_ID'):
+ _send_mesos_task_finished_event()
+
+
logging.setLoggerClass(Notifier)
# notifier = logging.getLogger(__name__)
atexit.register(send_completion_event)