Allow interpolate more spout config parameters in extra links (#3310)
* Allow interpolate more spout config parameters in extra links
* new interpolate design
diff --git a/heron/tools/config/src/yaml/tracker/heron_tracker.yaml b/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
index 15f87c8..1d8cf5d 100644
--- a/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
+++ b/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
@@ -69,17 +69,3 @@
# formatter: "http://127.0.0.1/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}"
# - name: "Alerts"
# formatter: "http://127.0.0.1/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}"
-#
-# spout.extra.links:
-# - spout.type: "kafka"
-# extra.links:
-# - name: "Viz"
-# formatter: "http://127.0.0.1/kafka/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - name: "Alerts"
-# formatter: "http://127.0.0.1/kafka/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - spout.type: "default"
-# extra.links:
-# - name: "Viz"
-# formatter: "http://127.0.0.1/default/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - name: "Alerts"
-# formatter: "http://127.0.0.1/default/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
diff --git a/heron/tools/tracker/src/python/config.py b/heron/tools/tracker/src/python/config.py
index 6dbfc3d..08887b8 100644
--- a/heron/tools/tracker/src/python/config.py
+++ b/heron/tools/tracker/src/python/config.py
@@ -27,8 +27,6 @@
EXTRA_LINK_NAME_KEY = "name"
EXTRA_LINK_FORMATTER_KEY = "formatter"
EXTRA_LINK_URL_KEY = "url"
-SPOUT_EXTRA_LINKS_KEY = "spout.extra.links"
-SPOUT_TYPE_KEY = "spout.type"
class Config(object):
"""
@@ -40,11 +38,9 @@
self.configs = configs
self.statemgr_config = StateMgrConfig()
self.extra_links = []
- self.spout_extra_links = {}
self.load_configs()
- # pylint: disable=line-too-long
def load_configs(self):
"""load config files"""
self.statemgr_config.set_state_locations(self.configs[STATEMGRS_KEY])
@@ -52,10 +48,6 @@
for extra_link in self.configs[EXTRA_LINKS_KEY]:
self.extra_links.append(self.validate_extra_link(extra_link))
- if SPOUT_EXTRA_LINKS_KEY in self.configs:
- for extra_link in self.configs[SPOUT_EXTRA_LINKS_KEY]:
- self.spout_extra_links[extra_link[SPOUT_TYPE_KEY]] = [self.validate_extra_link(link) for link in extra_link[EXTRA_LINKS_KEY]]
-
def validate_extra_link(self, extra_link):
"""validate extra link"""
if EXTRA_LINK_NAME_KEY not in extra_link or EXTRA_LINK_FORMATTER_KEY not in extra_link:
@@ -77,8 +69,6 @@
"${TOPOLOGY}": "topology",
"${ROLE}": "role",
"${USER}": "user",
- "${SPOUT_NAME}": "spout_name",
- "${SPOUT_SOURCE}": "spout_source",
}
dummy_formatted_url = url_format
for key, value in valid_parameters.items():
@@ -91,34 +81,25 @@
# No error is thrown, so the format is valid.
return url_format
- def get_formatted_url(self, formatter, execution_state, **additional):
+ def get_formatted_url(self, formatter, execution_state):
"""
+ @param formatter: The template string to interpolate
@param execution_state: The python dict representing JSON execution_state
- @param additional: additional kwargs to interpolate
@return Formatted viz url
"""
# Create the parameters based on execution state
- valid_parameters = {
- "${CLUSTER}": execution_state.get("cluster",
- additional.get("cluster", "${CLUSTER}")),
- "${ENVIRON}": execution_state.get("environ",
- additional.get("environ", "${ENVIRON}")),
- "${TOPOLOGY}": execution_state.get("jobname",
- additional.get("jobname", "${TOPOLOGY}")),
- "${ROLE}": execution_state.get("role",
- additional.get("role", "${ROLE}")),
- "${USER}": execution_state.get("submission_user",
- additional.get("submission_user", "${USER}")),
- "${SPOUT_NAME}": execution_state.get("spout.name",
- additional.get("spout.name", "${SPOUT_NAME}")),
- "${SPOUT_SOURCE}": execution_state.get("spout.source",
- additional.get("spout.source", "${SPOUT_SOURCE}")),
+ common_parameters = {
+ "${CLUSTER}": execution_state.get("cluster", "${CLUSTER}"),
+ "${ENVIRON}": execution_state.get("environ", "${ENVIRON}"),
+ "${TOPOLOGY}": execution_state.get("jobname", "${TOPOLOGY}"),
+ "${ROLE}": execution_state.get("role", "${ROLE}"),
+ "${USER}": execution_state.get("submission_user", "${USER}"),
}
formatted_url = formatter
- for key, value in valid_parameters.items():
+ for key, value in common_parameters.items():
formatted_url = formatted_url.replace(key, value)
return formatted_url
diff --git a/heron/tools/tracker/src/python/tracker.py b/heron/tools/tracker/src/python/tracker.py
index 77be6a2..3ce814d 100644
--- a/heron/tools/tracker/src/python/tracker.py
+++ b/heron/tools/tracker/src/python/tracker.py
@@ -386,25 +386,6 @@
"bolts": {},
}
- # Pre-render component extra links with general params
- execution_state = topology.execution_state
- executionState = {
- "cluster": execution_state.cluster,
- "environ": execution_state.environ,
- "role": execution_state.role,
- "jobname": topology.name,
- "submission_user": execution_state.submission_user,
- }
-
- spout_extra_links = {}
- for spout_type, extra_links in self.config.spout_extra_links.items():
- spout_extra_links[spout_type] = []
- for extra_link in extra_links:
- link = extra_link.copy()
- link[EXTRA_LINK_URL_KEY] = self.config.get_formatted_url(link[EXTRA_LINK_FORMATTER_KEY],
- executionState)
- spout_extra_links[spout_type].append(link)
-
# Add spouts.
for spout in topology.spouts():
spoutName = spout.comp.name
@@ -412,6 +393,7 @@
spoutSource = "NA"
spoutVersion = "NA"
spoutConfigs = spout.comp.config.kvs
+ spoutExtraLinks = []
for kvs in spoutConfigs:
if kvs.key == "spout.type":
spoutType = javaobj.loads(kvs.serialized_value)
@@ -419,20 +401,31 @@
spoutSource = javaobj.loads(kvs.serialized_value)
elif kvs.key == "spout.version":
spoutVersion = javaobj.loads(kvs.serialized_value)
+ elif kvs.key == "extra.links":
+ spoutExtraLinks = json.loads(javaobj.loads(kvs.serialized_value))
+
spoutPlan = {
"config": convert_pb_kvs(spoutConfigs, include_non_primitives=False),
"type": spoutType,
"source": spoutSource,
"version": spoutVersion,
"outputs": [],
- "extra_links": [],
+ "extra_links": spoutExtraLinks,
}
- for extra_link in spout_extra_links.get(spoutType, []):
- extra_link[EXTRA_LINK_URL_KEY] = self.config.get_formatted_url(
- extra_link[EXTRA_LINK_URL_KEY],
- spoutPlan["config"], **{"spout.name": spoutName})
- spoutPlan["extra_links"].append(extra_link)
+ # render component extra links with general params
+ execution_state = topology.execution_state
+ executionState = {
+ "cluster": execution_state.cluster,
+ "environ": execution_state.environ,
+ "role": execution_state.role,
+ "jobname": topology.name,
+ "submission_user": execution_state.submission_user,
+ }
+
+ for link in spoutPlan["extra_links"]:
+ link[EXTRA_LINK_URL_KEY] = self.config.get_formatted_url(link[EXTRA_LINK_FORMATTER_KEY],
+ executionState)
for outputStream in list(spout.outputs):
spoutPlan["outputs"].append({