Merge branch 'fluo-839'
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 3b6bba5..c821684 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -95,7 +95,7 @@
         }
       }
 
-      if (queuedWork.containsKey(rowCol)) {
+      if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
         return false;
       }
 
@@ -164,7 +164,7 @@
 
   }
 
-  private static class NotificationProcessingTask implements Runnable {
+  private class NotificationProcessingTask implements Runnable {
 
     Notification notification;
     NotificationFinder notificationFinder;
@@ -184,6 +184,8 @@
         // notification should be processed.
         if (notificationFinder.shouldProcess(notification)) {
           workTask.run();
+        } else {
+          notificationProcessed(notification);
         }
       } catch (Exception e) {
         log.error("Failed to process work " + Hex.encNonAscii(notification), e);
@@ -192,7 +194,7 @@
 
   }
 
-  private static class FutureNotificationTask extends FutureTask<Void> implements
+  private class FutureNotificationTask extends FutureTask<Void> implements
       Comparable<FutureNotificationTask> {
 
     private final Notification notification;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 76c6c66..4f028c1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -85,7 +85,13 @@
         atx = new TracingTransaction(atx, notification, observer.getClass(), observerId);
       }
 
-      observer.process(atx, notification.getRow(), notification.getColumn());
+      try {
+        observer.process(atx, notification.getRow(), notification.getColumn());
+      } catch (Exception e) {
+        notificationFinder.failedToProcess(notification, TxResult.ERROR);
+        notificationProcessor.notificationProcessed(notification);
+        throw e;
+      }
 
       CommitManager commitManager = env.getSharedResources().getCommitManager();
       commitManager.beginCommit(atx, observerId, new WorkTaskCommitObserver());
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index 1732a29..35f3215 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -100,30 +100,30 @@
         rangeData.keySet().retainAll(rangeSet);
 
         long minRetryTime = maxSleepTime + System.currentTimeMillis();
-        int notifications = 0;
+        ScanCounts ntfyCounts = new ScanCounts();
         int tabletsScanned = 0;
         try {
           for (TableRange tabletRange : ranges) {
             TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
             if (System.currentTimeMillis() >= tabletData.retryTime) {
-              int count = 0;
+              ScanCounts counts;
               PartitionInfo pi = partitionManager.getPartitionInfo();
               if (partition.equals(pi)) {
                 try (Session session =
                     proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
                   // notifications could have been asynchronously queued for deletion. Let that
-                  // happen
-                  // 1st before scanning
+                  // happen 1st before scanning
                   env.getSharedResources().getBatchWriter().waitForAsyncFlush();
 
-                  count = scan(session, partition, tabletRange.getRange());
+                  counts = scan(session, partition, tabletRange.getRange());
                   tabletsScanned++;
                 }
               } else {
                 break;
               }
-              tabletData.updateScanCount(count, maxSleepTime);
-              notifications += count;
+              tabletData.updateScanCount(counts.added, maxSleepTime);
+              ntfyCounts.added += counts.added;
+              ntfyCounts.seen += counts.seen;
               if (stopped.get()) {
                 break;
               }
@@ -139,8 +139,8 @@
 
         qSize = proccessor.size();
 
-        log.debug("Scanned {} of {} tablets, added {} new notifications (total queued {})",
-            tabletsScanned, ranges.size(), notifications, qSize);
+        log.debug("Scanned {} of {} tablets. Notifications added: {} seen: {} queued: {}",
+            tabletsScanned, ranges.size(), ntfyCounts.added, ntfyCounts.seen, qSize);
 
         if (!stopped.get()) {
           UtilWaitThread.sleep(sleepTime, stopped);
@@ -168,7 +168,13 @@
     return wasInt;
   }
 
-  private int scan(Session session, PartitionInfo pi, Range range) throws TableNotFoundException {
+  private static class ScanCounts {
+    int seen = 0;
+    int added = 0;
+  }
+
+  private ScanCounts scan(Session session, PartitionInfo pi, Range range)
+      throws TableNotFoundException {
     Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
 
     scanner.setRange(range);
@@ -179,7 +185,7 @@
     NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
     scanner.addScanIterator(iterCfg);
 
-    int count = 0;
+    ScanCounts counts = new ScanCounts();
 
     for (Entry<Key, Value> entry : scanner) {
       if (!pi.equals(partitionManager.getPartitionInfo())) {
@@ -187,13 +193,15 @@
       }
 
       if (stopped.get()) {
-        return count;
+        return counts;
       }
 
+      counts.seen++;
+
       if (session.addNotification(finder, Notification.from(entry.getKey()))) {
-        count++;
+        counts.added++;
       }
     }
-    return count;
+    return counts;
   }
 }