Merge branch 'fluo-839'
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 3b6bba5..c821684 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -95,7 +95,7 @@
}
}
- if (queuedWork.containsKey(rowCol)) {
+ if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
return false;
}
@@ -164,7 +164,7 @@
}
- private static class NotificationProcessingTask implements Runnable {
+ private class NotificationProcessingTask implements Runnable {
Notification notification;
NotificationFinder notificationFinder;
@@ -184,6 +184,8 @@
// notification should be processed.
if (notificationFinder.shouldProcess(notification)) {
workTask.run();
+ } else {
+ notificationProcessed(notification);
}
} catch (Exception e) {
log.error("Failed to process work " + Hex.encNonAscii(notification), e);
@@ -192,7 +194,7 @@
}
- private static class FutureNotificationTask extends FutureTask<Void> implements
+ private class FutureNotificationTask extends FutureTask<Void> implements
Comparable<FutureNotificationTask> {
private final Notification notification;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 76c6c66..4f028c1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -85,7 +85,13 @@
atx = new TracingTransaction(atx, notification, observer.getClass(), observerId);
}
- observer.process(atx, notification.getRow(), notification.getColumn());
+ try {
+ observer.process(atx, notification.getRow(), notification.getColumn());
+ } catch (Exception e) {
+ notificationFinder.failedToProcess(notification, TxResult.ERROR);
+ notificationProcessor.notificationProcessed(notification);
+ throw e;
+ }
CommitManager commitManager = env.getSharedResources().getCommitManager();
commitManager.beginCommit(atx, observerId, new WorkTaskCommitObserver());
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index 1732a29..35f3215 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -100,30 +100,30 @@
rangeData.keySet().retainAll(rangeSet);
long minRetryTime = maxSleepTime + System.currentTimeMillis();
- int notifications = 0;
+ ScanCounts ntfyCounts = new ScanCounts();
int tabletsScanned = 0;
try {
for (TableRange tabletRange : ranges) {
TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
if (System.currentTimeMillis() >= tabletData.retryTime) {
- int count = 0;
+ ScanCounts counts;
PartitionInfo pi = partitionManager.getPartitionInfo();
if (partition.equals(pi)) {
try (Session session =
proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
// notifications could have been asynchronously queued for deletion. Let that
- // happen
- // 1st before scanning
+ // happen 1st before scanning
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
- count = scan(session, partition, tabletRange.getRange());
+ counts = scan(session, partition, tabletRange.getRange());
tabletsScanned++;
}
} else {
break;
}
- tabletData.updateScanCount(count, maxSleepTime);
- notifications += count;
+ tabletData.updateScanCount(counts.added, maxSleepTime);
+ ntfyCounts.added += counts.added;
+ ntfyCounts.seen += counts.seen;
if (stopped.get()) {
break;
}
@@ -139,8 +139,8 @@
qSize = proccessor.size();
- log.debug("Scanned {} of {} tablets, added {} new notifications (total queued {})",
- tabletsScanned, ranges.size(), notifications, qSize);
+ log.debug("Scanned {} of {} tablets. Notifications added: {} seen: {} queued: {}",
+ tabletsScanned, ranges.size(), ntfyCounts.added, ntfyCounts.seen, qSize);
if (!stopped.get()) {
UtilWaitThread.sleep(sleepTime, stopped);
@@ -168,7 +168,13 @@
return wasInt;
}
- private int scan(Session session, PartitionInfo pi, Range range) throws TableNotFoundException {
+ private static class ScanCounts {
+ int seen = 0;
+ int added = 0;
+ }
+
+ private ScanCounts scan(Session session, PartitionInfo pi, Range range)
+ throws TableNotFoundException {
Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
scanner.setRange(range);
@@ -179,7 +185,7 @@
NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
scanner.addScanIterator(iterCfg);
- int count = 0;
+ ScanCounts counts = new ScanCounts();
for (Entry<Key, Value> entry : scanner) {
if (!pi.equals(partitionManager.getPartitionInfo())) {
@@ -187,13 +193,15 @@
}
if (stopped.get()) {
- return count;
+ return counts;
}
+ counts.seen++;
+
if (session.addNotification(finder, Notification.from(entry.getKey()))) {
- count++;
+ counts.added++;
}
}
- return count;
+ return counts;
}
}