[NO ISSUE][HYR] Heartbeat processing / NC exceptions
- Run heartbeat processing off of Worker thread, to prevent starvation
- Preserve stacktrace when creating node-scoped HyracksDataException
clones
Change-Id: If8fd35a7fd488bed5f1d5e2146dd48892cb0a7a4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2138
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 4517730..7ae7cbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -38,7 +38,7 @@
return (HyracksDataException) cause;
} else if (cause instanceof Error) {
// don't wrap errors, allow them to propagate
- throw (Error)cause;
+ throw (Error) cause;
} else if (cause instanceof InterruptedException && !Thread.currentThread().isInterrupted()) {
// TODO(mblow): why not force interrupt on current thread?
LOGGER.log(Level.WARNING,
@@ -78,6 +78,12 @@
super(component, errorCode, message, cause, nodeId, params);
}
+ public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
+ StackTraceElement[] stackTrace, Serializable... params) {
+ super(component, errorCode, message, cause, nodeId, params);
+ setStackTrace(stackTrace);
+ }
+
/**
* @deprecated Error code is needed.
*/
@@ -141,6 +147,6 @@
public static HyracksDataException create(HyracksDataException e, String nodeId) {
return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId,
- e.getParams());
+ e.getStackTrace(), e.getParams());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 350984c..af5c102 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -72,8 +72,7 @@
break;
case NODE_HEARTBEAT:
CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
- ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs, nhf.getNodeId(),
- nhf.getHeartbeatData()));
+ ccs.getExecutor().execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData()));
break;
case NOTIFY_JOBLET_CLEANUP:
CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;