fixes #500 Made scanning for notifications scalable
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
index a9f5fd4..9af6feb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
@@ -16,7 +16,6 @@
 package org.apache.fluo.core.impl;
 
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.worker.finder.hash.ScanTask;
 
 /**
  * Contains implementation-related Fluo properties that should not be exposed in the API in
@@ -28,13 +27,16 @@
 
   public static final String ORACLE_PORT_PROP = FLUO_IMPL_PREFIX + ".oracle.port";
   public static final String WORKER_FINDER_PROP = FLUO_IMPL_PREFIX + ".worker.finder";
+  public static final String WORKER_PARTITION_GROUP_SIZE = FLUO_IMPL_PREFIX
+      + ".worker.finder.partition.groupSize";
+  public static final int WORKER_PARTITION_GROUP_SIZE_DEFAULT = 7;
   public static final String METRICS_RESERVOIR_PROP = FLUO_IMPL_PREFIX + ".metrics.reservoir";
-  public static final String MIN_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
-      + ScanTask.class.getSimpleName() + ".minSleep";
-  public static final int MIN_SLEEP_TIME_DEFAULT = 5000;
-  public static final String MAX_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
-      + ScanTask.class.getSimpleName() + ".maxSleep";
-  public static final int MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
+  public static final String NTFY_FINDER_MIN_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
+      + ".worker.finder.minSleep";
+  public static final int NTFY_FINDER_MIN_SLEEP_TIME_DEFAULT = 5000;
+  public static final String NTFY_FINDER_MAX_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
+      + ".worker.finder.maxSleep";
+  public static final int NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
 
   // Time period that each client will update ZK with their oldest active timestamp
   // If period is too short, Zookeeper may be overloaded. If too long, garbage collection
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
index 92b0382..3938c07 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
@@ -17,13 +17,13 @@
 
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
+import org.apache.fluo.core.worker.finder.hash.PartitionNotificationFinder;
 
 public class NotificationFinderFactory {
   public static NotificationFinder newNotificationFinder(FluoConfiguration conf) {
     String clazz =
         conf.getString(FluoConfigurationImpl.WORKER_FINDER_PROP,
-            HashNotificationFinder.class.getName());
+            PartitionNotificationFinder.class.getName());
     try {
       return Class.forName(clazz).asSubclass(NotificationFinder.class).newInstance();
     } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 56aa78d..3b6bba5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -16,13 +16,17 @@
 package org.apache.fluo.core.worker;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import com.codahale.metrics.Gauge;
 import org.apache.fluo.api.data.Column;
@@ -64,8 +68,12 @@
   // little utility class that tracks all notifications in queue
   private class NotificationTracker {
     private Map<RowColumn, Future<?>> queuedWork = new HashMap<>();
+    private Set<RowColumn> recentlyDeleted = new HashSet<>();
     private long sizeInBytes = 0;
+    private Map<Long, Predicate<RowColumn>> memoryPredicates = new HashMap<>();
+    private Predicate<RowColumn> memoryPredicate = rc -> false;
     private static final long MAX_SIZE = 1 << 24;
+    private long nextSessionId = 0;
 
     private long size(RowColumn rowCol) {
       Column col = rowCol.getColumn();
@@ -75,7 +83,7 @@
 
     public synchronized boolean add(RowColumn rowCol, Future<?> task) {
 
-      if (queuedWork.containsKey(rowCol)) {
+      if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
         return false;
       }
 
@@ -98,6 +106,9 @@
 
     public synchronized void remove(RowColumn rowCol) {
       if (queuedWork.remove(rowCol) != null) {
+        if (memoryPredicate.test(rowCol)) {
+          recentlyDeleted.add(rowCol);
+        }
         sizeInBytes -= size(rowCol);
         notify();
       }
@@ -123,6 +134,34 @@
       return true;
     }
 
+    private void resetMemoryPredicate() {
+      memoryPredicate = null;
+      for (Predicate<RowColumn> p : this.memoryPredicates.values()) {
+        if (memoryPredicate == null) {
+          memoryPredicate = p;
+        } else {
+          memoryPredicate = p.or(memoryPredicate);
+        }
+      }
+    }
+
+    public synchronized long beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
+      long sessionId = nextSessionId++;
+      this.memoryPredicates.put(sessionId, Objects.requireNonNull(memoryPredicate));
+      resetMemoryPredicate();
+      return sessionId;
+    }
+
+    public synchronized void finishAddingNotifications(long sessionId) {
+      this.memoryPredicates.remove(sessionId);
+      if (memoryPredicates.size() == 0) {
+        recentlyDeleted.clear();
+        memoryPredicate = rc -> false;
+      } else {
+        resetMemoryPredicate();
+      }
+    }
+
   }
 
   private static class NotificationProcessingTask implements Runnable {
@@ -176,25 +215,47 @@
     }
   }
 
-  public boolean addNotification(final NotificationFinder notificationFinder,
-      final Notification notification) {
+  public class Session implements AutoCloseable {
+    private long id;
 
-    WorkTaskAsync workTask =
-        new WorkTaskAsync(this, notificationFinder, env, notification, observers);
-    FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);
-
-    if (!tracker.add(notification.getRowColumn(), ft)) {
-      return false;
+    public Session(Predicate<RowColumn> memoryPredicate) {
+      this.id = tracker.beginAddingNotifications(memoryPredicate);
     }
 
-    try {
-      executor.execute(ft);
-    } catch (RejectedExecutionException rje) {
-      tracker.remove(notification.getRowColumn());
-      throw rje;
+    public boolean addNotification(final NotificationFinder notificationFinder,
+        final Notification notification) {
+
+      WorkTaskAsync workTask =
+          new WorkTaskAsync(NotificationProcessor.this, notificationFinder, env, notification,
+              observers);
+      FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);
+
+      if (!tracker.add(notification.getRowColumn(), ft)) {
+        return false;
+      }
+
+      try {
+        executor.execute(ft);
+      } catch (RejectedExecutionException rje) {
+        tracker.remove(notification.getRowColumn());
+        throw rje;
+      }
+
+      return true;
     }
 
-    return true;
+    public void close() {
+      tracker.finishAddingNotifications(id);
+    }
+  }
+
+  /**
+   * Starts a session for adding notifications. During this session, any notifications that are
+   * deleted and match the predicate will be remembered. These remembered notifications can not be
+   * added again while the session is active.
+   */
+  public Session beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
+    return new Session(memoryPredicate);
   }
 
   public void requeueNotification(final NotificationFinder notificationFinder,
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java b/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java
deleted file mode 100644
index 4d17965..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Range;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.hadoop.io.Text;
-
-public class TabletInfoCache<T, S extends Supplier<T>> {
-  private static final long CACHE_TIME = 5 * 60 * 1000;
-
-  private final Environment env;
-  private List<TabletInfo<T>> cachedTablets;
-  private long listSplitsTime = 0;
-  private S supplier;
-
-  public static class TabletInfo<T> {
-    private final Text start;
-    private final Text end;
-    private T data;
-
-    TabletInfo(Text start, Text end, T data) {
-      this.start = start;
-      this.end = end;
-      this.data = data;
-    }
-
-    private int hashCode(Text t) {
-      if (t == null) {
-        return 0;
-      }
-      return t.hashCode();
-    }
-
-    @Override
-    public int hashCode() {
-      return hashCode(start) + hashCode(end);
-    }
-
-    private boolean equals(Text t1, Text t2) {
-      if (t1 == t2) {
-        return true;
-      }
-
-      if (t1 == null || t2 == null) {
-        return false;
-      }
-
-      return t1.equals(t2);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof TabletInfo) {
-        @SuppressWarnings("rawtypes")
-        TabletInfo oti = (TabletInfo) o;
-
-        if (equals(start, oti.start)) {
-          return equals(end, oti.end);
-        }
-
-        return false;
-      }
-
-      return false;
-    }
-
-    public Text getStart() {
-      return start;
-    }
-
-    public Text getEnd() {
-      return end;
-    }
-
-    public T getData() {
-      return data;
-    }
-
-    public Range getRange() {
-      return new Range(start, false, end, true);
-    }
-  }
-
-  public TabletInfoCache(Environment env, S supplier) {
-    this.env = env;
-    this.supplier = supplier;
-  }
-
-  private List<TabletInfo<T>> listSplits() throws TableNotFoundException,
-      AccumuloSecurityException, AccumuloException {
-    List<Text> splits =
-        new ArrayList<>(env.getConnector().tableOperations().listSplits(env.getTable()));
-    Collections.sort(splits);
-
-    List<TabletInfo<T>> tablets = new ArrayList<>(splits.size() + 1);
-    for (int i = 0; i < splits.size(); i++) {
-      tablets
-          .add(new TabletInfo<>(i == 0 ? null : splits.get(i - 1), splits.get(i), supplier.get()));
-    }
-
-    tablets.add(new TabletInfo<>(splits.size() == 0 ? null : splits.get(splits.size() - 1), null,
-        supplier.get()));
-    listSplitsTime = System.currentTimeMillis();
-    return tablets;
-  }
-
-  public synchronized List<TabletInfo<T>> getTablets() throws Exception {
-    if (cachedTablets == null) {
-      cachedTablets = listSplits();
-    } else if (System.currentTimeMillis() - listSplitsTime > CACHE_TIME) {
-      List<TabletInfo<T>> tablets = listSplits();
-      Map<TabletInfo<T>, TabletInfo<T>> oldTablets = new HashMap<>();
-      for (TabletInfo<T> tabletInfo : cachedTablets) {
-        oldTablets.put(tabletInfo, tabletInfo);
-      }
-
-      List<TabletInfo<T>> newTablets = new ArrayList<>(tablets.size());
-
-      for (TabletInfo<T> tabletInfo : tablets) {
-        TabletInfo<T> oldTI = oldTablets.get(tabletInfo);
-        if (oldTI != null) {
-          newTablets.add(oldTI);
-        } else {
-          newTablets.add(tabletInfo);
-        }
-      }
-
-      cachedTablets = newTablets;
-    }
-
-    return Collections.unmodifiableList(cachedTablets);
-  }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
deleted file mode 100644
index e18ec2f..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker.finder.hash;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.data.ArrayByteSequence;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
-import org.apache.fluo.accumulo.util.NotificationUtil;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.Notification;
-import org.apache.fluo.core.util.ByteUtil;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.NotificationProcessor;
-import org.apache.fluo.core.worker.TxResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HashNotificationFinder implements NotificationFinder {
-
-  private NotificationProcessor notificationProcessor;
-  private CuratorFramework curator;
-  private List<String> finders = Collections.emptyList();
-  private int updates = 0;
-  private ModulusParams modParams;
-  private Environment env;
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private Thread scanThread;
-  private PathChildrenCache childrenCache;
-  private PersistentEphemeralNode myESNode;
-
-  private static final Logger log = LoggerFactory.getLogger(HashNotificationFinder.class);
-
-  static class ModParamsChangedException extends RuntimeException {
-    private static final long serialVersionUID = 1L;
-  }
-
-  private class FindersListener implements PathChildrenCacheListener {
-
-    @Override
-    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
-      switch (event.getType()) {
-        case CHILD_ADDED:
-        case CHILD_REMOVED:
-          if (!stopped.get()) {
-            updateFinders();
-          }
-          break;
-        case CHILD_UPDATED:
-          log.warn("unexpected event " + event);
-          break;
-        default:
-          break;
-
-      }
-    }
-
-  }
-
-  private synchronized void updateFinders() {
-
-    String me = myESNode.getActualPath();
-    while (me == null) {
-      UtilWaitThread.sleep(100);
-      me = myESNode.getActualPath();
-    }
-    me = ZKPaths.getNodeFromPath(me);
-
-    List<String> children = new ArrayList<>();
-    for (ChildData childData : childrenCache.getCurrentData()) {
-      children.add(ZKPaths.getNodeFromPath(childData.getPath()));
-    }
-
-    Collections.sort(children);
-
-    if (!finders.equals(children)) {
-      int index = children.indexOf(me);
-      if (index == -1) {
-        this.modParams = null;
-        finders = Collections.emptyList();
-        log.debug("Did not find self in list of finders " + me);
-      } else {
-        updates++;
-        this.modParams = new ModulusParams(children.indexOf(me), children.size(), updates);
-        finders = children;
-        log.debug("updated modulus params " + modParams.remainder + " " + modParams.divisor);
-      }
-    }
-  }
-
-  synchronized ModulusParams getModulusParams() {
-    return modParams;
-  }
-
-  @Override
-  public void init(Environment env, NotificationProcessor notificationProcessor) {
-    Preconditions.checkState(this.notificationProcessor == null);
-
-    this.notificationProcessor = notificationProcessor;
-
-    this.env = env;
-    this.curator = env.getSharedResources().getCurator();
-
-    try {
-      myESNode =
-          new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
-              + "/f-", new byte[0]);
-      myESNode.start();
-      myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);
-
-      childrenCache =
-          new PathChildrenCache(env.getSharedResources().getCurator(), ZookeeperPath.FINDERS, false);
-      childrenCache.getListenable().addListener(new FindersListener());
-      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
-
-      updateFinders();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void start() {
-    scanThread = new Thread(new ScanTask(this, env, stopped));
-    scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
-    scanThread.setDaemon(true);
-    scanThread.start();
-  }
-
-  @Override
-  public void stop() {
-    stopped.set(true);
-    try {
-      childrenCache.close();
-    } catch (IOException e1) {
-      log.warn("Failed to close children cache", e1);
-    }
-
-    try {
-      myESNode.close();
-    } catch (IOException e1) {
-      log.warn("Failed to close ephemeral node", e1);
-    }
-
-    scanThread.interrupt();
-    try {
-      scanThread.join();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void failedToProcess(Notification notification, TxResult status) {}
-
-  NotificationProcessor getWorkerQueue() {
-    return notificationProcessor;
-  }
-
-  @VisibleForTesting
-  static boolean shouldProcess(Notification notification, int divisor, int remainder) {
-    byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn());
-    return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()),
-        new ArrayByteSequence(cfcq), divisor, remainder);
-  }
-
-  @Override
-  public boolean shouldProcess(Notification notification) {
-    ModulusParams mp = getModulusParams();
-    return shouldProcess(notification, mp.divisor, mp.remainder);
-  }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java
new file mode 100644
index 0000000..3ebb01e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
+import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.impl.FluoConfigurationImpl;
+import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.FluoThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class manages partitioning of notifications across workers coordinating in ZooKeeper.
+ * Workers are divided into groups. Each group is given a subset of the Accumulo table. All workers
+ * in a group scan that subset and use hash partitioning to equally divide notifications.
+ *
+ * <p>
+ * Grouping workers was a compromise between every worker scanning the entire table OR each worker
+ * having a dedicated part of a table. This scheme allows multiple workers to share popular parts of
+ * a table. However, it limits the number of workers that will scan a portion of a table for
+ * notifications. This limitation is important for scaling, even if there are 1,000 workers there
+ * will never be more than 7 to 13 workers scanning a portion of the table.
+ */
+public class ParitionManager {
+
+  private static final Logger log = LoggerFactory.getLogger(ParitionManager.class);
+
+  private final PathChildrenCache childrenCache;
+  private final PersistentEphemeralNode myESNode;
+  private final int groupSize;
+  private long paritionSetTime;
+  private PartitionInfo partitionInfo;
+  private final ScheduledExecutorService schedExecutor;
+
+  private CuratorFramework curator;
+
+  private Environment env;
+
+  private final long minSleepTime;
+  private final long maxSleepTime;
+  private long retrySleepTime;
+
+  private static final long STABILIZE_TIME = TimeUnit.SECONDS.toMillis(60);
+
+  private class FindersListener implements PathChildrenCacheListener {
+
+    @Override
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+      switch (event.getType()) {
+        case CHILD_ADDED:
+        case CHILD_REMOVED:
+        case CHILD_UPDATED:
+          scheduleUpdate();
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
+  static PartitionInfo getGroupInfo(String me, SortedSet<String> children,
+      Collection<TableRange> tablets, int groupSize) {
+
+    int numGroups = Math.max(1, children.size() / groupSize);
+    int[] groupSizes = new int[numGroups];
+    int count = 0;
+    int myGroupId = -1;
+    int myId = -1;
+
+    for (String child : children) {
+      if (child.equals(me)) {
+        myGroupId = count;
+        myId = groupSizes[count];
+      }
+      groupSizes[count]++;
+      count = (count + 1) % numGroups;
+    }
+
+    List<TableRange> rangesCopy = new ArrayList<>(tablets);
+    Collections.sort(rangesCopy);
+
+    // The behavior of Random with a given seed and shuffle are the same across different versions
+    // of java. Both specify the algorithms in their javadoc and are meant to behave the same across
+    // versions. This is important because different workers may be running different versions of
+    // Java, but all workers need to do the same shuffle.
+    //
+    // Did try to use hashing to partition the tablets among groups, but it was slightly uneven. One
+    // group having a 10% more tablets would lead to uneven utilization.
+    Collections.shuffle(rangesCopy, new Random(42));
+
+    List<TableRange> groupsTablets = new ArrayList<>();
+
+    count = 0;
+    for (TableRange tr : rangesCopy) {
+      if (count == myGroupId) {
+        groupsTablets.add(tr);
+      }
+      count = (count + 1) % numGroups;
+    }
+
+    return new PartitionInfo(myId, myGroupId, groupSizes[myGroupId], numGroups, children.size(),
+        groupsTablets);
+  }
+
+  private void updatePartitionInfo() {
+    try {
+      String me = myESNode.getActualPath();
+      while (me == null) {
+        Thread.sleep(100);
+        me = myESNode.getActualPath();
+      }
+      me = ZKPaths.getNodeFromPath(me);
+
+      byte[] zkSplitData = null;
+      SortedSet<String> children = new TreeSet<>();
+      Set<String> groupSizes = new HashSet<>();
+      for (ChildData childData : childrenCache.getCurrentData()) {
+        String node = ZKPaths.getNodeFromPath(childData.getPath());
+        if (node.equals("splits")) {
+          zkSplitData = childData.getData();
+        } else {
+          children.add(node);
+          groupSizes.add(new String(childData.getData(), UTF_8));
+        }
+      }
+
+      if (zkSplitData == null) {
+        log.info("Did not find splits in zookeeper, will retry later.");
+        setPartitionInfo(null); // disable this worker from processing notifications
+        scheduleRetry();
+        return;
+      }
+
+      if (!children.contains(me)) {
+        log.warn("Did not see self (" + me
+            + "), cannot gather tablet and notification partitioning info.");
+        setPartitionInfo(null); // disable this worker from processing notifications
+        scheduleRetry();
+        return;
+      }
+
+      // ensure all workers agree on the group size
+      if (groupSizes.size() != 1 || !groupSizes.contains(groupSize + "")) {
+        log.warn("Group size disagreement " + groupSize + " " + groupSizes
+            + ", cannot gather tablet and notification partitioning info.");
+        setPartitionInfo(null); // disable this worker from processing notifications
+        scheduleRetry();
+        return;
+      }
+
+      List<Bytes> zkSplits = new ArrayList<>();
+      SerializedSplits.deserialize(zkSplits::add, zkSplitData);
+
+      Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits);
+      PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);
+
+      setPartitionInfo(newPI);
+    } catch (InterruptedException e) {
+      log.debug("Interrupted while gathering tablet and notification partitioning info.", e);
+    } catch (Exception e) {
+      log.warn("Problem gathering tablet and notification partitioning info.", e);
+      setPartitionInfo(null); // disable this worker from processing notifications
+      scheduleRetry();
+    }
+  }
+
+  private synchronized void scheduleRetry() {
+    schedExecutor.schedule(this::updatePartitionInfo, retrySleepTime, TimeUnit.MILLISECONDS);
+    retrySleepTime =
+        Math.min(maxSleepTime,
+            (long) (1.5 * retrySleepTime) + (long) (retrySleepTime * Math.random()));
+  }
+
+  private synchronized void scheduleUpdate() {
+    schedExecutor.schedule(this::updatePartitionInfo, 0, TimeUnit.MILLISECONDS);
+  }
+
+  private class CheckTabletsTask implements Runnable {
+    @Override
+    public void run() {
+      try {
+
+        String me = myESNode.getActualPath();
+        while (me == null) {
+          Thread.sleep(100);
+          me = myESNode.getActualPath();
+        }
+        me = ZKPaths.getNodeFromPath(me);
+
+        String me2 = me;
+        boolean imFirst =
+            childrenCache.getCurrentData().stream().map(ChildData::getPath)
+                .map(ZKPaths::getNodeFromPath).sorted().findFirst().map(s -> s.equals(me2))
+                .orElse(false);
+
+        if (imFirst) {
+
+          ChildData childData = childrenCache.getCurrentData(ZookeeperPath.FINDERS + "/splits");
+          if (childData == null) {
+            byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
+
+            curator.create().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
+          } else {
+            HashSet<Bytes> zkSplits = new HashSet<>();
+            SerializedSplits.deserialize(zkSplits::add, childData.getData());
+
+            HashSet<Bytes> currentSplits = new HashSet<>();
+            byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
+            SerializedSplits.deserialize(currentSplits::add, currSplitData);
+
+            if (!currentSplits.equals(zkSplits)) {
+              curator.setData().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        log.debug("Interrupted while checking table split points.", e);
+      } catch (Exception e) {
+        log.warn("Failed to checking table split points", e);
+      }
+    }
+  }
+
+  ParitionManager(Environment env, long minSleepTime, long maxSleepTime) {
+    try {
+      this.curator = env.getSharedResources().getCurator();
+      this.env = env;
+
+      this.minSleepTime = minSleepTime;
+      this.maxSleepTime = maxSleepTime;
+      this.retrySleepTime = minSleepTime;
+
+      groupSize =
+          env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE,
+              FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT);
+
+      myESNode =
+          new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
+              + "/f-", ("" + groupSize).getBytes(UTF_8));
+      myESNode.start();
+      myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);
+
+      childrenCache = new PathChildrenCache(curator, ZookeeperPath.FINDERS, true);
+      childrenCache.getListenable().addListener(new FindersListener());
+      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
+
+      schedExecutor =
+          Executors.newScheduledThreadPool(1,
+              new FluoThreadFactory("Fluo worker partition manager"));
+      schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime,
+          TimeUnit.MILLISECONDS);
+
+      scheduleUpdate();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void setPartitionInfo(PartitionInfo pi) {
+    synchronized (this) {
+      if (!Objects.equals(pi, this.partitionInfo)) {
+        log.debug("Updated finder partition info : " + pi);
+        this.paritionSetTime = System.nanoTime();
+        this.partitionInfo = pi;
+        this.notifyAll();
+      }
+
+      if (pi != null) {
+        retrySleepTime = minSleepTime;
+      }
+    }
+  }
+
+  private long getTimeSincePartitionChange() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - paritionSetTime);
+  }
+
+  synchronized PartitionInfo waitForPartitionInfo() throws InterruptedException {
+    while (partitionInfo == null
+        || getTimeSincePartitionChange() < Math.min(maxSleepTime, STABILIZE_TIME)) {
+      wait(minSleepTime);
+    }
+
+    return partitionInfo;
+  }
+
+  synchronized PartitionInfo getPartitionInfo() {
+    if (getTimeSincePartitionChange() < Math.min(maxSleepTime, STABILIZE_TIME)) {
+      return null;
+    }
+
+    return partitionInfo;
+  }
+
+  public void stop() {
+    try {
+      myESNode.close();
+    } catch (IOException e) {
+      log.debug("Error closing finder ephemeral node", e);
+    }
+    try {
+      childrenCache.close();
+    } catch (IOException e) {
+      log.debug("Error closing finder children cache", e);
+    }
+
+    schedExecutor.shutdownNow();
+  }
+
+  @VisibleForTesting
+  static boolean shouldProcess(Notification notification, int divisor, int remainder) {
+    byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn());
+    return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()),
+        new ArrayByteSequence(cfcq), divisor, remainder);
+  }
+
+  public boolean shouldProcess(Notification notification) {
+    PartitionInfo pi = getPartitionInfo();
+    if (pi == null) {
+      return false;
+    }
+
+    return pi.getMyGroupsRanges().getContaining(notification.getRow()) != null
+        && shouldProcess(notification, pi.getMyGroupSize(), pi.getMyIdInGroup());
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
new file mode 100644
index 0000000..e33b28b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.List;
+
+class PartitionInfo {
+
+  private final int myGroupId;
+  private final int myIdInGroup;
+  private final int numGroups;
+  private final int myGroupSize;
+  private final int numWorkers;
+  private final RangeSet myGroupsRanges;
+
+  PartitionInfo(int myId, int myGroupId, int myGroupSize, int totalGroups, int totalWorkers,
+      List<TableRange> groupsRanges) {
+    this.myIdInGroup = myId;
+    this.myGroupId = myGroupId;
+    this.myGroupSize = myGroupSize;
+    this.numGroups = totalGroups;
+    this.numWorkers = totalWorkers;
+    this.myGroupsRanges = new RangeSet(groupsRanges);
+  }
+
+  /**
+   * @return The id for the group this worker is in.
+   */
+  public int getMyGroupId() {
+    return myGroupId;
+  }
+
+  /**
+   * @return The id for this worker within its group.
+   */
+  public int getMyIdInGroup() {
+    return myIdInGroup;
+  }
+
+  /**
+   * @return The total number of workers groups there are.
+   */
+  public int getNumGroups() {
+    return numGroups;
+  }
+
+  /**
+   * @return The number of workers in the group this workers is in.
+   */
+  public int getMyGroupSize() {
+    return myGroupSize;
+  }
+
+  /**
+   * @return the total number of workers.
+   */
+  public int getNumWorkers() {
+    return numWorkers;
+  }
+
+  /**
+   * @return the table ranges associated with the group this workers is in.
+   */
+  public RangeSet getMyGroupsRanges() {
+    return myGroupsRanges;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof PartitionInfo) {
+      PartitionInfo other = (PartitionInfo) o;
+      return other.myGroupId == myGroupId && other.myIdInGroup == myIdInGroup
+          && other.numGroups == numGroups && other.myGroupSize == myGroupSize
+          && other.numWorkers == numWorkers && other.myGroupsRanges.equals(myGroupsRanges);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "workers:%d  groups:%d  groupSize:%d  groupId:%d  idInGroup:%d  #tablets:%d", numWorkers,
+        numGroups, myGroupSize, myGroupId, myIdInGroup, myGroupsRanges.size());
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
similarity index 82%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java
rename to modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
index cbad8b9..35c3f9b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
@@ -15,14 +15,6 @@
 
 package org.apache.fluo.core.worker.finder.hash;
 
-class ModulusParams {
-  int remainder;
-  int divisor;
-  int update;
-
-  ModulusParams(int r, int d, int u) {
-    this.remainder = r;
-    this.divisor = d;
-    this.update = u;
-  }
+public class PartitionInfoChangedException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
new file mode 100644
index 0000000..c940850
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.impl.FluoConfigurationImpl;
+import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.worker.NotificationFinder;
+import org.apache.fluo.core.worker.NotificationProcessor;
+import org.apache.fluo.core.worker.TxResult;
+
+public class PartitionNotificationFinder implements NotificationFinder {
+
+  private ParitionManager paritionManager;
+  private Thread scanThread;
+  private NotificationProcessor processor;
+  private Environment env;
+  private AtomicBoolean stopped;
+
+  @Override
+  public void init(Environment env, NotificationProcessor processor) {
+    this.processor = processor;
+    this.env = env;
+    this.stopped = new AtomicBoolean(false);
+
+  }
+
+  @Override
+  public void start() {
+    long minSleepTime =
+        env.getConfiguration().getInt(FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_PROP,
+            FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_DEFAULT);
+    long maxSleepTime =
+        env.getConfiguration().getInt(FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_PROP,
+            FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT);
+
+    paritionManager = new ParitionManager(env, minSleepTime, maxSleepTime);
+
+    scanThread =
+        new Thread(new ScanTask(this, processor, paritionManager, env, stopped, minSleepTime,
+            maxSleepTime));
+    scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
+    scanThread.setDaemon(true);
+    scanThread.start();
+  }
+
+  @Override
+  public void stop() {
+    stopped.set(true);
+
+    scanThread.interrupt();
+    try {
+      scanThread.join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    paritionManager.stop();
+  }
+
+  @Override
+  public boolean shouldProcess(Notification notification) {
+    return paritionManager.shouldProcess(notification);
+  }
+
+  @Override
+  public void failedToProcess(Notification notification, TxResult status) {}
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
new file mode 100644
index 0000000..40dc5e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import org.apache.fluo.api.data.Bytes;
+
+public class RangeSet {
+  private TreeMap<Bytes, TableRange> tmap;
+  private TableRange lastRange;
+
+  public RangeSet(List<TableRange> ranges) {
+    tmap = new TreeMap<>();
+
+    for (TableRange tablet : ranges) {
+      if (tablet.getEndRow() == null) {
+        lastRange = tablet;
+      } else {
+        tmap.put(tablet.getEndRow(), tablet);
+      }
+    }
+  }
+
+  public TableRange getContaining(Bytes row) {
+    Entry<Bytes, TableRange> entry = tmap.ceilingEntry(row);
+    if (entry != null) {
+      if (entry.getValue().contains(row)) {
+        return entry.getValue();
+      }
+    } else if (lastRange != null) {
+      if (lastRange.contains(row)) {
+        return lastRange;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof RangeSet) {
+      RangeSet ots = (RangeSet) o;
+
+      if (tmap.size() != ots.tmap.size()) {
+        return false;
+      }
+
+      for (Entry<Bytes, TableRange> entry : tmap.entrySet()) {
+        TableRange otr = ots.tmap.get(entry.getKey());
+        if (!Objects.equals(entry.getValue(), otr)) {
+          return false;
+        }
+      }
+
+      return Objects.equals(lastRange, ots.lastRange);
+    }
+    return false;
+  }
+
+  public int size() {
+    return tmap.size() + (lastRange == null ? 0 : 1);
+  }
+
+  public void forEach(Consumer<TableRange> trc) {
+    if (lastRange != null) {
+      trc.accept(lastRange);
+    }
+    tmap.values().forEach(trc);
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index e0fa791..1732a29 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -17,11 +17,14 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
@@ -31,12 +34,11 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
 import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.TabletInfoCache;
-import org.apache.fluo.core.worker.TabletInfoCache.TabletInfo;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder.ModParamsChangedException;
+import org.apache.fluo.core.worker.NotificationFinder;
+import org.apache.fluo.core.worker.NotificationProcessor;
+import org.apache.fluo.core.worker.NotificationProcessor.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,91 +46,101 @@
 
   private static final Logger log = LoggerFactory.getLogger(ScanTask.class);
 
-  private final HashNotificationFinder hwf;
+  private final NotificationFinder finder;
+  private final ParitionManager partitionManager;
+  private final NotificationProcessor proccessor;
   private final Random rand = new Random();
   private final AtomicBoolean stopped;
-  private final TabletInfoCache<TabletData, Supplier<TabletData>> tabletInfoCache;
+  private final Map<TableRange, TabletData> rangeData;
   private final Environment env;
 
-  static long STABILIZE_TIME = 10 * 1000;
-
   private long minSleepTime;
   private long maxSleepTime;
 
-  ScanTask(HashNotificationFinder hashWorkFinder, Environment env, AtomicBoolean stopped) {
-    this.hwf = hashWorkFinder;
-    this.tabletInfoCache = new TabletInfoCache<>(env, new Supplier<TabletData>() {
-      @Override
-      public TabletData get() {
-        return new TabletData();
-      }
-    });
+  ScanTask(NotificationFinder finder, NotificationProcessor proccessor,
+      ParitionManager partitionManager, Environment env, AtomicBoolean stopped, long minSleepTime,
+      long maxSleepTime) {
+    this.finder = finder;
+    this.rangeData = new HashMap<>();
+
     this.env = env;
     this.stopped = stopped;
 
-    minSleepTime =
-        env.getConfiguration().getInt(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP,
-            FluoConfigurationImpl.MIN_SLEEP_TIME_DEFAULT);
-    maxSleepTime =
-        env.getConfiguration().getInt(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP,
-            FluoConfigurationImpl.MAX_SLEEP_TIME_DEFAULT);
+    this.proccessor = proccessor;
+    this.partitionManager = partitionManager;
+
+    this.minSleepTime = minSleepTime;
+    this.maxSleepTime = maxSleepTime;
   }
 
   @Override
   public void run() {
 
-    int qSize = hwf.getWorkerQueue().size();
+
+    List<TableRange> ranges = new ArrayList<>();
+    Set<TableRange> rangeSet = new HashSet<>();
+
+    int qSize = proccessor.size();
 
     while (!stopped.get()) {
       try {
+        PartitionInfo partition = partitionManager.waitForPartitionInfo();
 
-        while (hwf.getWorkerQueue().size() > qSize / 2 && !stopped.get()) {
+        while (proccessor.size() > qSize / 2 && !stopped.get()) {
           UtilWaitThread.sleep(50, stopped);
         }
 
-        // break scan work into a lot of ranges that are randomly ordered. This has a few benefits.
-        // Ensures different workers are scanning different tablets.
-        // Allows checking local state more frequently in the case where work is not present in many
-        // tablets. Allows less frequent scanning of tablets that are
-        // usually empty.
-        List<TabletInfo<TabletData>> tablets = new ArrayList<>(tabletInfoCache.getTablets());
-        Collections.shuffle(tablets, rand);
+        ranges.clear();
+        rangeSet.clear();
+        partition.getMyGroupsRanges().forEach(t -> {
+          ranges.add(t);
+          rangeSet.add(t);
+        });
+        Collections.shuffle(ranges, rand);
+        rangeData.keySet().retainAll(rangeSet);
 
         long minRetryTime = maxSleepTime + System.currentTimeMillis();
         int notifications = 0;
         int tabletsScanned = 0;
         try {
-          for (TabletInfo<TabletData> tabletInfo : tablets) {
-            if (System.currentTimeMillis() >= tabletInfo.getData().retryTime) {
+          for (TableRange tabletRange : ranges) {
+            TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
+            if (System.currentTimeMillis() >= tabletData.retryTime) {
               int count = 0;
-              ModulusParams modParams = hwf.getModulusParams();
-              if (modParams != null) {
-                // notifications could have been asynchronously queued for deletion. Let that happen
-                // 1st before scanning
-                env.getSharedResources().getBatchWriter().waitForAsyncFlush();
-                count = scan(modParams, tabletInfo.getRange());
-                tabletsScanned++;
+              PartitionInfo pi = partitionManager.getPartitionInfo();
+              if (partition.equals(pi)) {
+                try (Session session =
+                    proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
+                  // notifications could have been asynchronously queued for deletion. Let that
+                  // happen
+                  // 1st before scanning
+                  env.getSharedResources().getBatchWriter().waitForAsyncFlush();
+
+                  count = scan(session, partition, tabletRange.getRange());
+                  tabletsScanned++;
+                }
+              } else {
+                break;
               }
-              tabletInfo.getData().updateScanCount(count, maxSleepTime);
+              tabletData.updateScanCount(count, maxSleepTime);
               notifications += count;
               if (stopped.get()) {
                 break;
               }
             }
 
-            minRetryTime = Math.min(tabletInfo.getData().retryTime, minRetryTime);
+            minRetryTime = Math.min(tabletData.retryTime, minRetryTime);
           }
-        } catch (ModParamsChangedException mpce) {
-          hwf.getWorkerQueue().clear();
-          waitForFindersToStabilize();
+        } catch (PartitionInfoChangedException mpce) {
+          // nothing to do
         }
 
         long sleepTime = Math.max(minSleepTime, minRetryTime - System.currentTimeMillis());
 
-        qSize = hwf.getWorkerQueue().size();
+        qSize = proccessor.size();
 
         log.debug("Scanned {} of {} tablets, added {} new notifications (total queued {})",
-            tabletsScanned, tablets.size(), notifications, qSize);
+            tabletsScanned, ranges.size(), notifications, qSize);
 
         if (!stopped.get()) {
           UtilWaitThread.sleep(sleepTime, stopped);
@@ -141,7 +153,6 @@
           log.error("Error while looking for notifications", e);
         }
       }
-
     }
   }
 
@@ -157,7 +168,7 @@
     return wasInt;
   }
 
-  private int scan(ModulusParams lmp, Range range) throws TableNotFoundException {
+  private int scan(Session session, PartitionInfo pi, Range range) throws TableNotFoundException {
     Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
 
     scanner.setRange(range);
@@ -165,38 +176,24 @@
     Notification.configureScanner(scanner);
 
     IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
-    NotificationHashFilter.setModulusParams(iterCfg, lmp.divisor, lmp.remainder);
+    NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
     scanner.addScanIterator(iterCfg);
 
     int count = 0;
 
     for (Entry<Key, Value> entry : scanner) {
-      if (lmp.update != hwf.getModulusParams().update) {
-        throw new HashNotificationFinder.ModParamsChangedException();
+      if (!pi.equals(partitionManager.getPartitionInfo())) {
+        throw new PartitionInfoChangedException();
       }
 
       if (stopped.get()) {
         return count;
       }
 
-      if (hwf.getWorkerQueue().addNotification(hwf, Notification.from(entry.getKey()))) {
+      if (session.addNotification(finder, Notification.from(entry.getKey()))) {
         count++;
       }
     }
     return count;
   }
-
-  private void waitForFindersToStabilize() {
-    ModulusParams lmp = hwf.getModulusParams();
-    long startTime = System.currentTimeMillis();
-
-    while (System.currentTimeMillis() - startTime < STABILIZE_TIME) {
-      UtilWaitThread.sleep(500, stopped);
-      ModulusParams lmp2 = hwf.getModulusParams();
-      if (lmp.update != lmp2.update) {
-        startTime = System.currentTimeMillis();
-        lmp = lmp2;
-      }
-    }
-  }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
new file mode 100644
index 0000000..53ad7ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class SerializedSplits {
+
+  static final int MAX_SIZE = 1 << 18;
+
+  public static void deserialize(Consumer<Bytes> splitConsumer, byte[] serializedSplits) {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(serializedSplits);
+      GZIPInputStream gzis = new GZIPInputStream(bais);
+      DataInputStream dis = new DataInputStream(gzis);
+
+      int numSplits = dis.readInt();
+
+      BytesBuilder builder = Bytes.builder();
+
+      for (int i = 0; i < numSplits; i++) {
+        int len = dis.readInt();
+        builder.setLength(0);
+        builder.append(dis, len);
+        splitConsumer.accept(builder.toBytes());
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private static byte[] serializeInternal(List<Bytes> splits) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    GZIPOutputStream gzOut = new GZIPOutputStream(baos);
+    BufferedOutputStream bos = new BufferedOutputStream(gzOut, 1 << 16);
+    DataOutputStream dos = new DataOutputStream(bos);
+
+    dos.writeInt(splits.size());
+    for (Bytes split : splits) {
+      dos.writeInt(split.length());
+      split.writeTo(dos);
+    }
+
+    dos.close();
+
+    return baos.toByteArray();
+  }
+
+  public static byte[] serialize(Collection<Bytes> splits) {
+    List<Bytes> splitsCopy = new ArrayList<>(splits);
+    Collections.sort(splitsCopy);
+
+    try {
+      byte[] serialized = serializeInternal(splitsCopy);
+
+      while (serialized.length > MAX_SIZE) {
+        List<Bytes> splitsCopy2 = new ArrayList<>(splitsCopy.size() / 2 + 1);
+        for (int i = 0; i < splitsCopy.size(); i += 2) {
+          splitsCopy2.add(splitsCopy.get(i));
+        }
+
+        splitsCopy = splitsCopy2;
+        serialized = serializeInternal(splitsCopy);
+      }
+
+      return serialized;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public static byte[] serializeTableSplits(Environment env) {
+    List<Bytes> splits;
+    try {
+      splits =
+          env.getConnector().tableOperations().listSplits(env.getTable()).stream()
+              .map(ByteUtil::toBytes).collect(Collectors.toList());
+    } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
+      throw new RuntimeException(e);
+    }
+    return serialize(splits);
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
new file mode 100644
index 0000000..30a97f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.core.util.ByteUtil;
+import org.apache.hadoop.io.Text;
+
+import static java.util.stream.Collectors.toList;
+
+public class TableRange implements Comparable<TableRange> {
+  private final Bytes prevEndRow;
+  private final Bytes endRow;
+  private final int hc;
+
+  public TableRange(Bytes per, Bytes er) {
+    this.prevEndRow = per;
+    this.endRow = er;
+    this.hc = Objects.hash(this.prevEndRow, this.endRow);
+  }
+
+  @Override
+  public int hashCode() {
+    return hc;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof TableRange) {
+      TableRange ot = (TableRange) o;
+      return Objects.equals(prevEndRow, ot.prevEndRow) && Objects.equals(endRow, ot.endRow);
+    }
+
+    return false;
+  }
+
+  public Bytes getPrevEndRow() {
+    return prevEndRow;
+  }
+
+  public Bytes getEndRow() {
+    return endRow;
+  }
+
+  public boolean contains(Bytes row) {
+    return (prevEndRow == null || row.compareTo(prevEndRow) > 0)
+        && (endRow == null || row.compareTo(endRow) <= 0);
+  }
+
+  @Override
+  public String toString() {
+    return getPrevEndRow() + " " + getEndRow();
+  }
+
+
+  public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) {
+    List<Bytes> sortedRows = rows.stream().sorted().collect(toList());
+    List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1);
+    for (int i = 0; i < sortedRows.size(); i++) {
+      tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), sortedRows.get(i)));
+    }
+
+    tablets.add(new TableRange(sortedRows.size() == 0 ? null
+        : sortedRows.get(sortedRows.size() - 1), null));
+    return tablets;
+  }
+
+
+
+  public Range getRange() {
+    Text tper = Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null);
+    Text ter = Optional.ofNullable(endRow).map(ByteUtil::toText).orElse(null);
+    return new Range(tper, false, ter, true);
+  }
+
+  @Override
+  public int compareTo(TableRange o) {
+    if (Objects.equals(getEndRow(), o.getEndRow())) {
+      // this will catch case of both null
+      return 0;
+    }
+
+    if (getEndRow() == null) {
+      return 1;
+    }
+
+    if (o.getEndRow() == null) {
+      return -1;
+    }
+
+    return getEndRow().compareTo(o.getEndRow());
+  }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
index 6790e54..b3496dd 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
@@ -63,7 +63,7 @@
     byte[] cfcq = NotificationUtil.encodeCol(col);
     Key k = new Key(row, ColumnConstants.NOTIFY_CF.toArray(), cfcq, new byte[0], 6);
     boolean accept = NotificationHashFilter.accept(k, 7, 3);
-    Assert.assertEquals(accept, HashNotificationFinder.shouldProcess(Notification.from(k), 7, 3));
+    Assert.assertEquals(accept, ParitionManager.shouldProcess(Notification.from(k), 7, 3));
     return accept;
   }
 }
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
new file mode 100644
index 0000000..8a02edc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toList;
+
+public class PartitionManagerTest {
+
+  @Test
+  public void testGrouping() {
+    IntFunction<String> nff = i -> String.format("f-%04d", i);
+
+    for (int numSplits : new int[] {1, 10, 100, 1000}) {
+      for (int numWorkers : new int[] {1, 5, 10, 11, 100}) {
+        for (int groupSize : new int[] {1, 2, 3, 5, 7, 13, 17, 19, 43, 73, 97}) {
+          int expectedGroups = Math.max(1, numWorkers / groupSize);
+          int maxGroupSize =
+              Math.min(numWorkers,
+                  groupSize + (int) Math.ceil((numWorkers % groupSize) / (double) expectedGroups));
+
+          TreeSet<String> children = new TreeSet<>();
+
+          IntStream.range(0, numWorkers).mapToObj(nff).forEach(children::add);
+
+          Collection<Bytes> rows =
+              IntStream.iterate(0, i -> i + 1000).limit(numSplits)
+                  .mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList());
+          Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
+
+          Set<String> idCombos = new HashSet<>();
+          Map<Integer, RangeSet> groupTablets = new HashMap<>();
+
+          for (int i = 0; i < numWorkers; i++) {
+            String me = nff.apply(i);
+            PartitionInfo pi = ParitionManager.getGroupInfo(me, children, tablets, groupSize);
+            Assert.assertEquals(expectedGroups, pi.getNumGroups());
+            Assert.assertTrue(pi.getMyGroupSize() >= Math.min(numWorkers, groupSize)
+                && pi.getMyGroupSize() <= maxGroupSize);
+            Assert.assertEquals(numWorkers, pi.getNumWorkers());
+            Assert.assertTrue(pi.getMyIdInGroup() >= 0 && pi.getMyIdInGroup() < maxGroupSize);
+            Assert.assertTrue(pi.getMyGroupId() >= 0 && pi.getMyGroupId() < expectedGroups);
+
+            Assert.assertFalse(idCombos.contains(pi.getMyGroupId() + ":" + pi.getMyIdInGroup()));
+            idCombos.add(pi.getMyGroupId() + ":" + pi.getMyIdInGroup());
+
+            if (!groupTablets.containsKey(pi.getMyGroupId())) {
+              groupTablets.put(pi.getMyGroupId(), pi.getMyGroupsRanges());
+            } else {
+              Assert.assertEquals(groupTablets.get(pi.getMyGroupId()), pi.getMyGroupsRanges());
+            }
+          }
+
+          Assert.assertEquals(numWorkers, idCombos.size());
+
+          // check that the tablets for each group are disjoint and that the union of the tablets
+          // for each group has all tablets
+          HashSet<TableRange> allTabletsFromGroups = new HashSet<>();
+
+          for (RangeSet tabletSet : groupTablets.values()) {
+            tabletSet.forEach(tr -> {
+              Assert.assertFalse(allTabletsFromGroups.contains(tr));
+              allTabletsFromGroups.add(tr);
+            });
+          }
+
+          Assert.assertEquals(new HashSet<>(tablets), allTabletsFromGroups);
+
+          // check that all groups have about the same number of tablets
+          IntSummaryStatistics summaryStats =
+              groupTablets.values().stream().mapToInt(RangeSet::size).summaryStatistics();
+          Assert.assertTrue(summaryStats.getMax() - summaryStats.getMin() < 2);
+        }
+      }
+    }
+  }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
new file mode 100644
index 0000000..13aa941
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+public class SerializedSplitsTest {
+  @Test
+  public void testLotsOfSplits() {
+    List<Bytes> splits =
+        IntStream.iterate(0, i -> i + 13).limit(1_000_000).mapToObj(i -> String.format("%08x", i))
+            .map(Bytes::of).collect(toList());
+
+    byte[] data = SerializedSplits.serialize(splits);
+    Assert.assertTrue(data.length <= SerializedSplits.MAX_SIZE);
+
+    List<Bytes> splits2 = new ArrayList<>();
+    SerializedSplits.deserialize(splits2::add, data);
+
+    Assert.assertTrue(splits2.size() > 10_000);
+    Assert.assertTrue(splits2.size() < splits.size());
+
+    int expectedDiff =
+        Integer.parseInt(splits2.get(1).toString(), 16)
+            - Integer.parseInt(splits2.get(0).toString(), 16);
+    Assert.assertTrue(expectedDiff > 13);
+    Assert.assertTrue(expectedDiff % 13 == 0);
+    // check that splits are evenly spaced
+    for (int i = 1; i < splits2.size(); i++) {
+      int sp1 = Integer.parseInt(splits2.get(i - 1).toString(), 16);
+      int sp2 = Integer.parseInt(splits2.get(i).toString(), 16);
+      Assert.assertEquals(expectedDiff, sp2 - sp1);
+      Assert.assertEquals(0, sp1 % 13);
+      Assert.assertEquals(0, sp2 % 13);
+    }
+
+    Assert.assertTrue(splits.get(0).compareTo(splits2.get(0)) <= 0);
+    Assert
+        .assertTrue(splits.get(splits.size() - 1).compareTo(splits2.get(splits2.size() - 1)) >= 0);
+  }
+
+  @Test
+  public void testSimple() {
+    Set<Bytes> splits =
+        IntStream.iterate(0, i -> i + 13).limit(1_000).mapToObj(i -> String.format("%08x", i))
+            .map(Bytes::of).collect(toSet());
+
+    byte[] data = SerializedSplits.serialize(splits);
+
+    HashSet<Bytes> splits2 = new HashSet<>();
+    SerializedSplits.deserialize(splits2::add, data);
+
+    Assert.assertEquals(splits, splits2);
+  }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
new file mode 100644
index 0000000..c73044a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TableRangeTest {
+  @Test
+  public void testBasic() {
+    TableRange tr1 = new TableRange(null, null);
+
+    Assert.assertTrue(tr1.contains(Bytes.of("a")));
+    Assert.assertTrue(tr1.contains(Bytes.of("z")));
+    Assert.assertNull(tr1.getEndRow());
+    Assert.assertNull(tr1.getPrevEndRow());
+
+    TableRange tr2 = new TableRange(null, Bytes.of("ma"));
+    Assert.assertTrue(tr2.contains(Bytes.of("a")));
+    Assert.assertTrue(tr2.contains(Bytes.of("ma")));
+    Assert.assertFalse(tr2.contains(Bytes.of("maa")));
+    Assert.assertFalse(tr2.contains(Bytes.of("z")));
+    Assert.assertNull(tr2.getPrevEndRow());
+    Assert.assertEquals(Bytes.of("ma"), tr2.getEndRow());
+
+    TableRange tr3 = new TableRange(Bytes.of("la"), null);
+    Assert.assertFalse(tr3.contains(Bytes.of("a")));
+    Assert.assertFalse(tr3.contains(Bytes.of("la")));
+    Assert.assertTrue(tr3.contains(Bytes.of("laa")));
+    Assert.assertTrue(tr3.contains(Bytes.of("z")));
+    Assert.assertEquals(Bytes.of("la"), tr3.getPrevEndRow());
+    Assert.assertNull(tr3.getEndRow());
+
+    TableRange tr4 = new TableRange(Bytes.of("la"), Bytes.of("ma"));
+    Assert.assertFalse(tr4.contains(Bytes.of("a")));
+    Assert.assertFalse(tr4.contains(Bytes.of("la")));
+    Assert.assertTrue(tr4.contains(Bytes.of("laa")));
+    Assert.assertTrue(tr4.contains(Bytes.of("ma")));
+    Assert.assertFalse(tr4.contains(Bytes.of("maa")));
+    Assert.assertFalse(tr4.contains(Bytes.of("z")));
+    Assert.assertEquals(Bytes.of("la"), tr4.getPrevEndRow());
+    Assert.assertEquals(Bytes.of("ma"), tr4.getEndRow());
+  }
+
+  @Test
+  public void testMultiple() {
+
+    Bytes sp1 = Bytes.of("e1");
+    Bytes sp2 = Bytes.of("m1");
+    Bytes sp3 = Bytes.of("r1");
+
+    Collection<TableRange> trc1 =
+        new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1)));
+
+    Assert.assertEquals(4, trc1.size());
+    Assert.assertTrue(trc1.contains(new TableRange(null, sp1)));
+    Assert.assertTrue(trc1.contains(new TableRange(sp1, sp2)));
+    Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3)));
+    Assert.assertTrue(trc1.contains(new TableRange(sp3, null)));
+
+    Collection<TableRange> trc2 = new HashSet<>(TableRange.toTabletRanges(Collections.emptyList()));
+    Assert.assertEquals(1, trc2.size());
+    Assert.assertTrue(trc2.contains(new TableRange(null, null)));
+  }
+
+  @Test
+  public void testCompare() {
+
+    Bytes sp1 = Bytes.of("e1");
+    Bytes sp2 = Bytes.of("m1");
+
+    TableRange tr1 = new TableRange(null, sp1);
+    TableRange tr2 = new TableRange(sp1, sp2);
+    TableRange tr3 = new TableRange(sp2, null);
+
+    Assert.assertTrue(tr1.compareTo(tr2) < 0);
+    Assert.assertTrue(tr2.compareTo(tr1) > 0);
+
+    Assert.assertTrue(tr2.compareTo(tr3) < 0);
+    Assert.assertTrue(tr3.compareTo(tr2) > 0);
+
+    Assert.assertTrue(tr1.compareTo(tr3) < 0);
+    Assert.assertTrue(tr3.compareTo(tr1) > 0);
+
+    Assert.assertTrue(tr1.compareTo(tr1) == 0);
+    Assert.assertTrue(tr2.compareTo(tr2) == 0);
+    Assert.assertTrue(tr3.compareTo(tr3) == 0);
+
+    Assert.assertTrue(tr1.compareTo(new TableRange(null, sp1)) == 0);
+    Assert.assertTrue(tr2.compareTo(new TableRange(sp1, sp2)) == 0);
+    Assert.assertTrue(tr3.compareTo(new TableRange(sp2, null)) == 0);
+
+    Assert.assertTrue(new TableRange(null, null).compareTo(new TableRange(null, null)) == 0);
+  }
+
+  @Test
+  public void testToRange() {
+    for (String prev : new String[] {null, "foo"}) {
+      for (String end : new String[] {null, "zoo"}) {
+        Assert.assertEquals(new Range(prev, false, end, true), new TableRange(prev == null ? null
+            : Bytes.of(prev), end == null ? null : Bytes.of(end)).getRange());
+      }
+    }
+  }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 9db0ce4..66791b5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -28,7 +28,7 @@
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
+import org.apache.fluo.core.worker.finder.hash.PartitionNotificationFinder;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestTransaction;
 import org.apache.fluo.mini.MiniFluoImpl;
@@ -171,11 +171,11 @@
 
     try (Environment env = new Environment(config)) {
 
-      NotificationFinder nf1 = new HashNotificationFinder();
+      NotificationFinder nf1 = new PartitionNotificationFinder();
       nf1.init(env, ((MiniFluoImpl) miniFluo).getNotificationProcessor());
       nf1.start();
 
-      NotificationFinder nf2 = new HashNotificationFinder();
+      NotificationFinder nf2 = new PartitionNotificationFinder();
       nf2.init(env, ((MiniFluoImpl) miniFluo).getNotificationProcessor());
       nf2.start();
 
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index 4e798ab..6d1ce5d 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -84,8 +84,8 @@
         startMiniAccumulo();
       }
 
-      config.setProperty(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP, 50);
-      config.setProperty(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP, 100);
+      config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_PROP, 50);
+      config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_PROP, 100);
 
       env = new Environment(config);