Merge branch 'master' into karthik/rmyamlbinary
diff --git a/.travis.yml b/.travis.yml
index a4ef75a..ee0e203 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@
apt:
sources:
- ubuntu-toolchain-r-test
- - george-edison55-precise-backports # cmake 3.2.3 / doxygen 1.8.3
packages:
- gcc-4.8
@@ -23,8 +22,6 @@
- pkg-config
- zip
- zlib1g-dev
- - cmake
- - cmake-data
env:
- CC=gcc-4.8 CXX=g++-4.8 CPP=cpp-4.8 CXXCPP=cpp-4.8
diff --git a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
index 75338db..33bd0a0 100644
--- a/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/api/ComponentJVMOptionsTopology.java
@@ -67,6 +67,11 @@
conf.setContainerRamRequested(ByteAmount.fromGigabytes(2));
conf.setContainerCpuRequested(2);
+ // Specify the size of ram padding to per container.
+ // Notice, this config will be considered as a hint,
+ // and it's up to the packing algorithm to determine whether to apply this hint
+ conf.setContainerRamPadding(ByteAmount.fromGigabytes(2));
+
if (args != null && args.length > 0) {
conf.setNumStmgrs(2);
HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
index a24f013..ba07289 100644
--- a/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/IntegerProcessingTopology.java
@@ -35,8 +35,8 @@
}
// Heron resources to be applied to the topology
- private static final float CPU = 2.0f;
- private static final long GIGABYTES_OF_RAM = 6;
+ private static final float CPU = 1.5f;
+ private static final int GIGABYTES_OF_RAM = 8;
private static final int NUM_CONTAINERS = 2;
/**
@@ -49,7 +49,6 @@
Streamlet<Integer> zeroes = builder.newSource(() -> 0);
builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
- .setNumPartitions(2)
.setName("random-ints")
.map(i -> i + 1)
.setName("add-one")
diff --git a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
index 8bfa62e..703d205 100644
--- a/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
+++ b/examples/src/java/com/twitter/heron/examples/streamlet/WindowedWordCountTopology.java
@@ -84,7 +84,6 @@
Config config = new Config.Builder()
.setNumContainers(topologyParallelism)
- .useKryoSerializer()
.build();
// Fetches the topology name from the first command-line argument
diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java
index b231dd6..b281e54 100644
--- a/heron/api/src/java/com/twitter/heron/api/Config.java
+++ b/heron/api/src/java/com/twitter/heron/api/Config.java
@@ -275,6 +275,11 @@
*/
public static final String TOPOLOGY_TIMER_EVENTS = "topology.timer.events";
+ /**
+ * Enable Remote debugging for java heron instances
+ */
+ public static final String TOPOLOGY_REMOTE_DEBUGGING_ENABLE = "topology.remote.debugging.enable";
+
private static final long serialVersionUID = 2550967708478837032L;
// We maintain a list of all user exposed vars
private static Set<String> apiVars = new HashSet<>();
@@ -310,6 +315,7 @@
apiVars.add(TOPOLOGY_ADDITIONAL_CLASSPATH);
apiVars.add(TOPOLOGY_UPDATE_DEACTIVATE_WAIT_SECS);
apiVars.add(TOPOLOGY_UPDATE_REACTIVATE_WAIT_SECS);
+ apiVars.add(TOPOLOGY_REMOTE_DEBUGGING_ENABLE);
}
public Config() {
@@ -688,4 +694,8 @@
}
timers.put(name, Pair.of(interval, task));
}
+
+ public void setTopologyRemoteDebugging(boolean isOn) {
+ this.put(Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, String.valueOf(isOn));
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
index d15bd8c..374f0a7 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/WindowedBoltExecutor.java
@@ -52,10 +52,8 @@
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
-import com.twitter.heron.api.utils.TupleUtils;
import com.twitter.heron.api.windowing.Event;
import com.twitter.heron.api.windowing.EvictionPolicy;
-import com.twitter.heron.api.windowing.TimerEvent;
import com.twitter.heron.api.windowing.TimestampExtractor;
import com.twitter.heron.api.windowing.TriggerPolicy;
import com.twitter.heron.api.windowing.TupleWindowImpl;
@@ -221,17 +219,15 @@
maxLagMs = DEFAULT_MAX_LAG_MS;
}
// watermark interval
- int watermarkInterval;
+ long watermarkIntervalMs;
if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
- watermarkInterval = ((Number) topoConf.get(WindowingConfigs
+ watermarkIntervalMs = ((Number) topoConf.get(WindowingConfigs
.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
} else {
- watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
+ watermarkIntervalMs = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
}
- // Use tick tuple to perodically generate watermarks
- Config.setTickTupleFrequencyMs(topoConf, watermarkInterval);
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager,
- maxLagMs, getComponentStreams(context));
+ waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkIntervalMs,
+ maxLagMs, getComponentStreams(context), topoConf);
} else {
if (topoConf.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
throw new IllegalArgumentException(
@@ -304,9 +300,8 @@
return new WatermarkTimeTriggerPolicy<>(slidingIntervalDurationMs, manager,
evictionPolicy, manager);
} else {
- // set tick tuple frequency in milliseconds for timer in TimeTriggerPolicy
- Config.setTickTupleFrequencyMs(topoConf, slidingIntervalDurationMs);
- return new TimeTriggerPolicy<>(slidingIntervalDurationMs, manager, evictionPolicy);
+ return new TimeTriggerPolicy<>(slidingIntervalDurationMs, manager,
+ evictionPolicy, topoConf);
}
}
}
@@ -352,30 +347,22 @@
@Override
public void execute(Tuple input) {
- if (TupleUtils.isTick(input)) {
- long currTime = System.currentTimeMillis();
- windowManager.add(new TimerEvent<>(input, currTime));
- if (isTupleTs()) {
- waterMarkEventGenerator.run();
+ if (isTupleTs()) {
+ long ts = timestampExtractor.extractTimestamp(input);
+ if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
+ windowManager.add(input, ts);
+ } else {
+ if (lateTupleStream != null) {
+ windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
+ } else {
+ LOG.info(String.format(
+ "Received a late tuple %s with ts %d. This will not be " + "processed"
+ + ".", input, ts));
+ }
+ windowedOutputCollector.ack(input);
}
} else {
- if (isTupleTs()) {
- long ts = timestampExtractor.extractTimestamp(input);
- if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
- windowManager.add(input, ts);
- } else {
- if (lateTupleStream != null) {
- windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
- } else {
- LOG.info(String.format(
- "Received a late tuple %s with ts %d. This will not be " + "processed"
- + ".", input, ts));
- }
- windowedOutputCollector.ack(input);
- }
- } else {
- windowManager.add(input);
- }
+ windowManager.add(input);
}
}
diff --git a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
index 9a487f4..cf52f7c 100644
--- a/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
+++ b/heron/api/src/java/com/twitter/heron/api/utils/TopologyUtils.java
@@ -290,4 +290,10 @@
throw new IllegalStateException("Failed to find topology defn file");
}
+
+ public static boolean getTopologyRemoteDebuggingEnabled(TopologyAPI.Topology topology) {
+ List<TopologyAPI.Config.KeyValue> topologyConfig = topology.getTopologyConfig().getKvsList();
+ return Boolean.parseBoolean(TopologyUtils.getConfigWithDefault(
+ topologyConfig, Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE, "false"));
+ }
}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/Event.java b/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
index e26c917..6c227d3 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/Event.java
@@ -63,10 +63,4 @@
* @return true if this is a watermark event
*/
boolean isWatermark();
-
- /**
- * If this is a timer event or not. Timer events use Tick Tuples to trigger
- * @return true if this a timer event
- */
- boolean isTimer();
}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java b/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
index 182d4b7..f5dc839 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/EventImpl.java
@@ -60,11 +60,6 @@
}
@Override
- public boolean isTimer() {
- return false;
- }
-
- @Override
public String toString() {
return "EventImpl{" + "event=" + event + ", ts=" + ts + '}';
}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java b/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java
deleted file mode 100644
index 027dd24..0000000
--- a/heron/api/src/java/com/twitter/heron/api/windowing/TimerEvent.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2017 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.twitter.heron.api.windowing;
-
-import java.io.Serializable;
-
-/**
- * Timer event used to trigger actions in windowing that needs to occur on a set frequency
- */
-public class TimerEvent<T extends Serializable> extends EventImpl<T> {
- private static final long serialVersionUID = -9174292711796600228L;
-
- public TimerEvent(T event, long ts) {
- super(event, ts);
- }
-
- @Override
- public boolean isTimer() {
- return true;
- }
-
- @Override
- public String toString() {
- return "TimerEvent{} " + super.toString();
- }
-}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
index 19a5560..ba0ac25 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WaterMarkEventGenerator.java
@@ -34,10 +34,12 @@
package com.twitter.heron.api.windowing;
import java.io.Serializable;
+import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import com.twitter.heron.api.Config;
import com.twitter.heron.api.generated.TopologyAPI;
/**
@@ -47,28 +49,35 @@
* any tuple coming with an earlier timestamp can be considered as late events.
*/
public class WaterMarkEventGenerator<T extends Serializable> {
+ private final Map<TopologyAPI.StreamId, Long> streamToTs;
private final WindowManager<T> windowManager;
+ private long watermarkIntervalMs;
private final int eventTsLag;
private final Set<TopologyAPI.StreamId> inputStreams;
- private final Map<TopologyAPI.StreamId, Long> streamToTs;
+ private Map<String, Object> topoConf;
private volatile long lastWaterMarkTs;
- private boolean started = false;
/**
* Creates a new WatermarkEventGenerator.
*
* @param windowManager The window manager this generator will submit watermark events to
* interval
+ * @param watermarkIntervalMs the interval at which watermarks should be emitted
* @param eventTsLagMs The max allowed lag behind the last watermark event before an event is
* considered late
* @param inputStreams The input streams this generator is expected to handle
+ * @param topoConf topology configurations
*/
- public WaterMarkEventGenerator(WindowManager<T> windowManager, int eventTsLagMs,
- Set<TopologyAPI.StreamId> inputStreams) {
- this.windowManager = windowManager;
+ public WaterMarkEventGenerator(WindowManager<T> windowManager, long watermarkIntervalMs,
+ int eventTsLagMs,
+ Set<TopologyAPI.StreamId> inputStreams,
+ Map<String, Object> topoConf) {
streamToTs = new ConcurrentHashMap<>();
+ this.windowManager = windowManager;
+ this.watermarkIntervalMs = watermarkIntervalMs;
this.eventTsLag = eventTsLagMs;
this.inputStreams = inputStreams;
+ this.topoConf = topoConf;
}
/**
@@ -85,12 +94,10 @@
}
public void run() {
- if (started) {
- long waterMarkTs = computeWaterMarkTs();
- if (waterMarkTs > lastWaterMarkTs) {
- this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
- lastWaterMarkTs = waterMarkTs;
- }
+ long waterMarkTs = computeWaterMarkTs();
+ if (waterMarkTs > lastWaterMarkTs) {
+ this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
+ lastWaterMarkTs = waterMarkTs;
}
}
@@ -110,6 +117,11 @@
}
public void start() {
- started = true;
+ Config.registerTopologyTimerEvents(
+ topoConf,
+ "WaterMarkEventGeneratorTimer",
+ Duration.ofMillis(watermarkIntervalMs),
+ () -> run()
+ );
}
}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java b/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
index fe05d1b..84ac2ca 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/WindowManager.java
@@ -44,7 +44,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import com.twitter.heron.api.windowing.EvictionPolicy.Action;
@@ -84,7 +83,6 @@
private final List<T> expiredEvents;
private final Set<Event<T>> prevWindowEvents;
private final AtomicInteger eventsSinceLastExpiry;
- private final ReentrantLock lock;
/**
* Constructs a {@link WindowManager}
@@ -100,8 +98,6 @@
expiredEvents = new ArrayList<>();
prevWindowEvents = new HashSet<>();
eventsSinceLastExpiry = new AtomicInteger();
- lock = new ReentrantLock(true);
-
}
/**
@@ -150,8 +146,6 @@
// watermark events are not added to the queue.
if (windowEvent.isWatermark()) {
LOG.fine(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
- } else if (windowEvent.isTimer()) {
- LOG.fine(String.format("Got timer event with ts %d", windowEvent.getTimestamp()));
} else {
queue.add(windowEvent);
}
@@ -166,18 +160,14 @@
public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
- try {
- lock.lock();
- /*
- * scan the entire window to handle out of order events in
- * the case of time based windows.
- */
- windowEvents = scanEvents(true);
- expired = new ArrayList<>(expiredEvents);
- expiredEvents.clear();
- } finally {
- lock.unlock();
- }
+
+ /*
+ * scan the entire window to handle out of order events in
+ * the case of time based windows.
+ */
+ windowEvents = scanEvents(true);
+ expired = new ArrayList<>(expiredEvents);
+ expiredEvents.clear();
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
@@ -239,25 +229,22 @@
LOG.fine(String.format("Scan events, eviction policy %s", evictionPolicy));
List<T> eventsToExpire = new ArrayList<>();
List<Event<T>> eventsToProcess = new ArrayList<>();
- try {
- lock.lock();
- Iterator<Event<T>> it = queue.iterator();
- while (it.hasNext()) {
- Event<T> windowEvent = it.next();
- Action action = evictionPolicy.evict(windowEvent);
- if (action == EXPIRE) {
- eventsToExpire.add(windowEvent.get());
- it.remove();
- } else if (!fullScan || action == STOP) {
- break;
- } else if (action == PROCESS) {
- eventsToProcess.add(windowEvent);
- }
+
+ Iterator<Event<T>> it = queue.iterator();
+ while (it.hasNext()) {
+ Event<T> windowEvent = it.next();
+ Action action = evictionPolicy.evict(windowEvent);
+ if (action == EXPIRE) {
+ eventsToExpire.add(windowEvent.get());
+ it.remove();
+ } else if (!fullScan || action == STOP) {
+ break;
+ } else if (action == PROCESS) {
+ eventsToProcess.add(windowEvent);
}
- expiredEvents.addAll(eventsToExpire);
- } finally {
- lock.unlock();
}
+ expiredEvents.addAll(eventsToExpire);
+
eventsSinceLastExpiry.set(0);
LOG.fine(String.format("[%d] events expired from window.", eventsToExpire.size()));
if (!eventsToExpire.isEmpty()) {
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
index 2a60a44..869ee4d 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/evictors/CountEvictionPolicy.java
@@ -76,7 +76,7 @@
@Override
public void track(Event<T> event) {
- if (!event.isWatermark() && !event.isTimer()) {
+ if (!event.isWatermark()) {
currentCount.incrementAndGet();
}
}
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
index 3b154b9..9fb6791 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/CountTriggerPolicy.java
@@ -65,7 +65,7 @@
@Override
public void track(Event<T> event) {
- if (started && !event.isWatermark() && !event.isTimer()) {
+ if (started && !event.isWatermark()) {
if (currentCount.incrementAndGet() >= count) {
evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
handler.onTrigger();
diff --git a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
index 3999d06..e07b328 100644
--- a/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
+++ b/heron/api/src/java/com/twitter/heron/api/windowing/triggers/TimeTriggerPolicy.java
@@ -33,7 +33,10 @@
package com.twitter.heron.api.windowing.triggers;
import java.io.Serializable;
+import java.time.Duration;
+import java.util.Map;
+import com.twitter.heron.api.Config;
import com.twitter.heron.api.windowing.DefaultEvictionContext;
import com.twitter.heron.api.windowing.Event;
import com.twitter.heron.api.windowing.EvictionPolicy;
@@ -49,25 +52,24 @@
private long duration;
private final TriggerHandler handler;
private final EvictionPolicy<T, ?> evictionPolicy;
- private boolean started = false;
+ private Map<String, Object> topoConf;
public TimeTriggerPolicy(long millis, TriggerHandler handler) {
- this(millis, handler, null);
+ this(millis, handler, null, new Config());
}
public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy<T, ?>
- evictionPolicy) {
+ evictionPolicy, Map<String, Object> topoConf) {
this.duration = millis;
this.handler = handler;
this.evictionPolicy = evictionPolicy;
+ this.topoConf = topoConf;
}
@Override
public void track(Event<T> event) {
- if (started && event.isTimer()) {
- triggerTask();
- }
+
}
@Override
@@ -77,7 +79,8 @@
@Override
public void start() {
- started = true;
+ Config.registerTopologyTimerEvents(this.topoConf, "TimeTriggerPolicyTimer",
+ Duration.ofMillis(this.duration), () -> triggerTask());
}
@Override
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Config.java b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
index a6ee37a..5f3590b 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Config.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Config.java
@@ -16,7 +16,6 @@
import java.io.Serializable;
-import com.twitter.heron.common.basics.ByteAmount;
import com.twitter.heron.streamlet.impl.KryoSerializer;
/**
@@ -27,8 +26,11 @@
*/
public final class Config implements Serializable {
private static final long serialVersionUID = 6204498077403076352L;
-
- private com.twitter.heron.api.Config heronConfig;
+ private final com.twitter.heron.api.Config heronConfig;
+ private final int numContainers;
+ private final DeliverySemantics deliverySemantics;
+ private final Serializer serializer;
+ private final Resources resources;
public enum DeliverySemantics {
ATMOST_ONCE,
@@ -36,7 +38,16 @@
EFFECTIVELY_ONCE
}
+ public enum Serializer {
+ KRYO,
+ JAVA
+ }
+
private Config(Builder builder) {
+ numContainers = builder.numContainers;
+ deliverySemantics = builder.deliverySemantics;
+ serializer = builder.serializer;
+ resources = builder.resources;
heronConfig = builder.config;
}
@@ -49,6 +60,22 @@
return heronConfig;
}
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public DeliverySemantics getDeliverySemantics() {
+ return deliverySemantics;
+ }
+
+ public Serializer getSerializer() {
+ return serializer;
+ }
+
+ public Resources getResources() {
+ return resources;
+ }
+
private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
DeliverySemantics semantics) {
switch (semantics) {
@@ -65,36 +92,47 @@
public static class Builder {
private com.twitter.heron.api.Config config;
+ private int numContainers;
+ private Serializer serializer;
+ private DeliverySemantics deliverySemantics;
+ private Resources resources;
public Builder() {
+ serializer = Serializer.KRYO;
+ numContainers = 1;
+ deliverySemantics = DeliverySemantics.ATMOST_ONCE;
+ resources = Resources.defaultResources();
config = new com.twitter.heron.api.Config();
}
/**
* Sets the number of containers to run this topology
- * @param numContainers The number of containers to distribute this topology
+ * @param containers The number of containers to distribute this topology
*/
- public Builder setNumContainers(int numContainers) {
- config.setNumStmgrs(numContainers);
+ public Builder setNumContainers(int containers) {
+ numContainers = containers;
+ config.setNumStmgrs(containers);
return this;
}
/**
* Sets resources used per container by this topology
- * @param resources The resource to dedicate per container
+ * @param containerResources The resource to dedicate per container
*/
- public Builder setContainerResources(Resources resources) {
+ public Builder setContainerResources(Resources containerResources) {
+ resources = containerResources;
config.setContainerCpuRequested(resources.getCpu());
- config.setContainerRamRequested(ByteAmount.fromBytes(resources.getRam()));
+ config.setContainerRamRequested(resources.getRam());
return this;
}
/**
* Sets the delivery semantics of the topology
- * @param semantic The delivery semantic to be enforced
+ * @param semantics The delivery semantic to be enforced
*/
- public Builder setDeliverySemantics(DeliverySemantics semantic) {
- config.setTopologyReliabilityMode(Config.translateSemantics(semantic));
+ public Builder setDeliverySemantics(DeliverySemantics semantics) {
+ deliverySemantics = semantics;
+ config.setTopologyReliabilityMode(Config.translateSemantics(semantics));
return this;
}
@@ -108,20 +146,29 @@
return this;
}
- /**
- * Sets the topology to use the Kryo serializer for serializing
- * streamlet elements
- */
- public Builder useKryoSerializer() {
- try {
- config.setSerializationClassName(new KryoSerializer().getClass().getName());
- } catch (NoClassDefFoundError e) {
- throw new RuntimeException("Linking with kryo is needed because useKryoSerializer is used");
+ private void applySerializer(Serializer topologySerializer) {
+ if (topologySerializer == Serializer.KRYO) {
+ try {
+ config.setSerializationClassName(KryoSerializer.class.getName());
+ } catch (NoClassDefFoundError e) {
+ throw new RuntimeException(
+ "Linking with Kryo is needed because setTopologySerializer is used");
+ }
}
+ }
+
+ /**
+ * Sets the topology to use the specified topologySerializer for serializing
+ * streamlet elements
+ * @param topologySerializer The topologySerializer to be used in this topology
+ */
+ public Builder setTopologySerializer(Serializer topologySerializer) {
+ serializer = topologySerializer;
return this;
}
public Config build() {
+ applySerializer(serializer);
return new Config(this);
}
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
index 1deffcf..2e60151 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Resources.java
@@ -16,6 +16,8 @@
import java.io.Serializable;
+import com.twitter.heron.common.basics.ByteAmount;
+
/**
* Resources needed by the topology are encapsulated in this class.
* Currently we deal with CPU and RAM. Others can be added later.
@@ -23,7 +25,7 @@
public final class Resources implements Serializable {
private static final long serialVersionUID = 630451253428388496L;
private float cpu;
- private long ram;
+ private ByteAmount ram;
private Resources(Builder builder) {
this.cpu = builder.cpu;
@@ -39,10 +41,22 @@
return cpu;
}
- public long getRam() {
+ public ByteAmount getRam() {
return ram;
}
+ public long getRamBytes() {
+ return ram.asBytes();
+ }
+
+ public long getRamMegabytes() {
+ return ram.asMegabytes();
+ }
+
+ public long getRamGigabytes() {
+ return ram.asGigabytes();
+ }
+
@Override
public String toString() {
return String.format("{ CPU: %s RAM: %s }", String.valueOf(cpu), String.valueOf(ram));
@@ -50,11 +64,11 @@
public static class Builder {
private float cpu;
- private long ram;
+ private ByteAmount ram;
public Builder() {
this.cpu = 1.0f;
- this.ram = 104857600;
+ this.ram = ByteAmount.fromBytes(104857600);
}
/**
@@ -62,7 +76,7 @@
* @param nram The number of megabytes of RAM
*/
public Builder setRamInMB(long nram) {
- this.ram = nram * 1024;
+ this.ram = ByteAmount.fromMegabytes(nram);
return this;
}
@@ -71,7 +85,7 @@
* @param nram The number of gigabytes of RAM
*/
public Builder setRamInGB(long nram) {
- this.ram = nram * 1024 * 1024;
+ this.ram = ByteAmount.fromGigabytes(nram);
return this;
}
@@ -89,7 +103,7 @@
* @param containerRam The number of bytes of RAM
*/
public Builder setRam(long containerRam) {
- this.ram = containerRam;
+ this.ram = ByteAmount.fromBytes(containerRam);
return this;
}
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java b/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
index aec84d7..324d240 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/SerializablePredicate.java
@@ -19,7 +19,7 @@
/**
* All user supplied transformation functions have to be serializable.
- * Thus all Strealmet transformation definitions take Serializable
+ * Thus all Streamlet transformation definitions take Serializable
* Functions as their input. We simply decorate java.util. function
* definitions with a Serializable tag to ensure that any supplied
* lambda functions automatically become serializable.
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Sink.java b/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
index beb3e96..1e2bef1 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Sink.java
@@ -18,7 +18,7 @@
/**
* Sink is how Streamlet's end. The put method
- * invokation consumes the tuple into say external database/cache, etc.
+ * invocation consumes the tuple into say external database/cache, etc.
* setup/cleanup is where the sink can do any one time setup work, like
* establishing/closing connection to sources, etc.
*/
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Source.java b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
index fe2330a..b87717b 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Source.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Source.java
@@ -19,7 +19,7 @@
/**
* Source is how Streamlet's originate. The get method
- * invokation returns new element that form the tuples of the streamlet.
+ * invocation returns new element that form the tuples of the streamlet.
* setup/cleanup is where the generator can do any one time setup work, like
* establishing/closing connection to sources, etc.
*/
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java b/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
index 882c813..4d66eb8 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/Streamlet.java
@@ -34,7 +34,7 @@
* Streamlet. One can think of a transformation attaching itself to the stream and processing
* each tuple as they go by. Thus the parallelism of any operator is implicitly determined
* by the number of partitions of the stream that it is operating on. If a particular
- * tranformation wants to operate at a different parallelism, one can repartition the
+ * transformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
@InterfaceStability.Evolving
@@ -180,7 +180,7 @@
T identity, SerializableBiFunction<T, R, ? extends T> reduceFn);
/**
- * Returns a new Streamlet thats the union of this and the ‘other’ streamlet. Essentially
+ * Returns a new Streamlet that is the union of this and the ‘other’ streamlet. Essentially
* the new streamlet will contain tuples belonging to both Streamlets
*/
Streamlet<R> union(Streamlet<? extends R> other);
diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
index 606ea0c..ca88eb2 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/StreamletImpl.java
@@ -66,7 +66,7 @@
* Streamlet. One can think of a transformation attaching itself to the stream and processing
* each tuple as they go by. Thus the parallelism of any operator is implicitly determined
* by the number of partitions of the stream that it is operating on. If a particular
- * tranformation wants to operate at a different parallelism, one can repartition the
+ * transformation wants to operate at a different parallelism, one can repartition the
* Streamlet before doing the transformation.
*/
public abstract class StreamletImpl<R> implements Streamlet<R> {
@@ -109,8 +109,8 @@
*/
@Override
public Streamlet<R> setName(String sName) {
- if (sName == null || sName.isEmpty()) {
- throw new IllegalArgumentException("Streamlet name cannot be null/empty");
+ if (sName == null || sName.trim().isEmpty()) {
+ throw new IllegalArgumentException("Streamlet name cannot be null/blank");
}
this.name = sName;
return this;
diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
index 01ef89a..12ceed7 100644
--- a/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
+++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WaterMarkEventGeneratorTest.java
@@ -23,6 +23,7 @@
import org.junit.Before;
import org.junit.Test;
+import com.twitter.heron.api.Config;
import com.twitter.heron.api.generated.TopologyAPI;
import static org.junit.Assert.*;
@@ -48,8 +49,8 @@
}
};
// set watermark interval to a high value and trigger manually to fix timing issues
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, Collections
- .singleton(streamId("s1")));
+ waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L, 5, Collections
+ .singleton(streamId("s1")), new Config());
waterMarkEventGenerator.start();
}
@@ -77,7 +78,8 @@
Set<TopologyAPI.StreamId> streams = new HashSet<>();
streams.add(streamId("s1"));
streams.add(streamId("s2"));
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5, streams);
+ waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L,
+ 5, streams, new Config());
waterMarkEventGenerator.start();
waterMarkEventGenerator.track(streamId("s1"), 100);
diff --git a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
index a5a2ccd..5c6be6e 100644
--- a/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
+++ b/heron/api/tests/java/com/twitter/heron/api/windowing/WindowManagerTest.java
@@ -26,6 +26,7 @@
import org.junit.Before;
import org.junit.Test;
+import com.twitter.heron.api.Config;
import com.twitter.heron.api.windowing.evictors.CountEvictionPolicy;
import com.twitter.heron.api.windowing.evictors.TimeEvictionPolicy;
import com.twitter.heron.api.windowing.evictors.WatermarkCountEvictionPolicy;
@@ -225,7 +226,7 @@
* Set it to a large value and trigger manually.
*/
TriggerPolicy<Integer, ?> triggerPolicy = new TimeTriggerPolicy<Integer>(Duration.ofDays(1)
- .toMillis(), windowManager, evictionPolicy);
+ .toMillis(), windowManager, evictionPolicy, new Config());
triggerPolicy.start();
windowManager.setTriggerPolicy(triggerPolicy);
long now = System.currentTimeMillis();
diff --git a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
index 069b6e2..d609787 100644
--- a/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/com/twitter/heron/streamlet/impl/StreamletImplTest.java
@@ -18,11 +18,14 @@
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.streamlet.Config;
import com.twitter.heron.streamlet.Context;
import com.twitter.heron.streamlet.Resources;
import com.twitter.heron.streamlet.SerializableTransformer;
@@ -260,15 +263,69 @@
@Test
public void testResourcesBuilder() {
- Resources defaultResoures = Resources.defaultResources();
- assertEquals(0, Float.compare(defaultResoures.getCpu(), 1.0f));
- assertEquals(defaultResoures.getRam(), 104857600);
+ Resources defaultResources = Resources.defaultResources();
+ assertEquals(0, Float.compare(defaultResources.getCpu(), 1.0f));
+ assertEquals(defaultResources.getRam(), ByteAmount.fromMegabytes(100));
- Resources res2 = new Resources.Builder()
+ Resources nonDefaultResources = new Resources.Builder()
.setCpu(5.1f)
.setRamInGB(20)
.build();
- assertEquals(0, Float.compare(res2.getCpu(), 5.1f));
- assertEquals(res2.getRam(), 20 * 1024 * 1024);
+ assertEquals(0, Float.compare(nonDefaultResources.getCpu(), 5.1f));
+ assertEquals(nonDefaultResources.getRam(), ByteAmount.fromGigabytes(20));
}
+
+ @Test
+ public void testConfigBuilder() {
+ Config defaultConfig = Config.defaultConfig();
+ assertEquals(defaultConfig.getDeliverySemantics(), Config.DeliverySemantics.ATMOST_ONCE);
+ assertEquals(defaultConfig.getNumContainers(), 1);
+ assertEquals(0, Float.compare(defaultConfig.getResources().getCpu(), 1.0f));
+ assertEquals(defaultConfig.getResources().getRam(), ByteAmount.fromMegabytes(100));
+ assertEquals(defaultConfig.getSerializer(), Config.Serializer.KRYO);
+
+ Resources nonDefaultResources = new Resources.Builder()
+ .setCpu(3.1f)
+ .setRamInMB(2500)
+ .build();
+
+ Config nonDefaultConfig = new Config.Builder()
+ .setContainerResources(nonDefaultResources)
+ .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
+ .setNumContainers(8)
+ .setTopologySerializer(Config.Serializer.JAVA)
+ .setUserConfig("key", "value")
+ .build();
+ assertEquals(nonDefaultConfig.getNumContainers(), 8);
+ assertEquals(nonDefaultConfig.getDeliverySemantics(),
+ Config.DeliverySemantics.EFFECTIVELY_ONCE);
+ assertEquals(0, Float.compare(nonDefaultConfig.getResources().getCpu(), 3.1f));
+ assertEquals(nonDefaultConfig.getResources().getRam(), ByteAmount.fromMegabytes(2500));
+ assertEquals(nonDefaultConfig.getSerializer(), Config.Serializer.JAVA);
+
+ Config multiSetConfig = new Config.Builder()
+ .setTopologySerializer(Config.Serializer.JAVA)
+ .setTopologySerializer(Config.Serializer.KRYO)
+ .build();
+ assertEquals(multiSetConfig.getSerializer(), Config.Serializer.KRYO);
+ }
+
+ @Test
+ public void testSetNameWithInvalidValues() {
+ Streamlet<Double> sample = StreamletImpl.createSupplierStreamlet(() -> Math.random());
+ Function<String, Streamlet<Double>> function = sample::setName;
+ testByFunction(function, null);
+ testByFunction(function, "");
+ testByFunction(function, " ");
+ }
+
+ private void testByFunction(Function<String, Streamlet<Double>> function, String sName) {
+ try {
+ function.apply(sName);
+ fail("Should have thrown an IllegalArgumentException because streamlet name is invalid");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Streamlet name cannot be null/blank", e.getMessage());
+ }
+ }
+
}
diff --git a/heron/config/src/yaml/conf/aurora/scheduler.yaml b/heron/config/src/yaml/conf/aurora/scheduler.yaml
index a4aab63..24d3f31 100644
--- a/heron/config/src/yaml/conf/aurora/scheduler.yaml
+++ b/heron/config/src/yaml/conf/aurora/scheduler.yaml
@@ -10,6 +10,15 @@
# Invoke the IScheduler as a library directly
heron.scheduler.is.service: False
+####################################################################
+# Following are Aurora-specific
+####################################################################
+# The maximum retry attempts when trying to kill an Aurora job
+heron.scheduler.job.max.kill.attempts: 5
+
+# The interval in ms between two retry-attempts to kill an Aurora job
+heron.scheduler.job.kill.retry.interval.ms: 2000
+
# Aurora Controller Class
# heron.class.scheduler.aurora.controller.cli: False
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index d901d20..23f5fc3 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -43,19 +43,37 @@
Log = log.Log
+# pylint: disable=too-many-lines
+
def print_usage():
print(
- "Usage: ./heron-executor <shardid> <topname> <topid> <topdefnfile>"
- " <state_manager_connection> <state_manager_root> <tmaster_binary> <stmgr_binary>"
- " <metricsmgr_classpath> <instance_jvm_opts_in_base64> <classpath>"
- " <master_port> <tmaster_controller_port> <tmaster_stats_port> <heron_internals_config_file>"
- " <override_config_file> <component_ram_map> <component_jvm_opts_in_base64> <pkg_type>"
- " <topology_bin_file> <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>"
- " <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>"
- " <scheduler_classpath> <scheduler_port> <python_instance_binary>"
- " <metricscachemgr_classpath> <metricscachemgr_masterport> <metricscachemgr_statsport>"
- " <is_stateful> <ckptmgr_classpath> <ckptmgr_port> <stateful_config_file> "
- " <healthmgr_mode> <healthmgr_classpath> <cpp_instance_binary>")
+ "Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
+ " --topology-id=<topid> --topology-defn-file=<topdefnfile>"
+ " --state-manager-connection=<state_manager_connection>"
+ " --state-manager-root=<state_manager_root> --tmaster-binary=<tmaster_binary>"
+ " --stmgr-binary=<stmgr_binary> --metrics-manager-classpath=<metricsmgr_classpath>"
+ " --instance-jvm-opts=<instance_jvm_opts_in_base64> --classpath=<classpath>"
+ " --master-port=<master_port> --tmaster-controller-port=<tmaster_controller_port>"
+ " --tmaster-stats-port=<tmaster_stats_port>"
+ " --heron-internals-config-file=<heron_internals_config_file>"
+ " --override-config-file=<override_config_file> --component-ram-map=<component_ram_map>"
+ " --component-jvm-opts=<component_jvm_opts_in_base64> --pkg-type=<pkg_type>"
+ " --topology-binary-file=<topology_bin_file> --heron-java-home=<heron_java_home>"
+ " --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
+ " --metrics-manager-port=<metricsmgr_port>"
+ " --cluster=<cluster> --role=<role> --environment=<environ>"
+ " --instance-classpath=<instance_classpath>"
+ " --metrics-sinks-config-file=<metrics_sinks_config_file>"
+ " --scheduler-classpath=<scheduler_classpath> --scheduler-port=<scheduler_port>"
+ " --python-instance-binary=<python_instance_binary>"
+ " --metricscache-manager-classpath=<metricscachemgr_classpath>"
+ " --metricscache-manager-master-port=<metricscachemgr_masterport>"
+ " --metricscache-manager-stats-port=<metricscachemgr_statsport>"
+ " --is-stateful=<is_stateful> --checkpoint-manager-classpath=<ckptmgr_classpath>"
+ " --checkpoint-manager-port=<ckptmgr_port> --stateful-config-file=<stateful_config_file>"
+ " --health-manager-mode=<healthmgr_mode> --health-manager-classpath=<healthmgr_classpath>"
+ " --cpp-instance-binary=<cpp_instance_binary>"
+ " --jvm-remote-debugger-ports=<comma_seperated_port_list>")
def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
@@ -219,7 +237,9 @@
self.health_manager_mode = parsed_args.health_manager_mode
self.health_manager_classpath = '%s:%s'\
% (self.scheduler_classpath, parsed_args.health_manager_classpath)
-
+ self.jvm_remote_debugger_ports = \
+ parsed_args.jvm_remote_debugger_ports.split(",") \
+ if parsed_args.jvm_remote_debugger_ports else None
def __init__(self, args, shell_env):
self.init_parsed_args(args)
@@ -293,6 +313,8 @@
parser.add_argument("--stateful-config-file", required=True)
parser.add_argument("--health-manager-mode", required=True)
parser.add_argument("--health-manager-classpath", required=True)
+ parser.add_argument("--jvm-remote-debugger-ports", required=False,
+ help="ports to be used by a remote debugger for JVM instances")
parsed_args, unknown_args = parser.parse_known_args(args[1:])
@@ -508,6 +530,10 @@
java_version.startswith("1.5"):
java_metasize_param = 'PermSize'
+ if self.jvm_remote_debugger_ports and \
+ (len(instance_info) > len(self.jvm_remote_debugger_ports)):
+ Log.warn("Not enough remote debugger ports for all instances!")
+
for (instance_id, component_name, global_task_id, component_index) in instance_info:
total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
@@ -542,6 +568,12 @@
'-XX:ParallelGCThreads=4',
'-Xloggc:log-files/gc.%s.log' % instance_id]
+ remote_debugger_port = None
+ if self.jvm_remote_debugger_ports:
+ remote_debugger_port = self.jvm_remote_debugger_ports.pop()
+ instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+ % remote_debugger_port)
+
instance_args = ['-topology_name', self.topology_name,
'-topology_id', self.topology_id,
'-instance_id', instance_id,
@@ -553,10 +585,13 @@
'-metricsmgr_port', self.metrics_manager_port,
'-system_config_file', self.heron_internals_config_file,
'-override_config_file', self.override_config_file]
+ if remote_debugger_port:
+ instance_args += ['-remote_debugger_port', remote_debugger_port]
instance_cmd = instance_cmd + self.instance_jvm_opts.split()
if component_name in self.component_jvm_opts:
instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
+
instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
'-cp',
'%s:%s' % (self.instance_classpath, self.classpath),
diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
index b8b5be6..7dc3aa0 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java
@@ -201,15 +201,13 @@
stmgrPortOption.setRequired(true);
options.addOption(stmgrPortOption);
- Option metricsmgrPortOption
- = new Option(
+ Option metricsmgrPortOption = new Option(
CommandLineOptions.METRICS_MGR_PORT_OPTION, true, "Metrics Manager Port");
metricsmgrPortOption.setType(Integer.class);
metricsmgrPortOption.setRequired(true);
options.addOption(metricsmgrPortOption);
- Option systemConfigFileOption
- = new Option(
+ Option systemConfigFileOption = new Option(
CommandLineOptions.SYSTEM_CONFIG_FILE, true, "Heron Internals Config Filename");
systemConfigFileOption.setType(String.class);
systemConfigFileOption.setRequired(true);
@@ -222,6 +220,11 @@
overrideConfigFileOption.setRequired(true);
options.addOption(overrideConfigFileOption);
+ Option remoteDebuggerPortOption = new Option(
+ CommandLineOptions.REMOTE_DEBUGGER_PORT, true, "Remote Debugger Port");
+ remoteDebuggerPortOption.setType(Integer.class);
+ options.addOption(remoteDebuggerPortOption);
+
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;
@@ -257,6 +260,12 @@
String overrideConfigFile
= commandLine.getOptionValue(CommandLineOptions.OVERRIDE_CONFIG_FILE);
+ Integer remoteDebuggerPort = null;
+ if (commandLine.hasOption(CommandLineOptions.REMOTE_DEBUGGER_PORT)) {
+ remoteDebuggerPort = Integer.parseInt(
+ commandLine.getOptionValue(CommandLineOptions.REMOTE_DEBUGGER_PORT));
+ }
+
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFile, true)
.putAll(overrideConfigFile, true)
@@ -266,8 +275,13 @@
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
// Create the protobuf Instance
- PhysicalPlans.InstanceInfo instanceInfo = PhysicalPlans.InstanceInfo.newBuilder().
- setTaskId(taskId).setComponentIndex(componentIndex).setComponentName(componentName).build();
+ PhysicalPlans.InstanceInfo.Builder instanceInfoBuilder
+ = PhysicalPlans.InstanceInfo.newBuilder().setTaskId(taskId)
+ .setComponentIndex(componentIndex).setComponentName(componentName);
+ if (remoteDebuggerPort != null) {
+ instanceInfoBuilder.setRemoteDebuggerPort(remoteDebuggerPort);
+ }
+ PhysicalPlans.InstanceInfo instanceInfo = instanceInfoBuilder.build();
PhysicalPlans.Instance instance = PhysicalPlans.Instance.newBuilder().
setInstanceId(instanceId).setStmgrId(streamId).setInfo(instanceInfo).build();
@@ -285,11 +299,17 @@
systemConfig.getHeronLoggingMaximumFiles()));
LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
- LOG.info("\nStarting instance " + instanceId + " for topology " + topologyName
+ String logMsg = "\nStarting instance " + instanceId + " for topology " + topologyName
+ " and topologyId " + topologyId + " for component " + componentName
+ " with taskId " + taskId + " and componentIndex " + componentIndex
+ " and stmgrId " + streamId + " and stmgrPort " + streamPort
- + " and metricsManagerPort " + metricsPort);
+ + " and metricsManagerPort " + metricsPort;
+
+ if (remoteDebuggerPort != null) {
+ logMsg += " and remoteDebuggerPort " + remoteDebuggerPort;
+ }
+
+ LOG.info(logMsg);
LOG.info("System Config: " + systemConfig);
diff --git a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
index 779771f..395c8c8 100644
--- a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
+++ b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -348,6 +349,10 @@
LOG.info("Loops terminated. MetricsCache Manager exits.");
}
+ /**
+ * start statemgr_client, metricscache_server, http_server
+ * @throws Exception
+ */
public void start() throws Exception {
// 1. Do prepare work
// create an instance of state manager
@@ -367,18 +372,23 @@
// initialize the statemgr
statemgr.initialize(config);
- statemgr.setMetricsCacheLocation(metricsCacheLocation, topologyName);
- LOG.info("metricsCacheLocation " + metricsCacheLocation.toString());
- LOG.info("topologyName " + topologyName.toString());
+ Boolean b = statemgr.setMetricsCacheLocation(metricsCacheLocation, topologyName)
+ .get(5000, TimeUnit.MILLISECONDS);
+ if (b != null && b) {
+ LOG.info("metricsCacheLocation " + metricsCacheLocation.toString());
+ LOG.info("topologyName " + topologyName.toString());
- LOG.info("Starting Metrics Cache HTTP Server");
- metricsCacheManagerHttpServer.start();
+ LOG.info("Starting Metrics Cache HTTP Server");
+ metricsCacheManagerHttpServer.start();
- // 2. The MetricsCacheServer would run in the main thread
- // We do it in the final step since it would await the main thread
- LOG.info("Starting Metrics Cache Server");
- metricsCacheManagerServer.start();
- metricsCacheManagerServerLoop.loop();
+ // 2. The MetricsCacheServer would run in the main thread
+ // We do it in the final step since it would await the main thread
+ LOG.info("Starting Metrics Cache Server");
+ metricsCacheManagerServer.start();
+ metricsCacheManagerServerLoop.loop();
+ } else {
+ throw new RuntimeException("Failed to set metricscahe location.");
+ }
} finally {
// 3. Do post work basing on the result
// Currently nothing to do here
diff --git a/heron/proto/physical_plan.proto b/heron/proto/physical_plan.proto
index cc72fe4..e11af00 100644
--- a/heron/proto/physical_plan.proto
+++ b/heron/proto/physical_plan.proto
@@ -33,6 +33,8 @@
required int32 component_index = 2; // specific to this component
required string component_name = 3;
repeated string params = 4;
+ // the port a remote debugger can be attached to for this instance. Currently JVM instances only
+ optional int32 remote_debugger_port = 5;
}
message Instance {
diff --git a/heron/scheduler-core/src/java/BUILD b/heron/scheduler-core/src/java/BUILD
index 31598b9..c8b7545 100644
--- a/heron/scheduler-core/src/java/BUILD
+++ b/heron/scheduler-core/src/java/BUILD
@@ -9,6 +9,7 @@
"//heron/api/src/java:classification",
"@commons_cli_commons_cli//jar",
"@com_google_guava_guava//jar",
+ "@org_apache_commons_commons_lang3//jar",
]
spi_deps_files = [
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
index 0d5f416..0f7b890 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/ExecutorFlag.java
@@ -56,7 +56,8 @@
CheckpointManagerPort("checkpoint-manager-port"),
StatefulConfigFile("stateful-config-file"),
HealthManagerMode("health-manager-mode"),
- HealthManagerClasspath("health-manager-classpath");
+ HealthManagerClasspath("health-manager-classpath"),
+ JvmRemoteDebuggerPorts("jvm-remote-debugger-ports");
private final String name;
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
index fe4f366..20de1ee 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/RuntimeManagerRunner.java
@@ -150,7 +150,8 @@
if (!schedulerClient.killTopology(killTopologyRequest)) {
throw new TopologyRuntimeManagementException(
- String.format("Failed to kill topology '%s' with scheduler", topologyName));
+ String.format("Failed to kill topology '%s' with scheduler, "
+ + "please re-try the kill command later", topologyName));
}
// clean up the state of the topology in state manager
@@ -176,7 +177,7 @@
if (!changeDetected(currentPlan, changeRequests)) {
throw new TopologyRuntimeManagementException(
String.format("The component parallelism request (%s) is the same as the "
- + "current topology parallelism. Not taking action.", newParallelism));
+ + "current topology parallelism. Not taking action.", newParallelism));
}
PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
@@ -199,7 +200,7 @@
if (!schedulerClient.updateTopology(updateTopologyRequest)) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to update topology with Scheduler, updateTopologyRequest="
- + updateTopologyRequest));
+ + updateTopologyRequest));
}
// Clean the connection when we are done.
@@ -242,25 +243,25 @@
result = statemgr.deletePhysicalPlan(topologyName);
if (result == null || !result) {
throw new TopologyRuntimeManagementException(
- "Failed to clear physical plan. Check whether TMaster set it correctly.");
+ "Failed to clear physical plan. Check whether TMaster set it correctly.");
}
result = statemgr.deleteSchedulerLocation(topologyName);
if (result == null || !result) {
throw new TopologyRuntimeManagementException(
- "Failed to clear scheduler location. Check whether Scheduler set it correctly.");
+ "Failed to clear scheduler location. Check whether Scheduler set it correctly.");
}
result = statemgr.deleteLocks(topologyName);
if (result == null || !result) {
throw new TopologyRuntimeManagementException(
- "Failed to delete locks. It's possible that the topology never created any.");
+ "Failed to delete locks. It's possible that the topology never created any.");
}
result = statemgr.deleteExecutionState(topologyName);
if (result == null || !result) {
throw new TopologyRuntimeManagementException(
- "Failed to clear execution state");
+ "Failed to clear execution state");
}
// Set topology def at last since we determine whether a topology is running
@@ -268,7 +269,7 @@
result = statemgr.deleteTopology(topologyName);
if (result == null || !result) {
throw new TopologyRuntimeManagementException(
- "Failed to clear topology definition");
+ "Failed to clear topology definition");
}
LOG.fine("Cleaned up topology state");
@@ -346,7 +347,7 @@
}
private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan,
- Map<String, Integer> changeRequests) {
+ Map<String, Integer> changeRequests) {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
for (String component : changeRequests.keySet()) {
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
index ea89870..358604a 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
@@ -59,7 +59,8 @@
SCHEDULER_PORT("scheduler", true),
METRICS_CACHE_MASTER_PORT("metrics-cache-m", true),
METRICS_CACHE_STATS_PORT("metrics-cache-s", true),
- CHECKPOINT_MANAGER_PORT("ckptmgr", true);
+ CHECKPOINT_MANAGER_PORT("ckptmgr", true),
+ JVM_REMOTE_DEBUGGER_PORTS("jvm-remote-debugger", false);
private final String name;
private final boolean required;
@@ -230,6 +231,9 @@
ExecutorPort.METRICS_CACHE_STATS_PORT, ports);
String ckptmgrPort = ExecutorPort.getPort(
ExecutorPort.CHECKPOINT_MANAGER_PORT, ports);
+ String remoteDebuggerPorts = ExecutorPort.getPort(
+ ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, ports
+ );
List<String> commands = new ArrayList<>();
commands.add(createCommandArg(ExecutorFlag.TopologyName, topology.getName()));
@@ -310,6 +314,9 @@
commands.add(createCommandArg(ExecutorFlag.HealthManagerMode, healthMgrMode));
commands.add(createCommandArg(ExecutorFlag.HealthManagerClasspath,
Context.healthMgrClassPath(config)));
+ if (remoteDebuggerPorts != null) {
+ commands.add(createCommandArg(ExecutorFlag.JvmRemoteDebuggerPorts, remoteDebuggerPorts));
+ }
return commands.toArray(new String[commands.size()]);
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
index 02cd2f3..4c1bcf2 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraContext.java
@@ -22,6 +22,9 @@
public final class AuroraContext extends Context {
public static final String JOB_LINK_TEMPLATE = "heron.scheduler.job.link.template";
public static final String JOB_TEMPLATE = "heron.scheduler.job.template";
+ public static final String JOB_MAX_KILL_ATTEMPTS = "heron.scheduler.job.max.kill.attempts";
+ public static final String JOB_KILL_RETRY_INTERVAL_MS =
+ "heron.scheduler.job.kill.retry.interval.ms";
private AuroraContext() {
}
@@ -34,4 +37,12 @@
return config.getStringValue(JOB_TEMPLATE,
new File(Context.heronConf(config), "heron.aurora").getPath());
}
+
+ public static int getJobMaxKillAttempts(Config config) {
+ return config.getIntegerValue(JOB_MAX_KILL_ATTEMPTS, 5);
+ }
+
+ public static long getJobKillRetryIntervalMs(Config config) {
+ return config.getLongValue(JOB_KILL_RETRY_INTERVAL_MS, 2000);
+ }
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
index 9e4ccaf..daf28c1 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraHeronShellController.java
@@ -69,12 +69,22 @@
return cliController.killJob();
}
+ private StMgr searchContainer(Integer id) {
+ String prefix = "stmgr-" + id;
+ for (StMgr sm : stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrsList()) {
+ if (sm.getId().equals(prefix)) {
+ return sm;
+ }
+ }
+ return null;
+ }
+
// Restart an aurora container
@Override
public boolean restart(Integer containerId) {
// there is no backpressure for container 0, delegate to aurora client
if (containerId == null || containerId == 0) {
- cliController.restart(containerId);
+ return cliController.restart(containerId);
}
if (stateMgrAdaptor == null) {
@@ -82,18 +92,24 @@
return false;
}
- int index = containerId - 1; // stmgr container starts from 1
- StMgr contaienrInfo = stateMgrAdaptor.getPhysicalPlan(topologyName).getStmgrs(index);
- String host = contaienrInfo.getHostName();
- int port = contaienrInfo.getShellPort();
- String url = "http://" + host + ":" + port + "/killexecutor";
+ StMgr sm = searchContainer(containerId);
+ if (sm == null) {
+ LOG.warning("container not found in pplan " + containerId);
+ return false;
+ }
+
+ String url = "http://" + sm.getHostName() + ":" + sm.getShellPort() + "/killexecutor";
String payload = "secret=" + stateMgrAdaptor.getExecutionState(topologyName).getTopologyId();
LOG.info("sending `kill container` to " + url + "; payload: " + payload);
HttpURLConnection con = NetworkUtils.getHttpConnection(url);
try {
- NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes());
- return NetworkUtils.checkHttpResponseCode(con, 200);
+ if (NetworkUtils.sendHttpPostRequest(con, "X", payload.getBytes())) {
+ return NetworkUtils.checkHttpResponseCode(con, 200);
+ } else { // if heron-shell command fails, delegate to aurora client
+ LOG.info("heron-shell killexecutor failed; try aurora client ..");
+ return cliController.restart(containerId);
+ }
} finally {
con.disconnect();
}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
index 914a797..652f829 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
@@ -30,6 +30,7 @@
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.utils.TopologyUtils;
+import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.UpdateTopologyManager;
@@ -141,7 +142,27 @@
@Override
public boolean onKill(Scheduler.KillTopologyRequest request) {
- return controller.killJob();
+ // The aurora service can be unavailable or unstable for a while,
+ // we will try to kill the job with multiple attempts
+ int attempts = AuroraContext.getJobMaxKillAttempts(config);
+ long retryIntervalMs = AuroraContext.getJobKillRetryIntervalMs(config);
+ LOG.info("Will try " + attempts + " attempts at interval: " + retryIntervalMs + " ms");
+
+ // First attempt
+ boolean res = controller.killJob();
+ attempts--;
+
+ // Failure retry
+ while (!res && attempts > 0) {
+ LOG.warning("Failed to kill the topology. Will retry in " + retryIntervalMs + " ms...");
+ Utils.sleep(retryIntervalMs);
+
+ // Retry the killJob()
+ res = controller.killJob();
+ attempts--;
+ }
+
+ return res;
}
@Override
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
index c8cb637..7799a77 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -73,6 +73,9 @@
public static final String METRICS_CACHE_MASTER_PORT = "6007";
public static final String METRICS_CACHE_STATS_PORT = "6008";
public static final String CHECKPOINT_MGR_PORT = "6009";
+ // port number the start with when more than one port needed for remote debugging
+ public static final String JVM_REMOTE_DEBUGGER_PORT = "6010";
+ public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger";
public static final Map<ExecutorPort, String> EXECUTOR_PORTS = new HashMap<>();
static {
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
index dc436b6..21da118 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -15,6 +15,7 @@
package com.twitter.heron.scheduler.kubernetes;
import java.io.IOException;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -23,6 +24,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +34,7 @@
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
+import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.TopologyRuntimeManagementException;
@@ -165,11 +168,17 @@
String[] deploymentConfs =
new String[Ints.checkedCast(Runtime.numContainers(runtimeConfiguration))];
+
for (int i = 0; i < Runtime.numContainers(runtimeConfiguration); i++) {
-
- deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource);
+ Optional<PackingPlan.ContainerPlan> container = packing.getContainer(i);
+ Set<PackingPlan.InstancePlan> instancePlans;
+ if (container.isPresent()) {
+ instancePlans = container.get().getInstances();
+ } else {
+ instancePlans = new HashSet<>();
+ }
+ deploymentConfs[i] = buildKubernetesPodSpec(mapper, i, containerResource, instancePlans);
}
-
return deploymentConfs;
}
@@ -183,7 +192,8 @@
*/
protected String buildKubernetesPodSpec(ObjectMapper mapper,
Integer containerIndex,
- Resource containerResource) {
+ Resource containerResource,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ObjectNode instance = mapper.createObjectNode();
instance.put(KubernetesConstants.API_VERSION, KubernetesConstants.API_VERSION_1);
@@ -192,7 +202,8 @@
instance.set(KubernetesConstants.API_SPEC, getContainerSpec(mapper,
containerIndex,
- containerResource));
+ containerResource,
+ instancePlans));
return instance.toString();
}
@@ -304,7 +315,7 @@
setImagePullPolicyIfPresent(containerInfo);
// Port info -- all the same
- containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+ containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, containerPlan.getInstances()));
// In order for the container to run with the correct index, we're copying the base
// configuration for container with index 0, and replacing the container index with
@@ -350,7 +361,8 @@
*/
protected ObjectNode getContainerSpec(ObjectMapper mapper,
int containerIndex,
- Resource containerResource) {
+ Resource containerResource,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ObjectNode containerSpec = mapper.createObjectNode();
ArrayNode containerList = mapper.createArrayNode();
@@ -382,10 +394,10 @@
setImagePullPolicyIfPresent(containerInfo);
// Port information for this container
- containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper));
+ containerInfo.set(KubernetesConstants.PORTS, getPorts(mapper, instancePlans));
// Heron command for the container
- String[] command = getExecutorCommand(containerIndex);
+ String[] command = getExecutorCommand(containerIndex, instancePlans.size());
ArrayNode commandsArray = mapper.createArrayNode();
for (int i = 0; i < command.length; i++) {
commandsArray.add(command[i]);
@@ -410,12 +422,22 @@
return containerSpec;
}
+ private List<Integer> getRemoteDebuggerPorts(int numberOfInstances) {
+ List<Integer> ports = new LinkedList<>();
+ for (int i = 0; i < numberOfInstances; i++) {
+ ports.add(Integer.parseInt(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT, 10) + i);
+ }
+
+ return ports;
+ }
+
/**
* Get the ports the container will need to expose so other containers can access its services
*
* @param mapper
*/
- protected ArrayNode getPorts(ObjectMapper mapper) {
+ protected ArrayNode getPorts(ObjectMapper mapper,
+ Set<PackingPlan.InstancePlan> instancePlans) {
ArrayNode ports = mapper.createArrayNode();
for (Map.Entry<ExecutorPort, String> entry
@@ -428,6 +450,18 @@
ports.add(port);
}
+ // if remote debugger enabled
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+ List<Integer> portsForRemoteDebugging = getRemoteDebuggerPorts(instancePlans.size());
+
+ for (int i = 0; i < portsForRemoteDebugging.size(); i++) {
+ ObjectNode port = mapper.createObjectNode();
+ port.put(KubernetesConstants.DOCKER_CONTAINER_PORT, portsForRemoteDebugging.get(i));
+ port.put(KubernetesConstants.PORT_NAME, KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME
+ + "-" + String.valueOf(i));
+ }
+ }
+
return ports;
}
@@ -452,10 +486,17 @@
*
* @param containerIndex
*/
- protected String[] getExecutorCommand(int containerIndex) {
+ protected String[] getExecutorCommand(int containerIndex, int numInstances) {
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))) {
+ KubernetesConstants.EXECUTOR_PORTS.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", getRemoteDebuggerPorts(numInstances)
+ .stream().map(Object::toString)
+ .collect(Collectors.toList())));
+ }
String[] executorCommand =
SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
containerIndex, KubernetesConstants.EXECUTOR_PORTS);
+
String[] command = {
"sh",
"-c",
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index fb98bf2..7b598ce 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -32,9 +32,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.twitter.heron.api.utils.TopologyUtils;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.scheduler.UpdateTopologyManager;
+import com.twitter.heron.scheduler.utils.Runtime;
import com.twitter.heron.scheduler.utils.SchedulerUtils;
import com.twitter.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import com.twitter.heron.spi.common.Config;
@@ -80,9 +82,9 @@
* Start executor process via running an async shell process
*/
@VisibleForTesting
- protected Process startExecutorProcess(int container) {
+ protected Process startExecutorProcess(int container, Set<PackingPlan.InstancePlan> instances) {
return ShellUtils.runASyncProcess(
- getExecutorCommand(container),
+ getExecutorCommand(container, instances),
new File(LocalContext.workingDirectory(config)),
Integer.toString(container));
}
@@ -91,25 +93,27 @@
* Start the executor for the given container
*/
@VisibleForTesting
- protected void startExecutor(final int container) {
+ protected void startExecutor(final int container, Set<PackingPlan.InstancePlan> instances) {
LOG.info("Starting a new executor for container: " + container);
// create a process with the executor command and topology working directory
- final Process containerExecutor = startExecutorProcess(container);
+ final Process containerExecutor = startExecutorProcess(container, instances);
// associate the process and its container id
processToContainer.put(containerExecutor, container);
LOG.info("Started the executor for container: " + container);
// add the container for monitoring
- startExecutorMonitor(container, containerExecutor);
+ startExecutorMonitor(container, containerExecutor, instances);
}
/**
* Start the monitor of a given executor
*/
@VisibleForTesting
- protected void startExecutorMonitor(final int container, final Process containerExecutor) {
+ protected void startExecutorMonitor(final int container,
+ final Process containerExecutor,
+ Set<PackingPlan.InstancePlan> instances) {
// add the container for monitoring
Runnable r = new Runnable() {
@Override
@@ -130,7 +134,7 @@
}
LOG.log(Level.INFO, "Trying to restart container {0}", container);
// restart the container
- startExecutor(processToContainer.remove(containerExecutor));
+ startExecutor(processToContainer.remove(containerExecutor), instances);
} catch (InterruptedException e) {
if (!isTopologyKilled) {
LOG.log(Level.SEVERE, "Process is interrupted: ", e);
@@ -142,7 +146,8 @@
monitorService.submit(r);
}
- private String[] getExecutorCommand(int container) {
+
+ private String[] getExecutorCommand(int container, Set<PackingPlan.InstancePlan> instances) {
Map<ExecutorPort, String> ports = new HashMap<>();
for (ExecutorPort executorPort : ExecutorPort.getRequiredPorts()) {
int port = SysUtils.getFreePort();
@@ -152,8 +157,22 @@
ports.put(executorPort, String.valueOf(port));
}
- String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtime))
+ && instances != null) {
+ List<String> remoteDebuggingPorts = new LinkedList<>();
+ int portsForRemoteDebugging = instances.size();
+ for (int i = 0; i < portsForRemoteDebugging; i++) {
+ int port = SysUtils.getFreePort();
+ if (port == -1) {
+ throw new RuntimeException("Failed to find available ports for executor");
+ }
+ remoteDebuggingPorts.add(String.valueOf(port));
+ }
+ ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", remoteDebuggingPorts));
+ }
+ String[] executorCmd = SchedulerUtils.getExecutorCommand(config, runtime, container, ports);
LOG.info("Executor command line: " + Arrays.toString(executorCmd));
return executorCmd;
}
@@ -167,11 +186,11 @@
synchronized (processToContainer) {
LOG.info("Starting executor for TMaster");
- startExecutor(0);
+ startExecutor(0, null);
// for each container, run its own executor
for (PackingPlan.ContainerPlan container : packing.getContainers()) {
- startExecutor(container.getId());
+ startExecutor(container.getId(), container.getInstances());
}
}
@@ -276,7 +295,7 @@
throw new RuntimeException(String.format("Found active container for %s, "
+ "cannot launch a duplicate container.", container.getId()));
}
- startExecutor(container.getId());
+ startExecutor(container.getId(), container.getInstances());
}
}
}
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
index 1e06d6d..5940d8b 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/kubernetes/KubernetesSchedulerTest.java
@@ -69,7 +69,7 @@
public static void beforeClass() throws Exception {
scheduler = Mockito.spy(KubernetesScheduler.class);
Mockito.doReturn(EXECUTOR_CMD).when(scheduler)
- .getExecutorCommand(Mockito.anyInt());
+ .getExecutorCommand(Mockito.anyInt(), Mockito.anyInt());
}
@AfterClass
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index 484ad8f..badaaa9 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -25,13 +25,14 @@
import org.junit.Test;
import org.mockito.Mockito;
+import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.utils.PackingTestUtils;
-
+@SuppressWarnings("unchecked")
public class LocalSchedulerTest {
private static final String TOPOLOGY_NAME = "testTopology";
private static final int MAX_WAITING_SECOND = 10;
@@ -46,6 +47,19 @@
Mockito.when(config.getStringValue(Key.TOPOLOGY_NAME)).thenReturn(TOPOLOGY_NAME);
runtime = Mockito.mock(Config.class);
+
+ scheduler.initialize(config, runtime);
+ Mockito.when(runtime.get(Key.TOPOLOGY_DEFINITION)).thenReturn(TopologyAPI.Topology
+ .newBuilder()
+ .setId("a")
+ .setName("a")
+ .setState(TopologyAPI.TopologyState.RUNNING)
+ .setTopologyConfig(
+ TopologyAPI.Config.newBuilder()
+ .addKvs(TopologyAPI.Config.KeyValue.newBuilder()
+ .setKey(com.twitter.heron.api.Config.TOPOLOGY_REMOTE_DEBUGGING_ENABLE)
+ .setValue("false"))).build());
+
scheduler.initialize(config, runtime);
}
@@ -71,11 +85,14 @@
@Test
public void testOnSchedule() throws Exception {
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+ Mockito.anySet());
Process[] mockProcesses = new Process[4];
for (int i = 0; i < 4; i++) {
mockProcesses[i] = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i);
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.doReturn(mockProcesses[i]).when(scheduler).startExecutorProcess(i, instances);
}
PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
@@ -92,9 +109,12 @@
// id 2 was not in the container plan
continue;
}
- Mockito.verify(scheduler).startExecutor(i);
- Mockito.verify(scheduler).startExecutorProcess(i);
- Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i]);
+
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.verify(scheduler).startExecutor(i, instances);
+ Mockito.verify(scheduler).startExecutorProcess(i, instances);
+ Mockito.verify(scheduler).startExecutorMonitor(i, mockProcesses[i], instances);
}
}
@@ -105,12 +125,15 @@
//verify plan is deployed and containers are created
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class),
+ Mockito.anySet());
Process mockProcessTM = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(0);
+ Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(
+ 0, null);
Process mockProcessWorker1 = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(1);
+ Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(
+ 1, PackingTestUtils.testContainerPlan(1).getInstances());
PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
Set<PackingPlan.ContainerPlan> containers = new HashSet<>();
@@ -118,24 +141,30 @@
Mockito.when(packingPlan.getContainers()).thenReturn(containers);
Assert.assertTrue(scheduler.onSchedule(packingPlan));
- Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(2)).startExecutor(Mockito.anyInt(),
+ Mockito.anySet());
//now verify add container adds new container
Process mockProcessWorker2 = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(3);
+ Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(
+ 3, PackingTestUtils.testContainerPlan(3).getInstances());
containers.clear();
containers.add(PackingTestUtils.testContainerPlan(3));
scheduler.addContainers(containers);
- Mockito.verify(scheduler).startExecutor(3);
+ Mockito.verify(scheduler).startExecutor(3,
+ PackingTestUtils.testContainerPlan(3).getInstances());
Process mockProcess = Mockito.mock(Process.class);
- Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(Mockito.anyInt());
+ Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(
+ Mockito.anyInt(), Mockito.anySet());
containers.clear();
containers.add(PackingTestUtils.testContainerPlan(4));
containers.add(PackingTestUtils.testContainerPlan(5));
scheduler.addContainers(containers);
- Mockito.verify(scheduler).startExecutor(4);
- Mockito.verify(scheduler).startExecutor(5);
+ Mockito.verify(scheduler).startExecutor(4,
+ PackingTestUtils.testContainerPlan(4).getInstances());
+ Mockito.verify(scheduler).startExecutor(5,
+ PackingTestUtils.testContainerPlan(5).getInstances());
}
/**
@@ -147,13 +176,17 @@
//verify plan is deployed and containers are created
Mockito.doNothing().
- when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(),
+ Mockito.any(Process.class), Mockito.anySet());
Process[] processes = new Process[LOCAL_NUM_CONTAINER];
Set<PackingPlan.ContainerPlan> existingContainers = new HashSet<>();
for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
processes[i] = Mockito.mock(Process.class);
- Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
+ Set<PackingPlan.InstancePlan> instances
+ = (i == 0) ? null : PackingTestUtils.testContainerPlan(i).getInstances();
+ Mockito.doReturn(processes[i]).when(scheduler)
+ .startExecutorProcess(i, instances);
if (i > 0) {
// ignore the container for TMaster. existing containers simulate the containers created
// by packing plan
@@ -165,7 +198,8 @@
Mockito.when(packingPlan.getContainers()).thenReturn(existingContainers);
Assert.assertTrue(scheduler.onSchedule(packingPlan));
verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4, 5);
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
Set<PackingPlan.ContainerPlan> containersToRemove = new HashSet<>();
PackingPlan.ContainerPlan containerToRemove =
@@ -175,7 +209,8 @@
verifyIdsOfLaunchedContainers(0, 1, 2, 3, 4);
Mockito.verify(processes[LOCAL_NUM_CONTAINER - 1]).destroy();
// verify no new process restarts
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
containersToRemove.clear();
containersToRemove.add(PackingTestUtils.testContainerPlan(1));
@@ -185,7 +220,8 @@
Mockito.verify(processes[1]).destroy();
Mockito.verify(processes[2]).destroy();
// verify no new process restarts
- Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
}
private void verifyIdsOfLaunchedContainers(int... ids) {
@@ -249,17 +285,21 @@
int exitValue = 1;
Process containerExecutor = Mockito.mock(Process.class);
Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
- Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+ Mockito.doNothing().when(scheduler).startExecutor(
+ Mockito.anyInt(), Mockito.anySet());
// Start the process
scheduler.getProcessToContainer().put(containerExecutor, containerId);
- scheduler.startExecutorMonitor(containerId, containerExecutor);
+ scheduler.startExecutorMonitor(
+ containerId, containerExecutor,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
// Shut down the MonitorService
scheduler.getMonitorService().shutdown();
scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
// The dead process should be restarted
- Mockito.verify(scheduler).startExecutor(containerId);
+ Mockito.verify(scheduler).startExecutor(containerId,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
Assert.assertFalse(scheduler.isTopologyKilled());
}
@@ -270,20 +310,22 @@
Process containerExecutor = Mockito.mock(Process.class);
Mockito.doReturn(exitValue).when(containerExecutor).exitValue();
- Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt());
+ Mockito.doNothing().when(scheduler).startExecutor(Mockito.anyInt(), Mockito.anySet());
// Set the killed flag and the dead process should not be restarted
scheduler.onKill(Scheduler.KillTopologyRequest.getDefaultInstance());
// Start the process
scheduler.getProcessToContainer().put(containerExecutor, containerId);
- scheduler.startExecutorMonitor(containerId, containerExecutor);
+ scheduler.startExecutorMonitor(containerId, containerExecutor,
+ PackingTestUtils.testContainerPlan(containerId).getInstances());
// Shut down the MonitorService
scheduler.getMonitorService().shutdown();
scheduler.getMonitorService().awaitTermination(MAX_WAITING_SECOND, TimeUnit.SECONDS);
// The dead process should not be restarted
- Mockito.verify(scheduler, Mockito.never()).startExecutor(Mockito.anyInt());
+ Mockito.verify(scheduler, Mockito.never())
+ .startExecutor(Mockito.anyInt(), Mockito.anySet());
Assert.assertTrue(scheduler.isTopologyKilled());
}
}
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
index ee3695f..14a4d60 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/ShellUtilsTest.java
@@ -23,15 +23,12 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
public class ShellUtilsTest {
- private static final Logger LOG = Logger.getLogger(ShellUtilsTest.class.getName());
-
private static String generateRandomLongString(int size) {
StringBuilder builder = new StringBuilder();
Random random = new Random();
diff --git a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
index af22c36..54694e9 100644
--- a/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
+++ b/heron/spi/tests/java/com/twitter/heron/spi/utils/UploaderUtilsTest.java
@@ -14,12 +14,19 @@
package com.twitter.heron.spi.utils;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.fail;
public class UploaderUtilsTest {
@@ -55,4 +62,42 @@
UploaderUtils.generateFilename(topologyName, role, tag, version, extension);
Assert.assertTrue(customizedFilename.endsWith(extension));
}
+
+ @Test
+ public void testCopyToOutputStream() throws Exception {
+ String fileContent = "temp file test content";
+ String prefix = "myTestFile";
+ String suffix = ".tmp";
+ File tempFile = null;
+ try {
+ // create temp file
+ tempFile = File.createTempFile(prefix, suffix);
+
+ // write content to temp file
+ writeContentToFile(tempFile.getAbsolutePath(), fileContent);
+
+ // copy file content to output stream
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ UploaderUtils.copyToOutputStream(tempFile.getAbsolutePath(), out);
+ Assert.assertEquals(fileContent, new String(out.toByteArray()));
+ } finally {
+ if (tempFile != null) {
+ tempFile.deleteOnExit();
+ }
+ }
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testCopyToOutputStreamWithInvalidFile() throws Exception {
+ UploaderUtils.copyToOutputStream("invalid_file_name", new ByteArrayOutputStream());
+ }
+
+ private void writeContentToFile(String fileName, String content) {
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(fileName))) {
+ bw.write(content);
+ } catch (IOException e) {
+ fail("Unexpected IOException has been thrown so unit test fails. Error message: "
+ + e.getMessage());
+ }
+ }
}
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index acb58eb..8bbfe30 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -31,6 +31,8 @@
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -337,6 +339,15 @@
public ListenableFuture<Boolean> setMetricsCacheLocation(
TopologyMaster.MetricsCacheLocation location,
String topologyName) {
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
+ if (arg1 != ConnectionState.CONNECTED) {
+ // if not the first time successful connection, fail fast
+ throw new RuntimeException("Unexpected state change to: " + arg1.name());
+ }
+ }
+ });
return createNode(
StateLocation.METRICSCACHE_LOCATION, topologyName, location.toByteArray(), true);
}
@@ -426,6 +437,7 @@
Config config = Config.newBuilder()
.put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
.put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
+ .put(Key.SCHEDULER_IS_SERVICE, false)
.build();
CuratorStateManager stateManager = new CuratorStateManager();
stateManager.doMain(args, config);
diff --git a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
index 7fc0db8..2d09221 100644
--- a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
+++ b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemContext.java
@@ -18,8 +18,10 @@
import com.twitter.heron.spi.common.Context;
public class LocalFileSystemContext extends Context {
- public static String fileSystemDirectory(Config config) {
+
+ public static String getFileSystemDirectory(Config config) {
return config.getStringValue(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(),
LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString());
}
+
}
diff --git a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
index e4ddf69..a7cbebc 100644
--- a/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
+++ b/heron/uploaders/src/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploader.java
@@ -43,7 +43,7 @@
public void initialize(Config ipconfig) {
this.config = ipconfig;
- this.destTopologyDirectory = LocalFileSystemContext.fileSystemDirectory(config);
+ this.destTopologyDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
// name of the destination file is the same as the base name of the topology package file
String fileName =
diff --git a/heron/uploaders/tests/java/BUILD b/heron/uploaders/tests/java/BUILD
index 5e50861..c94b1c3 100644
--- a/heron/uploaders/tests/java/BUILD
+++ b/heron/uploaders/tests/java/BUILD
@@ -49,6 +49,7 @@
java_tests(
test_classes = [
"com.twitter.heron.uploader.localfs.LocalFileSystemConfigTest",
+ "com.twitter.heron.uploader.localfs.LocalFileSystemContextTest",
"com.twitter.heron.uploader.localfs.LocalFileSystemUploaderTest",
],
runtime_deps = [ ":localfs-tests" ],
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
index 7209551..5806c2e 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemConfigTest.java
@@ -43,7 +43,7 @@
Config config = Config.toLocalMode(getDefaultConfig());
Assert.assertEquals(
- LocalFileSystemContext.fileSystemDirectory(config),
+ LocalFileSystemContext.getFileSystemDirectory(config),
TokenSub.substitute(config, LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString())
);
}
@@ -59,7 +59,7 @@
.build());
Assert.assertEquals(
- LocalFileSystemContext.fileSystemDirectory(config),
+ LocalFileSystemContext.getFileSystemDirectory(config),
overrideDirectory
);
}
@@ -84,7 +84,7 @@
Assert.assertEquals(
Paths.get(uploader.getTopologyFile()).getParent().toString(),
- LocalFileSystemContext.fileSystemDirectory(config)
+ LocalFileSystemContext.getFileSystemDirectory(config)
);
}
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java
new file mode 100644
index 0000000..6ee51ee
--- /dev/null
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemContextTest.java
@@ -0,0 +1,62 @@
+// Copyright 2017 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.uploader.localfs;
+
+import java.nio.file.Paths;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Key;
+
+public class LocalFileSystemContextTest {
+
+ @Test
+ public void testGetDefaultFileSystemDirectory() {
+ Config config = Config.newBuilder()
+ .put(Key.CLUSTER, "cluster")
+ .put(Key.ROLE, "role")
+ .put(Key.TOPOLOGY_NAME, "topology")
+ .build();
+
+ String actualFileSystemDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
+
+ // get default file system directory
+ String defaultFileSystemDirectory = Paths.get("${HOME}",
+ ".herondata", "repository", "${CLUSTER}", "${ROLE}", "${TOPOLOGY}").toString();
+
+ Assert.assertEquals(defaultFileSystemDirectory, actualFileSystemDirectory);
+ }
+
+ @Test
+ public void testGetFileSystemDirectory() {
+ String fileSystemDirectory = "${HOME}/.herondata/topologies/${CLUSTER}";
+
+ Config config = Config.newBuilder()
+ .put(Key.CLUSTER, "cluster")
+ .put(Key.ROLE, "role")
+ .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), fileSystemDirectory)
+ .build();
+
+ // get actual file system directory set by the user
+ String actualFileSystemDirectory = LocalFileSystemContext.getFileSystemDirectory(config);
+
+ String expectedFileSystemDirectory = Paths.get("${HOME}",
+ ".herondata", "topologies", "${CLUSTER}").toString();
+
+ Assert.assertEquals(expectedFileSystemDirectory, actualFileSystemDirectory);
+ }
+
+}
diff --git a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
index 85db4f4..18e9ed1 100644
--- a/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
+++ b/heron/uploaders/tests/java/com/twitter/heron/uploader/localfs/LocalFileSystemUploaderTest.java
@@ -30,13 +30,14 @@
public class LocalFileSystemUploaderTest {
+ private static final String TOPOLOGY_PACKAGE_FILE_NAME = "some-topology.tar";
+
private Config config;
private String fileSystemDirectory;
private String testTopologyDirectory;
@Before
public void before() throws Exception {
-
// form the file system directory using bazel environ files
fileSystemDirectory = Paths.get(System.getenv("JAVA_RUNFILES"), "topologies").toString();
@@ -60,17 +61,17 @@
}
@Test
- public void testUploader() throws Exception {
-
+ public void testUploader() {
// identify the location of the test topology tar file
- String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
Assert.assertNotNull(uploader.uploadPackage());
// verify if the file exists
@@ -79,33 +80,32 @@
}
@Test(expected = UploaderException.class)
- public void testSourceNotExists() throws Exception {
-
+ public void testSourceNotExists() {
// identify the location of the test topology tar file
String topologyPackage = Paths.get(
testTopologyDirectory, "doesnot-exist-topology.tar").toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
uploader.uploadPackage();
}
@Test
- public void testUndo() throws Exception {
-
+ public void testUndo() {
// identify the location of the test topology tar file
- String topologyPackage = Paths.get(testTopologyDirectory, "some-topology.tar").toString();
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
- Config newconfig = Config.newBuilder()
+ Config newConfig = Config.newBuilder()
.putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
// create the uploader and load the package
LocalFileSystemUploader uploader = new LocalFileSystemUploader();
- uploader.initialize(newconfig);
+ uploader.initialize(newConfig);
Assert.assertNotNull(uploader.uploadPackage());
// verify if the file exists
@@ -113,7 +113,95 @@
Assert.assertTrue(new File(destFile).isFile());
// now undo the file
- uploader.undo();
+ Assert.assertTrue(uploader.undo());
Assert.assertFalse(new File(destFile).isFile());
}
+
+ @Test
+ public void testUseDefaultFileSystemDirectoryWhenNotSet() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ // set file system directory as null
+ Config newConfig = Config.newBuilder()
+ .putAll(config)
+ .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+ .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), null)
+ .build();
+
+ // create the uploader
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+
+ // get default file system directory
+ String defaultFileSystemDirectory = LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.getDefaultString();
+
+ String destDirectory = uploader.getTopologyDirectory();
+ String destFile = uploader.getTopologyFile();
+
+ // verify usage of default file system directory for destination directory and file
+ Assert.assertEquals(destDirectory, defaultFileSystemDirectory);
+ Assert.assertTrue(destFile.contains(defaultFileSystemDirectory));
+ }
+
+ @Test
+ public void testUploadPackageWhenTopologyFileAlreadyExists() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ Config newConfig = Config.newBuilder()
+ .putAll(config).put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage).build();
+
+ // create the uploader and load the package
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+ Assert.assertNotNull(uploader.uploadPackage());
+
+ // verify if the file exists
+ String destFile = uploader.getTopologyFile();
+ Assert.assertTrue(new File(destFile).isFile());
+
+ // load same package again by overriding existing one
+ Assert.assertNotNull(uploader.uploadPackage());
+ String destFile2 = uploader.getTopologyFile();
+ Assert.assertTrue(new File(destFile2).isFile());
+
+ // verify that existing file is overridden
+ Assert.assertEquals(destFile, destFile2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUploadPackageWhenFileSystemDirectoryIsInvalid() {
+ // identify the location of the test topology tar file
+ String topologyPackage = Paths.get(testTopologyDirectory,
+ TOPOLOGY_PACKAGE_FILE_NAME).toString();
+
+ String invalidFileSystemDirectory = "invalid%path";
+
+ // set invalid file system directory
+ Config newConfig = Config.newBuilder()
+ .putAll(config)
+ .put(Key.TOPOLOGY_PACKAGE_FILE, topologyPackage)
+ .put(LocalFileSystemKey.FILE_SYSTEM_DIRECTORY.value(), invalidFileSystemDirectory).build();
+
+ // create the uploader and load the package
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.initialize(newConfig);
+ uploader.uploadPackage();
+ }
+
+ @Test
+ public void testGetUri() {
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ Assert.assertEquals(uploader.getUri("testFileName").toString(), "file://testFileName");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetUriWhenDestFileNameIsInvalid() {
+ LocalFileSystemUploader uploader = new LocalFileSystemUploader();
+ uploader.getUri("invalid_%_DestFilePath");
+ }
+
}
diff --git a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
index a0bd2b9..18e25de 100644
--- a/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/backtype/storm/utils/ConfigUtils.java
@@ -48,6 +48,8 @@
doTaskHooksTranslation(heronConfig);
+ doTopologyLevelTranslation(heronConfig);
+
return heronConfig;
}
@@ -113,7 +115,10 @@
}
/**
- * Translate storm config into heron config
+ * Translate storm config into heron config. This funciton is used by both topology
+ * and component level config translations. Therefore NO config should be generated
+ * when a key does NOT exist if the key is for both topology and component.
+ * Otherwise the component config might overwrite the topolgy config with a wrong value.
* @param heron the heron config object to receive the results.
*/
private static void doStormTranslation(Config heronConfig) {
@@ -125,20 +130,7 @@
Integer nWorkers = Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_WORKERS));
com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
}
- if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
- Integer nAckers =
- Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
- if (nAckers > 0) {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
+
if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
Integer nSecs =
Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
@@ -162,4 +154,25 @@
com.twitter.heron.api.Config.setDebug(heronConfig, dBg);
}
}
+
+ /**
+ * Translate topology config.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doTopologyLevelTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+ Integer nAckers =
+ Utils.getInt(heronConfig.get(backtype.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+ if (nAckers > 0) {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ }
}
diff --git a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
index abd132a..d7b23a2 100644
--- a/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-compatibility/src/java/org/apache/storm/utils/ConfigUtils.java
@@ -47,6 +47,8 @@
doTaskHooksTranslation(heronConfig);
+ doTopologyLevelTranslation(heronConfig);
+
return heronConfig;
}
@@ -113,7 +115,10 @@
}
/**
- * Translate storm config into heron config
+ * Translate storm config into heron config. This funciton is used by both topology
+ * and component level config translations. Therefore NO config should be generated
+ * when a key does NOT exist if the key is for both topology and component.
+ * Otherwise the component config might overwrite the topolgy config with a wrong value.
* @param heron the heron config object to receive the results.
*/
private static void doStormTranslation(Config heronConfig) {
@@ -125,20 +130,6 @@
Integer nWorkers = Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_WORKERS));
com.twitter.heron.api.Config.setNumStmgrs(heronConfig, nWorkers);
}
- if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
- Integer nAckers =
- Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
- if (nAckers > 0) {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- } else {
- com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
- com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
Integer nSecs =
Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
@@ -166,4 +157,25 @@
(Map) heronConfig.get(org.apache.storm.Config.TOPOLOGY_ENVIRONMENT));
}
}
+
+ /**
+ * Translate topology config.
+ * @param heron the heron config object to receive the results.
+ */
+ private static void doTopologyLevelTranslation(Config heronConfig) {
+ if (heronConfig.containsKey(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS)) {
+ Integer nAckers =
+ Utils.getInt(heronConfig.get(org.apache.storm.Config.TOPOLOGY_ACKER_EXECUTORS));
+ if (nAckers > 0) {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE);
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ } else {
+ com.twitter.heron.api.Config.setTopologyReliabilityMode(heronConfig,
+ com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE);
+ }
+ }
}
diff --git a/website/content/docs/operators/deployment/uploaders/localfs.md b/website/content/docs/operators/deployment/uploaders/localfs.md
index 4e9e2a9..bd5411a 100644
--- a/website/content/docs/operators/deployment/uploaders/localfs.md
+++ b/website/content/docs/operators/deployment/uploaders/localfs.md
@@ -27,7 +27,7 @@
* `heron.uploader.localfs.file.system.directory` --- Provides the name of the directory where
the topology jar should be uploaded. The name of the directory should be unique per cluster
You could use the Heron environment variables `${CLUSTER}` that will be substituted by cluster
-name.
+name. If this is not set, `${HOME}/.herondata/repository/${CLUSTER}/${ROLE}/${TOPOLOGY}` will be set as default.
### Example Local File System Uploader Configuration