GIRAPH-1163
closes #52
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index aa4ce7b..86c711c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -19,6 +19,7 @@
package org.apache.giraph.graph;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -93,11 +94,12 @@
// CHECKSTYLE: stop IllegalCatch
} catch (RuntimeException e) {
// CHECKSTYLE: resume IllegalCatch
+ byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
graphTaskManager.getJobProgressTracker().logError(
"Exception occurred on mapper " +
graphTaskManager.getConf().getTaskPartition() + ": " +
- ExceptionUtils.getStackTrace(e));
+ ExceptionUtils.getStackTrace(e), exByteArray);
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
throw new IllegalStateException(
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 1967b44..b0659bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -65,6 +65,7 @@
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.worker.WorkerProgress;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -1115,8 +1116,9 @@
LOG.fatal(
"uncaughtException: OverrideExceptionHandler on thread " +
t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
- jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
-
+ byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
+ jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
+ exByteArray);
zooKeeperCleanup();
workerFailureCleanup();
} finally {
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index e699bfb..6f1258d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -39,7 +39,7 @@
}
@Override
- public void logError(String logLine) {
+ public void logError(String logLine, byte [] exByteArray) {
}
@Override
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index a7ac055..f51d765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -124,11 +124,12 @@
}
@Override
- public synchronized void logError(final String logLine) {
+ public synchronized void logError(final String logLine,
+ final byte [] exByteArray) {
executeWithRetry(new Runnable() {
@Override
public void run() {
- jobProgressTracker.logError(logLine);
+ jobProgressTracker.logError(logLine, exByteArray);
}
});
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index bb9390e..d7d03d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -224,7 +224,8 @@
}
@Override
- public void logError(String logLine) {
+ public void
+ logError(String logLine, byte [] exByteArray) {
LOG.error(logLine);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 92e35b8..a1ad44d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -43,13 +43,17 @@
void logInfo(String logLine);
/**
- * Call this when you want to log an error line from any mapper to command
- * line
+ * Call this when you want to log an error line and exception
+ * object from any mapper to command line
+ *
+ * KryoWritableWrapper.convertFromByteArray can be used to
+ * get exception object back
*
* @param logLine Line to log
+ * @param exByteArray Exception byte array
*/
@ThriftMethod
- void logError(String logLine);
+ void logError(String logLine, byte [] exByteArray);
/**
* Notify that job is failing
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
index f17955b..d80a9a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -120,4 +120,29 @@
public static <T> T wrapAndCopy(T object) {
return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get();
}
+
+ /**
+ * Converting the object to byte array.
+ * @param object Object
+ * @param <T> Type
+ * @return byte array
+ */
+ public static <T> byte [] convertToByteArray(T object) {
+ KryoWritableWrapper<T> wrapper =
+ new KryoWritableWrapper<>(object);
+ return WritableUtils.toByteArray(wrapper);
+ }
+
+ /**
+ * Converting from byte array
+ * @param arr byte array
+ * @param <T> type
+ * @return original object
+ */
+ public static <T> T convertFromByteArray(byte [] arr) {
+ KryoWritableWrapper<T> wrapper =
+ new KryoWritableWrapper<>();
+ WritableUtils.fromByteArray(arr, wrapper);
+ return wrapper.get();
+ }
}