clean python state manager (#2327)
* clean python state manager
* update tracker
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 78a0c0a..c01e48b 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -900,8 +900,11 @@
statemgr_config.set_state_locations(configloader.load_state_manager_locations(self.cluster))
try:
self.state_managers = statemanagerfactory.get_all_state_managers(statemgr_config)
+ for state_manager in self.state_managers:
+ state_manager.start()
except Exception as ex:
Log.error("Found exception while initializing state managers: %s. Bailing out..." % ex)
+ traceback.print_exc()
sys.exit(1)
# pylint: disable=unused-argument
diff --git a/heron/statemgrs/src/python/statemanagerfactory.py b/heron/statemgrs/src/python/statemanagerfactory.py
index c29b84f..cfdb761 100644
--- a/heron/statemgrs/src/python/statemanagerfactory.py
+++ b/heron/statemgrs/src/python/statemanagerfactory.py
@@ -19,7 +19,6 @@
'''
import os
-import traceback
from heron.statemgrs.src.python.filestatemanager import FileStateManager
@@ -43,8 +42,8 @@
def get_all_zk_state_managers(conf):
"""
- Connects to all the zookeeper state_managers and returns
- the connected state_managers instances.
+ Creates all the zookeeper state_managers and returns
+ them in a list
"""
state_managers = []
state_locations = conf.get_state_locations_of_type("zookeeper")
@@ -67,12 +66,6 @@
rootpath = location['rootpath']
LOG.info("Connecting to zk hostports: " + str(hostportlist) + " rootpath: " + rootpath)
state_manager = ZkStateManager(name, hostportlist, rootpath, tunnelhost)
- try:
- state_manager.start()
- except Exception as ex:
- LOG.error("Exception while connecting to state_manager.")
- LOG.debug(traceback.format_exc())
- raise ex
state_managers.append(state_manager)
return state_managers
@@ -88,12 +81,6 @@
rootpath = os.path.expanduser(location['rootpath'])
LOG.info("Connecting to file state with rootpath: " + rootpath)
state_manager = FileStateManager(name, rootpath)
- try:
- state_manager.start()
- except Exception as ex:
- LOG.error("Exception while connecting to state_manager.")
- traceback.print_exc()
- raise ex
state_managers.append(state_manager)
return state_managers
diff --git a/heron/tools/tracker/src/python/tracker.py b/heron/tools/tracker/src/python/tracker.py
index 429e634..88ea502 100644
--- a/heron/tools/tracker/src/python/tracker.py
+++ b/heron/tools/tracker/src/python/tracker.py
@@ -13,6 +13,8 @@
# limitations under the License.
''' tracker.py '''
import json
+import sys
+import traceback
from functools import partial
@@ -54,6 +56,13 @@
Sync the topologies with the statemgrs.
"""
self.state_managers = statemanagerfactory.get_all_state_managers(self.config.statemgr_config)
+ try:
+ for state_manager in self.state_managers:
+ state_manager.start()
+ except Exception as ex:
+ Log.error("Found exception while initializing state managers: %s. Bailing out..." % ex)
+ traceback.print_exc()
+ sys.exit(1)
# pylint: disable=deprecated-lambda
def on_topologies_watch(state_manager, topologies):