Merge branch 'master' into sanjeevk/encryption
diff --git a/.gitignore b/.gitignore
index b65721f..927928b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -93,6 +93,9 @@
## Maven generated files
.classpath.txt
+## Generated by CI jobs run locally
+heron_build.txt
+
## File-based project format
*.ipr
*.iws
diff --git a/.travis.yml b/.travis.yml
index a4ef75a..ee0e203 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@
apt:
sources:
- ubuntu-toolchain-r-test
- - george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3
packages:
- gcc-4.8
@@ -23,8 +22,6 @@
- pkg-config
- zip
- zlib1g-dev
- - cmake
- - cmake-data
env:
- CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
diff --git a/WORKSPACE b/WORKSPACE
index b2b4485..cb3f559 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -285,6 +285,11 @@
)
maven_jar(
+ name = "org_objectweb_asm",
+ artifact = "org.ow2.asm:asm:5.0.4",
+)
+
+maven_jar(
name = "org_apache_mesos_mesos",
artifact = "org.apache.mesos:mesos:0.22.0",
)
diff --git a/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml b/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
index e8c952f..9d0ef5c 100644
--- a/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
+++ b/deploy/kubernetes/gcp/bookkeeper-apiserver.yaml
@@ -1,20 +1,54 @@
##
## Heron API server deployment
##
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ labels:
+ k8s-app: heron-apiserver
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+ name: heron-apiserver
+ labels:
+ app: heron-apiserver
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: cluster-admin
+subjects:
+- kind: ServiceAccount
+ name: heron-apiserver
+ namespace: default
+
+---
+
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: heron-apiserver
+ labels:
+ app: heron-apiserver
+ namespace: default
spec:
+ selector:
+ matchLabels:
+ app: heron-apiserver
replicas: 1
template:
metadata:
labels:
app: heron-apiserver
spec:
+ serviceAccountName: heron-apiserver
containers:
- name: heron-apiserver
- image: heron/heron:0.16.2
+ image: heron/heron:latest
command: ["sh", "-c"]
args:
- >-
@@ -23,7 +57,7 @@
--cluster kubernetes
-D heron.statemgr.connection.string=zookeeper:2181
-D heron.kubernetes.scheduler.uri=http://localhost:8001
- -D heron.executor.docker.image=heron/heron:0.16.2
+ -D heron.executor.docker.image=heron/heron:latest
-D heron.class.uploader=com.twitter.heron.uploader.dlog.DLUploader
-D heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/distributedlog
- name: kubectl-proxy
diff --git a/deploy/kubernetes/gcp/gcs-apiserver.yaml b/deploy/kubernetes/gcp/gcs-apiserver.yaml
index 80091d3..1ed21ad 100644
--- a/deploy/kubernetes/gcp/gcs-apiserver.yaml
+++ b/deploy/kubernetes/gcp/gcs-apiserver.yaml
@@ -1,17 +1,51 @@
##
## Heron API server deployment
##
-apiVersion: apps/v1beta1
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ labels:
+ k8s-app: heron-apiserver
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+ name: heron-apiserver
+ labels:
+ app: heron-apiserver
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: cluster-admin
+subjects:
+- kind: ServiceAccount
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: heron-apiserver
+ labels:
+ app: heron-apiserver
+ namespace: default
spec:
+ selector:
+ matchLabels:
+ app: heron-apiserver
replicas: 1
template:
metadata:
labels:
app: heron-apiserver
spec:
+ serviceAccountName: heron-apiserver
volumes:
- name: google-cloud-key
secret:
diff --git a/deploy/kubernetes/general/README.md b/deploy/kubernetes/general/README.md
index 2558641..2132ee5 100644
--- a/deploy/kubernetes/general/README.md
+++ b/deploy/kubernetes/general/README.md
@@ -69,15 +69,20 @@
}
```
-3. Submit an example topology:
+3. Set service_url:
```shell
-$ heron submit kubernetes \
---service-url=http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
-~/.heron/examples/heron-api-examples.jar \
+$ heron config kubernetes \
+set service_url http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
com.twitter.heron.examples.api.AckingTopology acking
```
-4. View heron ui:
+4. Submit an example topology:
+```shell
+$ heron submit kubernetes ~/.heron/examples/heron-api-examples.jar \
+com.twitter.heron.examples.api.AckingTopology acking
+```
+
+5. View heron ui:
```
http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-ui:8889
```
diff --git a/deploy/kubernetes/general/apiserver.yaml b/deploy/kubernetes/general/apiserver.yaml
index e8c952f..a368b92 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -1,20 +1,54 @@
##
## Heron API server deployment
##
-apiVersion: apps/v1beta1
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ labels:
+ k8s-app: heron-apiserver
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+ name: heron-apiserver
+ labels:
+ app: heron-apiserver
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: cluster-admin
+subjects:
+- kind: ServiceAccount
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: heron-apiserver
+ labels:
+ app: heron-apiserver
+ namespace: default
spec:
+ selector:
+ matchLabels:
+ app: heron-apiserver
replicas: 1
template:
metadata:
labels:
app: heron-apiserver
spec:
+ serviceAccountName: heron-apiserver
containers:
- name: heron-apiserver
- image: heron/heron:0.16.2
+ image: heron/heron:latest
command: ["sh", "-c"]
args:
- >-
@@ -23,7 +57,7 @@
--cluster kubernetes
-D heron.statemgr.connection.string=zookeeper:2181
-D heron.kubernetes.scheduler.uri=http://localhost:8001
- -D heron.executor.docker.image=heron/heron:0.16.2
+ -D heron.executor.docker.image=heron/heron:latest
-D heron.class.uploader=com.twitter.heron.uploader.dlog.DLUploader
-D heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/distributedlog
- name: kubectl-proxy
diff --git a/deploy/kubernetes/minikube/README.md b/deploy/kubernetes/minikube/README.md
index ba5ec99..31410f6 100644
--- a/deploy/kubernetes/minikube/README.md
+++ b/deploy/kubernetes/minikube/README.md
@@ -68,15 +68,19 @@
}
```
-3. Submit an example topology:
+3. Set service_url:
```shell
-$ heron submit kubernetes \
---service-url=http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
-~/.heron/examples/heron-api-examples.jar \
+$ heron config kubernetes \
+set service_url http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-apiserver:9000 \
+```
+
+4. Submit an example topology:
+```shell
+$ heron submit kubernetes ~/.heron/examples/heron-api-examples.jar \
com.twitter.heron.examples.api.AckingTopology acking
```
-4. View heron ui:
+5. View heron ui:
```
http://localhost:8001/api/v1/proxy/namespaces/default/services/heron-ui:8889
```
diff --git a/deploy/kubernetes/minikube/apiserver.yaml b/deploy/kubernetes/minikube/apiserver.yaml
index d921af3..e3efd69 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -1,17 +1,51 @@
##
## Heron API server deployment
##
-apiVersion: apps/v1beta1
+
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ labels:
+ k8s-app: heron-apiserver
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+ name: heron-apiserver
+ labels:
+ app: heron-apiserver
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: cluster-admin
+subjects:
+- kind: ServiceAccount
+ name: heron-apiserver
+ namespace: default
+
+---
+
+apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: heron-apiserver
+ labels:
+ app: heron-apiserver
spec:
+ selector:
+ matchLabels:
+ app: heron-apiserver
replicas: 1
template:
metadata:
labels:
app: heron-apiserver
spec:
+ serviceAccountName: heron-apiserver
containers:
- name: heron-apiserver
image: heron/heron:latest
diff --git a/examples/src/java/BUILD b/examples/src/java/BUILD
index d8eaeff..3994603 100644
--- a/examples/src/java/BUILD
+++ b/examples/src/java/BUILD
@@ -4,7 +4,7 @@
name='api-examples-unshaded',
srcs = glob(["com/twitter/heron/examples/api/**/*.java"]),
deps = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java"
],
@@ -24,7 +24,9 @@
deps = [
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
- "//heron/simulator/src/java:simulator-java"
+ "//heron/simulator/src/java:simulator-java",
+ "//third_party/java:kryo",
+ "@apache_pulsar_client//jar",
],
create_executable = 0,
)
diff --git a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
index 75338db..33bd0a0 100644
--- a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
@@ -67,6 +67,11 @@
conf.setContainerRamRequested(ByteAmount.fromGigabytes(2));
conf.setContainerCpuRequested(2);
+ // Specify the size of ram padding to per container.
+ // Notice, this config will be considered as a hint,
+ // and it's up to the packing algorithm to determine whether to apply this hint
+ conf.setContainerRamPadding(ByteAmount.fromGigabytes(2));
+
if (args != null && args.length > 0) {
conf.setNumStmgrs(2);
HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java
new file mode 100644
index 0000000..635e5b4
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/FilesystemSinkTopology.java
@@ -0,0 +1,141 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Sink;
+
+/**
+ * This topology demonstrates how sinks work in the Heron Streamlet API for Java.
+ * In this case, the sink is a temporary file. Each value that enters the graph
+ * from the source streamlet (an indefinite series of randomly generated
+ * integers) is written to that temporary file.
+ */
+public final class FilesystemSinkTopology {
+ private FilesystemSinkTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(FilesystemSinkTopology.class.getName());
+
+ /**
+ * Implements the Sink interface, which defines what happens when the toSink
+ * method is invoked in a processing graph.
+ */
+ private static class FilesystemSink<T> implements Sink<T> {
+ private static final long serialVersionUID = -96514621878356224L;
+ private Path tempFilePath;
+ private File tempFile;
+
+ FilesystemSink(File f) {
+ this.tempFile = f;
+ }
+
+ /**
+ * The setup function is called before the sink is used. Any complex
+ * instantiation logic for the sink should go here.
+ */
+ public void setup(Context context) {
+ this.tempFilePath = Paths.get(tempFile.toURI());
+ }
+
+ /**
+ * The put function defines how each incoming streamlet element is
+ * actually processed. In this case, each incoming element is converted
+ * to a byte array and written to the temporary file (successful writes
+ * are also logged). Any exceptions are converted to RuntimeExceptions,
+ * which will effectively kill the topology.
+ */
+ public void put(T element) {
+ byte[] bytes = String.format("%s\n", element.toString()).getBytes();
+
+ try {
+ Files.write(tempFilePath, bytes, StandardOpenOption.APPEND);
+ LOG.info(
+ String.format("Wrote %s to %s",
+ new String(bytes),
+ tempFilePath.toAbsolutePath()
+ )
+ );
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Any cleanup logic for the sink can be applied here.
+ */
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ // Creates a temporary file to write output into.
+ File file = File.createTempFile("filesystem-sink-example", ".tmp");
+
+ LOG.info(
+ String.format("Ready to write to file %s",
+ file.getAbsolutePath()
+ )
+ );
+
+ processingGraphBuilder
+ .newSource(() -> {
+ // This applies a "brake" that makes the processing graph write
+ // to the temporary file at a reasonable, readable pace.
+ StreamletUtils.sleep(500);
+ return ThreadLocalRandom.current().nextInt(100);
+ })
+ .setName("incoming-integers")
+ // Here, the FilesystemSink implementation of the Sink
+ // interface is passed to the toSink function.
+ .toSink(new FilesystemSink<>(file));
+
+ // The topology's parallelism (the number of containers across which the topology's
+ // processing instance will be split) can be defined via the second command-line
+ // argument (or else the default of 2 will be used).
+ int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+ Config config = new Config.Builder()
+ .setNumContainers(topologyParallelism)
+ .build();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java
new file mode 100644
index 0000000..b36e18b
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/FormattedOutputTopology.java
@@ -0,0 +1,113 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+
+/**
+ * This topology demonstrates the use of consume operations in the Heron
+ * Streamlet API for Java. A consume operation terminates a processing
+ * graph, like a simpler version of a sink (which requires implementing
+ * the Sink interface). Here, the consume operation is used to produce
+ * custom-formatted logging output for a processing graph in which random
+ * sensor readings are fed into the graph every two seconds (a simple
+ * filter is also applied to this source streamlet prior to logging).
+ */
+public final class FormattedOutputTopology {
+ private FormattedOutputTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(FormattedOutputTopology.class.getName());
+
+ /**
+ * A list of devices emitting sensor readings ("device1" through "device100").
+ */
+ private static final List<String> DEVICES = IntStream.range(1, 100)
+ .mapToObj(i -> String.format("device%d", i))
+ .collect(Collectors.toList());
+
+ /**
+ * Sensor readings consist of a device ID, a temperature reading, and
+ * a humidity reading. The temperature and humidity readings are
+ * randomized within a range.
+ */
+ private static class SensorReading implements Serializable {
+ private static final long serialVersionUID = 3418308641606699744L;
+ private String deviceId;
+ private double temperature;
+ private double humidity;
+
+ SensorReading() {
+ // Readings are produced only every two seconds
+ StreamletUtils.sleep(2000);
+ this.deviceId = StreamletUtils.randomFromList(DEVICES);
+ // Each temperature reading is a double between 70 and 100
+ this.temperature = 70 + 30 * new Random().nextDouble();
+ // Each humidity reading is a percentage between 80 and 100
+ this.humidity = (80 + 20 * new Random().nextDouble()) / 100;
+ }
+
+ String getDeviceId() {
+ return deviceId;
+ }
+
+ double getTemperature() {
+ return temperature;
+ }
+
+ double getHumidity() {
+ return humidity;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ processingGraphBuilder
+ // The source streamlet is an indefinite series of sensor readings
+ // emitted every two seconds
+ .newSource(SensorReading::new)
+ // A simple filter that excludes a percentage of the sensor readings
+ .filter(reading -> reading.getHumidity() < .9 && reading.getTemperature() < 90)
+ // In the consumer operation, each reading is converted to a formatted
+ // string and logged
+ .consume(reading -> LOG.info(
+ String.format("Reading from device %s: (temp: %f, humidity: %f)",
+ reading.getDeviceId(),
+ reading.getTemperature(),
+ reading.getHumidity())
+ ));
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ Config config = Config.defaultConfig();
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java
new file mode 100644
index 0000000..558840b
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/ImpressionsAndClicksTopology.java
@@ -0,0 +1,197 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.JoinType;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology demonstrates the use of join operations in the Heron
+ * Streamlet API for Java. Two independent streamlets, one consisting
+ * of ad impressions, the other of ad clicks, is joined together. A join
+ * function then checks if the userId matches on the impression and click.
+ * Finally, a reduce function counts the number of impression/click matches
+ * over the specified time window.
+ */
+public final class ImpressionsAndClicksTopology {
+ private ImpressionsAndClicksTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(ImpressionsAndClicksTopology.class.getName());
+
+ /**
+ * A list of company IDs to be used to generate random clicks and impressions.
+ */
+ private static final List<String> ADS = Arrays.asList(
+ "acme",
+ "blockchain-inc",
+ "omnicorp"
+ );
+
+ /**
+ * A list of 25 active users ("user1" through "user25").
+ */
+ private static final List<String> USERS = IntStream.range(1, 25)
+ .mapToObj(i -> String.format("user%d", i))
+ .collect(Collectors.toList());
+
+ /**
+ * A POJO for incoming ad impressions (generated every 50 milliseconds).
+ */
+ private static class AdImpression implements Serializable {
+ private static final long serialVersionUID = 3283110635310800177L;
+
+ private String adId;
+ private String userId;
+ private String impressionId;
+
+ AdImpression() {
+ this.adId = StreamletUtils.randomFromList(ADS);
+ this.userId = StreamletUtils.randomFromList(USERS);
+ this.impressionId = UUID.randomUUID().toString();
+ LOG.info(String.format("Emitting impression: %s", this));
+ }
+
+ String getAdId() {
+ return adId;
+ }
+
+ String getUserId() {
+ return userId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(adId; %s, impressionId: %s)",
+ adId,
+ impressionId
+ );
+ }
+ }
+
+ /**
+ * A POJO for incoming ad clicks (generated every 50 milliseconds).
+ */
+ private static class AdClick implements Serializable {
+ private static final long serialVersionUID = 7202766159176178988L;
+ private String adId;
+ private String userId;
+ private String clickId;
+
+ AdClick() {
+ this.adId = StreamletUtils.randomFromList(ADS);
+ this.userId = StreamletUtils.randomFromList(USERS);
+ this.clickId = UUID.randomUUID().toString();
+ LOG.info(String.format("Emitting click: %s", this));
+ }
+
+ String getAdId() {
+ return adId;
+ }
+
+ String getUserId() {
+ return userId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(adId; %s, clickId: %s)",
+ adId,
+ clickId
+ );
+ }
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ // A KVStreamlet is produced. Each element is a KeyValue object where the key
+ // is the impression ID and the user ID is the value.
+ Streamlet<AdImpression> impressions = processingGraphBuilder
+ .newSource(AdImpression::new);
+
+ // A KVStreamlet is produced. Each element is a KeyValue object where the key
+ // is the ad ID and the user ID is the value.
+ Streamlet<AdClick> clicks = processingGraphBuilder
+ .newSource(AdClick::new);
+
+ /**
+ * Here, the impressions KVStreamlet is joined to the clicks KVStreamlet.
+ */
+ impressions
+ // The join function here essentially provides the reduce function with a streamlet
+ // of KeyValue objects where the userId matches across an impression and a click
+ // (meaning that the user has clicked on the ad).
+ .join(
+ // The other streamlet that's being joined to
+ clicks,
+ // Key extractor for the impressions streamlet
+ impression -> impression.getUserId(),
+ // Key extractor for the clicks streamlet
+ click -> click.getUserId(),
+ // Window configuration for the join operation
+ WindowConfig.TumblingCountWindow(25),
+ // Join type (inner join means that all elements from both streams will be included)
+ JoinType.INNER,
+ // For each element resulting from the join operation, a value of 1 will be provided
+ // if the ad IDs match between the elements (or a value of 0 if they don't).
+ (user1, user2) -> (user1.getAdId().equals(user2.getAdId())) ? 1 : 0
+ )
+ // The reduce function counts the number of ad clicks per user.
+ .reduceByKeyAndWindow(
+ // Key extractor for the reduce operation
+ kv -> String.format("user-%s", kv.getKey().getKey()),
+ // Value extractor for the reduce operation
+ kv -> kv.getValue(),
+ // Window configuration for the reduce operation
+ WindowConfig.TumblingCountWindow(50),
+ // A running cumulative total is calculated for each key
+ (cumulative, incoming) -> cumulative + incoming
+ )
+ // Finally, the consumer operation provides formatted log output
+ .consume(kw -> {
+ LOG.info(String.format("(user: %s, clicks: %d)",
+ kw.getKey().getKey(),
+ kw.getValue()));
+ });
+
+ Config config = Config.defaultConfig();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
new file mode 100644
index 0000000..a24f013
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
@@ -0,0 +1,79 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Resources;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This is a very simple topology that shows a series of streamlet operations
+ * on a source streamlet of random integers (between 1 and 10). First, 1 is added
+ * to each integer. That streamlet is then united with a streamlet that consists
+ * of an indefinite stream of zeroes. At that point, all 2s are excluded from the
+ * streamlet. The final output of the processing graph is then logged.
+ */
+public final class IntegerProcessingTopology {
+ private IntegerProcessingTopology() {
+ }
+
+ // Heron resources to be applied to the topology
+ private static final float CPU = 2.0f;
+ private static final long GIGABYTES_OF_RAM = 6;
+ private static final int NUM_CONTAINERS = 2;
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder builder = Builder.createBuilder();
+
+ Streamlet<Integer> zeroes = builder.newSource(() -> 0);
+
+ builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
+ .setNumPartitions(2)
+ .setName("random-ints")
+ .map(i -> i + 1)
+ .setName("add-one")
+ .union(zeroes)
+ .setName("unify-streams")
+ .filter(i -> i != 2)
+ .setName("remove-twos")
+ .log();
+
+ Resources resources = new Resources.Builder()
+ .setCpu(CPU)
+ .setRamInGB(GIGABYTES_OF_RAM)
+ .build();
+
+ Config config = new Config.Builder()
+ .setNumContainers(NUM_CONTAINERS)
+ .setContainerResources(resources)
+ .build();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, builder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java
new file mode 100644
index 0000000..6a02325
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/RepartitionTopology.java
@@ -0,0 +1,111 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates the usage of a simple repartitioning algorithm
+ * using the Heron Streamlet API for Java. Normally, streamlet elements are
+ * distributed randomly across downstream instances when processed.
+ * Repartitioning enables you to select which instances (partitions) to send
+ * elements to on the basis of a user-defined logic. Here, a source streamlet
+ * emits an indefinite series of random integers between 1 and 100. The value
+ * of that number then determines to which topology instance (partition) the
+ * element is routed.
+ */
+public final class RepartitionTopology {
+ private RepartitionTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(RepartitionTopology.class.getName());
+
+ /**
+ * The repartition function that determines to which partition each incoming
+ * streamlet element is routed (across 8 possible partitions). Integers between 1
+ * and 24 are routed to partitions 0 and 1, integers between 25 and 40 to partitions
+ * 2 and 3, and so on.
+ */
+ private static List<Integer> repartitionStreamlet(int incomingInteger, int numPartitions) {
+ List<Integer> partitions;
+
+ if (incomingInteger >= 0 && incomingInteger < 25) {
+ partitions = Arrays.asList(0, 1);
+ } else if (incomingInteger > 26 && incomingInteger < 50) {
+ partitions = Arrays.asList(2, 3);
+ } else if (incomingInteger > 50 && incomingInteger < 75) {
+ partitions = Arrays.asList(4, 5);
+ } else if (incomingInteger > 76 && incomingInteger <= 100) {
+ partitions = Arrays.asList(6, 7);
+ } else {
+ partitions = Arrays.asList(ThreadLocalRandom.current().nextInt(0, 8));
+ }
+
+ String logMessage = String.format("Sending value %d to partitions: %s",
+ incomingInteger,
+ StreamletUtils.intListAsString(partitions));
+
+ LOG.info(logMessage);
+
+ return partitions;
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ Streamlet<Integer> randomIntegers = processingGraphBuilder
+ .newSource(() -> {
+ // Random integers are emitted every 50 milliseconds
+ StreamletUtils.sleep(50);
+ return ThreadLocalRandom.current().nextInt(100);
+ })
+ .setNumPartitions(2)
+ .setName("random-integer-source");
+
+ randomIntegers
+ // The specific repartition logic is applied here
+ .repartition(8, RepartitionTopology::repartitionStreamlet)
+ .setName("repartition-incoming-values")
+ // Here, a generic repartition logic is applied (simply
+ // changing the number of partitions without specifying
+ // how repartitioning will take place)
+ .repartition(2)
+ .setName("reduce-partitions-for-logging-operation")
+ .log();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ Config config = Config.defaultConfig();
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java
new file mode 100644
index 0000000..d66b6ff
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/SimplePulsarSourceTopology.java
@@ -0,0 +1,126 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.logging.Logger;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Source;
+
+/**
+ * This topology demonstrates how sources work in the Heron Streamlet API
+ * for Java. The example source here reads from an Apache Pulsar topic and
+ * injects incoming messages into the processing graph.
+ */
+public final class SimplePulsarSourceTopology {
+ private SimplePulsarSourceTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(SimplePulsarSourceTopology.class.getName());
+
+ private static class PulsarSource implements Source<String> {
+ private static final long serialVersionUID = -3433804102901363106L;
+ private PulsarClient client;
+ private Consumer consumer;
+ private String pulsarConnectionUrl;
+ private String consumeTopic;
+ private String subscription;
+
+ PulsarSource(String url, String topic, String subscription) {
+ this.pulsarConnectionUrl = url;
+ this.consumeTopic = topic;
+ this.subscription = subscription;
+ }
+
+ /**
+ * The setup functions defines the instantiation logic for the source.
+ * Here, a Pulsar client and consumer are created that will listen on
+ * the Pulsar topic.
+ */
+ public void setup(Context context) {
+ try {
+ client = PulsarClient.create(pulsarConnectionUrl);
+ consumer = client.subscribe(consumeTopic, subscription);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * The get function defines how elements for the source streamlet are
+ * "gotten." In this case, the Pulsar consumer for the specified topic
+ * listens for incoming messages.
+ */
+ public Collection<String> get() {
+ try {
+ String retval = new String(consumer.receive().getData(), "utf-8");
+ return Collections.singletonList(retval);
+ } catch (PulsarClientException | UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ /**
+ * A Pulsar source is constructed for a specific Pulsar installation, topic, and
+ * subsecription.
+ */
+ Source<String> pulsarSource = new PulsarSource(
+ "pulsar://localhost:6650", // Pulsar connection URL
+ "persistent://sample/standalone/ns1/heron-pulsar-test-topic", // Pulsar topic
+ "subscription-1" // Subscription name for the Pulsar topic
+ );
+
+ /**
+ * In this processing graph, the source streamlet consists of messages on a
+ * Pulsar topic. Those messages are simply logged without any processing logic
+ * applied to them.
+ */
+ processingGraphBuilder.newSource(pulsarSource)
+ .setName("incoming-pulsar-messages")
+ .consume(s -> LOG.info(String.format("Message received from Pulsar: \"%s\"", s)));
+
+ Config config = Config.defaultConfig();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java
new file mode 100644
index 0000000..afa05bc
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/SmartWatchTopology.java
@@ -0,0 +1,123 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.KeyValue;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology shows an example usage of a reduce function. A source streamlet emits smart watch readings every 10
+ * seconds from one of several joggers. Those readings provide * The processing graph then converts those smart watch
+ * readings to a KeyValue object, which is passed to a reduce function that calculates a per-jogger total number of feet
+ * run in the last minute. The reduced value is then used to provide a per-runner average pace (feet per minute) and the
+ * result is logged using a consume operation (which allows for a formatted log).
+ */
+public final class SmartWatchTopology {
+ private SmartWatchTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(SmartWatchTopology.class.getName());
+
+ private static final List<String> JOGGERS = Arrays.asList(
+ "bill",
+ "ted"
+ );
+
+ private static class SmartWatchReading implements Serializable {
+ private static final long serialVersionUID = -6555650939020508026L;
+ private final String joggerId;
+ private final int feetRun;
+
+ SmartWatchReading() {
+ StreamletUtils.sleep(1000);
+ this.joggerId = StreamletUtils.randomFromList(JOGGERS);
+ this.feetRun = ThreadLocalRandom.current().nextInt(200, 400);
+ }
+
+ String getJoggerId() {
+ return joggerId;
+ }
+
+ int getFeetRun() {
+ return feetRun;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ processingGraphBuilder.newSource(SmartWatchReading::new)
+ .setName("incoming-watch-readings")
+ .reduceByKeyAndWindow(
+ // Key extractor
+ reading -> reading.getJoggerId(),
+ // Value extractor
+ reading -> reading.getFeetRun(),
+ // The time window (1 minute of clock time)
+ WindowConfig.TumblingTimeWindow(Duration.ofSeconds(10)),
+ // The reduce function (produces a cumulative sum)
+ (cumulative, incoming) -> cumulative + incoming
+ )
+ .setName("reduce-to-total-distance-per-jogger")
+ .map(keyWindow -> {
+ // The per-key result of the previous reduce step
+ long totalFeetRun = keyWindow.getValue();
+
+ // The amount of time elapsed
+ long startTime = keyWindow.getKey().getWindow().getStartTime();
+ long endTime = keyWindow.getKey().getWindow().getEndTime();
+ long timeLengthMillis = endTime - startTime; // Cast to float to use as denominator
+
+ // The feet-per-minute calculation
+ float feetPerMinute = totalFeetRun / (float) (timeLengthMillis / 1000);
+
+ // Reduce to two decimal places
+ String paceString = new DecimalFormat("#.##").format(feetPerMinute);
+
+ // Return a per-jogger average pace
+ return new KeyValue<>(keyWindow.getKey().getKey(), paceString);
+ })
+ .setName("calculate-average-speed")
+ .consume(kv -> {
+ String logMessage = String.format("(runner: %s, avgFeetPerMinute: %s)",
+ kv.getKey(),
+ kv.getValue());
+
+ LOG.info(logMessage);
+ });
+
+ Config config = Config.defaultConfig();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java
new file mode 100644
index 0000000..334e674
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/StreamletCloneTopology.java
@@ -0,0 +1,158 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Sink;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates clone operations on streamlets in the Heron
+ * Streamlet API for Java. A clone operation creates multiple identical copies
+ * of a streamlet. Clone operations are especially useful if you want to, for
+ * example send streamlet elements to separate sinks, as is done here. A
+ * supplier streamlet emits random scores in a game (per player). That initial
+ * streamlet is cloned into two. One of the cloned streams goes to a custom
+ * logging sink while the other goes to a dummy database sink.
+ */
+public final class StreamletCloneTopology {
+ private StreamletCloneTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(StreamletCloneTopology.class.getName());
+
+ /**
+ * A list of players of the game ("player1" through "player100").
+ */
+ private static final List<String> PLAYERS = IntStream.range(1, 100)
+ .mapToObj(i -> String.format("player%d", i))
+ .collect(Collectors.toList());
+
+ /**
+ * A POJO for game scores.
+ */
+ private static class GameScore implements Serializable {
+ private static final long serialVersionUID = 1089454399729015529L;
+ private String playerId;
+ private int score;
+
+ GameScore() {
+ this.playerId = StreamletUtils.randomFromList(PLAYERS);
+ this.score = ThreadLocalRandom.current().nextInt(1000);
+ }
+
+ String getPlayerId() {
+ return playerId;
+ }
+
+ int getScore() {
+ return score;
+ }
+ }
+
+ /**
+ * A phony database sink. This sink doesn't actually interact with a database.
+ * Instead, it logs each incoming score to stdout.
+ */
+ private static class DatabaseSink implements Sink<GameScore> {
+ private static final long serialVersionUID = 5544736723673011054L;
+
+ private void saveToDatabase(GameScore score) {
+ // This is a dummy operation, so no database logic will be implemented here
+ }
+
+ public void setup(Context context) {
+ }
+
+ public void put(GameScore score) {
+ String logMessage = String.format("Saving a score of %d for player %s to the database",
+ score.getScore(),
+ score.getPlayerId());
+ LOG.info(logMessage);
+ saveToDatabase(score);
+ }
+
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * A logging sink that simply prints a formatted log message for each incoming score.
+ */
+ private static class FormattedLogSink implements Sink<GameScore> {
+ private static final long serialVersionUID = 1251089445039059977L;
+ public void setup(Context context) {
+ }
+
+ public void put(GameScore score) {
+ String logMessage = String.format("The current score for player %s is %d",
+ score.getPlayerId(),
+ score.getScore());
+ LOG.info(logMessage);
+ }
+
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ /**
+ * A supplier streamlet of random GameScore objects is cloned into two
+ * separate streamlets.
+ */
+ List<Streamlet<GameScore>> splitGameScoreStreamlet = processingGraphBuilder
+ .newSource(GameScore::new)
+ .clone(2);
+
+ /**
+ * Elements in the first cloned streamlet go to the database sink.
+ */
+ splitGameScoreStreamlet.get(0)
+ .toSink(new DatabaseSink());
+
+ /**
+ * Elements in the second cloned streamlet go to the logging sink.
+ */
+ splitGameScoreStreamlet.get(1)
+ .toSink(new FormattedLogSink());
+
+ Config config = Config.defaultConfig();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java
new file mode 100644
index 0000000..83fddbd
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/TransformsTopology.java
@@ -0,0 +1,122 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.SerializableTransformer;
+
+/**
+ * In this topology, a supplier generates an indefinite series of random integers between 1
+ * and 100. From there, a series of transform operations are applied that ultimately leave
+ * the original value unchanged.
+ */
+public final class TransformsTopology {
+ private TransformsTopology() {
+ }
+
+ private static final Logger LOG = Logger.getLogger(TransformsTopology.class.getName());
+
+ /**
+ * This transformer leaves incoming values unmodified. The Consumer simply accepts incoming
+ * values as-is during the transform phase.
+ */
+ private static class DoNothingTransformer<T> implements SerializableTransformer<T, T> {
+ private static final long serialVersionUID = 3717991700067221067L;
+
+ public void setup(Context context) {
+ }
+
+ /**
+ * Here, the incoming value is accepted as-is and not changed (hence the "do nothing"
+ * in the class name).
+ */
+ public void transform(T in, Consumer<T> consumer) {
+ consumer.accept(in);
+ }
+
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * This transformer increments incoming values by a user-supplied increment (which can also,
+ * of course, be negative).
+ */
+ private static class IncrementTransformer implements SerializableTransformer<Integer, Integer> {
+ private static final long serialVersionUID = -3198491688219997702L;
+ private int increment;
+ private int total;
+
+ IncrementTransformer(int increment) {
+ this.increment = increment;
+ }
+
+ public void setup(Context context) {
+ context.registerMetric("InCrementMetric", 30, () -> total);
+ }
+
+ /**
+ * Here, the incoming value is incremented by the value specified in the
+ * transformer's constructor.
+ */
+ public void transform(Integer in, Consumer<Integer> consumer) {
+ int incrementedValue = in + increment;
+ total += increment;
+ consumer.accept(incrementedValue);
+ }
+
+ public void cleanup() {
+ }
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder builder = Builder.createBuilder();
+
+ /**
+ * The processing graph consists of a supplier streamlet that emits
+ * random integers between 1 and 100. From there, a series of transformers
+ * is applied. At the end of the graph, the original value is ultimately
+ * unchanged.
+ */
+ builder.newSource(() -> ThreadLocalRandom.current().nextInt(100))
+ .transform(new DoNothingTransformer<>())
+ .transform(new IncrementTransformer(10))
+ .transform(new IncrementTransformer(-7))
+ .transform(new DoNothingTransformer<>())
+ .transform(new IncrementTransformer(-3))
+ .log();
+
+ Config config = Config.defaultConfig();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, builder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
new file mode 100644
index 0000000..8bfa62e
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -0,0 +1,97 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.WindowConfig;
+
+/**
+ * This topology is an implementation of the classic word count example
+ * for the Heron Streamlet API for Java. A source streamlet emits an
+ * indefinite series of sentences chosen at random from a pre-defined list.
+ * Each sentence is then "flattened" into a list of individual words. A
+ * reduce function keeps a running tally of the number of times each word
+ * is encountered within each time window (in this case a tumbling count
+ * window of 50 operations). The result is then logged.
+ */
+public final class WindowedWordCountTopology {
+ private WindowedWordCountTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(WindowedWordCountTopology.class.getName());
+
+ private static final List<String> SENTENCES = Arrays.asList(
+ "I have nothing to declare but my genius",
+ "You can even",
+ "Compassion is an action word with no boundaries",
+ "To thine own self be true"
+ );
+
+ public static void main(String[] args) throws Exception {
+ Builder processingGraphBuilder = Builder.createBuilder();
+
+ processingGraphBuilder
+ // The origin of the processing graph: an indefinite series of sentences chosen
+ // from the list
+ .newSource(() -> StreamletUtils.randomFromList(SENTENCES))
+ .setName("random-sentences-source")
+ // Each sentence is "flattened" into a Streamlet<String> of individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .setName("flatten-into-individual-words")
+ // The reduce operation performs the per-key (i.e. per-word) sum within each time window
+ .reduceByKeyAndWindow(
+ // The key extractor (the word is left unchanged)
+ word -> word,
+ // Value extractor (the value is always 1)
+ word -> 1,
+ WindowConfig.TumblingCountWindow(50),
+ (x, y) -> x + y
+ )
+ .setName("reduce-operation")
+ // The final output is logged using a user-supplied format
+ .consume(kv -> {
+ String logMessage = String.format("(word: %s, count: %d)",
+ kv.getKey().getKey(),
+ kv.getValue()
+ );
+ LOG.info(logMessage);
+ });
+
+ // The topology's parallelism (the number of containers across which the topology's
+ // processing instance will be split) can be defined via the second command-line
+ // argument (or else the default of 2 will be used).
+ int topologyParallelism = StreamletUtils.getParallelism(args, 2);
+
+ Config config = new Config.Builder()
+ .setNumContainers(topologyParallelism)
+ .useKryoSerializer()
+ .build();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, processingGraphBuilder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java
new file mode 100644
index 0000000..bebfa66
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WireRequestsTopology.java
@@ -0,0 +1,193 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import com.twitter.heron.examples.streamlet.utils.StreamletUtils;
+import com.twitter.heron.streamlet.Builder;
+import com.twitter.heron.streamlet.Config;
+import com.twitter.heron.streamlet.Runner;
+import com.twitter.heron.streamlet.Streamlet;
+
+/**
+ * This topology demonstrates how different streamlets can be united into one
+ * using union operations. Wire requests (customers asking to wire money) are
+ * created for three different bank branches. Each bank branch is a separate
+ * streamlet. In each branch's streamlet, the request amount is checked to make
+ * sure that no one requests a wire transfer of more than $500. Then, the
+ * wire request streamlets for all three branches are combined into one using a
+ * union operation. Each element in the unified streamlet then passes through a
+ * fraud detection filter that ensures that no "bad" customers are allowed to
+ * make requests.
+ */
+public final class WireRequestsTopology {
+ private WireRequestsTopology() {
+ }
+
+ private static final Logger LOG =
+ Logger.getLogger(WireRequestsTopology.class.getName());
+
+ /**
+ * A list of current customers (some good, some bad).
+ */
+ private static final List<String> CUSTOMERS = Arrays.asList(
+ "honest-tina",
+ "honest-jeff",
+ "scheming-dave",
+ "scheming-linda"
+ );
+
+ /**
+ * A list of bad customers whose requests should be rejected.
+ */
+ private static final List<String> FRAUDULENT_CUSTOMERS = Arrays.asList(
+ "scheming-dave",
+ "scheming-linda"
+ );
+
+ /**
+ * The maximum allowable amount for transfers. Requests for more than this
+ * amount need to be rejected.
+ */
+ private static final int MAX_ALLOWABLE_AMOUNT = 500;
+
+ /**
+ * A POJO for wire requests.
+ */
+ private static class WireRequest implements Serializable {
+ private static final long serialVersionUID = 1311441220738558016L;
+ private String customerId;
+ private int amount;
+
+ WireRequest(long delay) {
+ // The pace at which requests are generated is throttled. Different
+ // throttles are applied to different bank branches.
+ StreamletUtils.sleep(delay);
+ this.customerId = StreamletUtils.randomFromList(CUSTOMERS);
+ this.amount = ThreadLocalRandom.current().nextInt(1000);
+ LOG.info(String.format("New wire request: %s", this));
+ }
+
+ String getCustomerId() {
+ return customerId;
+ }
+
+ int getAmount() {
+ return amount;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(customer: %s, amount: %d)", customerId, amount);
+ }
+ }
+
+ /**
+ * Each request is checked to make sure that requests from untrustworthy customers
+ * are rejected.
+ */
+ private static boolean fraudDetect(WireRequest request) {
+ String logMessage;
+
+ boolean fraudulent = FRAUDULENT_CUSTOMERS.contains(request.getCustomerId());
+
+ if (fraudulent) {
+ logMessage = String.format("Rejected fraudulent customer %s",
+ request.getCustomerId());
+ LOG.warning(logMessage);
+ } else {
+ logMessage = String.format("Accepted request for $%d from customer %s",
+ request.getAmount(),
+ request.getCustomerId());
+ LOG.info(logMessage);
+ }
+
+ return !fraudulent;
+ }
+
+ /**
+ * Each request is checked to make sure that no one requests an amount over $500.
+ */
+ private static boolean checkRequestAmount(WireRequest request) {
+ boolean sufficientBalance = request.getAmount() < MAX_ALLOWABLE_AMOUNT;
+
+ if (!sufficientBalance) {
+ LOG.warning(
+ String.format("Rejected excessive request of $%d",
+ request.getAmount()));
+ }
+
+ return sufficientBalance;
+ }
+
+ /**
+ * All Heron topologies require a main function that defines the topology's behavior
+ * at runtime
+ */
+ public static void main(String[] args) throws Exception {
+ Builder builder = Builder.createBuilder();
+
+ // Requests from the "quiet" bank branch (high throttling).
+ Streamlet<WireRequest> quietBranch = builder.newSource(() -> new WireRequest(20))
+ .setNumPartitions(1)
+ .setName("quiet-branch-requests")
+ .filter(WireRequestsTopology::checkRequestAmount)
+ .setName("quiet-branch-check-balance");
+
+ // Requests from the "medium" bank branch (medium throttling).
+ Streamlet<WireRequest> mediumBranch = builder.newSource(() -> new WireRequest(10))
+ .setNumPartitions(2)
+ .setName("medium-branch-requests")
+ .filter(WireRequestsTopology::checkRequestAmount)
+ .setName("medium-branch-check-balance");
+
+ // Requests from the "busy" bank branch (low throttling).
+ Streamlet<WireRequest> busyBranch = builder.newSource(() -> new WireRequest(5))
+ .setNumPartitions(4)
+ .setName("busy-branch-requests")
+ .filter(WireRequestsTopology::checkRequestAmount)
+ .setName("busy-branch-check-balance");
+
+ // Here, the streamlets for the three bank branches are united into one. The fraud
+ // detection filter then operates on that unified streamlet.
+ quietBranch
+ .union(mediumBranch)
+ .setNumPartitions(2)
+ .setName("union-1")
+ .union(busyBranch)
+ .setName("union-2")
+ .setNumPartitions(4)
+ .filter(WireRequestsTopology::fraudDetect)
+ .setName("all-branches-fraud-detect")
+ .log();
+
+ Config config = new Config.Builder()
+ .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+ .setNumContainers(2)
+ .build();
+
+ // Fetches the topology name from the first command-line argument
+ String topologyName = StreamletUtils.getTopologyName(args);
+
+ // Finally, the processing graph and configuration are passed to the Runner, which converts
+ // the graph into a Heron topology that can be run in a Heron cluster.
+ new Runner().run(topologyName, config, builder);
+ }
+}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java
deleted file mode 100644
index 82525a0..0000000
--- a/examples/src/java/com/twitter/heron/examples/streamlet/WordCountStreamletTopology.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright 2017 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.examples.streamlet;
-
-import java.util.Arrays;
-
-import com.twitter.heron.streamlet.Builder;
-import com.twitter.heron.streamlet.Config;
-import com.twitter.heron.streamlet.Runner;
-import com.twitter.heron.streamlet.WindowConfig;
-
-/**
- * This is a topology that does simple word counts.
- * <p>
- * In this topology,
- * 1. The sentence "Mary had a little lamb" is generated over and over again.
- * 2. The flatMap stage splits the sentence into words
- * 3. The map stage 'counts' each word one at a time.
- * 4. The reduceByKeyAndWindow stage assembles a tumbling count window and
- * computes the counts of all words grouped by word.
- */
-public final class WordCountStreamletTopology {
- private WordCountStreamletTopology() {
- }
-
- /**
- * Main method
- */
- public static void main(String[] args) {
- if (args.length < 1) {
- throw new RuntimeException("Specify topology name");
- }
-
- int parallelism = 1;
- if (args.length > 1) {
- parallelism = Integer.parseInt(args[1]);
- }
- Builder builder = Builder.createBuilder();
- builder.newSource(() -> "Mary had a little lamb")
- .flatMap((sentence) -> Arrays.asList(sentence.split("\\s+")))
- .reduceByKeyAndWindow(x -> x, WindowConfig.TumblingCountWindow(10), 0, (x, y) -> x + 1)
- .log();
- Config conf = new Config();
- conf.setNumContainers(parallelism);
- Runner runner = new Runner();
- runner.run(args[0], conf, builder);
- }
-}
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java b/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java
new file mode 100644
index 0000000..4f01aa4
--- /dev/null
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/utils/StreamletUtils.java
@@ -0,0 +1,69 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.examples.streamlet.utils;
+
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * A collection of helper functions for the Streamlet API example topologies
+ */
+public final class StreamletUtils {
+ private StreamletUtils() {
+ }
+
+ public static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Fetches the topology's name from the first command-line argument or
+ * throws an exception if not present.
+ */
+ public static String getTopologyName(String[] args) throws Exception {
+ if (args.length == 0) {
+ throw new Exception("You must supply a name for the topology");
+ } else {
+ return args[0];
+ }
+ }
+
+ /**
+ * Selects a random item from a list. Used in many example source streamlets.
+ */
+ public static <T> T randomFromList(List<T> ls) {
+ return ls.get(new Random().nextInt(ls.size()));
+ }
+
+ /**
+ * Fetches the topology's parallelism from the second-command-line
+ * argument or defers to a supplied default.
+ */
+ public static int getParallelism(String[] args, int defaultParallelism) {
+ return (args.length > 1) ? Integer.parseInt(args[1]) : defaultParallelism;
+ }
+
+ /**
+ * Converts a list of integers into a comma-separated string.
+ */
+ public static String intListAsString(List<Integer> ls) {
+ return String.join(", ", ls.stream().map(i -> i.toString()).collect(Collectors.toList()));
+ }
+}
diff --git a/heron/api/src/java/BUILD b/heron/api/src/java/BUILD
index 2e13df0..e98fe2e 100644
--- a/heron/api/src/java/BUILD
+++ b/heron/api/src/java/BUILD
@@ -20,17 +20,29 @@
"//heron/common/src/java:basics-java",
]
+# Low Level Api
java_library(
- name = "api-java",
- srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
+ name = "api-java-low-level",
+ srcs = glob(["com/twitter/heron/api/**/*.java"]),
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files,
)
+# Functional Api
+java_library(
+ name = "api-java",
+ srcs = glob(["com/twitter/heron/streamlet/**/*.java"]),
+ javacopts = DOCLINT_HTML_AND_SYNTAX,
+ deps = api_deps_files + [
+ ":api-java-low-level",
+ "//third_party/java:kryo-neverlink",
+ ]
+)
+
java_binary(
name = "api-unshaded",
srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
- deps = api_deps_files,
+ deps = api_deps_files + ["//third_party/java:kryo-neverlink"],
)
jarjar_binary(
diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java
index 5c12615..b281e54 100644
--- a/heron/api/src/java/com/twitter/heron/api/Config.java
+++ b/heron/api/src/java/com/twitter/heron/api/Config.java
@@ -34,6 +34,7 @@
package com.twitter.heron.api;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -266,6 +267,19 @@
*/
public static final String TOPOLOGY_ENVIRONMENT = "topology.environment";
+ /**
+ * Timer events registered for a topology.
+ * This is a Map<String, Pair<Duration, Runnable>>.
+ * Where the key is the name and the value contains the frequency of the event
+ * and the task to run.
+ */
+ public static final String TOPOLOGY_TIMER_EVENTS = "topology.timer.events";
+
+ /**
+ * Enable Remote debugging for java heron instances
+ */
+ public static final String TOPOLOGY_REMOTE_DEBUGGING_ENABLE = "topology.remote.debugging.enable";
+
private static final long serialVersionUID = 2550967708478837032L;
// We maintain a list of all user exposed vars
private static Set<String> apiVars = new HashSet<>();
@@ -301,6 +315,7 @@
apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH);
apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS);
apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS);
+ apiVars.add(TOPOLOGY_REMOTE_DEBUGGING_ENABLE);
}
public Config() {
@@ -652,4 +667,35 @@
public void setTopologyStatefulStartClean(boolean clean) {
setTopologyStatefulStartClean(this, clean);
}
+
+ /**
+ * Registers a timer event that executes periodically
+ * @param conf the map with the existing topology configs
+ * @param name the name of the timer
+ * @param interval the frequency in which to run the task
+ * @param task the task to run
+ */
+ @SuppressWarnings("unchecked")
+ public static void registerTopologyTimerEvents(Map<String, Object> conf,
+ String name, Duration interval,
+ Runnable task) {
+ if (interval.isZero() || interval.isNegative()) {
+ throw new IllegalArgumentException("Timer duration needs to be positive");
+ }
+ if (!conf.containsKey(Config.TOPOLOGY_TIMER_EVENTS)) {
+ conf.put(Config.TOPOLOGY_TIMER_EVENTS, new HashMap<String, Pair<Duration, Runnable>>());
+ }
+
+ Map<String, Pair<Duration, Runnable>> timers
+ = (Map<String, Pair<Duration, Runnable>>) conf.get(Config.TOPOLOGY_TIMER_EVENTS);
+
+ if (timers.containsKey(name)) {
+ throw new IllegalArgumentException("Timer with name " + name + " already exists");
+ }
+ timers.put(name, Pair.of(interval, task));
+ }
+
+ public void setTopologyRemoteDebugging(boolean isOn) {
+ this.put(Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, String.valueOf(isOn));
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
index 9a487f4..cf52f7c 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
@@ -290,4 +290,10 @@
throw new IllegalStateException("Failed to find topology defn file");
}
+
+ public static boolean getTopologyRemoteDebuggingEnabled(TopologyAPI.Topology topology) {
+ List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+ return Boolean.parseBoolean(TopologyUtils.getConfigWithDefault(
+ topologyConfig, Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, "false"));
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
index e4aca45..a6ee37a 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
@@ -17,69 +17,39 @@
import java.io.Serializable;
import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.streamlet.impl.KryoSerializer;
/**
* Config is the way users configure the execution of the topology.
- * Things like tuple delivery semantics, resources used, as well as
- * user defined key/value pairs are passed on to the runner via
+ * Things like streamlet delivery semantics, resources used, as well as
+ * user-defined key/value pairs are passed on to the topology runner via
* this class.
*/
public final class Config implements Serializable {
private static final long serialVersionUID = 6204498077403076352L;
+
private com.twitter.heron.api.Config heronConfig;
+
public enum DeliverySemantics {
ATMOST_ONCE,
ATLEAST_ONCE,
EFFECTIVELY_ONCE
}
- public Config() {
- heronConfig = new com.twitter.heron.api.Config();
+ private Config(Builder builder) {
+ heronConfig = builder.config;
}
- Config(com.twitter.heron.api.Config config) {
- heronConfig = config;
+ public static Config defaultConfig() {
+ return new Builder()
+ .build();
}
com.twitter.heron.api.Config getHeronConfig() {
return heronConfig;
}
- /**
- * Sets the delivery semantics of the topology
- * @param semantic The delivery semantic to be enforced
- */
- public void setDeliverySemantics(DeliverySemantics semantic) {
- heronConfig.setTopologyReliabilityMode(translateSemantics(semantic));
- }
-
- /**
- * Sets the number of containers to run this topology
- * @param numContainers The number of containers to distribute this topology
- */
- public void setNumContainers(int numContainers) {
- heronConfig.setNumStmgrs(numContainers);
- }
-
- /**
- * Sets resources used per container by this topology
- * @param resource The resource per container to dedicate per container
- */
- public void setContainerResources(Resources resource) {
- heronConfig.setContainerCpuRequested(resource.getCpu());
- heronConfig.setContainerRamRequested(ByteAmount.fromBytes(resource.getRam()));
- }
-
- /**
- * Sets some user defined key value mapping
- * @param key The user defined key
- * @param value The user defined object
- */
- public void setUserConfig(String key, Object value) {
- heronConfig.put(key, value);
- }
-
- private com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
+ private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
DeliverySemantics semantics) {
switch (semantics) {
case ATMOST_ONCE:
@@ -92,4 +62,67 @@
return com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
}
}
+
+ public static class Builder {
+ private com.twitter.heron.api.Config config;
+
+ public Builder() {
+ config = new com.twitter.heron.api.Config();
+ }
+
+ /**
+ * Sets the number of containers to run this topology
+ * @param numContainers The number of containers to distribute this topology
+ */
+ public Builder setNumContainers(int numContainers) {
+ config.setNumStmgrs(numContainers);
+ return this;
+ }
+
+ /**
+ * Sets resources used per container by this topology
+ * @param resources The resource to dedicate per container
+ */
+ public Builder setContainerResources(Resources resources) {
+ config.setContainerCpuRequested(resources.getCpu());
+ config.setContainerRamRequested(ByteAmount.fromBytes(resources.getRam()));
+ return this;
+ }
+
+ /**
+ * Sets the delivery semantics of the topology
+ * @param semantic The delivery semantic to be enforced
+ */
+ public Builder setDeliverySemantics(DeliverySemantics semantic) {
+ config.setTopologyReliabilityMode(Config.translateSemantics(semantic));
+ return this;
+ }
+
+ /**
+ * Sets some user-defined key/value mapping
+ * @param key The user-defined key
+ * @param value The user-defined value
+ */
+ public Builder setUserConfig(String key, Object value) {
+ config.put(key, value);
+ return this;
+ }
+
+ /**
+ * Sets the topology to use the Kryo serializer for serializing
+ * streamlet elements
+ */
+ public Builder useKryoSerializer() {
+ try {
+ config.setSerializationClassName(new KryoSerializer().getClass().getName());
+ } catch (NoClassDefFoundError e) {
+ throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used");
+ }
+ return this;
+ }
+
+ public Config build() {
+ return new Config(this);
+ }
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Context.java b/heron/api/src/java/com/twitter/heron/streamlet/Context.java
index 8371011..34fcf0e 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Context.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Context.java
@@ -16,6 +16,7 @@
import java.io.Serializable;
import java.util.Map;
+import java.util.function.Supplier;
import com.twitter.heron.api.state.State;
@@ -50,6 +51,14 @@
int getStreamPartition();
/**
+ * Register a metric function. This function will be called
+ * by the system every collectionInterval seconds and the resulting value
+ * will be collected
+ */
+ <T> void registerMetric(String metricName, int collectionInterval,
+ Supplier<T> metricFn);
+
+ /**
* The state where components can store any of their local state
* @return The state interface where users can store their local state
*/
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java b/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
index 6e9770a..2c3b4e4 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/KeyValue.java
@@ -31,6 +31,10 @@
return new KeyValue<R, T>(k, v);
}
+ KeyValue() {
+ // nothing really
+ }
+
public KeyValue(K k, V v) {
this.key = k;
this.value = v;
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java b/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
index fb1af85..25c06c4 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/KeyedWindow.java
@@ -25,10 +25,16 @@
private static final long serialVersionUID = 4193319775040181971L;
private T key;
private Window window;
+
+ KeyedWindow() {
+ // nothing really
+ }
+
public KeyedWindow(T key, Window window) {
this.key = key;
this.window = window;
}
+
public T getKey() {
return key;
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
index 967e700..1deffcf 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
@@ -18,13 +18,23 @@
/**
* Resources needed by the topology are encapsulated in this class.
- * Currently we deal with cpu and ram. Others can be added later.
+ * Currently we deal with CPU and RAM. Others can be added later.
*/
public final class Resources implements Serializable {
private static final long serialVersionUID = 630451253428388496L;
private float cpu;
private long ram;
+ private Resources(Builder builder) {
+ this.cpu = builder.cpu;
+ this.ram = builder.ram;
+ }
+
+ public static Resources defaultResources() {
+ return new Builder()
+ .build();
+ }
+
public float getCpu() {
return cpu;
}
@@ -33,31 +43,58 @@
return ram;
}
- public Resources() {
- this.cpu = 1.0f;
- this.ram = 104857600;
- }
-
- public Resources withCpu(float ncpu) {
- this.cpu = ncpu;
- return this;
- }
-
- public Resources withRam(long nram) {
- this.ram = nram;
- return this;
- }
-
- public Resources withRamInMB(long nram) {
- return withRam(nram * 1024 * 1024);
- }
-
- public Resources withRamInGB(long nram) {
- return withRamInMB(nram * 1024);
- }
-
@Override
public String toString() {
- return "{ CPU: " + String.valueOf(cpu) + " RAM: " + String.valueOf(ram) + " }";
+ return String.format("{ CPU: %s RAM: %s }", String.valueOf(cpu), String.valueOf(ram));
+ }
+
+ public static class Builder {
+ private float cpu;
+ private long ram;
+
+ public Builder() {
+ this.cpu = 1.0f;
+ this.ram = 104857600;
+ }
+
+ /**
+ * Sets the RAM to be used by the topology (in megabytes)
+ * @param nram The number of megabytes of RAM
+ */
+ public Builder setRamInMB(long nram) {
+ this.ram = nram * 1024;
+ return this;
+ }
+
+ /**
+ * Sets the RAM to be used by the topology (in gigabytes)
+ * @param nram The number of gigabytes of RAM
+ */
+ public Builder setRamInGB(long nram) {
+ this.ram = nram * 1024 * 1024;
+ return this;
+ }
+
+ /**
+ * Sets the total number of CPUs to be used by the topology
+ * @param containerCpu The number of CPUs (as a float)
+ */
+ public Builder setCpu(float containerCpu) {
+ this.cpu = containerCpu;
+ return this;
+ }
+
+ /**
+ * Sets the RAM to be used by the topology (in bytes)
+ * @param containerRam The number of bytes of RAM
+ */
+ public Builder setRam(long containerRam) {
+ this.ram = containerRam;
+ return this;
+ }
+
+ public Resources build() {
+ return new Resources(this);
+ }
}
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Source.java b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
index 60a4903..fe2330a 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Source.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
@@ -15,6 +15,7 @@
package com.twitter.heron.streamlet;
import java.io.Serializable;
+import java.util.Collection;
/**
* Source is how Streamlet's originate. The get method
@@ -24,6 +25,6 @@
*/
public interface Source<T> extends Serializable {
void setup(Context context);
- T get();
+ Collection<T> get();
void cleanup();
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Window.java b/heron/api/src/java/com/twitter/heron/streamlet/Window.java
index eb14928..fb25e92 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Window.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Window.java
@@ -28,6 +28,10 @@
private long endTimeMs;
private long count;
+ Window() {
+ // nothing really
+ }
+
public Window(long startTimeMs, long endTimeMs, long count) {
this.startTimeMs = startTimeMs;
this.endTimeMs = endTimeMs;
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
index 31bbdfd..62fd7ba 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/ContextImpl.java
@@ -16,7 +16,9 @@
import java.io.Serializable;
import java.util.Map;
+import java.util.function.Supplier;
+import com.twitter.heron.api.metric.IMetric;
import com.twitter.heron.api.state.State;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.streamlet.Context;
@@ -59,7 +61,26 @@
}
@Override
+ public <T> void registerMetric(String metricName, int collectionInterval,
+ Supplier<T> metricFn) {
+ topologyContext.registerMetric(metricName, new StreamletMetric<T>(metricFn),
+ collectionInterval);
+ }
+
+ @Override
public State<Serializable, Serializable> getState() {
return state;
}
+
+ private class StreamletMetric<T> implements IMetric<T> {
+ private Supplier<T> metricFn;
+ StreamletMetric(Supplier<T> metricFn) {
+ this.metricFn = metricFn;
+ }
+
+ @Override
+ public T getValueAndReset() {
+ return metricFn.get();
+ }
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java
new file mode 100644
index 0000000..1a80547
--- /dev/null
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/KryoSerializer.java
@@ -0,0 +1,97 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.streamlet.impl;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.esotericsoftware.kryo.serializers.DefaultSerializers;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+
+import com.twitter.heron.api.serializer.IPluggableSerializer;
+
+/**
+ * KryoSerializer is a wrapper around Heron's IPluggableSerializer.
+ * Streamlet based topologies turning on kryo serialization are based off of it.
+ */
+public class KryoSerializer implements IPluggableSerializer {
+ private Kryo kryo;
+ private Output kryoOut;
+ private Input kryoIn;
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ kryo = getKryo();
+ kryoOut = new Output(2000, 2000000000);
+ kryoIn = new Input(1);
+ }
+
+ @Override
+ public byte[] serialize(Object object) {
+ kryoOut.clear();
+ kryo.writeClassAndObject(kryoOut, object);
+ return kryoOut.toBytes();
+ }
+
+ @Override
+ public Object deserialize(byte[] input) {
+ kryoIn.setBuffer(input);
+ return kryo.readClassAndObject(kryoIn);
+ }
+
+ private Kryo getKryo() {
+ Kryo k = new Kryo();
+ k.setRegistrationRequired(false);
+ k.setReferences(false);
+ k.register(byte[].class);
+ k.register(ArrayList.class, new ArrayListSerializer());
+ k.register(HashMap.class, new HashMapSerializer());
+ k.register(HashSet.class, new HashSetSerializer());
+ k.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
+ return k;
+ }
+
+ private class ArrayListSerializer extends CollectionSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+ public Collection create(Kryo k, Input input, Class<Collection> type) {
+ return new ArrayList();
+ }
+ }
+
+ private class HashMapSerializer extends MapSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending kryo class signature that takes Map
+ public Map<String, Object> create(Kryo k, Input input, Class<Map> type) {
+ return new HashMap<>();
+ }
+ }
+
+ private class HashSetSerializer extends CollectionSerializer {
+ @Override
+ @SuppressWarnings("rawtypes") // extending Kryo class that uses raw types
+ public Collection create(Kryo k, Input input, Class<Collection> type) {
+ return new HashSet();
+ }
+ }
+}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
index f459839..2f8919d 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/sources/ComplexSource.java
@@ -15,6 +15,7 @@
package com.twitter.heron.streamlet.impl.sources;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Map;
import com.twitter.heron.api.spout.SpoutOutputCollector;
@@ -58,9 +59,11 @@
@Override
public void nextTuple() {
- R val = generator.get();
+ Collection<R> val = generator.get();
if (val != null) {
- collector.emit(new Values(val));
+ for (R tuple : val) {
+ collector.emit(new Values(tuple));
+ }
}
}
}
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index ddf8e0e..a69bf79 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -1,6 +1,7 @@
load("/tools/rules/heron_deps", "heron_java_api_proto_files")
api_deps_files = [
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:api-java",
"//heron/common/src/java:utils-java",
"//heron/common/src/java:basics-java",
@@ -31,6 +32,7 @@
"com.twitter.heron.streamlet.impl.operators.JoinOperatorTest",
"com.twitter.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
"com.twitter.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
+ "com.twitter.heron.api.ConfigTest",
"com.twitter.heron.api.HeronSubmitterTest"
],
runtime_deps = [ ":api-tests" ],
diff --git a/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java b/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java
new file mode 100644
index 0000000..9de282a
--- /dev/null
+++ b/heron/api/tests/java/com/twitter/heron/api/ConfigTest.java
@@ -0,0 +1,49 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.api;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ConfigTest {
+
+ @Test
+ public void testRegisterTimerEvent() {
+ Config conf = new Config();
+ try {
+ Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(0), () -> {
+ });
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(-1), () -> {
+ });
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ Config.registerTopologyTimerEvents(conf, "timer", Duration.ofSeconds(1), () -> {
+ });
+ } catch (IllegalArgumentException e) {
+ Assert.fail();
+ }
+ }
+}
diff --git a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
index be8450b..069b6e2 100644
--- a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
@@ -24,6 +24,7 @@
import com.twitter.heron.api.topology.TopologyBuilder;
import com.twitter.heron.streamlet.Context;
+import com.twitter.heron.streamlet.Resources;
import com.twitter.heron.streamlet.SerializableTransformer;
import com.twitter.heron.streamlet.Streamlet;
import com.twitter.heron.streamlet.WindowConfig;
@@ -256,4 +257,18 @@
(JoinStreamlet<String, String, String, String>) fStreamlet.getChildren().get(0);
assertEquals(jStreamlet.getChildren().size(), 0);
}
+
+ @Test
+ public void testResourcesBuilder() {
+ Resources defaultResoures = Resources.defaultResources();
+ assertEquals(0, Float.compare(defaultResoures.getCpu(), 1.0f));
+ assertEquals(defaultResoures.getRam(), 104857600);
+
+ Resources res2 = new Resources.Builder()
+ .setCpu(5.1f)
+ .setRamInGB(20)
+ .build();
+ assertEquals(0, Float.compare(res2.getCpu(), 5.1f));
+ assertEquals(res2.getRam(), 20 * 1024 * 1024);
+ }
}
diff --git a/heron/ckptmgr/src/java/BUILD b/heron/ckptmgr/src/java/BUILD
index 0aa6fda..95e9ba0 100644
--- a/heron/ckptmgr/src/java/BUILD
+++ b/heron/ckptmgr/src/java/BUILD
@@ -28,7 +28,7 @@
deps = [
":ckptmgr-java",
"//heron/spi/src/java:statefulstorage-spi-java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
diff --git a/heron/common/src/java/BUILD b/heron/common/src/java/BUILD
index 118a609..c5e721a 100644
--- a/heron/common/src/java/BUILD
+++ b/heron/common/src/java/BUILD
@@ -46,7 +46,7 @@
deps = heron_java_proto_files() + [
":basics-java",
":config-java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:classification",
]
)
diff --git a/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java b/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
index 0870084..8744990 100644
--- a/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
+++ b/heron/common/src/java/com/twitter/heron/common/basics/WakeableLooper.java
@@ -119,6 +119,16 @@
timers.add(new TimerTask(expiration, task));
}
+ public void registerPeriodicEvent(Duration frequency, Runnable task) {
+ registerTimerEvent(frequency, new Runnable() {
+ @Override
+ public void run() {
+ task.run();
+ registerPeriodicEvent(frequency, task);
+ }
+ });
+ }
+
public void exitLoop() {
exitLoop = true;
wakeUp();
diff --git a/heron/common/tests/java/BUILD b/heron/common/tests/java/BUILD
index f4bc5d5..6ab0757 100644
--- a/heron/common/tests/java/BUILD
+++ b/heron/common/tests/java/BUILD
@@ -3,7 +3,7 @@
srcs = glob(["**/*.java"]),
deps = [
"//heron/proto:proto_topology_java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
diff --git a/heron/config/src/yaml/conf/aurora/heron.aurora b/heron/config/src/yaml/conf/aurora/heron.aurora
index 2136c2f..8ab3df4 100644
--- a/heron/config/src/yaml/conf/aurora/heron.aurora
+++ b/heron/config/src/yaml/conf/aurora/heron.aurora
@@ -22,48 +22,48 @@
command_to_start_executor = \
'{{EXECUTOR_BINARY}}' \
- '--shard={{mesos.instance}}' \
- '--topology-name={{TOPOLOGY_NAME}}' \
- '--topology-id={{TOPOLOGY_ID}}' \
- '--topology-defn-file={{TOPOLOGY_DEFINITION_FILE}}' \
- '--state-manager-connection={{STATEMGR_CONNECTION_STRING}}' \
- '--state-manager-root={{STATEMGR_ROOT_PATH}}' \
- '--tmaster-binary={{TMASTER_BINARY}}' \
- '--stmgr-binary={{STMGR_BINARY}}' \
- '--metrics-manager-classpath="{{METRICSMGR_CLASSPATH}}"' \
- '--instance-jvm-opts={{INSTANCE_JVM_OPTS_IN_BASE64}}' \
- '--classpath="{{TOPOLOGY_CLASSPATH}}"' \
- '--master-port={{thermos.ports[port1]}}' \
- '--tmaster-controller-port={{thermos.ports[port2]}}' \
- '--tmaster-stats-port={{thermos.ports[port3]}}' \
- '--heron-internals-config-file={{SYSTEM_YAML}}' \
- '--override-config-file={{OVERRIDE_YAML}} ' \
- '--component-ram-map={{COMPONENT_RAMMAP}}' \
- '--component-jvm-opts={{COMPONENT_JVM_OPTS_IN_BASE64}}' \
- '--pkg-type={{TOPOLOGY_PACKAGE_TYPE}}' \
- '--topology-binary-file={{TOPOLOGY_BINARY_FILE}}' \
- '--heron-java-home={{JAVA_HOME}}' \
- '--shell-port={{thermos.ports[http]}}' \
- '--heron-shell-binary={{SHELL_BINARY}}' \
- '--metrics-manager-port={{thermos.ports[port4]}}' \
- '--cluster={{CLUSTER}}' \
- '--role={{ROLE}}' \
- '--environment={{ENVIRON}}' \
- '--instance-classpath="{{INSTANCE_CLASSPATH}}"' \
- '--metrics-sinks-config-file={{METRICS_YAML}}' \
- '--scheduler-classpath="{{SCHEDULER_CLASSPATH}}"' \
- '--scheduler-port="{{thermos.ports[scheduler]}}"' \
- '--python-instance-binary={{PYTHON_INSTANCE_BINARY}}' \
- '--cpp-instance-binary={{CPP_INSTANCE_BINARY}}' \
- '--metricscache-manager-classpath={{METRICSCACHEMGR_CLASSPATH}}' \
- '--metricscache-manager-master-port={{thermos.ports[metricscachemgr_masterport]}}' \
- '--metricscache-manager-stats-port={{thermos.ports[metricscachemgr_statsport]}}' \
- '--is-stateful={{IS_STATEFUL_ENABLED}}' \
- '--checkpoint-manager-classpath="{{CKPTMGR_CLASSPATH}}"' \
- '--checkpoint-manager-port={{thermos.ports[ckptmgr_port]}}' \
- '--stateful-config-file={{STATEFUL_CONFIG_YAML}}' \
- '--health-manager-mode={{HEALTHMGR_MODE}}' \
- '--health-manager-classpath={{HEALTHMGR_CLASSPATH}}'
+ ' --shard={{mesos.instance}}' \
+ ' --topology-name={{TOPOLOGY_NAME}}' \
+ ' --topology-id={{TOPOLOGY_ID}}' \
+ ' --topology-defn-file={{TOPOLOGY_DEFINITION_FILE}}' \
+ ' --state-manager-connection={{STATEMGR_CONNECTION_STRING}}' \
+ ' --state-manager-root={{STATEMGR_ROOT_PATH}}' \
+ ' --tmaster-binary={{TMASTER_BINARY}}' \
+ ' --stmgr-binary={{STMGR_BINARY}}' \
+ ' --metrics-manager-classpath="{{METRICSMGR_CLASSPATH}}"' \
+ ' --instance-jvm-opts={{INSTANCE_JVM_OPTS_IN_BASE64}}' \
+ ' --classpath="{{TOPOLOGY_CLASSPATH}}"' \
+ ' --master-port={{thermos.ports[port1]}}' \
+ ' --tmaster-controller-port={{thermos.ports[port2]}}' \
+ ' --tmaster-stats-port={{thermos.ports[port3]}}' \
+ ' --heron-internals-config-file={{SYSTEM_YAML}}' \
+ ' --override-config-file={{OVERRIDE_YAML}} ' \
+ ' --component-ram-map={{COMPONENT_RAMMAP}}' \
+ ' --component-jvm-opts={{COMPONENT_JVM_OPTS_IN_BASE64}}' \
+ ' --pkg-type={{TOPOLOGY_PACKAGE_TYPE}}' \
+ ' --topology-binary-file={{TOPOLOGY_BINARY_FILE}}' \
+ ' --heron-java-home={{JAVA_HOME}}' \
+ ' --shell-port={{thermos.ports[http]}}' \
+ ' --heron-shell-binary={{SHELL_BINARY}}' \
+ ' --metrics-manager-port={{thermos.ports[port4]}}' \
+ ' --cluster={{CLUSTER}}' \
+ ' --role={{ROLE}}' \
+ ' --environment={{ENVIRON}}' \
+ ' --instance-classpath="{{INSTANCE_CLASSPATH}}"' \
+ ' --metrics-sinks-config-file={{METRICS_YAML}}' \
+ ' --scheduler-classpath="{{SCHEDULER_CLASSPATH}}"' \
+ ' --scheduler-port="{{thermos.ports[scheduler]}}"' \
+ ' --python-instance-binary={{PYTHON_INSTANCE_BINARY}}' \
+ ' --cpp-instance-binary={{CPP_INSTANCE_BINARY}}' \
+ ' --metricscache-manager-classpath={{METRICSCACHEMGR_CLASSPATH}}' \
+ ' --metricscache-manager-master-port={{thermos.ports[metricscachemgr_masterport]}}' \
+ ' --metricscache-manager-stats-port={{thermos.ports[metricscachemgr_statsport]}}' \
+ ' --is-stateful={{IS_STATEFUL_ENABLED}}' \
+ ' --checkpoint-manager-classpath="{{CKPTMGR_CLASSPATH}}"' \
+ ' --checkpoint-manager-port={{thermos.ports[ckptmgr_port]}}' \
+ ' --stateful-config-file={{STATEFUL_CONFIG_YAML}}' \
+ ' --health-manager-mode={{HEALTHMGR_MODE}}' \
+ ' --health-manager-classpath={{HEALTHMGR_CLASSPATH}}'
launch_heron_executor = Process(
name = 'launch_heron_executor',
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index d901d20..5a2d0b0 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -43,6 +43,8 @@
Log = log.Log
+# pylint: disable=too-many-lines
+
def print_usage():
print(
"Usage: ./heron-executor <shardid> <topname> <topid> <topdefnfile>"
@@ -55,7 +57,7 @@
" <scheduler_classpath> <scheduler_port> <python_instance_binary>"
" <metricscachemgr_classpath> <metricscachemgr_masterport> <metricscachemgr_statsport>"
" <is_stateful> <ckptmgr_classpath> <ckptmgr_port> <stateful_config_file> "
- " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary>")
+ " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary> <jvm_remote_debugger_ports>")
def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
@@ -219,7 +221,9 @@
self.health_manager_mode = parsed_args.health_manager_mode
self.health_manager_classpath = '%s:%s'\
% (self.scheduler_classpath, parsed_args.health_manager_classpath)
-
+ self.jvm_remote_debugger_ports = \
+ parsed_args.jvm_remote_debugger_ports.split(",") \
+ if parsed_args.jvm_remote_debugger_ports else None
def __init__(self, args, shell_env):
self.init_parsed_args(args)
@@ -293,6 +297,8 @@
parser.add_argument("--stateful-config-file", required=True)
parser.add_argument("--health-manager-mode", required=True)
parser.add_argument("--health-manager-classpath", required=True)
+ parser.add_argument("--jvm-remote-debugger-ports", required=False,
+ help="ports to be used by a remote debugger for JVM instances")
parsed_args, unknown_args = parser.parse_known_args(args[1:])
@@ -508,6 +514,10 @@
java_version.startswith("1.5"):
java_metasize_param = 'PermSize'
+ if self.jvm_remote_debugger_ports and \
+ (len(instance_info) > len(self.jvm_remote_debugger_ports)):
+ Log.warn("Not enough remote debugger ports for all instances!")
+
for (instance_id, component_name, global_task_id, component_index) in instance_info:
total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
@@ -542,6 +552,12 @@
'-XX:ParallelGCThreads=4',
'-Xloggc:log-files/gc.%s.log' % instance_id]
+ remote_debugger_port = None
+ if self.jvm_remote_debugger_ports:
+ remote_debugger_port = self.jvm_remote_debugger_ports.pop()
+ instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+ % remote_debugger_port)
+
instance_args = ['-topology_name', self.topology_name,
'-topology_id', self.topology_id,
'-instance_id', instance_id,
@@ -553,10 +569,13 @@
'-metricsmgr_port', self.metrics_manager_port,
'-system_config_file', self.heron_internals_config_file,
'-override_config_file', self.override_config_file]
+ if remote_debugger_port:
+ instance_args += ['-remote_debugger_port', remote_debugger_port]
instance_cmd = instance_cmd + self.instance_jvm_opts.split()
if component_name in self.component_jvm_opts:
instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
+
instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
'-cp',
'%s:%s' % (self.instance_classpath, self.classpath),
diff --git a/heron/healthmgr/src/java/BUILD b/heron/healthmgr/src/java/BUILD
index c232d7b..010a4bb 100644
--- a/heron/healthmgr/src/java/BUILD
+++ b/heron/healthmgr/src/java/BUILD
@@ -5,7 +5,7 @@
load("/tools/rules/heron_deps", "heron_java_proto_files")
healthmgr_deps_files = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:classification",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
diff --git a/heron/instance/src/java/BUILD b/heron/instance/src/java/BUILD
index b465f20..56e2f8d 100644
--- a/heron/instance/src/java/BUILD
+++ b/heron/instance/src/java/BUILD
@@ -7,7 +7,7 @@
instance_deps_files = \
heron_java_proto_files() + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:classification",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
index b8b5be6..7dc3aa0 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
@@ -201,15 +201,13 @@
stmgrPortOption.setRequired(true);
options.addOption(stmgrPortOption);
- Option metricsmgrPortOption
- = new Option(
+ Option metricsmgrPortOption = new Option(
CommandLineOptions.METRICS_MGR_PORT_OPTION, true, "Metrics Manager Port");
metricsmgrPortOption.setType(Integer.class);
metricsmgrPortOption.setRequired(true);
options.addOption(metricsmgrPortOption);
- Option systemConfigFileOption
- = new Option(
+ Option systemConfigFileOption = new Option(
CommandLineOptions.SYSTEM_CONFIG_FILE, true, "Heron Internals Config Filename");
systemConfigFileOption.setType(String.class);
systemConfigFileOption.setRequired(true);
@@ -222,6 +220,11 @@
overrideConfigFileOption.setRequired(true);
options.addOption(overrideConfigFileOption);
+ Option remoteDebuggerPortOption = new Option(
+ CommandLineOptions.REMOTE_DEBUGGER_PORT, true, "Remote Debugger Port");
+ remoteDebuggerPortOption.setType(Integer.class);
+ options.addOption(remoteDebuggerPortOption);
+
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;
@@ -257,6 +260,12 @@
String overrideConfigFile
= commandLine.getOptionValue(CommandLineOptions.OVERRIDE_CONFIG_FILE);
+ Integer remoteDebuggerPort = null;
+ if (commandLine.hasOption(CommandLineOptions.REMOTE_DEBUGGER_PORT)) {
+ remoteDebuggerPort = Integer.parseInt(
+ commandLine.getOptionValue(CommandLineOptions.REMOTE_DEBUGGER_PORT));
+ }
+
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFile, true)
.putAll(overrideConfigFile, true)
@@ -266,8 +275,13 @@
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
// Create the protobuf Instance
- PhysicalPlans.InstanceInfo instanceInfo = PhysicalPlans.InstanceInfo.newBuilder().
- setTaskId(taskId).setComponentIndex(componentIndex).setComponentName(componentName).build();
+ PhysicalPlans.InstanceInfo.Builder instanceInfoBuilder
+ = PhysicalPlans.InstanceInfo.newBuilder().setTaskId(taskId)
+ .setComponentIndex(componentIndex).setComponentName(componentName);
+ if (remoteDebuggerPort != null) {
+ instanceInfoBuilder.setRemoteDebuggerPort(remoteDebuggerPort);
+ }
+ PhysicalPlans.InstanceInfo instanceInfo = instanceInfoBuilder.build();
PhysicalPlans.Instance instance = PhysicalPlans.Instance.newBuilder().
setInstanceId(instanceId).setStmgrId(streamId).setInfo(instanceInfo).build();
@@ -285,11 +299,17 @@
systemConfig.getHeronLoggingMaximumFiles()));
LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
- LOG.info("\nStarting instance " + instanceId + " for topology " + topologyName
+ String logMsg = "\nStarting instance " + instanceId + " for topology " + topologyName
+ " and topologyId " + topologyId + " for component " + componentName
+ " with taskId " + taskId + " and componentIndex " + componentIndex
+ " and stmgrId " + streamId + " and stmgrPort " + streamPort
- + " and metricsManagerPort " + metricsPort);
+ + " and metricsManagerPort " + metricsPort;
+
+ if (remoteDebuggerPort != null) {
+ logMsg += " and remoteDebuggerPort " + remoteDebuggerPort;
+ }
+
+ LOG.info(logMsg);
LOG.info("System Config: " + systemConfig);
diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
index 3c73e4d..415e460 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltInstance.java
@@ -47,6 +47,7 @@
import com.twitter.heron.common.utils.tuple.TickTuple;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import com.twitter.heron.instance.IInstance;
+import com.twitter.heron.instance.util.InstanceUtils;
import com.twitter.heron.proto.ckptmgr.CheckpointManager;
import com.twitter.heron.proto.system.HeronTuples;
@@ -223,6 +224,7 @@
looper.addTasksOnWakeup(boltTasks);
PrepareTickTupleTimer();
+ InstanceUtils.prepareTimerEvents(looper, helper);
}
@Override
@@ -305,13 +307,9 @@
if (tickTupleFreqMs != null) {
Duration freq = TypeUtils.getDuration(tickTupleFreqMs, ChronoUnit.MILLIS);
- Runnable r = new Runnable() {
- public void run() {
- SendTickTuple();
- }
- };
+ Runnable r = () -> SendTickTuple();
- looper.registerTimerEvent(freq, r);
+ looper.registerPeriodicEvent(freq, r);
}
}
@@ -323,7 +321,5 @@
boltMetrics.executeTuple(t.getSourceStreamId(), t.getSourceComponent(), latency);
collector.sendOutTuples();
- // reschedule ourselves again
- PrepareTickTupleTimer();
}
}
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 7090dbf..7e99117 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -44,6 +44,7 @@
import com.twitter.heron.common.utils.misc.SerializeDeSerializeHelper;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.instance.IInstance;
+import com.twitter.heron.instance.util.InstanceUtils;
import com.twitter.heron.proto.ckptmgr.CheckpointManager;
import com.twitter.heron.proto.system.HeronTuples;
@@ -264,6 +265,8 @@
if (enableMessageTimeouts) {
lookForTimeouts();
}
+
+ InstanceUtils.prepareTimerEvents(looper, helper);
}
/**
diff --git a/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java b/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java
new file mode 100644
index 0000000..1bd7a8d
--- /dev/null
+++ b/heron/instance/src/java/com/twitter/heron/instance/util/InstanceUtils.java
@@ -0,0 +1,43 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.instance.util;
+
+import java.time.Duration;
+import java.util.Map;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.Pair;
+import com.twitter.heron.common.basics.SlaveLooper;
+import com.twitter.heron.common.utils.misc.PhysicalPlanHelper;
+
+public final class InstanceUtils {
+ private InstanceUtils() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void prepareTimerEvents(SlaveLooper looper, PhysicalPlanHelper helper) {
+ Map<String, Pair<Duration, Runnable>> timerEvents =
+ (Map<String, Pair<Duration, Runnable>>) helper.getTopologyContext()
+ .getTopologyConfig().get(Config.TOPOLOGY_TIMER_EVENTS);
+
+ if (timerEvents != null) {
+ for (Map.Entry<String, Pair<Duration, Runnable>> entry : timerEvents.entrySet()) {
+ Duration duration = entry.getValue().getFirst();
+ Runnable task = entry.getValue().getSecond();
+
+ looper.registerPeriodicEvent(duration, task);
+ }
+ }
+ }
+}
diff --git a/heron/instance/src/java/shade.conf b/heron/instance/src/java/shade.conf
index fb7700c..3a45c40 100644
--- a/heron/instance/src/java/shade.conf
+++ b/heron/instance/src/java/shade.conf
@@ -1,4 +1,3 @@
rule com.google.protobuf** com.twitter.heron.shaded.@0
rule org.yaml.snakeyaml** com.twitter.heron.shaded.@0
-rule com.esotericsoftware.kryo** com.twitter.heron.shaded.@0
rule org.apache.commons** com.twitter.heron.shaded.@0
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index e0e663c..e504bcd 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -2,7 +2,7 @@
test_deps_files = \
heron_java_proto_files() + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
diff --git a/heron/metricscachemgr/src/java/BUILD b/heron/metricscachemgr/src/java/BUILD
index 91a0d29..553fd5e 100644
--- a/heron/metricscachemgr/src/java/BUILD
+++ b/heron/metricscachemgr/src/java/BUILD
@@ -10,7 +10,7 @@
"//heron/spi/src/java:utils-spi-java",
"//heron/spi/src/java:statemgr-spi-java",
"//heron/spi/src/java:packing-spi-java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
diff --git a/heron/metricsmgr/src/java/BUILD b/heron/metricsmgr/src/java/BUILD
index bdff429..3e67d62 100644
--- a/heron/metricsmgr/src/java/BUILD
+++ b/heron/metricsmgr/src/java/BUILD
@@ -9,7 +9,7 @@
exclude = ["**/MetricManager.java"],
),
deps = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
@@ -33,7 +33,7 @@
srcs = glob(["**/MetricsManager.java"]),
deps = [
":metricsmgr-java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:network-java",
diff --git a/heron/metricsmgr/tests/java/BUILD b/heron/metricsmgr/tests/java/BUILD
index af6e540..80ecc2d 100644
--- a/heron/metricsmgr/tests/java/BUILD
+++ b/heron/metricsmgr/tests/java/BUILD
@@ -2,7 +2,7 @@
name = "metricsmgr-tests",
srcs = glob(["**/*.java"]),
deps = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:network-java",
"//heron/common/src/java:config-java",
diff --git a/heron/packing/src/java/BUILD b/heron/packing/src/java/BUILD
index d8df225..2b97922 100644
--- a/heron/packing/src/java/BUILD
+++ b/heron/packing/src/java/BUILD
@@ -15,7 +15,7 @@
roundrobin_deps_files = \
heron_java_proto_files() + \
packing_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:statemgr-spi-java",
"//heron/spi/src/java:utils-spi-java",
]
@@ -23,7 +23,7 @@
binpacking_deps_files = \
heron_java_proto_files() + \
packing_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:statemgr-spi-java",
"//heron/spi/src/java:utils-spi-java",
]
@@ -37,7 +37,7 @@
"//heron/spi/src/java:common-spi-java",
"//heron/spi/src/java:packing-spi-java",
"//heron/spi/src/java:utils-spi-java",
- "//heron/api/src/java:api-java"
+ "//heron/api/src/java:api-java-low-level"
],
)
diff --git a/heron/packing/tests/java/BUILD b/heron/packing/tests/java/BUILD
index ad2851a..ec74b63 100644
--- a/heron/packing/tests/java/BUILD
+++ b/heron/packing/tests/java/BUILD
@@ -20,7 +20,7 @@
heron_java_proto_files() + \
packing_deps_files + \
test_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:utils-spi-java",
]
@@ -28,7 +28,7 @@
heron_java_proto_files() + \
packing_deps_files + \
test_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:utils-spi-java",
]
@@ -36,7 +36,7 @@
heron_java_proto_files() + \
packing_deps_files + \
test_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:utils-spi-java",
]
@@ -54,7 +54,7 @@
"//heron/packing/src/java:utils",
"//heron/spi/src/java:packing-spi-java",
"//third_party/java:junit4",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/proto:proto_topology_java",
"//heron/spi/src/java:common-spi-java",
"//heron/spi/src/java:utils-spi-java",
diff --git a/heron/proto/physical_plan.proto b/heron/proto/physical_plan.proto
index cc72fe4..e11af00 100644
--- a/heron/proto/physical_plan.proto
+++ b/heron/proto/physical_plan.proto
@@ -33,6 +33,8 @@
required int32 component_index = 2; // specific to this component
required string component_name = 3;
repeated string params = 4;
+ // the port a remote debugger can be attached to for this instance. Currently JVM instances only
+ optional int32 remote_debugger_port = 5;
}
message Instance {
diff --git a/heron/scheduler-core/src/java/BUILD b/heron/scheduler-core/src/java/BUILD
index 4176368..c8b7545 100644
--- a/heron/scheduler-core/src/java/BUILD
+++ b/heron/scheduler-core/src/java/BUILD
@@ -9,10 +9,11 @@
"//heron/api/src/java:classification",
"@commons_cli_commons_cli//jar",
"@com_google_guava_guava//jar",
+ "@org_apache_commons_commons_lang3//jar",
]
spi_deps_files = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/spi/src/java:common-spi-java",
"//heron/spi/src/java:statemgr-spi-java",
"//heron/spi/src/java:uploader-spi-java",
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
index 0d5f416..0f7b890 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
@@ -56,7 +56,8 @@
CheckpointManagerPort("checkpoint-manager-port"),
StatefulConfigFile("stateful-config-file"),
HealthManagerMode("health-manager-mode"),
- HealthManagerClasspath("health-manager-classpath");
+ HealthManagerClasspath("health-manager-classpath"),
+ JvmRemoteDebuggerPorts("jvm-remote-debugger-ports");
private final String name;
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
index 8838f7d..358604a 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
@@ -17,8 +17,10 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,12 +41,63 @@
import com.twitter.heron.spi.utils.ShellUtils;
public final class SchedulerUtils {
- public static final int PORTS_REQUIRED_FOR_EXECUTOR = 9;
public static final int PORTS_REQUIRED_FOR_SCHEDULER = 1;
public static final String SCHEDULER_COMMAND_LINE_PROPERTIES_OVERRIDE_OPTION = "P";
private static final Logger LOG = Logger.getLogger(SchedulerUtils.class.getName());
+ /**
+ * Enum that defines the type of ports that an heron executor needs
+ */
+
+ public enum ExecutorPort {
+ MASTER_PORT("master", true),
+ TMASTER_CONTROLLER_PORT("tmaster-ctl", true),
+ TMASTER_STATS_PORT("tmaster-stats", true),
+ SHELL_PORT("shell-port", true),
+ METRICS_MANAGER_PORT("metrics-mgr", true),
+ SCHEDULER_PORT("scheduler", true),
+ METRICS_CACHE_MASTER_PORT("metrics-cache-m", true),
+ METRICS_CACHE_STATS_PORT("metrics-cache-s", true),
+ CHECKPOINT_MANAGER_PORT("ckptmgr", true),
+ JVM_REMOTE_DEBUGGER_PORTS("jvm-remote-debugger", false);
+
+ private final String name;
+ private final boolean required;
+
+ ExecutorPort(String name, boolean required) {
+ this.name = name;
+ this.required = required;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public static String getPort(ExecutorPort executorPort,
+ Map<ExecutorPort, String> portMap) {
+ if (!portMap.containsKey(executorPort) && executorPort.isRequired()) {
+ throw new RuntimeException("Required port " + executorPort.getName() + " not provided");
+ }
+
+ return portMap.get(executorPort);
+ }
+
+ public static Set<ExecutorPort> getRequiredPorts() {
+ Set<ExecutorPort> executorPorts = new HashSet<>();
+ for (ExecutorPort executorPort : ExecutorPort.values()) {
+ if (executorPort.isRequired()) {
+ executorPorts.add(executorPort);
+ }
+ }
+ return executorPorts;
+ }
+ }
+
private SchedulerUtils() {
}
@@ -124,50 +177,18 @@
/**
* Utils method to construct the command to start heron-executor
*
- * @param config The static Config
- * @param runtime The runtime Config
- * @param containerIndex the executor/container index
- * @param freePorts list of free ports
- * @return String[] representing the command to start heron-executor
- */
- public static String[] executorCommand(
- Config config,
- Config runtime,
- int containerIndex,
- List<Integer> freePorts) {
- // First let us have some safe checks
- if (freePorts.size() < PORTS_REQUIRED_FOR_EXECUTOR) {
- throw new RuntimeException("Failed to find enough ports for executor");
- }
- for (int port : freePorts) {
- if (port == -1) {
- throw new RuntimeException("Failed to find available ports for executor");
- }
- }
-
- // Convert port to string
- List<String> ports = new LinkedList<>();
- for (int port : freePorts) {
- ports.add(Integer.toString(port));
- }
-
- return getExecutorCommand(config, runtime, containerIndex, ports);
- }
-
- /**
- * Utils method to construct the command to start heron-executor
- *
* @param config The static config
* @param runtime The runtime config
* @param containerIndex the executor/container index
- * @param ports list of free ports in String
+ * @param ports a map of ports to use where the key indicate the port type and the
+ * value is the port
* @return String[] representing the command to start heron-executor
*/
public static String[] getExecutorCommand(
Config config,
Config runtime,
int containerIndex,
- List<String> ports) {
+ Map<ExecutorPort, String> ports) {
List<String> commands = new ArrayList<>();
commands.add(Context.executorBinary(config));
commands.add(createCommandArg(ExecutorFlag.Shard, Integer.toString(containerIndex)));
@@ -184,22 +205,35 @@
*
* @param config The static Config
* @param runtime The runtime Config
- * @param freePorts list of free ports
+ * @param ports a map of ports to use where the key indicate the port type and the
+ * value is the port
* @return String[] representing the arguments to start heron-executor
*/
public static String[] executorCommandArgs(
- Config config, Config runtime, List<String> freePorts) {
+ Config config, Config runtime, Map<ExecutorPort, String> ports) {
TopologyAPI.Topology topology = Runtime.topology(runtime);
- String masterPort = freePorts.get(0);
- String tmasterControllerPort = freePorts.get(1);
- String tmasterStatsPort = freePorts.get(2);
- String shellPort = freePorts.get(3);
- String metricsmgrPort = freePorts.get(4);
- String schedulerPort = freePorts.get(5);
- String metricsCacheMasterPort = freePorts.get(6);
- String metricsCacheStatsPort = freePorts.get(7);
- String ckptmgrPort = freePorts.get(8);
+ String masterPort = ExecutorPort.getPort(
+ ExecutorPort.MASTER_PORT, ports);
+ String tmasterControllerPort = ExecutorPort.getPort(
+ ExecutorPort.TMASTER_CONTROLLER_PORT, ports);
+ String tmasterStatsPort = ExecutorPort.getPort(
+ ExecutorPort.TMASTER_STATS_PORT, ports);
+ String shellPort = ExecutorPort.getPort(
+ ExecutorPort.SHELL_PORT, ports);
+ String metricsmgrPort = ExecutorPort.getPort(
+ ExecutorPort.METRICS_MANAGER_PORT, ports);
+ String schedulerPort = ExecutorPort.getPort(
+ ExecutorPort.SCHEDULER_PORT, ports);
+ String metricsCacheMasterPort = ExecutorPort.getPort(
+ ExecutorPort.METRICS_CACHE_MASTER_PORT, ports);
+ String metricsCacheStatsPort = ExecutorPort.getPort(
+ ExecutorPort.METRICS_CACHE_STATS_PORT, ports);
+ String ckptmgrPort = ExecutorPort.getPort(
+ ExecutorPort.CHECKPOINT_MANAGER_PORT, ports);
+ String remoteDebuggerPorts = ExecutorPort.getPort(
+ ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, ports
+ );
List<String> commands = new ArrayList<>();
commands.add(createCommandArg(ExecutorFlag.TopologyName, topology.getName()));
@@ -280,6 +314,9 @@
commands.add(createCommandArg(ExecutorFlag.HealthManagerMode, healthMgrMode));
commands.add(createCommandArg(ExecutorFlag.HealthManagerClasspath,
Context.healthMgrClassPath(config)));
+ if (remoteDebuggerPorts != null) {
+ commands.add(createCommandArg(ExecutorFlag.JvmRemoteDebuggerPorts, remoteDebuggerPorts));
+ }
return commands.toArray(new String[commands.size()]);
}
diff --git a/heron/scheduler-core/tests/java/BUILD b/heron/scheduler-core/tests/java/BUILD
index d32a823..b2ebb74 100644
--- a/heron/scheduler-core/tests/java/BUILD
+++ b/heron/scheduler-core/tests/java/BUILD
@@ -4,7 +4,7 @@
"@com_google_guava_guava//jar",
"@commons_io_commons_io//jar",
"//third_party/java:powermock",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:utils-java",
"//heron/scheduler-core/src/java:scheduler-java",
diff --git a/heron/schedulers/src/java/BUILD b/heron/schedulers/src/java/BUILD
index 0236024..d81b523 100644
--- a/heron/schedulers/src/java/BUILD
+++ b/heron/schedulers/src/java/BUILD
@@ -18,7 +18,7 @@
]
api_deps_files = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
]
scheduler_deps_files = \
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index 9e4ccaf..daf28c1 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -69,12 +69,22 @@
return cliController.killJob();
}
+ private StMgr searchContainer(Integer id) {
+ String prefix = "stmgr-" + id;
+ for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
+ if (sm.getId().equals(prefix)) {
+ return sm;
+ }
+ }
+ return null;
+ }
+
// Restart an aurora container
@Override
public boolean restart(Integer containerId) {
// there is no backpressure for container 0, delegate to aurora client
if (containerId == null || containerId == 0) {
- cliController.restart(containerId);
+ return cliController.restart(containerId);
}
if (stateMgrAdaptor == null) {
@@ -82,18 +92,24 @@
return false;
}
- int index = containerId - 1; // stmgr container starts from 1
- StMgr contaienrInfo = stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrs(index);
- String host = contaienrInfo.getHostName();
- int port = contaienrInfo.getShellPort();
- String url = "http://" + host + ":" + port + "/killexecutor";
+ StMgr sm = searchContainer(containerId);
+ if (sm == null) {
+ LOG.warning("container not found in pplan " + containerId);
+ return false;
+ }
+
+ String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
LOG.info("sending `kill container` to " + url + "; payload: " + payload);
HttpURLConnection con = NetworkUtils.getHttpConnection(url);
try {
- NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes());
- return NetworkUtils.checkHttpResponseCode(con, 200);
+ if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
+ return NetworkUtils.checkHttpResponseCode(con, 200);
+ } else { // if heron-shell command fails, delegate to aurora client
+ LOG.info("heron-shell killexecutor failed; try aurora client ..");
+ return cliController.restart(containerId);
+ }
} finally {
con.disconnect();
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
index 9c349c6..7799a77 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -16,9 +16,13 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
+
public final class KubernetesConstants {
private KubernetesConstants() {
@@ -60,10 +64,6 @@
public static final String ANNOTATION_PROMETHEUS_PORT = "prometheus.io/port";
public static final String PROMETHEUS_PORT = "8080";
- public static final String[] PORT_NAMES = new String[]{
- "master", "tmaster-ctlr", "tmaster-stats", "shell", "metricsmgr", "scheduler",
- "metrics-cache-m", "metrics-cache-s", "ckptmgr"};
-
public static final String MASTER_PORT = "6001";
public static final String TMASTER_CONTROLLER_PORT = "6002";
public static final String TMASTER_STATS_PORT = "6003";
@@ -73,11 +73,22 @@
public static final String METRICS_CACHE_MASTER_PORT = "6007";
public static final String METRICS_CACHE_STATS_PORT = "6008";
public static final String CHECKPOINT_MGR_PORT = "6009";
+ // port number the start with when more than one port needed for remote debugging
+ public static final String JVM_REMOTE_DEBUGGER_PORT = "6010";
+ public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger";
- public static final String[] PORT_LIST = new String[]{
- MASTER_PORT, TMASTER_CONTROLLER_PORT, TMASTER_STATS_PORT,
- SHELL_PORT, METRICSMGR_PORT, SCHEDULER_PORT, METRICS_CACHE_MASTER_PORT,
- METRICS_CACHE_STATS_PORT, CHECKPOINT_MGR_PORT };
+ public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
+ static {
+ EXECUTOR_PORTS.put(ExecutorPort.MASTER_PORT, MASTER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.TMASTER_CONTROLLER_PORT, TMASTER_CONTROLLER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.TMASTER_STATS_PORT, TMASTER_STATS_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.SHELL_PORT, SHELL_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_MANAGER_PORT, METRICSMGR_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.SCHEDULER_PORT, SCHEDULER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, METRICS_CACHE_MASTER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_STATS_PORT, METRICS_CACHE_STATS_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, CHECKPOINT_MGR_PORT);
+ }
public static final String JOB_LINK =
"/api/v1/proxy/namespaces/kube-system/services/kubernetes-dashboard/#/pod";
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index ca23d1f..21da118 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -15,14 +15,16 @@
package com.twitter.heron.scheduler.kubernetes;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,12 +34,14 @@
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
+import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.TopologyRuntimeManagementException;
import com.twitter.heron.scheduler.UpdateTopologyManager;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.common.Key;
@@ -164,11 +168,17 @@
String[] deploymentConfs =
new String[Ints.checkedCast(Runtime.numContainers(runtimeConfiguration))];
+
for (int i = 0; i < Runtime.numContainers(runtimeConfiguration); i++) {
-
- deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource);
+ Optional<PackingPlan.ContainerPlan> container = packing.getContainer(i);
+ Set<PackingPlan.InstancePlan> instancePlans;
+ if (container.isPresent()) {
+ instancePlans = container.get().getInstances();
+ } else {
+ instancePlans = new HashSet<>();
+ }
+ deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource, instancePlans);
}
-
return deploymentConfs;
}
@@ -182,7 +192,8 @@
*/
protected String buildKubernetesPodSpec(ObjectMapper mapper,
Integer containerIndex,
- Resource containerResource) {
+ Resource containerResource,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ObjectNode instance = mapper.createObjectNode();
instance.put(KubernetesConstants.API_VERSION, KubernetesConstants.API_VERSION_1);
@@ -191,7 +202,8 @@
instance.set(KubernetesConstants.API_SPEC, getContainerSpec(mapper,
containerIndex,
- containerResource));
+ containerResource,
+ instancePlans));
return instance.toString();
}
@@ -303,7 +315,7 @@
setImagePullPolicyIfPresent(containerInfo);
// Port info -- all the same
- containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+ containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, containerPlan.getInstances()));
// In order for the container to run with the correct index, we're copying the base
// configuration for container with index 0, and replacing the container index with
@@ -349,7 +361,8 @@
*/
protected ObjectNode getContainerSpec(ObjectMapper mapper,
int containerIndex,
- Resource containerResource) {
+ Resource containerResource,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ObjectNode containerSpec = mapper.createObjectNode();
ArrayNode containerList = mapper.createArrayNode();
@@ -381,10 +394,10 @@
setImagePullPolicyIfPresent(containerInfo);
// Port information for this container
- containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+ containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, instancePlans));
// Heron command for the container
- String[] command = getExecutorCommand(containerIndex);
+ String[] command = getExecutorCommand(containerIndex, instancePlans.size());
ArrayNode commandsArray = mapper.createArrayNode();
for (int i = 0; i < command.length; i++) {
commandsArray.add(command[i]);
@@ -409,22 +422,46 @@
return containerSpec;
}
+ private List<Integer> getRemoteDebuggerPorts(int numberOfInstances) {
+ List<Integer> ports = new LinkedList<>();
+ for (int i = 0; i < numberOfInstances; i++) {
+ ports.add(Integer.parseInt(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT, 10) + i);
+ }
+
+ return ports;
+ }
+
/**
* Get the ports the container will need to expose so other containers can access its services
*
* @param mapper
*/
- protected ArrayNode getPorts(ObjectMapper mapper) {
+ protected ArrayNode getPorts(ObjectMapper mapper,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ArrayNode ports = mapper.createArrayNode();
- for (int i = 0; i < KubernetesConstants.PORT_NAMES.length; i++) {
+ for (Map.Entry<ExecutorPort, String> entry
+ : KubernetesConstants.EXECUTOR_PORTS.entrySet()) {
ObjectNode port = mapper.createObjectNode();
+ ExecutorPort portName = entry.getKey();
port.put(KubernetesConstants.DOCKER_CONTAINER_PORT,
- Integer.parseInt(KubernetesConstants.PORT_LIST[i], 10));
- port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.PORT_NAMES[i]);
+ Integer.parseInt(entry.getValue(), 10));
+ port.put(KubernetesConstants.PORT_NAME, portName.getName());
ports.add(port);
}
+ // if remote debugger enabled
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+ List<Integer> portsForRemoteDebugging = getRemoteDebuggerPorts(instancePlans.size());
+
+ for (int i = 0; i < portsForRemoteDebugging.size(); i++) {
+ ObjectNode port = mapper.createObjectNode();
+ port.put(KubernetesConstants.DOCKER_CONTAINER_PORT, portsForRemoteDebugging.get(i));
+ port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME
+ + "-" + String.valueOf(i));
+ }
+ }
+
return ports;
}
@@ -449,10 +486,17 @@
*
* @param containerIndex
*/
- protected String[] getExecutorCommand(int containerIndex) {
+ protected String[] getExecutorCommand(int containerIndex, int numInstances) {
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+ KubernetesConstants.EXECUTOR_PORTS.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", getRemoteDebuggerPorts(numInstances)
+ .stream().map(Object::toString)
+ .collect(Collectors.toList())));
+ }
String[] executorCommand =
SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
- containerIndex, Arrays.asList(KubernetesConstants.PORT_LIST));
+ containerIndex, KubernetesConstants.EXECUTOR_PORTS);
+
String[] command = {
"sh",
"-c",
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index beb0c2d..7b598ce 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -32,10 +32,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.UpdateTopologyManager;
+import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.scheduler.IScalable;
@@ -79,9 +82,9 @@
* Start executor process via running an async shell process
*/
@VisibleForTesting
- protected Process startExecutorProcess(int container) {
+ protected Process startExecutorProcess(int container, Set<PackingPlan.InstancePlan> instances) {
return ShellUtils.runASyncProcess(
- getExecutorCommand(container),
+ getExecutorCommand(container, instances),
new File(LocalContext.workingDirectory(config)),
Integer.toString(container));
}
@@ -90,25 +93,27 @@
* Start the executor for the given container
*/
@VisibleForTesting
- protected void startExecutor(final int container) {
+ protected void startExecutor(final int container, Set<PackingPlan.InstancePlan> instances) {
LOG.info("Starting a new executor for container: " + container);
// create a process with the executor command and topology working directory
- final Process containerExecutor = startExecutorProcess(container);
+ final Process containerExecutor = startExecutorProcess(container, instances);
// associate the process and its container id
processToContainer.put(containerExecutor, container);
LOG.info("Started the executor for container: " + container);
// add the container for monitoring
- startExecutorMonitor(container, containerExecutor);
+ startExecutorMonitor(container, containerExecutor, instances);
}
/**
* Start the monitor of a given executor
*/
@VisibleForTesting
- protected void startExecutorMonitor(final int container, final Process containerExecutor) {
+ protected void startExecutorMonitor(final int container,
+ final Process containerExecutor,
+ Set<PackingPlan.InstancePlan> instances) {
// add the container for monitoring
Runnable r = new Runnable() {
@Override
@@ -129,7 +134,7 @@
}
LOG.log(Level.INFO, "Trying to restart container {0}", container);
// restart the container
- startExecutor(processToContainer.remove(containerExecutor));
+ startExecutor(processToContainer.remove(containerExecutor), instances);
} catch (InterruptedException e) {
if (!isTopologyKilled) {
LOG.log(Level.SEVERE, "Process is interrupted: ", e);
@@ -141,14 +146,33 @@
monitorService.submit(r);
}
- private String[] getExecutorCommand(int container) {
- List<Integer> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
- for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
- freePorts.add(SysUtils.getFreePort());
+
+ private String[] getExecutorCommand(int container, Set<PackingPlan.InstancePlan> instances) {
+ Map<ExecutorPort, String> ports = new HashMap<>();
+ for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+ int port = SysUtils.getFreePort();
+ if (port == -1) {
+ throw new RuntimeException("Failed to find available ports for executor");
+ }
+ ports.put(executorPort, String.valueOf(port));
}
- String[] executorCmd = SchedulerUtils.executorCommand(config, runtime, container, freePorts);
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtime))
+ && instances != null) {
+ List<String> remoteDebuggingPorts = new LinkedList<>();
+ int portsForRemoteDebugging = instances.size();
+ for (int i = 0; i < portsForRemoteDebugging; i++) {
+ int port = SysUtils.getFreePort();
+ if (port == -1) {
+ throw new RuntimeException("Failed to find available ports for executor");
+ }
+ remoteDebuggingPorts.add(String.valueOf(port));
+ }
+ ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", remoteDebuggingPorts));
+ }
+ String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
LOG.info("Executor command line: " + Arrays.toString(executorCmd));
return executorCmd;
}
@@ -162,11 +186,11 @@
synchronized (processToContainer) {
LOG.info("Starting executor for TMaster");
- startExecutor(0);
+ startExecutor(0, null);
// for each container, run its own executor
for (PackingPlan.ContainerPlan container : packing.getContainers()) {
- startExecutor(container.getId());
+ startExecutor(container.getId(), container.getInstances());
}
}
@@ -271,7 +295,7 @@
throw new RuntimeException(String.format("Found active container for %s, "
+ "cannot launch a duplicate container.", container.getId()));
}
- startExecutor(container.getId());
+ startExecutor(container.getId(), container.getInstances());
}
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
index 07dcc85..acdfe25 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonConstants.java
@@ -14,6 +14,11 @@
package com.twitter.heron.scheduler.marathon;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
+
public final class MarathonConstants {
private MarathonConstants() {
@@ -50,10 +55,6 @@
public static final String DOCKER_FORCE_PULL = "forcePullImage";
public static final String DOCKER_NETWORK_BRIDGE = "BRIDGE";
- public static final String[] PORT_NAMES = new String[]{
- "master", "tmaster-controller", "tmaster-stats", "shell", "metricsmgr", "scheduler",
- "metrics-cache-master", "metrics-cache-stats", "ckptmgr"};
-
public static final String MASTER_PORT = "$PORT0";
public static final String TMASTER_CONTROLLER_PORT = "$PORT1";
public static final String TMASTER_STATS_PORT = "$PORT2";
@@ -64,10 +65,18 @@
public static final String METRICS_CACHE_STATS_PORT = "$PORT7";
public static final String CKPTMGR_PORT = "$PORT8";
- public static final String[] PORT_LIST = new String[]{
- MASTER_PORT, TMASTER_CONTROLLER_PORT, TMASTER_STATS_PORT,
- SHELL_PORT, METRICSMGR_PORT, SCHEDULER_PORT, METRICS_CACHE_MASTER_PORT,
- METRICS_CACHE_STATS_PORT, CKPTMGR_PORT};
+ public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
+ static {
+ EXECUTOR_PORTS.put(ExecutorPort.MASTER_PORT, MASTER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.TMASTER_CONTROLLER_PORT, TMASTER_CONTROLLER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.TMASTER_STATS_PORT, TMASTER_STATS_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.SHELL_PORT, SHELL_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_MANAGER_PORT, METRICSMGR_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.SCHEDULER_PORT, SCHEDULER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, METRICS_CACHE_MASTER_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.METRICS_CACHE_STATS_PORT, METRICS_CACHE_STATS_PORT);
+ EXECUTOR_PORTS.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, CKPTMGR_PORT);
+ }
public static final String JOB_LINK = "/ui/#/group/%2F";
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
index 067f289..38190c8 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/marathon/MarathonScheduler.java
@@ -14,9 +14,9 @@
package com.twitter.heron.scheduler.marathon;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.logging.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -27,6 +27,7 @@
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.common.Key;
@@ -197,12 +198,13 @@
protected ArrayNode getPorts(ObjectMapper mapper) {
ArrayNode ports = mapper.createArrayNode();
- for (String portName : MarathonConstants.PORT_NAMES) {
+ for (Map.Entry<ExecutorPort, String> entry
+ : MarathonConstants.EXECUTOR_PORTS.entrySet()) {
ObjectNode port = mapper.createObjectNode();
port.put(MarathonConstants.DOCKER_CONTAINER_PORT, 0);
port.put(MarathonConstants.PROTOCOL, MarathonConstants.TCP);
port.put(MarathonConstants.HOST_PORT, 0);
- port.put(MarathonConstants.PORT_NAME, portName);
+ port.put(MarathonConstants.PORT_NAME, entry.getKey().getName());
ports.add(port);
}
@@ -212,7 +214,7 @@
protected String getExecutorCommand(int containerIndex) {
String[] commands = SchedulerUtils.getExecutorCommand(config, runtime,
- containerIndex, Arrays.asList(MarathonConstants.PORT_LIST));
+ containerIndex, MarathonConstants.EXECUTOR_PORTS);
return "cd $MESOS_SANDBOX && " + Joiner.on(" ").join(commands);
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
index b5b3b70..6d6298a 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/MesosScheduler.java
@@ -269,6 +269,6 @@
// Convert them from bytes to MB
container.diskInMB = maxResourceContainer.getDisk().asMegabytes();
container.memInMB = maxResourceContainer.getRam().asMegabytes();
- container.ports = SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR;
+ container.ports = SchedulerUtils.ExecutorPort.getRequiredPorts().size();
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
index 8649178..324dfb6 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTask.java
@@ -15,6 +15,7 @@
package com.twitter.heron.scheduler.mesos.framework;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -25,6 +26,7 @@
import org.apache.mesos.Protos;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
/**
@@ -205,8 +207,19 @@
protected String executorCommand(
Config config, Config runtime, int containerIndex) {
+ Map<ExecutorPort, String> ports = new HashMap<>();
+ ports.put(ExecutorPort.MASTER_PORT, String.valueOf(freePorts.get(0)));
+ ports.put(ExecutorPort.TMASTER_CONTROLLER_PORT, String.valueOf(freePorts.get(1)));
+ ports.put(ExecutorPort.TMASTER_STATS_PORT, String.valueOf(freePorts.get(2)));
+ ports.put(ExecutorPort.SHELL_PORT, String.valueOf(freePorts.get(3)));
+ ports.put(ExecutorPort.METRICS_MANAGER_PORT, String.valueOf(freePorts.get(4)));
+ ports.put(ExecutorPort.SCHEDULER_PORT, String.valueOf(freePorts.get(5)));
+ ports.put(ExecutorPort.METRICS_CACHE_MASTER_PORT, String.valueOf(freePorts.get(6)));
+ ports.put(ExecutorPort.METRICS_CACHE_STATS_PORT, String.valueOf(freePorts.get(7)));
+ ports.put(ExecutorPort.CHECKPOINT_MANAGER_PORT, String.valueOf(freePorts.get(8)));
+
String[] executorCmd =
- SchedulerUtils.executorCommand(config, runtime, containerIndex, freePorts);
+ SchedulerUtils.getExecutorCommand(config, runtime, containerIndex, ports);
return join(executorCmd, " ");
}
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
index 3bc35ad..6f89900 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/slurm/SlurmScheduler.java
@@ -17,7 +17,9 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -25,6 +27,7 @@
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.packing.PackingPlan;
@@ -125,12 +128,17 @@
}
protected String[] getExecutorCommand(PackingPlan packing) {
- List<String> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
- for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
- freePorts.add(Integer.toString(SysUtils.getFreePort()));
+ Map<ExecutorPort, String> ports = new HashMap<>();
+ for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+ int port = SysUtils.getFreePort();
+ if (port == -1) {
+ throw new RuntimeException("Failed to find available ports for executor");
+ }
+ ports.put(executorPort, String.valueOf(port));
}
- String[] executorCmd = SchedulerUtils.executorCommandArgs(this.config, this.runtime, freePorts);
+ String[] executorCmd = SchedulerUtils.executorCommandArgs(this.config, this.runtime,
+ ports);
LOG.log(Level.FINE, "Executor command line: ", Arrays.toString(executorCmd));
return executorCmd;
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
index f7d09b4..9321c6f 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/yarn/HeronExecutorTask.java
@@ -15,10 +15,9 @@
package com.twitter.heron.scheduler.yarn;
import java.io.File;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -37,6 +36,7 @@
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.scheduler.utils.SchedulerConfigUtils;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
+import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.Cluster;
import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.ComponentRamMap;
import com.twitter.heron.scheduler.yarn.HeronConfigurationOptions.Environ;
@@ -156,20 +156,24 @@
verboseMode,
topology);
- List<Integer> freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
- for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
- freePorts.add(SysUtils.getFreePort());
- }
-
Config runtime = Config.newBuilder()
.put(Key.COMPONENT_RAMMAP, componentRamMap)
.put(Key.TOPOLOGY_DEFINITION, topology)
.build();
- String[] executorCmd = SchedulerUtils.executorCommand(config,
+ Map<ExecutorPort, String> ports = new HashMap<>();
+ for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
+ int port = SysUtils.getFreePort();
+ if (port == -1) {
+ throw new RuntimeException("Failed to find available ports for executor");
+ }
+ ports.put(executorPort, String.valueOf(port));
+ }
+
+ String[] executorCmd = SchedulerUtils.getExecutorCommand(config,
runtime,
heronExecutorId,
- freePorts);
+ ports);
LOG.info("Executor command line: " + Arrays.toString(executorCmd));
return executorCmd;
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 1bf3a14..b54f288 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -3,7 +3,7 @@
common_deps_files = [
"@com_google_guava_guava//jar",
"//third_party/java:powermock",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:utils-java",
"//heron/scheduler-core/src/java:scheduler-java",
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
index 1e06d6d..5940d8b 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
@@ -69,7 +69,7 @@
public static void beforeClass() throws Exception {
scheduler = Mockito.spy(KubernetesScheduler.class);
Mockito.doReturn(EXECUTOR_CMD).when(scheduler)
- .getExecutorCommand(Mockito.anyInt());
+ .getExecutorCommand(Mockito.anyInt(), Mockito.anyInt());
}
@AfterClass
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index 484ad8f..badaaa9 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -25,13 +25,14 @@
import org.junit.Test;
import org.mockito.Mockito;
+import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.utils.PackingTestUtils;
-
+@SuppressWarnings("unchecked")
public class LocalSchedulerTest {
private static final String TOPOLOGY_NAME = "testTopology";
private static final int MAX_WAITING_SECOND = 10;
@@ -46,6 +47,19 @@
Mockito.when(config.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
runtime = Mockito.mock(Config.class);
+
+ scheduler.initialize(config, runtime);
+ Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(TopologyAPI.Topology
+ .newBuilder()
+ .setId("a")
+ .setName("a")
+ .setState(TopologyAPI.TopologyState.RUNNING)
+ .setTopologyConfig(
+ TopologyAPI.Config.newBuilder()
+ .addKvs(TopologyAPI.Config.KeyValue.newBuilder()
+ .setKey(com.twitter.heron.api.Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE)
+ .setValue("false"))).build());
+
scheduler.initialize(config, runtime);
}
@@ -71,11 +85,14 @@
@Test
public void testOnSchedule() throws Exception {
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+ Mockito.anySet());
Process[] mockProcesses = new Process[4];
for (int i = 0; i < 4; i++) {
mockProcesses[i] = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i);
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i, instances);
}
PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
@@ -92,9 +109,12 @@
// id 2 was not in the container plan
continue;
}
- Mockito.verify(scheduler).startExecutor(i);
- Mockito.verify(scheduler).startExecutorProcess(i);
- Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i]);
+
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.verify(scheduler).startExecutor(i, instances);
+ Mockito.verify(scheduler).startExecutorProcess(i, instances);
+ Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i], instances);
}
}
@@ -105,12 +125,15 @@
//verify plan is deployed and containers are created
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+ Mockito.anySet());
Process mockProcessTM = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(0);
+ Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(
+ 0, null);
Process mockProcessWorker1 = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(1);
+ Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(
+ 1, PackingTestUtils.testContainerPlan(1).getInstances());
PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
@@ -118,24 +141,30 @@
Mockito.when(packingPlan.getContainers()).thenReturn(containers);
Assert.assertTrue(scheduler.onSchedule(packingPlan));
- Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt(),
+ Mockito.anySet());
//now verify add container adds new container
Process mockProcessWorker2 = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(3);
+ Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(
+ 3, PackingTestUtils.testContainerPlan(3).getInstances());
containers.clear();
containers.add(PackingTestUtils.testContainerPlan(3));
scheduler.addContainers(containers);
- Mockito.verify(scheduler).startExecutor(3);
+ Mockito.verify(scheduler).startExecutor(3,
+ PackingTestUtils.testContainerPlan(3).getInstances());
Process mockProcess = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(Mockito.anyInt());
+ Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(
+ Mockito.anyInt(), Mockito.anySet());
containers.clear();
containers.add(PackingTestUtils.testContainerPlan(4));
containers.add(PackingTestUtils.testContainerPlan(5));
scheduler.addContainers(containers);
- Mockito.verify(scheduler).startExecutor(4);
- Mockito.verify(scheduler).startExecutor(5);
+ Mockito.verify(scheduler).startExecutor(4,
+ PackingTestUtils.testContainerPlan(4).getInstances());
+ Mockito.verify(scheduler).startExecutor(5,
+ PackingTestUtils.testContainerPlan(5).getInstances());
}
/**
@@ -147,13 +176,17 @@
//verify plan is deployed and containers are created
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(),
+ Mockito.any(Process.class), Mockito.anySet());
Process[] processes = new Process[LOCAL_NUM_CONTAINER];
Set<PackingPlan.ContainerPlan> existingContainers = new HashSet<>();
for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
processes[i] = Mockito.mock(Process.class);
- Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.doReturn(processes[i]).when(scheduler)
+ .startExecutorProcess(i, instances);
if (i > 0) {
// ignore the container for TMaster. existing containers simulate the containers created
// by packing plan
@@ -165,7 +198,8 @@
Mockito.when(packingPlan.getContainers()).thenReturn(existingContainers);
Assert.assertTrue(scheduler.onSchedule(packingPlan));
verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4, 5);
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
Set<PackingPlan.ContainerPlan> containersToRemove = new HashSet<>();
PackingPlan.ContainerPlan containerToRemove =
@@ -175,7 +209,8 @@
verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4);
Mockito.verify(processes[LOCAL_NUM_CONTAINER - 1]).destroy();
// verify no new process restarts
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
containersToRemove.clear();
containersToRemove.add(PackingTestUtils.testContainerPlan(1));
@@ -185,7 +220,8 @@
Mockito.verify(processes[1]).destroy();
Mockito.verify(processes[2]).destroy();
// verify no new process restarts
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
}
private void verifyIdsOfLaunchedContainers(int... ids) {
@@ -249,17 +285,21 @@
int exitValue = 1;
Process containerExecutor = Mockito.mock(Process.class);
Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
- Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+ Mockito.doNothing().when(scheduler).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
// Start the process
scheduler.getProcessToContainer().put(containerExecutor, containerId);
- scheduler.startExecutorMonitor(containerId, containerExecutor);
+ scheduler.startExecutorMonitor(
+ containerId, containerExecutor,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
// Shut down the MonitorService
scheduler.getMonitorService().shutdown();
scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
// The dead process should be restarted
- Mockito.verify(scheduler).startExecutor(containerId);
+ Mockito.verify(scheduler).startExecutor(containerId,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
Assert.assertFalse(scheduler.isTopologyKilled());
}
@@ -270,20 +310,22 @@
Process containerExecutor = Mockito.mock(Process.class);
Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
- Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+ Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt(), Mockito.anySet());
// Set the killed flag and the dead process should not be restarted
scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance());
// Start the process
scheduler.getProcessToContainer().put(containerExecutor, containerId);
- scheduler.startExecutorMonitor(containerId, containerExecutor);
+ scheduler.startExecutorMonitor(containerId, containerExecutor,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
// Shut down the MonitorService
scheduler.getMonitorService().shutdown();
scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
// The dead process should not be restarted
- Mockito.verify(scheduler, Mockito.never()).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.never())
+ .startExecutor(Mockito.anyInt(), Mockito.anySet());
Assert.assertTrue(scheduler.isTopologyKilled());
}
}
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
index 2684b3a..52bcdca 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/MesosSchedulerTest.java
@@ -138,7 +138,7 @@
Assert.assertEquals(CPU, container.cpu, 0.01);
Assert.assertEquals(MEM, ByteAmount.fromMegabytes(((Double) container.memInMB).longValue()));
Assert.assertEquals(DISK, ByteAmount.fromMegabytes(((Double) container.diskInMB).longValue()));
- Assert.assertEquals(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR, container.ports);
+ Assert.assertEquals(SchedulerUtils.ExecutorPort.getRequiredPorts().size(), container.ports);
Assert.assertEquals(2, container.dependencies.size());
Assert.assertTrue(container.dependencies.contains(CORE_PACKAGE_URI));
Assert.assertTrue(container.dependencies.contains(TOPOLOGY_PACKAGE_URI));
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
index 0365252..8c7de7e 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/mesos/framework/LaunchableTaskTest.java
@@ -110,7 +110,7 @@
container.cpu = CPU;
container.diskInMB = DISK;
container.memInMB = MEM;
- container.ports = SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR;
+ container.ports = SchedulerUtils.ExecutorPort.getRequiredPorts().size();
container.shell = true;
container.retries = Integer.MAX_VALUE;
container.dependencies = new ArrayList<>();
@@ -122,7 +122,7 @@
// List of free ports
List<Integer> freePorts = new ArrayList<>();
- for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
+ for (int i = 0; i < SchedulerUtils.ExecutorPort.getRequiredPorts().size(); i++) {
freePorts.add(i);
}
diff --git a/heron/simulator/src/java/BUILD b/heron/simulator/src/java/BUILD
index 39da8d2..221495d 100644
--- a/heron/simulator/src/java/BUILD
+++ b/heron/simulator/src/java/BUILD
@@ -6,7 +6,7 @@
simulator_deps_files = \
heron_java_proto_files() + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:utils-java",
diff --git a/heron/simulator/tests/java/BUILD b/heron/simulator/tests/java/BUILD
index 25d1497..12db81a 100644
--- a/heron/simulator/tests/java/BUILD
+++ b/heron/simulator/tests/java/BUILD
@@ -4,7 +4,7 @@
name = "simulator-tests",
srcs = glob(["**/*.java"]),
deps = heron_java_proto_files() + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:utils-java",
diff --git a/heron/spi/src/java/BUILD b/heron/spi/src/java/BUILD
index 80283b2..33311ac 100644
--- a/heron/spi/src/java/BUILD
+++ b/heron/spi/src/java/BUILD
@@ -27,7 +27,7 @@
]),
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/api/src/java:classification",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
@@ -58,7 +58,7 @@
"//heron/common/src/java:basics-java",
"//heron/common/src/java:config-java",
"//heron/common/src/java:utils-java",
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"@com_google_guava_guava//jar",
]
@@ -78,7 +78,7 @@
]
packing_deps_files = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
":common-spi-java",
"//heron/api/src/java:classification",
"//heron/common/src/java:basics-java",
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
index ee3695f..14a4d60 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
@@ -23,15 +23,12 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
public class ShellUtilsTest {
- private static final Logger LOG = Logger.getLogger(ShellUtilsTest.class.getName());
-
private static String generateRandomLongString(int size) {
StringBuilder builder = new StringBuilder();
Random random = new Random();
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
index af22c36..54694e9 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
@@ -14,12 +14,19 @@
package com.twitter.heron.spi.utils;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.fail;
public class UploaderUtilsTest {
@@ -55,4 +62,42 @@
UploaderUtils.generateFilename(topologyName, role, tag, version, extension);
Assert.assertTrue(customizedFilename.endsWith(extension));
}
+
+ @Test
+ public void testCopyToOutputStream() throws Exception {
+ String fileContent = "temp file test content";
+ String prefix = "myTestFile";
+ String suffix = ".tmp";
+ File tempFile = null;
+ try {
+ // create temp file
+ tempFile = File.createTempFile(prefix, suffix);
+
+ // write content to temp file
+ writeContentToFile(tempFile.getAbsolutePath(), fileContent);
+
+ // copy file content to output stream
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ UploaderUtils.copyToOutputStream(tempFile.getAbsolutePath(), out);
+ Assert.assertEquals(fileContent, new String(out.toByteArray()));
+ } finally {
+ if (tempFile != null) {
+ tempFile.deleteOnExit();
+ }
+ }
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testCopyToOutputStreamWithInvalidFile() throws Exception {
+ UploaderUtils.copyToOutputStream("invalid_file_name", new ByteArrayOutputStream());
+ }
+
+ private void writeContentToFile(String fileName, String content) {
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) {
+ bw.write(content);
+ } catch (IOException e) {
+ fail("Unexpected IOException has been thrown so unit test fails. Error message: "
+ + e.getMessage());
+ }
+ }
}
diff --git a/heron/statemgrs/src/java/BUILD b/heron/statemgrs/src/java/BUILD
index 82e81d6..4757c4f 100644
--- a/heron/statemgrs/src/java/BUILD
+++ b/heron/statemgrs/src/java/BUILD
@@ -12,11 +12,11 @@
localfs_deps_files = \
common_deps_files + [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/spi/src/java:heron-spi",
]
-
+
zookeeper_deps_files = \
localfs_deps_files + [
"@org_apache_curator_curator_client//jar",
@@ -31,7 +31,7 @@
srcs = glob(
["**/*.java"],
),
- deps = zookeeper_deps_files,
+ deps = zookeeper_deps_files,
)
java_library(
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index acb58eb..d677019 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -426,6 +426,7 @@
Config config = Config.newBuilder()
.put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
.put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
+ .put(Key.SCHEDULER_IS_SERVICE, false)
.build();
CuratorStateManager stateManager = new CuratorStateManager();
stateManager.doMain(args, config);
diff --git a/heron/tools/apiserver/src/java/BUILD b/heron/tools/apiserver/src/java/BUILD
index efd161e..91f3ee4 100644
--- a/heron/tools/apiserver/src/java/BUILD
+++ b/heron/tools/apiserver/src/java/BUILD
@@ -6,7 +6,7 @@
"//heron/spi/src/java:heron-spi",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:utils-java",
- "//heron/api/src/java:api-java"
+ "//heron/api/src/java:api-java-low-level"
]
scheduler_deps_files = [
diff --git a/heron/tools/cli/src/python/cdefs.py b/heron/tools/cli/src/python/cdefs.py
index ac4fed0..85e9afd 100644
--- a/heron/tools/cli/src/python/cdefs.py
+++ b/heron/tools/cli/src/python/cdefs.py
@@ -14,7 +14,6 @@
''' cdefs.py '''
import os
-import heron.common.src.python.utils.log as Log
import heron.tools.cli.src.python.cliconfig as cliconfig
import heron.tools.common.src.python.utils.config as config
@@ -50,6 +49,5 @@
'''
config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
if not os.path.isdir(config_path):
- Log.error("Cluster config directory \'%s\' does not exist", config_path)
return False
return True
diff --git a/heron/tools/cli/src/python/main.py b/heron/tools/cli/src/python/main.py
index 426dc94..7db01e9 100644
--- a/heron/tools/cli/src/python/main.py
+++ b/heron/tools/cli/src/python/main.py
@@ -221,6 +221,7 @@
# check if the cluster config directory exists
if not cdefs.check_direct_mode_cluster_definition(cluster, config_path):
+ Log.error("Cluster config directory \'%s\' does not exist", config_path)
return dict()
config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
index 85db4f4..18e9ed1 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
@@ -30,13 +30,14 @@
public class LocalFileSystemUploaderTest {
+ private static final String TOPOLOGY_PACKAGE_FILE_NAME = "some-topology.tar";
+
private Config config;
private String fileSystemDirectory;
private String testTopologyDirectory;
@Before
public void before() throws Exception {
-
// form the file system directory using bazel environ files
fileSystemDirectory = Paths.get(System.getenv("JAVA_RUNFILES"), "topologies").toString();
@@ -60,17 +61,17 @@
}
@Test
- public void testUploader() throws Exception {
-
+ public void testUploader() {
// identify the location of the test topology tar file
- String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
Assert.assertNotNull(uploader.uploadPackage());
// verify if the file exists
@@ -79,33 +80,32 @@
}
@Test(expected = UploaderException.class)
- public void testSourceNotExists() throws Exception {
-
+ public void testSourceNotExists() {
// identify the location of the test topology tar file
String topologyPackage = Paths.get(
testTopologyDirectory, "doesnot-exist-topology.tar").toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
uploader.uploadPackage();
}
@Test
- public void testUndo() throws Exception {
-
+ public void testUndo() {
// identify the location of the test topology tar file
- String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
Assert.assertNotNull(uploader.uploadPackage());
// verify if the file exists
@@ -113,7 +113,95 @@
Assert.assertTrue(new File(destFile).isFile());
// now undo the file
- uploader.undo();
+ Assert.assertTrue(uploader.undo());
Assert.assertFalse(new File(destFile).isFile());
}
+
+ @Test
+ public void testUseDefaultFileSystemDirectoryWhenNotSet() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ // set file system directory as null
+ Config newConfig = Config.newBuilder()
+ .putAll(config)
+ .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+ .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), null)
+ .build();
+
+ // create the uploader
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+
+ // get default file system directory
+ String defaultFileSystemDirectory = LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString();
+
+ String destDirectory = uploader.getTopologyDirectory();
+ String destFile = uploader.getTopologyFile();
+
+ // verify usage of default file system directory for destination directory and file
+ Assert.assertEquals(destDirectory, defaultFileSystemDirectory);
+ Assert.assertTrue(destFile.contains(defaultFileSystemDirectory));
+ }
+
+ @Test
+ public void testUploadPackageWhenTopologyFileAlreadyExists() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ Config newConfig = Config.newBuilder()
+ .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
+
+ // create the uploader and load the package
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+ Assert.assertNotNull(uploader.uploadPackage());
+
+ // verify if the file exists
+ String destFile = uploader.getTopologyFile();
+ Assert.assertTrue(new File(destFile).isFile());
+
+ // load same package again by overriding existing one
+ Assert.assertNotNull(uploader.uploadPackage());
+ String destFile2 = uploader.getTopologyFile();
+ Assert.assertTrue(new File(destFile2).isFile());
+
+ // verify that existing file is overridden
+ Assert.assertEquals(destFile, destFile2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUploadPackageWhenFileSystemDirectoryIsInvalid() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ String invalidFileSystemDirectory = "invalid%path";
+
+ // set invalid file system directory
+ Config newConfig = Config.newBuilder()
+ .putAll(config)
+ .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+ .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), invalidFileSystemDirectory).build();
+
+ // create the uploader and load the package
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+ uploader.uploadPackage();
+ }
+
+ @Test
+ public void testGetUri() {
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ Assert.assertEquals(uploader.getUri("testFileName").toString(), "file://testFileName");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetUriWhenDestFileNameIsInvalid() {
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.getUri("invalid_%_DestFilePath");
+ }
+
}
diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index ca48984..279cc8a 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -14,6 +14,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
"//heron/proto:proto_topology_java",
"//third_party/java:jackson",
@@ -31,6 +32,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
"//third_party/java:hadoop-core",
"//third_party/java:jackson",
@@ -46,6 +48,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
"@com_googlecode_json_simple_json_simple//jar",
"@commons_cli_commons_cli//jar",
@@ -61,6 +64,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
":common",
":core"
@@ -74,6 +78,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
"@commons_cli_commons_cli//jar",
"@com_googlecode_json_simple_json_simple//jar",
@@ -96,6 +101,7 @@
),
deps = [
"//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//storm-compatibility/src/java:storm-compatibility-java",
":common",
":core"
diff --git a/storm-compatibility-examples/src/java/BUILD b/storm-compatibility-examples/src/java/BUILD
index da237dc..0f5ef31 100644
--- a/storm-compatibility-examples/src/java/BUILD
+++ b/storm-compatibility-examples/src/java/BUILD
@@ -4,7 +4,7 @@
name='heron-storm-compatibility-examples-unshaded',
srcs = glob(["**/*.java"]),
deps = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//storm-compatibility/src/java:storm-compatibility-java",
],
diff --git a/storm-compatibility/src/java/BUILD b/storm-compatibility/src/java/BUILD
index 91ae2d7..e5e3409 100644
--- a/storm-compatibility/src/java/BUILD
+++ b/storm-compatibility/src/java/BUILD
@@ -5,7 +5,7 @@
load("/tools/rules/javadoc", "java_doc")
storm_deps_files = [
- "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java",
"//heron/proto:proto_topology_java",
diff --git a/storm-compatibility/src/java/backtype/storm/StormSubmitter.java b/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
index 7d68d5e..c99e058 100644
--- a/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
+++ b/storm-compatibility/src/java/backtype/storm/StormSubmitter.java
@@ -53,7 +53,8 @@
Map stormConfig,
StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- // First do config translation
+ // First do topology config translation. Bolt and Spout config translation is handled
+ // in their own DeclarerImpl classes.
com.twitter.heron.api.Config heronConfig = ConfigUtils.translateConfig(stormConfig);
// Now submit a heron topology
diff --git a/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java b/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
index f162588..ae0bc78 100644
--- a/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
+++ b/storm-compatibility/src/java/backtype/storm/topology/BoltDeclarerImpl.java
@@ -18,11 +18,13 @@
package backtype.storm.topology;
+import java.util.HashMap;
import java.util.Map;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.grouping.CustomStreamGroupingDelegate;
import backtype.storm.tuple.Fields;
+import backtype.storm.utils.ConfigUtils;
import backtype.storm.utils.Utils;
public class BoltDeclarerImpl implements BoltDeclarer {
@@ -35,13 +37,18 @@
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public BoltDeclarer addConfigurations(Map conf) {
- delegate.addConfigurations((Map<String, Object>) conf);
+ // Translate config to heron config and then apply.
+ Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+ delegate.addConfigurations(heronConf);
return this;
}
@Override
public BoltDeclarer addConfiguration(String config, Object value) {
- delegate.addConfiguration(config, value);
+ Map<String, Object> configMap = new HashMap<String, Object>();
+ configMap.put(config, value);
+
+ addConfigurations(configMap);
return this;
}
diff --git a/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java b/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
index 7cf4092..29c5313 100644
--- a/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
+++ b/storm-compatibility/src/java/backtype/storm/topology/SpoutDeclarerImpl.java
@@ -18,8 +18,11 @@
package backtype.storm.topology;
+import java.util.HashMap;
import java.util.Map;
+import backtype.storm.utils.ConfigUtils;
+
public class SpoutDeclarerImpl implements SpoutDeclarer {
private com.twitter.heron.api.topology.SpoutDeclarer delegate;
@@ -30,13 +33,18 @@
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public SpoutDeclarer addConfigurations(Map conf) {
- delegate.addConfigurations((Map<String, Object>) conf);
+ // Translate config to heron config and then apply.
+ Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+ delegate.addConfigurations(heronConf);
return this;
}
@Override
public SpoutDeclarer addConfiguration(String config, Object value) {
- delegate.addConfiguration(config, value);
+ Map<String, Object> configMap = new HashMap<String, Object>();
+ configMap.put(config, value);
+
+ addConfigurations(configMap);
return this;
}
diff --git a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
index 25cbf7a..18e25de 100644
--- a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
@@ -39,57 +39,31 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public static Config translateConfig(Map stormConfig) {
Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
// Look at serialization stuff first
doSerializationTranslation(heronConfig);
// Now look at supported apis
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
- heronConfig.put(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
- heronConfig.get(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_WORKERS)) {
- Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
- com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
- Integer nAckers =
- Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
- if (nAckers > 0) {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
- Integer nSecs =
- Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
- Integer nPending =
- Utils.getInt(
- heronConfig.get(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
- com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
- Integer tSecs =
- Utils.getInt(
- heronConfig.get(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
- com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
- }
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_DEBUG)) {
- Boolean dBg =
- Boolean.parseBoolean(heronConfig.get(backtype.storm.Config.TOPOLOGY_DEBUG).toString());
- com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
- }
+ doStormTranslation(heronConfig);
doTaskHooksTranslation(heronConfig);
+ doTopologyLevelTranslation(heronConfig);
+
+ return heronConfig;
+ }
+
+ /**
+ * Translate storm config to heron config for components
+ * @param stormConfig the storm config
+ * @return a heron config
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static Config translateComponentConfig(Map stormConfig) {
+ Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
+ doStormTranslation(heronConfig);
+
return heronConfig;
}
@@ -139,4 +113,66 @@
heronConfig.setAutoTaskHooks(translationHooks);
}
}
+
+ /**
+ * Translate storm config into heron config. This funciton is used by both topology
+ * and component level config translations. Therefore NO config should be generated
+ * when a key does NOT exist if the key is for both topology and component.
+ * Otherwise the component config might overwrite the topolgy config with a wrong value.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doStormTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+ heronConfig.put(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
+ heronConfig.get(backtype.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
+ }
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_WORKERS)) {
+ Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
+ com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
+ }
+
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+ Integer nSecs =
+ Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
+ }
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
+ Integer nPending =
+ Utils.getInt(
+ heronConfig.get(backtype.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
+ com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
+ }
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
+ Integer tSecs =
+ Utils.getInt(
+ heronConfig.get(backtype.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
+ com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
+ }
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_DEBUG)) {
+ Boolean dBg =
+ Boolean.parseBoolean(heronConfig.get(backtype.storm.Config.TOPOLOGY_DEBUG).toString());
+ com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
+ }
+ }
+
+ /**
+ * Translate topology config.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doTopologyLevelTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+ Integer nAckers =
+ Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+ if (nAckers > 0) {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ }
}
diff --git a/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java b/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
index 24cb0d8..4936067 100644
--- a/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
+++ b/storm-compatibility/src/java/org/apache/storm/StormSubmitter.java
@@ -53,7 +53,8 @@
Map stormConfig,
StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
- // First do config translation
+ // First do topology config translation. Bolt and Spout config translation is handled
+ // in their own DeclarerImpl classes.
com.twitter.heron.api.Config heronConfig = ConfigUtils.translateConfig(stormConfig);
// Now submit a heron topology
diff --git a/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java b/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
index f4aaa65..8cbf926 100644
--- a/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
+++ b/storm-compatibility/src/java/org/apache/storm/topology/BoltDeclarerImpl.java
@@ -18,11 +18,13 @@
package org.apache.storm.topology;
+import java.util.HashMap;
import java.util.Map;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.CustomStreamGroupingDelegate;
import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
public class BoltDeclarerImpl implements BoltDeclarer {
@@ -35,13 +37,18 @@
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public BoltDeclarer addConfigurations(Map conf) {
- delegate.addConfigurations((Map<String, Object>) conf);
+ // Translate config to heron config and then apply.
+ Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+ delegate.addConfigurations(heronConf);
return this;
}
@Override
public BoltDeclarer addConfiguration(String config, Object value) {
- delegate.addConfiguration(config, value);
+ Map<String, Object> configMap = new HashMap<String, Object>();
+ configMap.put(config, value);
+
+ addConfigurations(configMap);
return this;
}
diff --git a/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java b/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
index 28dcc90..f6e8675 100644
--- a/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
+++ b/storm-compatibility/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java
@@ -18,8 +18,11 @@
package org.apache.storm.topology;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.storm.utils.ConfigUtils;
+
public class SpoutDeclarerImpl implements SpoutDeclarer {
private com.twitter.heron.api.topology.SpoutDeclarer delegate;
@@ -30,13 +33,18 @@
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public SpoutDeclarer addConfigurations(Map conf) {
- delegate.addConfigurations((Map<String, Object>) conf);
+ // Translate config to heron config and then apply.
+ Map<String, Object> heronConf = ConfigUtils.translateComponentConfig(conf);
+ delegate.addConfigurations(heronConf);
return this;
}
@Override
public SpoutDeclarer addConfiguration(String config, Object value) {
- delegate.addConfiguration(config, value);
+ Map<String, Object> configMap = new HashMap<String, Object>();
+ configMap.put(config, value);
+
+ addConfigurations(configMap);
return this;
}
diff --git a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
index 910b2a7..d7b23a2 100644
--- a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
@@ -32,7 +32,7 @@
}
/**
- * Translate storm config to heron config
+ * Translate storm config to heron config for topology
* @param stormConfig the storm config
* @return a heron config
*/
@@ -43,57 +43,26 @@
doSerializationTranslation(heronConfig);
// Now look at supported apis
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
- heronConfig.put(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
- heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_WORKERS)) {
- Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
- com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
- Integer nAckers =
- Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
- if (nAckers > 0) {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
- Integer nSecs =
- Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
- Integer nPending =
- Utils.getInt(
- heronConfig.get(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
- com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
- Integer tSecs =
- Utils.getInt(
- heronConfig.get(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
- com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_DEBUG)) {
- Boolean dBg =
- Boolean.parseBoolean(heronConfig.get(org.apache.storm.Config.TOPOLOGY_DEBUG).toString());
- com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
- }
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT)) {
- com.twitter.heron.api.Config.setEnvironment(heronConfig,
- (Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
- }
+ doStormTranslation(heronConfig);
doTaskHooksTranslation(heronConfig);
+ doTopologyLevelTranslation(heronConfig);
+
+ return heronConfig;
+ }
+
+ /**
+ * Translate storm config to heron config for components
+ * @param stormConfig the storm config
+ * @return a heron config
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static Config translateComponentConfig(Map stormConfig) {
+ Config heronConfig = new Config((Map<String, Object>) stormConfig);
+
+ doStormTranslation(heronConfig);
+
return heronConfig;
}
@@ -144,4 +113,69 @@
heronConfig.setAutoTaskHooks(translationHooks);
}
}
+
+ /**
+ * Translate storm config into heron config. This funciton is used by both topology
+ * and component level config translations. Therefore NO config should be generated
+ * when a key does NOT exist if the key is for both topology and component.
+ * Otherwise the component config might overwrite the topolgy config with a wrong value.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doStormTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
+ heronConfig.put(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS,
+ heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS).toString());
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_WORKERS)) {
+ Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
+ com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+ Integer nSecs =
+ Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ com.twitter.heron.api.Config.setMessageTimeoutSecs(heronConfig, nSecs);
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING)) {
+ Integer nPending =
+ Utils.getInt(
+ heronConfig.get(org.apache.storm.Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
+ com.twitter.heron.api.Config.setMaxSpoutPending(heronConfig, nPending);
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS)) {
+ Integer tSecs =
+ Utils.getInt(
+ heronConfig.get(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS).toString());
+ com.twitter.heron.api.Config.setTickTupleFrequency(heronConfig, tSecs);
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_DEBUG)) {
+ Boolean dBg =
+ Boolean.parseBoolean(heronConfig.get(org.apache.storm.Config.TOPOLOGY_DEBUG).toString());
+ com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
+ }
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT)) {
+ com.twitter.heron.api.Config.setEnvironment(heronConfig,
+ (Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
+ }
+ }
+
+ /**
+ * Translate topology config.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doTopologyLevelTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+ Integer nAckers =
+ Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+ if (nAckers > 0) {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ }
}
diff --git a/third_party/java/BUILD b/third_party/java/BUILD
index 9645eea..9d9053f 100644
--- a/third_party/java/BUILD
+++ b/third_party/java/BUILD
@@ -96,6 +96,7 @@
"@com_esotericsoftware_reflectasm//jar",
"@com_esotericsoftware_minlog//jar",
"@org_objenesis_objenesis//jar",
+ "@org_objectweb_asm//jar",
],
)
diff --git a/website/config.yaml b/website/config.yaml
index b616119..a3f6d47 100755
--- a/website/config.yaml
+++ b/website/config.yaml
@@ -20,9 +20,9 @@
author: Twitter, Inc.
description: A realtime, distributed, fault-tolerant stream processing engine from Twitter
versions:
- heron: 0.16.4
+ heron: 0.16.5
bazel: 0.5.4
- heronpy: 0.16.4
+ heronpy: 0.16.5
assets:
favicon:
small: /img/favicon-16x16.png
diff --git a/website/content/docs/operators/deployment/uploaders/localfs.md b/website/content/docs/operators/deployment/uploaders/localfs.md
index 4e9e2a9..bd5411a 100644
--- a/website/content/docs/operators/deployment/uploaders/localfs.md
+++ b/website/content/docs/operators/deployment/uploaders/localfs.md
@@ -27,7 +27,7 @@
* `heron.uploader.localfs.file.system.directory` --- Provides the name of the directory where
the topology jar should be uploaded. The name of the directory should be unique per cluster
You could use the Heron environment variables `${CLUSTER}` that will be substituted by cluster
-name.
+name. If this is not set, `${HOME}/.herondata/repository/${CLUSTER}/${ROLE}/${TOPOLOGY}` will be set as default.
### Example Local File System Uploader Configuration