Merge branch 'master' into kanishk/ui-bp-metric
diff --git a/bazel_configure.py b/bazel_configure.py
index 9693fe3..15cdf01 100755
--- a/bazel_configure.py
+++ b/bazel_configure.py
@@ -394,10 +394,11 @@
env_map['PYTHON2'] = discover_tool('python2.7', 'Python2', 'PYTHON2', '2.7')
if platform == 'Darwin':
- env_map['AR'] = discover_tool('libtool', 'archiver', 'AR')
+ env_map['LIBTOOL'] = discover_tool('glibtool', 'Libtool', 'LIBTOOL', '2.4.2')
else:
- env_map['AR'] = discover_tool('ar', 'archiver', 'AR')
+ env_map['LIBTOOL'] = discover_tool('libtool', 'Libtool', 'LIBTOOL', '2.4.2')
+ env_map['AR'] = discover_tool('ar', 'archiver', 'AR')
env_map['GCOV']= discover_tool('gcov','coverage tool', 'GCOV')
env_map['DWP'] = discover_tool_default('dwp', 'dwp', 'DWP', '/usr/bin/dwp')
env_map['NM'] = discover_tool_default('nm', 'nm', 'NM', '/usr/bin/nm')
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
index 4511b1d..48b8f75 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaSpout.java
@@ -65,7 +65,7 @@
@Override
@SuppressWarnings("unchecked")
- public void open(Map<String, Object> conf,
+ public void open(Map conf,
final TopologyContext context,
final SpoutOutputCollector aCollector) {
collector = aCollector;
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
index bd00cd0..1f162a7 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/KafkaUtils.java
@@ -286,7 +286,7 @@
tups = ((StringMultiSchemeWithTopic) kafkaConfig.scheme)
.deserializeWithTopic(topic, payloadBytes);
} else {
- tups = kafkaConfig.scheme.deserialize(payloadBytes);
+ tups = kafkaConfig.scheme.deserialize(payload);
}
}
return tups;
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
index 6540e39..f82a628 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringKeyValueScheme.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka;
+import java.nio.ByteBuffer;
import java.util.List;
import com.google.common.collect.ImmutableMap;
@@ -30,7 +31,7 @@
@Override
public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
if (key == null) {
- return deserialize(value);
+ return deserialize(ByteBuffer.wrap(value));
}
String keyString = StringScheme.deserializeString(key);
String valueString = StringScheme.deserializeString(value);
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
index 46daf85..685e3c9 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -32,7 +33,7 @@
private static final long serialVersionUID = 1985667152241541458L;
@Override
- public Iterable<List<Object>> deserialize(byte[] bytes) {
+ public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
throw new UnsupportedOperationException();
}
diff --git a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
index 55a1f80..219d353 100644
--- a/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
+++ b/contrib/kafka-spout/src/java/org/apache/storm/kafka/StringScheme.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -30,8 +31,10 @@
public static final String STRING_SCHEME_KEY = "str";
private static final long serialVersionUID = -1806461122261145598L;
- public List<Object> deserialize(byte[] bytes) {
- return new Values(deserializeString(bytes));
+ public List<Object> deserialize(ByteBuffer bytes) {
+ byte[] bytearray = new byte[bytes.remaining()];
+ bytes.get(bytearray);
+ return new Values(deserializeString(bytearray));
}
public static String deserializeString(byte[] bytes) {
diff --git a/heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java b/contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
similarity index 96%
rename from heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java
rename to contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
index 0930862..fae5d8f 100644
--- a/heron/api/src/java/com/twitter/heron/api/serializer/KryoSerializer.java
+++ b/contrib/kryo-serializer/src/java/com/twitter/heron/serializer/kryo/KryoSerializer.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.twitter.heron.api.serializer;
+package com.twitter.heron.serializer.kryo;
import java.util.Map;
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 9745fa8..a9e1933 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -3,12 +3,12 @@
package(default_visibility = ["//visibility:public"])
load("/tools/rules/heron_deps", "heron_java_proto_files")
+load("/tools/rules/jarjar_rules", "jarjar_binary")
api_deps_files = \
heron_java_proto_files() + [
":classification",
"//heron/common/src/java:basics-java",
- "//third_party/java:kryo",
]
java_library(
@@ -18,16 +18,16 @@
)
java_binary(
- name = "api-unshaded",
+ name = "api",
srcs = glob(["com/twitter/heron/api/**/*.java"]),
deps = api_deps_files,
)
-genrule(
+jarjar_binary(
name = "heron-api",
- srcs = [":api-unshaded_deploy.jar"],
- outs = ["heron-api.jar"],
- cmd = "cp $< $@",
+ src = ":api_deploy.jar",
+ shade = "shade.conf",
+ deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"]
)
java_library(
diff --git a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
index 0699d4f..904c1e3 100644
--- a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
+++ b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java
@@ -15,11 +15,8 @@
package com.twitter.heron.api;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
-import com.google.protobuf.ByteString;
-
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.utils.Utils;
@@ -37,26 +34,6 @@
this.topologyBuilder = topologyBuilder;
}
- private static TopologyAPI.Config.Builder getConfigBuilder(Config config) {
- TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
- Set<String> apiVars = config.getApiVars();
- for (String key : config.keySet()) {
- Object value = config.get(key);
- TopologyAPI.Config.KeyValue.Builder b = TopologyAPI.Config.KeyValue.newBuilder();
- b.setKey(key);
- if (apiVars.contains(key)) {
- b.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
- b.setValue(value.toString());
- } else {
- b.setType(TopologyAPI.ConfigValueType.JAVA_SERIALIZED_VALUE);
- b.setSerializedValue(ByteString.copyFrom(Utils.serialize(value)));
- }
- cBldr.addKvs(b);
- }
-
- return cBldr;
- }
-
private static void addDefaultTopologyConfig(Map<String, Object> userConfig) {
if (!userConfig.containsKey(Config.TOPOLOGY_DEBUG)) {
userConfig.put(Config.TOPOLOGY_DEBUG, "false");
@@ -96,7 +73,7 @@
addDefaultTopologyConfig(heronConfig);
heronConfig.put(Config.TOPOLOGY_NAME, name);
- topologyBuilder.setTopologyConfig(getConfigBuilder(heronConfig));
+ topologyBuilder.setTopologyConfig(Utils.getConfigBuilder(heronConfig));
return topologyBuilder.build();
}
diff --git a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
index aa1059a..6cf70e3 100644
--- a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
+++ b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java
@@ -14,9 +14,7 @@
package com.twitter.heron.api.topology;
-import java.util.HashMap;
import java.util.Map;
-import java.util.logging.Logger;
import com.google.protobuf.ByteString;
@@ -26,24 +24,22 @@
public abstract class BaseComponentDeclarer<T extends ComponentConfigurationDeclarer<?>>
extends BaseConfigurationDeclarer<T> {
- private static final Logger LOG = Logger.getLogger(BaseComponentDeclarer.class.getName());
private String name;
private IComponent component;
- private Map<String, Object> componentConfiguration;
+ private Config componentConfiguration;
public BaseComponentDeclarer(String name, IComponent comp, Number taskParallelism) {
this.name = name;
this.component = comp;
- this.componentConfiguration = comp.getComponentConfiguration();
- if (this.componentConfiguration == null) {
- this.componentConfiguration = new HashMap<>();
+ if (comp.getComponentConfiguration() != null) {
+ this.componentConfiguration = new Config(comp.getComponentConfiguration());
+ } else {
+ this.componentConfiguration = new Config();
}
if (taskParallelism != null) {
- this.componentConfiguration.put(Config.TOPOLOGY_COMPONENT_PARALLELISM,
- taskParallelism.toString());
+ Config.setComponentParallelism(this.componentConfiguration, taskParallelism.intValue());
} else {
- this.componentConfiguration.put(Config.TOPOLOGY_COMPONENT_PARALLELISM,
- "1");
+ Config.setComponentParallelism(this.componentConfiguration, 1);
}
}
@@ -63,23 +59,6 @@
bldr.setName(name);
bldr.setSpec(TopologyAPI.ComponentObjectSpec.JAVA_SERIALIZED_OBJECT);
bldr.setSerializedObject(ByteString.copyFrom(Utils.serialize(component)));
-
- TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
- for (Map.Entry<String, Object> entry : componentConfiguration.entrySet()) {
- if (entry.getKey() == null) {
- LOG.warning("ignore: config key is null");
- continue;
- }
- if (entry.getValue() == null) {
- LOG.warning("ignore: config key " + entry.getKey() + " has null value");
- continue;
- }
- TopologyAPI.Config.KeyValue.Builder kvBldr = TopologyAPI.Config.KeyValue.newBuilder();
- kvBldr.setKey(entry.getKey());
- kvBldr.setValue(entry.getValue().toString());
- kvBldr.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
- cBldr.addKvs(kvBldr);
- }
- bldr.setConfig(cBldr);
+ bldr.setConfig(Utils.getConfigBuilder(componentConfiguration));
}
}
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java b/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
index a66bab2..6cda2c8 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/DefaultMaxSpoutPendingTuner.java
@@ -14,10 +14,6 @@
package com.twitter.heron.api.utils;
-// TODO:- Uncomment this
-// import org.slf4j.Logger;
-// import org.slf4j.LoggerFactory;
-
/**
* This is a class that helps to auto tune the max spout pending value
*/
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/Utils.java b/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
index e07d3a9..97579d2 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/Utils.java
@@ -25,8 +25,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import com.google.protobuf.ByteString;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
public final class Utils {
+ private static final Logger LOG = Logger.getLogger(Utils.class.getName());
+
public static final String DEFAULT_STREAM_ID = "default";
private Utils() {
@@ -102,4 +111,39 @@
return ret;
}
+
+ /**
+ * Converts a Heron Config object into a TopologyAPI.Config.Builder. Config entries with null
+ * keys or values are ignored.
+ *
+ * @param config heron Config object
+ * @return TopologyAPI.Config.Builder with values loaded from config
+ */
+ public static TopologyAPI.Config.Builder getConfigBuilder(Config config) {
+ TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder();
+ Set<String> apiVars = config.getApiVars();
+ for (String key : config.keySet()) {
+ if (key == null) {
+ LOG.warning("ignore: null config key found");
+ continue;
+ }
+ Object value = config.get(key);
+ if (value == null) {
+ LOG.warning("ignore: config key " + key + " has null value");
+ continue;
+ }
+ TopologyAPI.Config.KeyValue.Builder b = TopologyAPI.Config.KeyValue.newBuilder();
+ b.setKey(key);
+ if (apiVars.contains(key)) {
+ b.setType(TopologyAPI.ConfigValueType.STRING_VALUE);
+ b.setValue(value.toString());
+ } else {
+ b.setType(TopologyAPI.ConfigValueType.JAVA_SERIALIZED_VALUE);
+ b.setSerializedValue(ByteString.copyFrom(serialize(value)));
+ }
+ cBldr.addKvs(b);
+ }
+
+ return cBldr;
+ }
}
diff --git a/heron/api/src/java/shade.conf b/heron/api/src/java/shade.conf
new file mode 100644
index 0000000..7ee86d0
--- /dev/null
+++ b/heron/api/src/java/shade.conf
@@ -0,0 +1 @@
+rule com.google** com.twitter.heron.__shaded__.@0
diff --git a/heron/common/src/java/BUILD b/heron/common/src/java/BUILD
index 678421e..03754ec 100644
--- a/heron/common/src/java/BUILD
+++ b/heron/common/src/java/BUILD
@@ -7,6 +7,7 @@
common_deps_files = \
heron_java_proto_files() + [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:classification",
"@org_yaml_snakeyaml//jar",
"@com_google_guava_guava//jar",
]
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
index 246efbf..4ebb6a4 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java
@@ -32,7 +32,7 @@
* 4. Expose methods which could be called externally to change the value of metrics
*/
-public class BoltMetrics {
+public class BoltMetrics implements ComponentMetrics {
private final CountMetric ackCount;
private final ReducedMetric<MeanReducerState, Number, Double> processLatency;
private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java
new file mode 100644
index 0000000..5503d14
--- /dev/null
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java
@@ -0,0 +1,28 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.common.utils.metrics;
+
+import com.twitter.heron.classification.InterfaceAudience;
+import com.twitter.heron.classification.InterfaceStability;
+
+/**
+ * Interface for common metric actions that both spouts and bolts support
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ComponentMetrics {
+
+ void serializeDataTuple(String streamId, long latency);
+ void emittedTuple(String streamId);
+}
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
index 998ea75..8ad183f 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java
@@ -33,7 +33,7 @@
* 4. Expose methods which could be called externally to change the value of metrics
*/
-public class SpoutMetrics {
+public class SpoutMetrics implements ComponentMetrics {
private final CountMetric ackCount;
private final ReducedMetric<MeanReducerState, Number, Double> completeLatency;
private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
index 9427d34..1a72dbf 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/misc/PhysicalPlanHelper.java
@@ -193,29 +193,26 @@
public void setTopologyContext(MetricsCollector metricsCollector) {
topologyContext =
- new TopologyContextImpl(constructConfig(pplan.getTopology().getTopologyConfig(), component),
+ new TopologyContextImpl(mergeConfigs(pplan.getTopology().getTopologyConfig(), component),
pplan.getTopology(), makeTaskToComponentMap(), myTaskId, metricsCollector);
}
- private Map<String, Object> constructConfig(TopologyAPI.Config config,
- TopologyAPI.Component acomponent) {
- Map<String, Object> retval = new HashMap<String, Object>();
+ private Map<String, Object> mergeConfigs(TopologyAPI.Config config,
+ TopologyAPI.Component acomponent) {
+ Map<String, Object> map = new HashMap<>();
+ addConfigsToMap(config, map);
+ addConfigsToMap(acomponent.getConfig(), map); // Override any component specific configs
+ return map;
+ }
+
+ private void addConfigsToMap(TopologyAPI.Config config, Map<String, Object> map) {
for (TopologyAPI.Config.KeyValue kv : config.getKvsList()) {
if (kv.hasValue()) {
- retval.put(kv.getKey(), kv.getValue());
+ map.put(kv.getKey(), kv.getValue());
} else {
- retval.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
+ map.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
}
}
- // Override any component specific configs
- for (TopologyAPI.Config.KeyValue kv : acomponent.getConfig().getKvsList()) {
- if (kv.hasValue()) {
- retval.put(kv.getKey(), kv.getValue());
- } else {
- retval.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
- }
- }
- return retval;
}
private Map<Integer, String> makeTaskToComponentMap() {
diff --git a/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
index 40179de..5b356c4 100644
--- a/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
+++ b/heron/common/src/java/com/twitter/heron/common/utils/misc/SerializeDeSerializeHelper.java
@@ -15,15 +15,18 @@
package com.twitter.heron.common.utils.misc;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
/**
* Get the serializer according to the serializerClassName
*/
public final class SerializeDeSerializeHelper {
+ private static final Logger LOG = Logger.getLogger(SerializeDeSerializeHelper.class.getName());
private SerializeDeSerializeHelper() {
}
@@ -33,7 +36,12 @@
try {
String serializerClassName = (String) config.get(Config.TOPOLOGY_SERIALIZER_CLASSNAME);
if (serializerClassName == null) {
- serializer = new KryoSerializer();
+ LOG.log(Level.WARNING, "Serializer class name not provided. "
+ + "Fall back to Java serializer. "
+ + "This could cause serious performance degradation. "
+ + "You can specify to use Kryo as serializer. "
+ + "See https://twitter.github.io/heron/docs/developers/serialization/ for details");
+ serializer = new JavaSerializer();
} else {
serializer = (IPluggableSerializer) Class.forName(serializerClassName).newInstance();
}
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 3415b9f..b8e4ce3 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -209,6 +209,7 @@
self.processes_to_monitor = {}
self.state_managers = []
+ self.jvm_version = None
@staticmethod
def parse_args(args):
@@ -412,21 +413,29 @@
retval = {}
# TO DO (Karthik) to be moved into keys and defaults files
code_cache_size_mb = 64
- perm_gen_size_mb = 128
+ java_metasize_mb = 128
+
+ java_version = self._get_jvm_version()
+ java_metasize_param = 'MetaspaceSize'
+ if java_version.startswith("1.7") or \
+ java_version.startswith("1.6") or \
+ java_version.startswith("1.5"):
+ java_metasize_param = 'PermSize'
+
for (instance_id, component_name, global_task_id, component_index) in instance_info:
total_jvm_size = int(self.component_rammap[component_name] / (1024 * 1024))
- heap_size_mb = total_jvm_size - code_cache_size_mb - perm_gen_size_mb
+ heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
Log.info("component name: %s, ram request: %d, total jvm size: %dM, "
- "cache size: %dM, perm size: %dM"
+ "cache size: %dM, metaspace size: %dM"
% (component_name, self.component_rammap[component_name],
- total_jvm_size, code_cache_size_mb, perm_gen_size_mb))
+ total_jvm_size, code_cache_size_mb, java_metasize_mb))
xmn_size = int(heap_size_mb / 2)
instance_cmd = [os.path.join(self.heron_java_home, 'bin/java'),
'-Xmx%dM' % heap_size_mb,
'-Xms%dM' % heap_size_mb,
'-Xmn%dM' % xmn_size,
- '-XX:MaxPermSize=%dM' % perm_gen_size_mb,
- '-XX:PermSize=%dM' % perm_gen_size_mb,
+ '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
+ '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
'-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
'-XX:+CMSScavengeBeforeRemark',
'-XX:TargetSurvivorRatio=90',
@@ -466,6 +475,21 @@
retval[instance_id] = instance_cmd
return retval
+ def _get_jvm_version(self):
+ if not self.jvm_version:
+ cmd = [os.path.join(self.heron_java_home, 'bin/java'),
+ '-cp', self.instance_classpath, 'com.twitter.heron.instance.util.JvmVersion']
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (process_stdout, process_stderr) = process.communicate()
+ if process.returncode != 0:
+ Log.error("Failed to determine JVM version. Exiting. Output of %s: %s",
+ ' '.join(cmd), process_stderr)
+ sys.exit(1)
+
+ self.jvm_version = process_stdout
+ Log.info("Detected JVM version %s" % self.jvm_version)
+ return self.jvm_version
+
# Returns the processes for each Python Heron Instance
def _get_python_instance_cmd(self, instance_info):
# pylint: disable=fixme
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index 64334d8..f76fe1f 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -62,6 +62,9 @@
self.processes.append(ProcessInfo(popen, name, cmd))
return popen
+ def _get_jvm_version(self):
+ return "1.8.y.x"
+
class HeronExecutorTest(unittest.TestCase):
"""Unittest for Heron Executor"""
@@ -107,8 +110,8 @@
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
- return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxPermSize=128M " \
- "-XX:PermSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
+ return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxMetaspaceSize=128M " \
+ "-XX:MetaspaceSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
"-XX:TargetSurvivorRatio=90 -XX:+PrintCommandLineFlags -verbosegc -XX:+PrintGCDetails " \
"-XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCCause " \
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
diff --git a/heron/instance/src/java/BUILD b/heron/instance/src/java/BUILD
index 06f6681..f635b13 100644
--- a/heron/instance/src/java/BUILD
+++ b/heron/instance/src/java/BUILD
@@ -10,7 +10,6 @@
"//heron/api/src/java:api-java",
"//heron/api/src/java:classification",
"//heron/common/src/java:common-java",
- "//third_party/java:logging",
]
java_library(
@@ -29,10 +28,6 @@
deps = [
":instance-java",
"//heron/common/src/java:common-java",
- # Heron does not need it, we supply this,
- # since we exclude them explicitly in target "heron_binary".
- # Later we might remove it.
- "//third_party/java:logging",
] + heron_java_proto_files(),
)
diff --git a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
new file mode 100644
index 0000000..890ee9f
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
@@ -0,0 +1,143 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.instance;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.serializer.IPluggableSerializer;
+import com.twitter.heron.common.basics.Communicator;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+import com.twitter.heron.proto.system.HeronTuples;
+
+/**
+ * Common functionality used by both bolt and spout output collectors
+ */
+public class AbstractOutputCollector {
+ protected final IPluggableSerializer serializer;
+ protected final OutgoingTupleCollection outputter;
+ protected final ComponentMetrics metrics;
+ protected final boolean ackEnabled;
+ private long totalTuplesEmitted;
+ private PhysicalPlanHelper helper;
+
+ public AbstractOutputCollector(IPluggableSerializer serializer,
+ PhysicalPlanHelper helper,
+ Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+ ComponentMetrics metrics) {
+ this.serializer = serializer;
+ this.metrics = metrics;
+ this.totalTuplesEmitted = 0;
+ updatePhysicalPlanHelper(helper);
+
+ Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
+ if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
+ && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
+ this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
+ } else {
+ this.ackEnabled = false;
+ }
+
+ this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
+ }
+
+ public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
+ this.helper = physicalPlanHelper;
+ }
+
+ public PhysicalPlanHelper getPhysicalPlanHelper() {
+ return helper;
+ }
+ /////////////////////////////////////////////////////////
+ // Following public methods are used for querying or
+ // interacting internal state of the output collectors
+ /////////////////////////////////////////////////////////
+
+ // Return true we could offer item to outQueue
+ public boolean isOutQueuesAvailable() {
+ return outputter.isOutQueuesAvailable();
+ }
+
+ // Return the total data emitted in bytes
+ public long getTotalDataEmittedInBytes() {
+ return outputter.getTotalDataEmittedInBytes();
+ }
+
+ // Flush the tuples to next stage
+ public void sendOutTuples() {
+ outputter.sendOutTuples();
+ }
+
+ // Clean the internal state of BoltOutputCollectorImpl
+ public void clear() {
+ outputter.clear();
+ }
+
+ public long getTotalTuplesEmitted() {
+ return totalTuplesEmitted;
+ }
+
+ protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId,
+ List<Object> tuple) {
+ // Start construct the data tuple
+ HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder();
+
+ // set the key. This is mostly ignored
+ builder.setKey(0);
+
+ List<Integer> customGroupingTargetTaskIds = null;
+ if (!helper.isCustomGroupingEmpty()) {
+ // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
+ customGroupingTargetTaskIds =
+ helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
+
+ if (customGroupingTargetTaskIds != null) {
+ // It is a CustomStreamGrouping
+ builder.addAllDestTaskIds(customGroupingTargetTaskIds);
+ }
+ }
+
+ // Invoke user-defined emit task hook
+ helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+
+ return builder;
+ }
+
+ protected void sendTuple(HeronTuples.HeronDataTuple.Builder bldr,
+ String streamId, List<Object> tuple) {
+ long tupleSizeInBytes = 0;
+ long startTime = System.nanoTime();
+
+ // Serialize it
+ for (Object obj : tuple) {
+ byte[] b = serializer.serialize(obj);
+ ByteString bstr = ByteString.copyFrom(b);
+ bldr.addValues(bstr);
+ tupleSizeInBytes += b.length;
+ }
+
+ long latency = System.nanoTime() - startTime;
+ metrics.serializeDataTuple(streamId, latency);
+ // submit to outputter
+ outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
+ totalTuplesEmitted++;
+
+ // Update metrics
+ metrics.emittedTuple(streamId);
+ }
+}
diff --git a/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
index bfc896c..e91f427 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/OutgoingTupleCollection.java
@@ -32,7 +32,6 @@
protected final String componentName;
// We have just one outQueue responsible for both control tuples and data tuples
private final Communicator<HeronTuples.HeronTupleSet> outQueue;
- private final SystemConfig systemConfig;
private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
@@ -53,7 +52,7 @@
Communicator<HeronTuples.HeronTupleSet> outQueue) {
this.outQueue = outQueue;
this.componentName = componentName;
- this.systemConfig =
+ SystemConfig systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
// Initialize the values in constructor
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
index 229b59c..1840c03 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
@@ -42,7 +42,7 @@
public class BoltInstance implements IInstance {
- protected final PhysicalPlanHelper helper;
+ protected PhysicalPlanHelper helper;
protected final IBolt bolt;
protected final BoltOutputCollectorImpl collector;
protected final IPluggableSerializer serializer;
@@ -99,6 +99,11 @@
((IUpdatable) bolt).update(physicalPlanHelper.getTopologyContext());
}
collector.updatePhysicalPlanHelper(physicalPlanHelper);
+
+ // Re-prepare the CustomStreamGrouping since the downstream tasks can change
+ physicalPlanHelper.prepareForCustomStreamGrouping();
+ // Reset the helper
+ helper = physicalPlanHelper;
}
@Override
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
index 10a61e9..ffa58fb 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java
@@ -17,14 +17,10 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.tuple.Tuple;
@@ -32,7 +28,7 @@
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.tuple.TupleImpl;
-import com.twitter.heron.instance.OutgoingTupleCollection;
+import com.twitter.heron.instance.AbstractOutputCollector;
import com.twitter.heron.proto.system.HeronTuples;
/**
@@ -53,61 +49,32 @@
* 2. Pack the tuple and submit the OutgoingTupleCollection's addDataTuple
* 3. Update the metrics
*/
-public class BoltOutputCollectorImpl implements IOutputCollector {
+public class BoltOutputCollectorImpl extends AbstractOutputCollector implements IOutputCollector {
private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
- private final IPluggableSerializer serializer;
- private final OutgoingTupleCollection outputter;
-
// Reference to update the bolt metrics
private final BoltMetrics boltMetrics;
- private PhysicalPlanHelper helper;
- private final boolean ackEnabled;
-
- public BoltOutputCollectorImpl(IPluggableSerializer serializer,
- PhysicalPlanHelper helper,
- Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
- BoltMetrics boltMetrics) {
+ protected BoltOutputCollectorImpl(IPluggableSerializer serializer,
+ PhysicalPlanHelper helper,
+ Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+ BoltMetrics boltMetrics) {
+ super(serializer, helper, streamOutQueue, boltMetrics);
+ this.boltMetrics = boltMetrics;
if (helper.getMyBolt() == null) {
throw new RuntimeException(helper.getMyTaskId() + " is not a bolt ");
}
-
- this.serializer = serializer;
- this.boltMetrics = boltMetrics;
- updatePhysicalPlanHelper(helper);
-
- Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
- if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
- && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
- this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
- } else {
- this.ackEnabled = false;
- }
-
- this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
}
- void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
- this.helper = physicalPlanHelper;
- }
-
- /////////////////////////////////////////////////////////
- // Following public methods are overrides OutputCollector
- /////////////////////////////////////////////////////////
-
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return admitBoltTuple(streamId, anchors, tuple);
}
@Override
- public void emitDirect(
- int taskId,
- String streamId,
- Collection<Tuple> anchors,
- List<Object> tuple) {
+ public void emitDirect(int taskId, String streamId,
+ Collection<Tuple> anchors, List<Object> tuple) {
throw new RuntimeException("emitDirect not supported");
}
@@ -127,67 +94,23 @@
}
/////////////////////////////////////////////////////////
- // Following public methods are used for querying or
- // interacting internal state of the BoltOutputCollectorImpl
- /////////////////////////////////////////////////////////
-
- // Return true we could offer item to outQueue
- public boolean isOutQueuesAvailable() {
- return outputter.isOutQueuesAvailable();
- }
-
- // Return the total data emitted in bytes
- public long getTotalDataEmittedInBytes() {
- return outputter.getTotalDataEmittedInBytes();
- }
-
- // Flush the tuples to next stage
- public void sendOutTuples() {
- outputter.sendOutTuples();
- }
-
- // Clean the internal state of BoltOutputCollectorImpl
- public void clear() {
- outputter.clear();
- }
-
- /////////////////////////////////////////////////////////
// Following private methods are internal implementations
/////////////////////////////////////////////////////////
- private List<Integer> admitBoltTuple(
- String streamId,
- Collection<Tuple> anchors,
- List<Object> tuple) {
- if (helper.isTerminatedComponent()) {
+ private List<Integer> admitBoltTuple(String streamId,
+ Collection<Tuple> anchors,
+ List<Object> tuple) {
+ if (getPhysicalPlanHelper().isTerminatedComponent()) {
// No need to handle this tuples
return null;
}
// Start construct the data tuple
- HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
- // set the key. This is mostly ignored
- bldr.setKey(0);
-
- List<Integer> customGroupingTargetTaskIds = null;
- if (!helper.isCustomGroupingEmpty()) {
- // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
- customGroupingTargetTaskIds =
- helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
- if (customGroupingTargetTaskIds != null) {
- // It is a CustomStreamGrouping
- bldr.addAllDestTaskIds(customGroupingTargetTaskIds);
- }
- }
-
- // Invoke user-defined emit task hook
- helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+ HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple);
// Set the anchors for a tuple
if (anchors != null) {
// This message is rooted
- Set<HeronTuples.RootId> mergedRoots = new HashSet<HeronTuples.RootId>();
+ Set<HeronTuples.RootId> mergedRoots = new HashSet<>();
for (Tuple tpl : anchors) {
if (tpl instanceof TupleImpl) {
TupleImpl t = (TupleImpl) tpl;
@@ -199,24 +122,7 @@
}
}
- long tupleSizeInBytes = 0;
- long startTime = System.nanoTime();
-
- // Serialize it
- for (Object obj : tuple) {
- byte[] b = serializer.serialize(obj);
- ByteString bstr = ByteString.copyFrom(b);
- bldr.addValues(bstr);
- tupleSizeInBytes += b.length;
- }
-
- long latency = System.nanoTime() - startTime;
- boltMetrics.serializeDataTuple(streamId, latency);
- // submit to outputter
- outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-
- // Update metrics
- boltMetrics.emittedTuple(streamId);
+ sendTuple(bldr, streamId, tuple);
// TODO:- remove this after changing the api
return null;
@@ -244,7 +150,7 @@
}
// Invoke user-defined boltAck task hook
- helper.getTopologyContext().invokeHookBoltAck(tuple, latency);
+ getPhysicalPlanHelper().getTopologyContext().invokeHookBoltAck(tuple, latency);
boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
}
@@ -271,7 +177,7 @@
}
// Invoke user-defined boltFail task hook
- helper.getTopologyContext().invokeHookBoltFail(tuple, latency);
+ getPhysicalPlanHelper().getTopologyContext().invokeHookBoltFail(tuple, latency);
boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
}
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 50c1166..b3e1660 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -117,6 +117,11 @@
((IUpdatable) spout).update(physicalPlanHelper.getTopologyContext());
}
collector.updatePhysicalPlanHelper(physicalPlanHelper);
+
+ // Re-prepare the CustomStreamGrouping since the downstream tasks can change
+ physicalPlanHelper.prepareForCustomStreamGrouping();
+ // Reset the helper
+ helper = physicalPlanHelper;
}
@Override
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
index fe04498..39a9754 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java
@@ -19,21 +19,17 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpoutOutputCollector;
import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.utils.metrics.SpoutMetrics;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
-import com.twitter.heron.instance.OutgoingTupleCollection;
+import com.twitter.heron.instance.AbstractOutputCollector;
import com.twitter.heron.proto.system.HeronTuples;
/**
@@ -49,7 +45,8 @@
* 3. Maintain some statistics, for instance, total tuples emitted.
* <p>
*/
-public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+public class SpoutOutputCollectorImpl
+ extends AbstractOutputCollector implements ISpoutOutputCollector {
private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
// Map from tuple key to composite object with insertion-order, i.e. ordered by time
@@ -57,61 +54,31 @@
private final TupleKeyGenerator keyGenerator;
- private final SpoutMetrics spoutMetrics;
- private PhysicalPlanHelper helper;
-
- private final boolean ackingEnabled;
// When acking is not enabled, if the spout does an emit with a anchor
// we need to ack it immediately. This keeps the list of those
private final Queue<RootTupleInfo> immediateAcks;
- private final IPluggableSerializer serializer;
- private final OutgoingTupleCollection outputter;
-
- private long totalTuplesEmitted;
-
- public SpoutOutputCollectorImpl(IPluggableSerializer serializer,
- PhysicalPlanHelper helper,
- Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
- SpoutMetrics spoutMetrics) {
+ protected SpoutOutputCollectorImpl(IPluggableSerializer serializer,
+ PhysicalPlanHelper helper,
+ Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
+ ComponentMetrics spoutMetrics) {
+ super(serializer, helper, streamOutQueue, spoutMetrics);
if (helper.getMySpout() == null) {
throw new RuntimeException(helper.getMyTaskId() + " is not a spout ");
}
- this.serializer = serializer;
- this.spoutMetrics = spoutMetrics;
this.keyGenerator = new TupleKeyGenerator();
- updatePhysicalPlanHelper(helper);
// with default capacity, load factor and insertion order
- inFlightTuples = new LinkedHashMap<Long, RootTupleInfo>();
+ inFlightTuples = new LinkedHashMap<>();
- Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
- if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
- && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
- this.ackingEnabled =
- Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
- } else {
- this.ackingEnabled = false;
- }
-
- if (!ackingEnabled) {
- immediateAcks = new ArrayDeque<RootTupleInfo>();
+ if (!ackEnabled) {
+ immediateAcks = new ArrayDeque<>();
} else {
immediateAcks = null;
}
-
- this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
}
- void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
- this.helper = physicalPlanHelper;
- }
-
- /////////////////////////////////////////////////////////
- // Following public methods are overrides OutputCollector
- /////////////////////////////////////////////////////////
-
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return admitSpoutTuple(streamId, tuple, messageId);
@@ -133,40 +100,20 @@
// Following public methods are used for querying or
// interacting internal state of the BoltOutputCollectorImpl
/////////////////////////////////////////////////////////
-
- // Return true we could offer item to outQueue
- public boolean isOutQueuesAvailable() {
- return outputter.isOutQueuesAvailable();
- }
-
- // Return the total data emitted in bytes
- public long getTotalDataEmittedInBytes() {
- return outputter.getTotalDataEmittedInBytes();
- }
-
- // Flush the tuples to next stage
- public void sendOutTuples() {
- outputter.sendOutTuples();
- }
-
- public long getTotalTuplesEmitted() {
- return totalTuplesEmitted;
- }
-
public int numInFlight() {
return inFlightTuples.size();
}
- public Queue<RootTupleInfo> getImmediateAcks() {
+ Queue<RootTupleInfo> getImmediateAcks() {
return immediateAcks;
}
- public RootTupleInfo retireInFlight(long rootId) {
+ RootTupleInfo retireInFlight(long rootId) {
return inFlightTuples.remove(rootId);
}
- public List<RootTupleInfo> retireExpired(long timeout) {
- List<RootTupleInfo> retval = new ArrayList<RootTupleInfo>();
+ List<RootTupleInfo> retireExpired(long timeout) {
+ List<RootTupleInfo> retval = new ArrayList<>();
long curTime = System.nanoTime();
// The LinkedHashMap is ordered by insertion order, i.e. ordered by time
@@ -186,81 +133,41 @@
return retval;
}
- // Clean the internal state of BoltOutputCollectorImpl
- public void clear() {
- outputter.clear();
- }
-
/////////////////////////////////////////////////////////
// Following private methods are internal implementations
/////////////////////////////////////////////////////////
private List<Integer> admitSpoutTuple(String streamId, List<Object> tuple, Object messageId) {
// No need to send tuples if it is already terminated
- if (helper.isTerminatedComponent()) {
+ if (getPhysicalPlanHelper().isTerminatedComponent()) {
return null;
}
// Start construct the data tuple
- HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
- // set the key. This is mostly ignored
- bldr.setKey(0);
-
- // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
- List<Integer> customGroupingTargetTaskIds = null;
- if (!helper.isCustomGroupingEmpty()) {
- customGroupingTargetTaskIds =
- helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
- if (customGroupingTargetTaskIds != null) {
- // It is a CustomStreamGrouping
- bldr.addAllDestTaskIds(customGroupingTargetTaskIds);
- }
- }
-
- // Invoke user-defined emit task hook
- helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
+ HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple);
if (messageId != null) {
RootTupleInfo tupleInfo = new RootTupleInfo(streamId, messageId);
- if (ackingEnabled) {
+ if (ackEnabled) {
// This message is rooted
- HeronTuples.RootId.Builder rtbldr = EstablishRootId(tupleInfo);
+ HeronTuples.RootId.Builder rtbldr = establishRootId(tupleInfo);
bldr.addRoots(rtbldr);
} else {
immediateAcks.offer(tupleInfo);
}
}
- long tupleSizeInBytes = 0;
- long startTime = System.nanoTime();
-
- // Serialize it
- for (Object obj : tuple) {
- byte[] b = serializer.serialize(obj);
- ByteString bstr = ByteString.copyFrom(b);
- bldr.addValues(bstr);
- tupleSizeInBytes += b.length;
- }
-
- long latency = System.nanoTime() - startTime;
- spoutMetrics.serializeDataTuple(streamId, latency);
-
- // submit to outputter
- outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
- totalTuplesEmitted++;
- spoutMetrics.emittedTuple(streamId);
+ sendTuple(bldr, streamId, tuple);
// TODO:- remove this after changing the api
return null;
}
- private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo tupleInfo) {
+ private HeronTuples.RootId.Builder establishRootId(RootTupleInfo tupleInfo) {
// This message is rooted
long rootId = keyGenerator.next();
HeronTuples.RootId.Builder rtbldr = HeronTuples.RootId.newBuilder();
- rtbldr.setTaskid(helper.getMyTaskId());
+ rtbldr.setTaskid(getPhysicalPlanHelper().getMyTaskId());
rtbldr.setKey(rootId);
inFlightTuples.put(rootId, tupleInfo);
return rtbldr;
diff --git a/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java b/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java
new file mode 100644
index 0000000..249899d
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/util/JvmVersion.java
@@ -0,0 +1,26 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.instance.util;
+
+/**
+ * When run, prints to stdout the version of the java vm this class is being run in. This is used
+ * by the executor to get just the java version of the VM for easy and consistent parsing.
+ */
+public final class JvmVersion {
+ private JvmVersion() { }
+
+ public static void main(String[] args) {
+ System.out.print(System.getProperty("java.version"));
+ }
+}
diff --git a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
index 3072be1..10f2e02 100644
--- a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
+++ b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java
@@ -67,6 +67,8 @@
private PhysicalPlanHelper helper;
+ private long lastNotConnectedLogTime = 0;
+
public StreamManagerClient(NIOLooper s, String streamManagerHost, int streamManagerPort,
String topologyName, String topologyId,
PhysicalPlans.Instance instance,
@@ -240,6 +242,8 @@
}
private void readStreamMessageIfNeeded() {
+ final long lastNotConnectedLogThrottleSeconds = 5;
+
// If client is not connected, just return
if (isConnected()) {
if (isInQueuesAvailable() || helper == null) {
@@ -249,7 +253,13 @@
stopReading();
}
} else {
- LOG.info("Stop reading due to not yet connected to Stream Manager.");
+ long now = System.currentTimeMillis();
+ if (now - lastNotConnectedLogTime > lastNotConnectedLogThrottleSeconds * 1000) {
+ LOG.info(String.format("Stop reading due to not yet connected to Stream Manager. This "
+ + "message is throttled to emit no more than once every %d seconds.",
+ lastNotConnectedLogThrottleSeconds));
+ lastNotConnectedLogTime = now;
+ }
}
}
diff --git a/heron/instance/src/python/network/metricsmgr_client.py b/heron/instance/src/python/network/metricsmgr_client.py
index ddf2861..9f092b6 100644
--- a/heron/instance/src/python/network/metricsmgr_client.py
+++ b/heron/instance/src/python/network/metricsmgr_client.py
@@ -72,6 +72,7 @@
Log.error("Error connecting to Metrics Manager with status: %s" % str(status))
retry_interval = float(self.sys_config[constants.INSTANCE_RECONNECT_METRICSMGR_INTERVAL_SEC])
self.looper.register_timer_task_in_sec(self.start_connect, retry_interval)
+ return
self._send_register_req()
def on_response(self, status, context, response):
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index f61fcaf..c7432ae 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -7,6 +7,7 @@
"//heron/common/src/java:common-java",
"//heron/instance/src/java:instance-java",
"//third_party/java:junit4",
+ "//third_party/java:kryo",
]
java_library(
diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java
new file mode 100644
index 0000000..ddf7bf9
--- /dev/null
+++ b/heron/instance/tests/java/com/twitter/heron/grouping/AbstractTupleRoutingTest.java
@@ -0,0 +1,277 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.grouping;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.common.basics.Communicator;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.basics.SlaveLooper;
+import com.twitter.heron.common.basics.SysUtils;
+import com.twitter.heron.common.basics.WakeableLooper;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+import com.twitter.heron.instance.InstanceControlMsg;
+import com.twitter.heron.instance.Slave;
+import com.twitter.heron.proto.system.HeronTuples;
+import com.twitter.heron.proto.system.Metrics;
+import com.twitter.heron.proto.system.PhysicalPlans;
+import com.twitter.heron.resource.Constants;
+import com.twitter.heron.resource.TestBolt;
+import com.twitter.heron.resource.TestSpout;
+import com.twitter.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Abstract test to verify that tuples can be routed according to custom logic. Specific tests
+ * should extend this and override the initSpout(..), initBoltA(..) and/or initBoltB(..) methods as
+ * necessary to achieve the desired routing logic.
+ */
+public abstract class AbstractTupleRoutingTest {
+ private WakeableLooper testLooper;
+ private SlaveLooper slaveLooper;
+ private PhysicalPlans.PhysicalPlan physicalPlan;
+ private Communicator<HeronTuples.HeronTupleSet> outStreamQueue;
+ private Communicator<HeronTuples.HeronTupleSet> inStreamQueue;
+ private Communicator<InstanceControlMsg> inControlQueue;
+ private ExecutorService threadsPool;
+ private volatile int tupleReceived;
+ private volatile StringBuilder groupingInitInfo;
+ private Slave slave;
+
+ // Test component info. Topology is SPOUT -> BOLT_A -> BOLT_B
+ protected enum Component {
+ SPOUT("test-spout", "spout-id"),
+ BOLT_A("test-bolt-a", "bolt-a-id"),
+ BOLT_B("test-bolt-b", "bolt-b-id");
+
+ private String name;
+ private String id;
+
+ Component(String name, String instanceId) {
+ this.name = name;
+ this.id = instanceId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getInstanceId() {
+ return id;
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ UnitTestHelper.addSystemConfigToSingleton();
+
+ tupleReceived = 0;
+ groupingInitInfo = new StringBuilder();
+
+ testLooper = new SlaveLooper();
+ slaveLooper = new SlaveLooper();
+ outStreamQueue = new Communicator<>(slaveLooper, testLooper);
+ outStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+ inStreamQueue = new Communicator<>(testLooper, slaveLooper);
+ inStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+ inControlQueue = new Communicator<>(testLooper, slaveLooper);
+
+ Communicator<Metrics.MetricPublisherPublishMessage> slaveMetricsOut =
+ new Communicator<>(slaveLooper, testLooper);
+ slaveMetricsOut.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
+
+ slave = new Slave(slaveLooper, inStreamQueue, outStreamQueue, inControlQueue, slaveMetricsOut);
+ threadsPool = Executors.newSingleThreadExecutor();
+
+ threadsPool.execute(slave);
+ }
+
+ @After
+ public void after() throws Exception {
+ UnitTestHelper.clearSingletonRegistry();
+
+ tupleReceived = 0;
+ groupingInitInfo = null;
+
+ if (testLooper != null) {
+ testLooper.exitLoop();
+ }
+ if (slaveLooper != null) {
+ slaveLooper.exitLoop();
+ }
+ if (threadsPool != null) {
+ threadsPool.shutdownNow();
+ }
+ physicalPlan = null;
+ testLooper = null;
+ slaveLooper = null;
+ outStreamQueue = null;
+ inStreamQueue = null;
+
+ slave = null;
+ threadsPool = null;
+ }
+
+ protected String getInitInfoKey(String componentName) {
+ return "routing-init-info+" + componentName;
+ }
+
+ /**
+ * Test that tuple routing occurs using round robin
+ */
+ @Test
+ public void testRoundRobinRouting() throws Exception {
+ physicalPlan = constructPhysicalPlan();
+
+ PhysicalPlanHelper physicalPlanHelper =
+ new PhysicalPlanHelper(physicalPlan, getComponentToVerify().getInstanceId());
+ InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder()
+ .setNewPhysicalPlanHelper(physicalPlanHelper)
+ .build();
+
+ inControlQueue.offer(instanceControlMsg);
+
+ SingletonRegistry.INSTANCE.registerSingleton(
+ getInitInfoKey(getComponentToVerify().getName()), groupingInitInfo);
+
+ final int expectedTuplesValidated = 10;
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < Constants.RETRY_TIMES; i++) {
+ if (outStreamQueue.size() != 0) {
+ HeronTuples.HeronTupleSet set = outStreamQueue.poll();
+
+ assertTrue(set.isInitialized());
+ assertFalse(set.hasControl());
+ assertTrue(set.hasData());
+
+ HeronTuples.HeronDataTupleSet dataTupleSet = set.getData();
+ assertEquals(dataTupleSet.getStream().getId(), "default");
+ assertEquals(dataTupleSet.getStream().getComponentName(),
+ getComponentToVerify().getName());
+
+ for (HeronTuples.HeronDataTuple dataTuple : dataTupleSet.getTuplesList()) {
+ List<Integer> destTaskIds = dataTuple.getDestTaskIdsList();
+ assertEquals(1, destTaskIds.size());
+ assertEquals((Integer) tupleReceived, destTaskIds.get(0));
+ tupleReceived++;
+ }
+ }
+ if (tupleReceived == expectedTuplesValidated) {
+ assertEquals(getExpectedComponentInitInfo(), groupingInitInfo.toString());
+ testLooper.exitLoop();
+ break;
+ }
+ SysUtils.sleep(Constants.RETRY_INTERVAL_MS);
+ }
+ }
+ };
+
+ testLooper.addTasksOnWakeup(task);
+ testLooper.loop();
+ assertEquals(expectedTuplesValidated, tupleReceived);
+ }
+
+ private PhysicalPlans.PhysicalPlan constructPhysicalPlan() {
+ PhysicalPlans.PhysicalPlan.Builder physicalPlanBuilder
+ = PhysicalPlans.PhysicalPlan.newBuilder();
+
+ // Set topology protobuf
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ initSpout(topologyBuilder, Component.SPOUT.getName());
+ initBoltA(topologyBuilder, Component.BOLT_A.getName(), Component.SPOUT.getName());
+ initBoltB(topologyBuilder, Component.BOLT_B.getName(), Component.BOLT_A.getName());
+
+ Config conf = new Config();
+ conf.setTeamEmail("some-team@company.com");
+ conf.setTeamName("some-team");
+ conf.setTopologyProjectName("heron-integration-test");
+ conf.setNumStmgrs(1);
+ conf.setMaxSpoutPending(100);
+ conf.setEnableAcking(false);
+
+ TopologyAPI.Topology topology = topologyBuilder.createTopology()
+ .setName("topology-name")
+ .setConfig(conf)
+ .setState(TopologyAPI.TopologyState.RUNNING)
+ .getTopology();
+
+ physicalPlanBuilder.setTopology(topology);
+
+ // Set instances
+ int taskId = 0;
+ for (Component component : Component.values()) {
+ addComponent(physicalPlanBuilder, component, taskId++);
+ }
+
+ // Set stream mgr
+ PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
+ stmgr.setId("stream-manager-id");
+ stmgr.setHostName("127.0.0.1");
+ stmgr.setDataPort(8888);
+ stmgr.setLocalEndpoint("endpoint");
+ physicalPlanBuilder.addStmgrs(stmgr);
+
+ return physicalPlanBuilder.build();
+ }
+
+ private void addComponent(PhysicalPlans.PhysicalPlan.Builder builder,
+ Component component, int taskId) {
+ PhysicalPlans.InstanceInfo.Builder instanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+ instanceInfo.setComponentName(component.getName());
+ instanceInfo.setTaskId(taskId);
+ instanceInfo.setComponentIndex(0);
+
+ PhysicalPlans.Instance.Builder instance = PhysicalPlans.Instance.newBuilder();
+ instance.setInstanceId(component.getInstanceId());
+ instance.setStmgrId("stream-manager-id");
+ instance.setInfo(instanceInfo);
+
+ builder.addInstances(instance);
+ }
+
+ protected void initSpout(TopologyBuilder topologyBuilder, String spoutId) {
+ topologyBuilder.setSpout(spoutId, new TestSpout(), 1);
+ }
+
+ protected void initBoltA(TopologyBuilder topologyBuilder,
+ String boltId, String upstreamComponentId) {
+ topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+ .shuffleGrouping(upstreamComponentId);
+ }
+
+ protected void initBoltB(TopologyBuilder topologyBuilder,
+ String boltId, String upstreamComponentId) {
+ topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+ .shuffleGrouping(upstreamComponentId);
+ }
+
+ protected abstract Component getComponentToVerify();
+
+ protected abstract String getExpectedComponentInitInfo();
+}
diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
index 82099e4..515fee4 100644
--- a/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/grouping/CustomGroupingTest.java
@@ -1,257 +1,74 @@
-// Copyright 2016 Twitter. All rights reserved.
+// Copyright 2017 Twitter. All rights reserved.
//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
package com.twitter.heron.grouping;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.grouping.CustomStreamGrouping;
import com.twitter.heron.api.topology.TopologyBuilder;
import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.basics.SlaveLooper;
-import com.twitter.heron.common.basics.SysUtils;
-import com.twitter.heron.common.basics.WakeableLooper;
-import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.instance.InstanceControlMsg;
-import com.twitter.heron.instance.Slave;
-import com.twitter.heron.proto.system.HeronTuples;
-import com.twitter.heron.proto.system.Metrics;
-import com.twitter.heron.proto.system.PhysicalPlans;
-import com.twitter.heron.resource.Constants;
import com.twitter.heron.resource.TestBolt;
-import com.twitter.heron.resource.TestSpout;
-import com.twitter.heron.resource.UnitTestHelper;
-public class CustomGroupingTest {
- private static final String SPOUT_INSTANCE_ID = "spout-id";
- private static final String CUSTOM_GROUPING_INFO = "custom-grouping-info-in-prepare";
+/**
+ * Tests custom grouping by using round robin grouping from SPOUT to BOLT_A
+ */
+public class CustomGroupingTest extends AbstractTupleRoutingTest {
- private WakeableLooper testLooper;
- private SlaveLooper slaveLooper;
- private PhysicalPlans.PhysicalPlan physicalPlan;
- private Communicator<HeronTuples.HeronTupleSet> outStreamQueue;
- private Communicator<HeronTuples.HeronTupleSet> inStreamQueue;
- private Communicator<InstanceControlMsg> inControlQueue;
- private ExecutorService threadsPool;
- private Communicator<Metrics.MetricPublisherPublishMessage> slaveMetricsOut;
- private volatile int tupleReceived;
- private volatile StringBuilder customGroupingInfoInPrepare;
- private Slave slave;
+ @Override
+ protected void initBoltA(TopologyBuilder topologyBuilder,
+ String boltId, String upstreamComponentId) {
+ final CustomStreamGrouping myCustomGrouping =
+ new MyRoundRobinCustomGrouping(getInitInfoKey(upstreamComponentId));
- @Before
- public void before() throws Exception {
- UnitTestHelper.addSystemConfigToSingleton();
-
- tupleReceived = 0;
- customGroupingInfoInPrepare = new StringBuilder();
-
- testLooper = new SlaveLooper();
- slaveLooper = new SlaveLooper();
- outStreamQueue = new Communicator<HeronTuples.HeronTupleSet>(slaveLooper, testLooper);
- outStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
- inStreamQueue = new Communicator<HeronTuples.HeronTupleSet>(testLooper, slaveLooper);
- inStreamQueue.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
- inControlQueue = new Communicator<InstanceControlMsg>(testLooper, slaveLooper);
-
- slaveMetricsOut =
- new Communicator<Metrics.MetricPublisherPublishMessage>(slaveLooper, testLooper);
- slaveMetricsOut.init(Constants.QUEUE_BUFFER_SIZE, Constants.QUEUE_BUFFER_SIZE, 0.5);
-
- slave = new Slave(slaveLooper, inStreamQueue, outStreamQueue, inControlQueue, slaveMetricsOut);
- threadsPool = Executors.newSingleThreadExecutor();
-
- threadsPool.execute(slave);
+ topologyBuilder.setBolt(boltId, new TestBolt(), 1)
+ .customGrouping(upstreamComponentId, myCustomGrouping);
}
- @After
- public void after() throws Exception {
- UnitTestHelper.clearSingletonRegistry();
-
- tupleReceived = 0;
- customGroupingInfoInPrepare = null;
-
- if (testLooper != null) {
- testLooper.exitLoop();
- }
- if (slaveLooper != null) {
- slaveLooper.exitLoop();
- }
- if (threadsPool != null) {
- threadsPool.shutdownNow();
- }
- physicalPlan = null;
- testLooper = null;
- slaveLooper = null;
- outStreamQueue = null;
- inStreamQueue = null;
-
- slave = null;
- threadsPool = null;
+ @Override
+ protected Component getComponentToVerify() {
+ return Component.SPOUT;
}
- /**
- * Test custom grouping
- */
- @Test
- public void testCustomGrouping() throws Exception {
- final MyCustomGrouping myCustomGrouping = new MyCustomGrouping();
- final String expectedCustomGroupingStringInPrepare = "test-spout+test-spout+default+[1]";
-
- physicalPlan = constructPhysicalPlan(myCustomGrouping);
-
- PhysicalPlanHelper physicalPlanHelper = new PhysicalPlanHelper(physicalPlan, SPOUT_INSTANCE_ID);
- InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder().
- setNewPhysicalPlanHelper(physicalPlanHelper).
- build();
-
- inControlQueue.offer(instanceControlMsg);
-
- SingletonRegistry.INSTANCE.registerSingleton(CUSTOM_GROUPING_INFO, customGroupingInfoInPrepare);
-
- Runnable task = new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < Constants.RETRY_TIMES; i++) {
- if (outStreamQueue.size() != 0) {
- HeronTuples.HeronTupleSet set = outStreamQueue.poll();
-
- Assert.assertTrue(set.isInitialized());
- Assert.assertFalse(set.hasControl());
- Assert.assertTrue(set.hasData());
-
- HeronTuples.HeronDataTupleSet dataTupleSet = set.getData();
- Assert.assertEquals(dataTupleSet.getStream().getId(), "default");
- Assert.assertEquals(dataTupleSet.getStream().getComponentName(), "test-spout");
-
- for (HeronTuples.HeronDataTuple dataTuple : dataTupleSet.getTuplesList()) {
- List<Integer> destTaskIds = dataTuple.getDestTaskIdsList();
- Assert.assertEquals(destTaskIds.size(), 1);
- Assert.assertEquals(destTaskIds.get(0), (Integer) tupleReceived);
- tupleReceived++;
- }
- }
- if (tupleReceived == 10) {
- Assert.assertEquals(expectedCustomGroupingStringInPrepare,
- customGroupingInfoInPrepare.toString());
- testLooper.exitLoop();
- break;
- }
- SysUtils.sleep(Constants.RETRY_INTERVAL_MS);
- }
- }
- };
-
- testLooper.addTasksOnWakeup(task);
- testLooper.loop();
+ @Override
+ protected String getExpectedComponentInitInfo() {
+ return "test-spout+test-spout+default+[1]";
}
- private PhysicalPlans.PhysicalPlan constructPhysicalPlan(MyCustomGrouping myCustomGrouping) {
- PhysicalPlans.PhysicalPlan.Builder pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
-
- // Set topology protobuf
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- topologyBuilder.setSpout("test-spout", new TestSpout(), 1);
- // Here we need case switch to corresponding grouping
- topologyBuilder.setBolt("test-bolt", new TestBolt(), 1).
- customGrouping("test-spout", myCustomGrouping);
-
- Config conf = new Config();
- conf.setTeamEmail("streaming-compute@twitter.com");
- conf.setTeamName("stream-computing");
- conf.setTopologyProjectName("heron-integration-test");
- conf.setNumStmgrs(1);
- conf.setMaxSpoutPending(100);
- conf.setEnableAcking(false);
-
- TopologyAPI.Topology fTopology =
- topologyBuilder.createTopology().
- setName("topology-name").
- setConfig(conf).
- setState(TopologyAPI.TopologyState.RUNNING).
- getTopology();
-
- pPlan.setTopology(fTopology);
-
- // Set instances
- // Construct the spoutInstance
- PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
- spoutInstanceInfo.setComponentName("test-spout");
- spoutInstanceInfo.setTaskId(0);
- spoutInstanceInfo.setComponentIndex(0);
-
- PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
- spoutInstance.setInstanceId("spout-id");
- spoutInstance.setStmgrId("stream-manager-id");
- spoutInstance.setInfo(spoutInstanceInfo);
-
- // Construct the boltInstanceInfo
- PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
- boltInstanceInfo.setComponentName("test-bolt");
- boltInstanceInfo.setTaskId(1);
- boltInstanceInfo.setComponentIndex(0);
-
- PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
- boltInstance.setInstanceId("bolt-id");
- boltInstance.setStmgrId("stream-manager-id");
- boltInstance.setInfo(boltInstanceInfo);
-
- pPlan.addInstances(spoutInstance);
- pPlan.addInstances(boltInstance);
-
- // Set stream mgr
- PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
- stmgr.setId("stream-manager-id");
- stmgr.setHostName("127.0.0.1");
- stmgr.setDataPort(8888);
- stmgr.setLocalEndpoint("endpoint");
- pPlan.addStmgrs(stmgr);
-
- return pPlan.build();
- }
-
- private static class MyCustomGrouping implements CustomStreamGrouping {
+ private static final class MyRoundRobinCustomGrouping implements CustomStreamGrouping {
private static final long serialVersionUID = -4141962710451507976L;
private volatile int emitted = 0;
+ private final String initInfoKey;
+
+ private MyRoundRobinCustomGrouping(String initInfoKey) {
+ super();
+ this.initInfoKey = initInfoKey;
+ }
@Override
- public void prepare(
- TopologyContext context,
- String component,
- String streamId,
- List<Integer> targetTasks) {
+ public void prepare(TopologyContext context, String component,
+ String streamId, List<Integer> targetTasks) {
- StringBuilder customGroupingInfoInPrepare =
- (StringBuilder) SingletonRegistry.INSTANCE.getSingleton(CUSTOM_GROUPING_INFO);
- customGroupingInfoInPrepare.append(context.getThisComponentId() + "+" + component
- + "+" + streamId + "+" + targetTasks.toString());
+ ((StringBuilder) SingletonRegistry.INSTANCE.getSingleton(initInfoKey))
+ .append(String.format("%s+%s+%s+%s",
+ context.getThisComponentId(), component, streamId, targetTasks.toString()));
}
@Override
public List<Integer> chooseTasks(List<Object> values) {
- List<Integer> res = new ArrayList<Integer>();
+ List<Integer> res = new ArrayList<>();
res.add(emitted);
emitted++;
return res;
diff --git a/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java b/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
index 172dd35..3ce2919 100644
--- a/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/instance/bolt/BoltInstanceTest.java
@@ -30,7 +30,7 @@
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
@@ -81,7 +81,7 @@
@BeforeClass
public static void beforeClass() throws Exception {
- serializer = new KryoSerializer();
+ serializer = new JavaSerializer();
serializer.initialize(null);
}
diff --git a/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java b/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
index 5d46514..1c0794f 100644
--- a/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
+++ b/heron/instance/tests/java/com/twitter/heron/instance/spout/SpoutInstanceTest.java
@@ -32,7 +32,7 @@
import org.junit.Test;
import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.serializer.KryoSerializer;
+import com.twitter.heron.api.serializer.JavaSerializer;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.basics.SingletonRegistry;
import com.twitter.heron.common.basics.SlaveLooper;
@@ -95,7 +95,7 @@
@BeforeClass
public static void beforeClass() throws Exception {
- serializer = new KryoSerializer();
+ serializer = new JavaSerializer();
serializer.initialize(null);
}
diff --git a/heron/metricscachemgr/src/java/BUILD b/heron/metricscachemgr/src/java/BUILD
index 31664db..dd031be 100644
--- a/heron/metricscachemgr/src/java/BUILD
+++ b/heron/metricscachemgr/src/java/BUILD
@@ -13,7 +13,7 @@
"//heron/api/src/java:api-java",
"//heron/common/src/java:common-java",
"//heron/scheduler-core/src/java:scheduler-java",
- "//heron/statemgrs/src/java:localfs-statemgr-java",
+ "//heron/statemgrs/src/java:statemgrs-java",
"//heron/metricsmgr/src/thrift:thrift_scribe_java",
"//heron/metricsmgr/src/java:metricsmgr-java",
"//third_party/java:cli",
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
index 7e3bfd5..f608439 100644
--- a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -158,6 +158,12 @@
any(String.class), eq(TOPOLOGY_NAME),
eq(mockStateMgr), any(NetworkUtils.TunnelConfig.class));
+ // reactivation won't happen since topology state is still running due to mock state manager
+ PowerMockito.doNothing().when(TMasterUtils.class, "transitionTopologyState",
+ eq(TOPOLOGY_NAME), eq(TMasterUtils.TMasterCommand.ACTIVATE), eq(mockStateMgr),
+ eq(TopologyAPI.TopologyState.PAUSED), eq(TopologyAPI.TopologyState.RUNNING),
+ any(NetworkUtils.TunnelConfig.class));
+
spyUpdateManager.updateTopology(currentProtoPlan, proposedProtoPlan);
verify(spyUpdateManager).deactivateTopology(eq(mockStateMgr), eq(testTopology));
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
index 9f5b49c..fc4fd57 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/BoltOutputCollectorImpl.java
@@ -14,24 +14,11 @@
package com.twitter.heron.simulator.instance;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.serializer.IPluggableSerializer;
-import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.common.basics.Communicator;
import com.twitter.heron.common.utils.metrics.BoltMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.proto.system.HeronTuples;
/**
@@ -52,220 +39,14 @@
* 2. Pack the tuple and submit the OutgoingTupleCollection's addDataTuple
* 3. Update the metrics
*/
-public class BoltOutputCollectorImpl implements IOutputCollector {
- private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
-
- private final IPluggableSerializer serializer;
- private final OutgoingTupleCollection outputter;
-
- // Reference to update the bolt metrics
- private final BoltMetrics boltMetrics;
- private PhysicalPlanHelper helper;
-
- private final boolean ackEnabled;
+public class BoltOutputCollectorImpl
+ extends com.twitter.heron.instance.bolt.BoltOutputCollectorImpl
+ implements IOutputCollector {
public BoltOutputCollectorImpl(IPluggableSerializer serializer,
PhysicalPlanHelper helper,
Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
BoltMetrics boltMetrics) {
-
- if (helper.getMyBolt() == null) {
- throw new RuntimeException(helper.getMyTaskId() + " is not a bolt ");
- }
-
- this.serializer = serializer;
- this.boltMetrics = boltMetrics;
- updatePhysicalPlanHelper(helper);
-
- Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
- if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
- && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
- this.ackEnabled = Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
- } else {
- this.ackEnabled = false;
- }
-
- this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
- }
-
- public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
- this.helper = physicalPlanHelper;
- }
-
- /////////////////////////////////////////////////////////
- // Following public methods are overrides OutputCollector
- /////////////////////////////////////////////////////////
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- return admitBoltTuple(streamId, anchors, tuple);
- }
-
- @Override
- public void emitDirect(
- int taskId,
- String streamId,
- Collection<Tuple> anchors,
- List<Object> tuple) {
- throw new RuntimeException("emitDirect not supported");
- }
-
- @Override
- public void reportError(Throwable error) {
- LOG.log(Level.SEVERE, "Reporting an error in topology code ", error);
- }
-
- @Override
- public void ack(Tuple input) {
- admitAckTuple(input);
- }
-
- @Override
- public void fail(Tuple input) {
- admitFailTuple(input);
- }
-
- /////////////////////////////////////////////////////////
- // Following public methods are used for querying or
- // interacting internal state of the BoltOutputCollectorImpl
- /////////////////////////////////////////////////////////
-
- // Return true we could offer item to outQueue
- public boolean isOutQueuesAvailable() {
- return outputter.isOutQueuesAvailable();
- }
-
- // Return the total data emitted in bytes
- public long getTotalDataEmittedInBytes() {
- return outputter.getTotalDataEmittedInBytes();
- }
-
- // Flush the tuples to next stage
- public void sendOutTuples() {
- outputter.sendOutTuples();
- }
-
- // Clean the internal state of BoltOutputCollectorImpl
- public void clear() {
- outputter.clear();
- }
-
- /////////////////////////////////////////////////////////
- // Following private methods are internal implementations
- /////////////////////////////////////////////////////////
-
- private List<Integer> admitBoltTuple(
- String streamId,
- Collection<Tuple> anchors,
- List<Object> tuple) {
- // First check whether this tuple is sane
- helper.checkOutputSchema(streamId, tuple);
-
- // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
- List<Integer> customGroupingTargetTaskIds =
- helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
- // Invoke user-defined emit task hook
- helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
-
- // Start construct the data tuple
- HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
- // set the key. This is mostly ignored
- bldr.setKey(0);
-
- if (customGroupingTargetTaskIds != null) {
- // It is a CustomStreamGrouping
- for (Integer taskId : customGroupingTargetTaskIds) {
- bldr.addDestTaskIds(taskId);
- }
- }
-
- // Set the anchors for a tuple
- if (anchors != null) {
- // This message is rooted
- Set<HeronTuples.RootId> mergedRoots = new HashSet<HeronTuples.RootId>();
- for (Tuple tpl : anchors) {
- if (tpl instanceof TupleImpl) {
- TupleImpl t = (TupleImpl) tpl;
- mergedRoots.addAll(t.getRoots());
- }
- }
- for (HeronTuples.RootId rt : mergedRoots) {
- bldr.addRoots(rt);
- }
- }
-
- long tupleSizeInBytes = 0;
-
- long startTime = System.nanoTime();
-
- // Serialize it
- for (Object obj : tuple) {
- byte[] b = serializer.serialize(obj);
- ByteString bstr = ByteString.copyFrom(b);
- bldr.addValues(bstr);
- tupleSizeInBytes += b.length;
- }
-
- long latency = System.nanoTime() - startTime;
- boltMetrics.serializeDataTuple(streamId, latency);
-
- // submit to outputter
- outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
-
- // Update metrics
- boltMetrics.emittedTuple(streamId);
-
- // TODO:- remove this after changing the api
- return null;
- }
-
- private void admitAckTuple(Tuple tuple) {
- if (tuple instanceof TupleImpl) {
- TupleImpl tuplImpl = (TupleImpl) tuple;
- if (ackEnabled) {
- HeronTuples.AckTuple.Builder bldr = HeronTuples.AckTuple.newBuilder();
- bldr.setAckedtuple(tuplImpl.getTupleKey());
-
- long tupleSizeInBytes = 0;
-
- for (HeronTuples.RootId rt : tuplImpl.getRoots()) {
- bldr.addRoots(rt);
- tupleSizeInBytes += rt.getSerializedSize();
- }
- outputter.addAckTuple(bldr, tupleSizeInBytes);
- }
- long latency = System.nanoTime() - tuplImpl.getCreationTime();
-
- // Invoke user-defined boltAck task hook
- helper.getTopologyContext().invokeHookBoltAck(tuple, latency);
-
- boltMetrics.ackedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
- }
- }
-
- private void admitFailTuple(Tuple tuple) {
- if (tuple instanceof TupleImpl) {
- TupleImpl tuplImpl = (TupleImpl) tuple;
- if (ackEnabled) {
- HeronTuples.AckTuple.Builder bldr = HeronTuples.AckTuple.newBuilder();
- bldr.setAckedtuple(tuplImpl.getTupleKey());
-
- long tupleSizeInBytes = 0;
-
- for (HeronTuples.RootId rt : tuplImpl.getRoots()) {
- bldr.addRoots(rt);
- tupleSizeInBytes += rt.getSerializedSize();
- }
- outputter.addFailTuple(bldr, tupleSizeInBytes);
- }
- long latency = System.nanoTime() - tuplImpl.getCreationTime();
-
- // Invoke user-defined boltFail task hook
- helper.getTopologyContext().invokeHookBoltFail(tuple, latency);
-
- boltMetrics.failedTuple(tuple.getSourceStreamId(), tuple.getSourceComponent(), latency);
- }
+ super(serializer, helper, streamOutQueue, boltMetrics);
}
}
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
index f34d9c3..394f29d 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/OutgoingTupleCollection.java
@@ -14,10 +14,7 @@
package com.twitter.heron.simulator.instance;
-import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.proto.system.HeronTuples;
/**
@@ -28,144 +25,10 @@
* <p>
* In fact, when talking about to send out tuples, we mean we push them to the out queues.
*/
-public class OutgoingTupleCollection {
- protected final String componentName;
- // We have just one outQueue responsible for both control tuples and data tuples
- private final Communicator<HeronTuples.HeronTupleSet> outQueue;
- private final SystemConfig systemConfig;
+public class OutgoingTupleCollection extends com.twitter.heron.instance.OutgoingTupleCollection {
- private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
- private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
-
- // Total data emitted in bytes for the entire life
- private long totalDataEmittedInBytes;
-
- // Current size in bytes for data types to pack into the HeronTupleSet
- private long currentDataTupleSizeInBytes;
- // Maximum data tuple size in bytes we can put in one HeronTupleSet
- private long maxDataTupleSizeInBytes;
-
- private int dataTupleSetCapacity;
- private int controlTupleSetCapacity;
-
- public OutgoingTupleCollection(
- String componentName,
- Communicator<HeronTuples.HeronTupleSet> outQueue) {
- this.outQueue = outQueue;
- this.componentName = componentName;
- this.systemConfig =
- (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
-
- // Initialize the values in constructor
- this.totalDataEmittedInBytes = 0;
- this.currentDataTupleSizeInBytes = 0;
-
- // Read the config values
- this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
- this.maxDataTupleSizeInBytes = systemConfig.getInstanceSetDataTupleSizeBytes();
- this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity();
- }
-
- public void sendOutTuples() {
- flushRemaining();
- }
-
- public void addDataTuple(
- String streamId,
- HeronTuples.HeronDataTuple.Builder newTuple,
- long tupleSizeInBytes) {
- if (currentDataTuple == null
- || !currentDataTuple.getStream().getId().equals(streamId)
- || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
- || currentDataTupleSizeInBytes >= maxDataTupleSizeInBytes) {
- initNewDataTuple(streamId);
- }
- currentDataTuple.addTuples(newTuple);
-
- currentDataTupleSizeInBytes += tupleSizeInBytes;
- totalDataEmittedInBytes += tupleSizeInBytes;
- }
-
- public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
- if (currentControlTuple == null
- || currentControlTuple.getFailsCount() > 0
- || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
- initNewControlTuple();
- }
- currentControlTuple.addAcks(newTuple);
-
- // Add the size of data in bytes ready to send out
- totalDataEmittedInBytes += tupleSizeInBytes;
- }
-
- public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
- if (currentControlTuple == null
- || currentControlTuple.getAcksCount() > 0
- || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
- initNewControlTuple();
- }
- currentControlTuple.addFails(newTuple);
-
- // Add the size of data in bytes ready to send out
- totalDataEmittedInBytes += tupleSizeInBytes;
- }
-
- private void initNewDataTuple(String streamId) {
- flushRemaining();
-
- // Reset the set for data tuple
- currentDataTupleSizeInBytes = 0;
-
- TopologyAPI.StreamId.Builder sbldr = TopologyAPI.StreamId.newBuilder();
- sbldr.setId(streamId);
- sbldr.setComponentName(componentName);
- currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
- currentDataTuple.setStream(sbldr);
- }
-
- private void initNewControlTuple() {
- flushRemaining();
- currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
- }
-
- private void flushRemaining() {
- if (currentDataTuple != null) {
- HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
- bldr.setData(currentDataTuple);
-
- pushTupleToQueue(bldr, outQueue);
-
- currentDataTuple = null;
- }
- if (currentControlTuple != null) {
- HeronTuples.HeronTupleSet.Builder bldr = HeronTuples.HeronTupleSet.newBuilder();
- bldr.setControl(currentControlTuple);
- pushTupleToQueue(bldr, outQueue);
-
- currentControlTuple = null;
- }
- }
-
- private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder bldr,
- Communicator<HeronTuples.HeronTupleSet> out) {
- // The Communicator has un-bounded capacity so the offer will always be successful
- out.offer(bldr.build());
- }
-
- // Return true we could offer item to outQueue
- public boolean isOutQueuesAvailable() {
- return outQueue.size() < outQueue.getExpectedAvailableCapacity();
- }
-
- public long getTotalDataEmittedInBytes() {
- return totalDataEmittedInBytes;
- }
-
- // Clean the internal state of OutgoingTupleCollection
- public void clear() {
- currentControlTuple = null;
- currentDataTuple = null;
-
- outQueue.clear();
+ public OutgoingTupleCollection(String componentName,
+ Communicator<HeronTuples.HeronTupleSet> outQueue) {
+ super(componentName, outQueue);
}
}
diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java b/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
index 9867576..4619ff4 100644
--- a/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
+++ b/heron/simulator/src/java/com/twitter/heron/simulator/instance/SpoutOutputCollectorImpl.java
@@ -14,25 +14,11 @@
package com.twitter.heron.simulator.instance;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.protobuf.ByteString;
-
-import com.twitter.heron.api.Config;
import com.twitter.heron.api.serializer.IPluggableSerializer;
import com.twitter.heron.api.spout.ISpoutOutputCollector;
import com.twitter.heron.common.basics.Communicator;
-import com.twitter.heron.common.utils.metrics.SpoutMetrics;
+import com.twitter.heron.common.utils.metrics.ComponentMetrics;
import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
-import com.twitter.heron.common.utils.misc.TupleKeyGenerator;
import com.twitter.heron.proto.system.HeronTuples;
/**
@@ -48,219 +34,14 @@
* 3. Maintain some statistics, for instance, total tuples emitted.
* <p>
*/
-public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
- private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
-
- // Map from tuple key to composite object with insertion-order, i.e. ordered by time
- private final LinkedHashMap<Long, RootTupleInfo> inFlightTuples;
-
- private final TupleKeyGenerator keyGenerator;
-
- private final SpoutMetrics spoutMetrics;
- private PhysicalPlanHelper helper;
-
- private final boolean ackingEnabled;
- // When acking is not enabled, if the spout does an emit with a anchor
- // we need to ack it immediately. This keeps the list of those
- private final Queue<RootTupleInfo> immediateAcks;
-
- private final IPluggableSerializer serializer;
- private final OutgoingTupleCollection outputter;
-
- private long totalTuplesEmitted;
+public class SpoutOutputCollectorImpl
+ extends com.twitter.heron.instance.spout.SpoutOutputCollectorImpl
+ implements ISpoutOutputCollector {
public SpoutOutputCollectorImpl(IPluggableSerializer serializer,
PhysicalPlanHelper helper,
Communicator<HeronTuples.HeronTupleSet> streamOutQueue,
- SpoutMetrics spoutMetrics) {
- if (helper.getMySpout() == null) {
- throw new RuntimeException(helper.getMyTaskId() + " is not a spout ");
- }
-
- this.serializer = serializer;
- this.helper = helper;
- this.spoutMetrics = spoutMetrics;
- this.keyGenerator = new TupleKeyGenerator();
- updatePhysicalPlanHelper(helper);
-
- // with default capacity, load factor and insertion order
- inFlightTuples = new LinkedHashMap<Long, RootTupleInfo>();
-
- Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
- if (config.containsKey(Config.TOPOLOGY_ENABLE_ACKING)
- && config.get(Config.TOPOLOGY_ENABLE_ACKING) != null) {
- this.ackingEnabled =
- Boolean.parseBoolean(config.get(Config.TOPOLOGY_ENABLE_ACKING).toString());
- } else {
- this.ackingEnabled = false;
- }
-
- if (!ackingEnabled) {
- immediateAcks = new ArrayDeque<RootTupleInfo>();
- } else {
- immediateAcks = null;
- }
-
- this.outputter = new OutgoingTupleCollection(helper.getMyComponent(), streamOutQueue);
- }
-
- public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
- this.helper = physicalPlanHelper;
- }
-
- /////////////////////////////////////////////////////////
- // Following public methods are overrides OutputCollector
- /////////////////////////////////////////////////////////
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
- return admitSpoutTuple(streamId, tuple, messageId);
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
- throw new RuntimeException("emitDirect Not implemented");
- }
-
- // Log the report error and also send the stack trace to metrics manager.
- @Override
- public void reportError(Throwable error) {
- LOG.log(Level.SEVERE, "Reporting an error in topology code ", error);
- }
-
-
- /////////////////////////////////////////////////////////
- // Following public methods are used for querying or
- // interacting internal state of the BoltOutputCollectorImpl
- /////////////////////////////////////////////////////////
-
- // Return true we could offer item to outQueue
- public boolean isOutQueuesAvailable() {
- return outputter.isOutQueuesAvailable();
- }
-
- // Return the total data emitted in bytes
- public long getTotalDataEmittedInBytes() {
- return outputter.getTotalDataEmittedInBytes();
- }
-
- // Flush the tuples to next stage
- public void sendOutTuples() {
- outputter.sendOutTuples();
- }
-
- public long getTotalTuplesEmitted() {
- return totalTuplesEmitted;
- }
-
- public int numInFlight() {
- return inFlightTuples.size();
- }
-
- public Queue<RootTupleInfo> getImmediateAcks() {
- return immediateAcks;
- }
-
- public RootTupleInfo retireInFlight(long rootId) {
- return inFlightTuples.remove(rootId);
- }
-
- public List<RootTupleInfo> retireExpired(long timeout) {
- List<RootTupleInfo> retval = new ArrayList<RootTupleInfo>();
- long curTime = System.nanoTime();
-
- // The LinkedHashMap is ordered by insertion order, i.e. ordered by time
- // So we want need to iterate from the start and remove all items until
- // we meet the RootTupleInfo no need to expire
- Iterator<RootTupleInfo> iterator = inFlightTuples.values().iterator();
- while (iterator.hasNext()) {
- RootTupleInfo rootTupleInfo = iterator.next();
- if (rootTupleInfo.isExpired(curTime, timeout)) {
- retval.add(rootTupleInfo);
- iterator.remove();
- } else {
- break;
- }
- }
-
- return retval;
- }
-
- // Clean the internal state of BoltOutputCollectorImpl
- public void clear() {
- outputter.clear();
- }
-
- /////////////////////////////////////////////////////////
- // Following private methods are internal implementations
- /////////////////////////////////////////////////////////
-
- private List<Integer> admitSpoutTuple(String streamId, List<Object> tuple, Object messageId) {
- // First check whether this tuple is sane
- helper.checkOutputSchema(streamId, tuple);
-
- // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping
- List<Integer> customGroupingTargetTaskIds =
- helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
-
- // Invoke user-defined emit task hook
- helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
-
- // Start construct the data tuple
- HeronTuples.HeronDataTuple.Builder bldr = HeronTuples.HeronDataTuple.newBuilder();
-
- // set the key. This is mostly ignored
- bldr.setKey(0);
-
- if (customGroupingTargetTaskIds != null) {
- // It is a CustomStreamGrouping
- for (Integer taskId : customGroupingTargetTaskIds) {
- bldr.addDestTaskIds(taskId);
- }
- }
-
- if (messageId != null) {
- RootTupleInfo tupleInfo = new RootTupleInfo(streamId, messageId);
- if (ackingEnabled) {
- // This message is rooted
- HeronTuples.RootId.Builder rtbldr = EstablishRootId(tupleInfo);
- bldr.addRoots(rtbldr);
- } else {
- immediateAcks.offer(tupleInfo);
- }
- }
-
- long tupleSizeInBytes = 0;
-
- long startTime = System.nanoTime();
-
- // Serialize it
- for (Object obj : tuple) {
- byte[] b = serializer.serialize(obj);
- ByteString bstr = ByteString.copyFrom(b);
- bldr.addValues(bstr);
- tupleSizeInBytes += b.length;
- }
-
- long latency = System.nanoTime() - startTime;
- spoutMetrics.serializeDataTuple(streamId, latency);
-
- // submit to outputter
- outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
- totalTuplesEmitted++;
- spoutMetrics.emittedTuple(streamId);
-
- // TODO:- remove this after changing the api
- return null;
- }
-
- private HeronTuples.RootId.Builder EstablishRootId(RootTupleInfo tupleInfo) {
- // This message is rooted
- long rootId = keyGenerator.next();
- HeronTuples.RootId.Builder rtbldr = HeronTuples.RootId.newBuilder();
- rtbldr.setTaskid(helper.getMyTaskId());
- rtbldr.setKey(rootId);
- inFlightTuples.put(rootId, tupleInfo);
- return rtbldr;
+ ComponentMetrics spoutMetrics) {
+ super(serializer, helper, streamOutQueue, spoutMetrics);
}
}
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
index 160e990..c81b93c 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/ShellUtils.java
@@ -130,7 +130,7 @@
final StringBuilder builder = outputBuilder == null ? new StringBuilder() : outputBuilder;
// Log the command for debugging
- LOG.log(Level.INFO, "Running synced process: ``{0}''''", String.join(" ", cmdline));
+ LOG.log(Level.INFO, "Running synced process: ``{0}''''", joinString(cmdline));
ProcessBuilder pb = getProcessBuilder(isInheritIO, cmdline, workingDirectory, envs);
/* combine input stream and error stream into stderr because
1. this preserves order of process's stdout/stderr message
@@ -195,7 +195,7 @@
private static Process runASyncProcess(String[] command, File workingDirectory,
Map<String, String> envs, String logFileUuid, boolean logStderr) {
- LOG.log(Level.INFO, "Running async process: ``{0}''''", String.join(" ", command));
+ LOG.log(Level.INFO, "Running async process: ``{0}''''", joinString(command));
// the log file can help people to find out what happened between pb.start()
// and the async process started
@@ -315,4 +315,15 @@
return ret == 0;
}
+ // java 7 compatible version of String.join(" ", array), available in java 8
+ private static String joinString(String[] array) {
+ StringBuilder sb = new StringBuilder();
+ for (String value : array) {
+ if (sb.length() > 0) {
+ sb.append(" ");
+ }
+ sb.append(value);
+ }
+ return sb.toString();
+ }
}
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
index 3102153..8de6623 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/TMasterUtils.java
@@ -136,9 +136,10 @@
}
if (state == expectedState) {
- throw new TMasterException(String.format(
- "Topology %s command received topology '%s' but already in state %s",
+ LOG.warning(String.format(
+ "Topology %s command received but topology '%s' already in state %s",
topologyStateControlCommand, topologyName, state));
+ return;
}
if (state != startState) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index fcba33d..1ff249f 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -238,11 +238,27 @@
sp_string instance_id = instance_info_[task_id]->instance_->instance_id();
LOG(INFO) << "Instance " << instance_id << " closed connection";
- instance_info_[task_id]->set_connection(NULL);
+ // Remove the connection from active instances
active_instances_.erase(_conn);
+ // Remove from instance info
+ instance_info_[task_id]->set_connection(NULL);
+ delete instance_info_[task_id];
+ instance_info_.erase(task_id);
+
+ // Clean the instance_metric_map
+ auto immiter = instance_metric_map_.find(instance_id);
+ if (immiter != instance_metric_map_.end()) {
+ metrics_manager_client_->unregister_metric(MakeBackPressureCompIdMetricName(instance_id));
+ delete instance_metric_map_[instance_id];
+ instance_metric_map_.erase(instance_id);
+ }
+
+ // Clean the connection_buffer_metric_map_
auto qmmiter = connection_buffer_metric_map_.find(instance_id);
if (qmmiter != connection_buffer_metric_map_.end()) {
+ metrics_manager_client_->unregister_metric(MakeQueueSizeCompIdMetricName(instance_id));
+ delete connection_buffer_metric_map_[instance_id];
connection_buffer_metric_map_.erase(instance_id);
}
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 6a30dee..ee6a48a 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -139,7 +139,6 @@
typedef std::unordered_map<Connection*, sp_int32> ConnectionTaskIdMap;
ConnectionTaskIdMap active_instances_;
// map of task id to InstanceData
- // Once populated, will not change
typedef std::unordered_map<sp_int32, InstanceData*> TaskIdInstanceDataMap;
TaskIdInstanceDataMap instance_info_;
diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD
index 93f6526..c9ed8e2 100644
--- a/heron/storm/src/java/BUILD
+++ b/heron/storm/src/java/BUILD
@@ -1,28 +1,31 @@
package(default_visibility = ["//visibility:public"])
+load("/tools/rules/jarjar_rules", "jarjar_binary")
+
storm_deps_files = [
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java",
"@com_googlecode_json_simple_json_simple//jar",
- "//third_party/java:kryo",
+ "//third_party/java:kryo-neverlink",
]
+# Kryo is bundled here for integration test
java_library(
name='storm-compatibility-java',
srcs = glob(["**/*.java"]),
- deps = storm_deps_files,
+ deps = storm_deps_files + ["//third_party/java:kryo"],
)
java_binary(
- name = "storm-compatibility-unshaded",
+ name="storm-compatibility-unshaded",
srcs = glob(["**/*.java"]),
deps = storm_deps_files,
)
-genrule(
+jarjar_binary(
name = "heron-storm",
- srcs = [":storm-compatibility-unshaded_deploy.jar"],
- outs = ["heron-storm.jar"],
- cmd = "cp $< $@",
+ src = ":storm-compatibility-unshaded_deploy.jar",
+ shade = "shade.conf",
+ deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"]
)
diff --git a/heron/storm/src/java/shade.conf b/heron/storm/src/java/shade.conf
new file mode 100644
index 0000000..930aa16
--- /dev/null
+++ b/heron/storm/src/java/shade.conf
@@ -0,0 +1,3 @@
+rule com.google.** org.apache.storm.__shaded__.@0
+rule org.yaml.snakeyaml** org.apache.storm.__shaded__.@0
+rule org.json.simple** org.apache.storm.__shaded__.@0
diff --git a/heron/tools/cli/src/python/result.py b/heron/tools/cli/src/python/result.py
index e1643dc..6c6e669 100644
--- a/heron/tools/cli/src/python/result.py
+++ b/heron/tools/cli/src/python/result.py
@@ -64,16 +64,20 @@
msg = msg[:-1]
log_f(msg)
+ @staticmethod
+ def _do_print(f, msg):
+ if msg:
+ if msg[-1] == '\n':
+ msg = msg[:-1]
+ print >> f, msg
+
def _log_context(self):
# render context only after process exits
assert self.status is not None
- if self.status == Status.Ok or self.status == Status.DryRun:
+ if self.status in [Status.Ok, Status.DryRun]:
self._do_log(Log.info, self.succ_context)
- elif self.status == Status.HeronError:
+ elif self.status in [Status.HeronError, Status.InvocationError]:
self._do_log(Log.error, self.err_context)
- elif self.status == Status.InvocationError:
- # invocation error has no context
- pass
else:
raise RuntimeError(
"Unknown status type of value %d. Expected value: %s", self.status.value, list(Status))
@@ -98,8 +102,8 @@
class SimpleResult(Result):
"""Simple result: result that already and only
contains status of the result"""
- def __init__(self, status):
- super(SimpleResult, self).__init__(status)
+ def __init__(self, *args):
+ super(SimpleResult, self).__init__(*args)
def render(self):
self._log_context()
@@ -127,7 +131,7 @@
if retcode is not None and status_type(retcode) == Status.InvocationError:
self._do_log(Log.error, stderr_line)
else:
- print >> sys.stderr, stderr_line,
+ self._do_print(sys.stderr, stderr_line)
def renderProcessStdOut(self, stdout):
""" render stdout of shelled-out process
@@ -147,10 +151,13 @@
self._do_log(Log.error, stdout)
# No need to prefix [INFO] here. We want to display dry-run response in a clean way
elif self.status == Status.DryRun:
- print >> sys.stdout, stdout,
+ self._do_print(sys.stdout, stdout)
+ elif self.status == Status.InvocationError:
+ self._do_print(sys.stdout, stdout)
else:
raise RuntimeError(
- "Unknown status type of value %d. Expected value: %s", self.status.value, list(Status))
+ "Unknown status type of value %d. Expected value: %s" % \
+ (self.status.value, list(Status)))
def render(self):
while True:
diff --git a/heron/tools/cli/src/python/submit.py b/heron/tools/cli/src/python/submit.py
index 9d0ba76..9711109 100644
--- a/heron/tools/cli/src/python/submit.py
+++ b/heron/tools/cli/src/python/submit.py
@@ -193,8 +193,8 @@
result.render(res)
if not res.is_successful():
- err_context = "Failed to create topology definition \
- file when executing class '%s' of file '%s'" % (main_class, topology_file)
+ err_context = ("Failed to create topology definition " \
+ "file when executing class '%s' of file '%s'") % (main_class, topology_file)
res.add_context(err_context)
return res
@@ -234,11 +234,11 @@
tmp_dir,
java_defines)
- res.render()
+ result.render(res)
if not res.is_successful():
- err_context = "Failed to create topology definition \
- file when executing class '%s' of file '%s'" % (main_class, topology_file)
+ err_context = ("Failed to create topology definition " \
+ "file when executing class '%s' of file '%s'") % (main_class, topology_file)
res.add_context(err_context)
return res
@@ -256,11 +256,10 @@
res = execute.heron_pex(
topology_file, topology_class_name, tuple(unknown_args))
- res.render()
-
+ result.render(res)
if not res.is_successful():
- err_context = "Failed to create topology definition \
- file when executing class '%s' of file '%s'" % (topology_class_name, topology_file)
+ err_context = ("Failed to create topology definition " \
+ "file when executing class '%s' of file '%s'") % (topology_class_name, topology_file)
res.add_context(err_context)
return res
@@ -287,7 +286,7 @@
# check to see if the topology file exists
if not os.path.isfile(topology_file):
- err_context = "Topology jar|tar|pex file '%s' does not exist" % topology_file
+ err_context = "Topology file '%s' does not exist" % topology_file
return SimpleResult(Status.InvocationError, err_context)
# check if it is a valid file type
diff --git a/heron/tools/common/src/python/access/heron_api.py b/heron/tools/common/src/python/access/heron_api.py
index 3b0ddeb..3654682 100644
--- a/heron/tools/common/src/python/access/heron_api.py
+++ b/heron/tools/common/src/python/access/heron_api.py
@@ -746,6 +746,7 @@
'''
components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
+ result = {}
futures = []
for comp in components:
query = self.get_query(metric, comp, instance)
diff --git a/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java b/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
index f3a8320..dfaa034 100644
--- a/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
+++ b/integration-test/src/java/com/twitter/heron/integration_test/core/IntegrationTestBolt.java
@@ -81,7 +81,6 @@
@Override
public void execute(Tuple tuple) {
- tuplesReceived++;
String streamID = tuple.getSourceStreamId();
LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
@@ -105,6 +104,7 @@
"Received a terminal, need to receive %s more", terminalsToReceive));
}
} else {
+ tuplesReceived++;
currentTupleProcessing = tuple;
delegateBolt.execute(tuple);
// We ack only the tuples in user's logic
diff --git a/integration-test/src/python/test_runner/resources/test.json b/integration-test/src/python/test_runner/resources/test.json
index 7d7d273..021d91f 100644
--- a/integration-test/src/python/test_runner/resources/test.json
+++ b/integration-test/src/python/test_runner/resources/test.json
@@ -58,6 +58,11 @@
"expectedResultRelativePath" : "bolt_double_emit_tuples/resources/BoltDoubleEmitTuples.json"
},
{
+ "topologyName" : "IntegrationTest_MultiSpoutsMultiTasks",
+ "classPath" : "multi_spouts_multi_tasks.MultiSpoutsMultiTasks",
+ "expectedResultRelativePath" : "multi_spouts_multi_tasks/resources/MultiSpoutsMultiTasks.json"
+ },
+ {
"topologyName" : "IntegrationTest_OneBoltMultiTasks",
"classPath" : "one_bolt_multi_tasks.OneBoltMultiTasks",
"expectedResultRelativePath" : "one_bolt_multi_tasks/resources/OneBoltMultiTasks.json"
diff --git a/release/maven/heron-api.pom.template b/release/maven/heron-no-kryo.template.pom
similarity index 91%
rename from release/maven/heron-api.pom.template
rename to release/maven/heron-no-kryo.template.pom
index 08ad86c..31c0014 100644
--- a/release/maven/heron-api.pom.template
+++ b/release/maven/heron-no-kryo.template.pom
@@ -4,11 +4,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.twitter.heron</groupId>
- <artifactId>heron-api</artifactId>
+ <artifactId>ARTIFACT_ID</artifactId>
<version>VERSION</version>
<packaging>jar</packaging>
- <name>heron-api</name>
- <description>Heron API</description>
+ <name>NAME</name>
+ <description>DESCRIPTION</description>
<url>http://www.heronstreaming.io</url>
<licenses>
<license>
diff --git a/release/maven/heron-spi.pom.template b/release/maven/heron-spi.pom.template
deleted file mode 100644
index e747db6..0000000
--- a/release/maven/heron-spi.pom.template
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.twitter.heron</groupId>
- <artifactId>heron-spi</artifactId>
- <version>VERSION</version>
- <packaging>jar</packaging>
- <name>heron-spi</name>
- <description>Heron SPI</description>
- <url>http://www.heronstreaming.io</url>
- <licenses>
- <license>
- <name>The Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
- <developers>
- <developer>
- <name>Heron Users</name>
- <email>heron-users@googlegroups.com</email>
- <organization>Twitter</organization>
- <organizationUrl>http://www.twitter.com</organizationUrl>
- </developer>
- </developers>
- <scm>
- <connection>scm:git:git@github.com:twitter/heron.git</connection>
- <developerConnection>scm:git:git@github.com:twitter/heron.git</developerConnection>
- <url>git@github.com:twitter/heron.git</url>
- </scm>
-</project>
\ No newline at end of file
diff --git a/release/maven/heron-storm.pom.template b/release/maven/heron-with-kryo.template.pom
similarity index 77%
rename from release/maven/heron-storm.pom.template
rename to release/maven/heron-with-kryo.template.pom
index 9370907..8ca2a9c 100644
--- a/release/maven/heron-storm.pom.template
+++ b/release/maven/heron-with-kryo.template.pom
@@ -4,11 +4,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.twitter.heron</groupId>
- <artifactId>heron-storm</artifactId>
+ <artifactId>ARTIFACT_ID</artifactId>
<version>VERSION</version>
<packaging>jar</packaging>
- <name>heron-storm</name>
- <description>Heron Storm</description>
+ <name>NAME</name>
+ <description>DESCRIPTION</description>
<url>http://www.heronstreaming.io</url>
<licenses>
<license>
@@ -16,6 +16,14 @@
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
+ <dependencies>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>3.0.3</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
<developers>
<developer>
<name>Heron Users</name>
diff --git a/release/maven/maven-pom-version.sh b/release/maven/maven-pom-version.sh
index 0ff139e..5dec908 100755
--- a/release/maven/maven-pom-version.sh
+++ b/release/maven/maven-pom-version.sh
@@ -25,6 +25,24 @@
exit 1
fi
-cat ./maven/heron-api.pom.template | sed "s/VERSION/$1/g" >> ./heron-api-$1.pom
-cat ./maven/heron-storm.pom.template | sed "s/VERSION/$1/g" >> ./heron-storm-$1.pom
-cat ./maven/heron-spi.pom.template | sed "s/VERSION/$1/g" >> ./heron-spi-$1.pom
+cat ./maven/heron-no-kryo.template.pom | \
+ sed "s/VERSION/$1/g" | \
+ sed "s/ARTIFACT_ID/heron-api/g" | \
+ sed "s/NAME/heron-api/g" | \
+ sed "s/DESCRIPTION/Heron API/g" \
+ >> ./heron-api-$1.pom
+
+cat ./maven/heron-no-kryo.template.pom | \
+ sed "s/VERSION/$1/g" | \
+ sed "s/ARTIFACT_ID/heron-spi/g" | \
+ sed "s/NAME/heron-spi/g" | \
+ sed "s/DESCRIPTION/Heron SPI/g" \
+ >> ./heron-spi-$1.pom
+
+cat ./maven/heron-with-kryo.template.pom | \
+ sed "s/VERSION/$1/g" | \
+ sed "s/ARTIFACT_ID/heron-storm/g" | \
+ sed "s/NAME/heron-storm/g" | \
+ sed "s/DESCRIPTION/Heron Storm/g" \
+ >> ./heron-storm-$1.pom
+
diff --git a/scripts/resources/idea/copyright/heron.xml b/scripts/resources/idea/copyright/heron.xml
index 617bc67..413af1c 100644
--- a/scripts/resources/idea/copyright/heron.xml
+++ b/scripts/resources/idea/copyright/heron.xml
@@ -1,6 +1,6 @@
<component name="CopyrightManager">
<copyright>
<option name="myName" value="heron" />
- <option name="notice" value="// Copyright &#36;today.year Twitter. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License" />
+ <option name="notice" value="// Copyright &#36;today.year Twitter. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License." />
</copyright>
</component>
\ No newline at end of file
diff --git a/third_party/java/BUILD b/third_party/java/BUILD
index 3864fa0..22caa0d 100644
--- a/third_party/java/BUILD
+++ b/third_party/java/BUILD
@@ -99,6 +99,19 @@
],
)
+# This version is needed for dependents that don't want
+# kryo to be included in the generated binary
+# (e.g. //heron/storm/src/java:heron-storm)
+java_library(
+ name = "kryo-neverlink",
+ srcs = [ "Empty.java" ],
+ exports = [ "@com_esotericsoftware_kryo//jar" ],
+ deps = [
+ "@com_esotericsoftware_kryo//jar",
+ ],
+ neverlink = 1,
+)
+
java_library(
name = "yarn",
srcs = [ "Empty.java" ],
diff --git a/website/config.yaml b/website/config.yaml
index 56ba56d..ac3aa68 100755
--- a/website/config.yaml
+++ b/website/config.yaml
@@ -20,7 +20,7 @@
author: Twitter, Inc.
description: A realtime, distributed, fault-tolerant stream processing engine from Twitter
versions:
- heron: 0.14.5
+ heron: 0.14.7
bazel: 0.3.1
assets:
favicon: