fixes #500 Made scanning for notifications scalable
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
index a9f5fd4..9af6feb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
@@ -16,7 +16,6 @@
package org.apache.fluo.core.impl;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.worker.finder.hash.ScanTask;
/**
* Contains implementation-related Fluo properties that should not be exposed in the API in
@@ -28,13 +27,16 @@
public static final String ORACLE_PORT_PROP = FLUO_IMPL_PREFIX + ".oracle.port";
public static final String WORKER_FINDER_PROP = FLUO_IMPL_PREFIX + ".worker.finder";
+ public static final String WORKER_PARTITION_GROUP_SIZE = FLUO_IMPL_PREFIX
+ + ".worker.finder.partition.groupSize";
+ public static final int WORKER_PARTITION_GROUP_SIZE_DEFAULT = 7;
public static final String METRICS_RESERVOIR_PROP = FLUO_IMPL_PREFIX + ".metrics.reservoir";
- public static final String MIN_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
- + ScanTask.class.getSimpleName() + ".minSleep";
- public static final int MIN_SLEEP_TIME_DEFAULT = 5000;
- public static final String MAX_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
- + ScanTask.class.getSimpleName() + ".maxSleep";
- public static final int MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
+ public static final String NTFY_FINDER_MIN_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
+ + ".worker.finder.minSleep";
+ public static final int NTFY_FINDER_MIN_SLEEP_TIME_DEFAULT = 5000;
+ public static final String NTFY_FINDER_MAX_SLEEP_TIME_PROP = FLUO_IMPL_PREFIX
+ + ".worker.finder.maxSleep";
+ public static final int NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
// Time period that each client will update ZK with their oldest active timestamp
// If period is too short, Zookeeper may be overloaded. If too long, garbage collection
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
index 92b0382..3938c07 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationFinderFactory.java
@@ -17,13 +17,13 @@
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
+import org.apache.fluo.core.worker.finder.hash.PartitionNotificationFinder;
public class NotificationFinderFactory {
public static NotificationFinder newNotificationFinder(FluoConfiguration conf) {
String clazz =
conf.getString(FluoConfigurationImpl.WORKER_FINDER_PROP,
- HashNotificationFinder.class.getName());
+ PartitionNotificationFinder.class.getName());
try {
return Class.forName(clazz).asSubclass(NotificationFinder.class).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 56aa78d..3b6bba5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -16,13 +16,17 @@
package org.apache.fluo.core.worker;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import com.codahale.metrics.Gauge;
import org.apache.fluo.api.data.Column;
@@ -64,8 +68,12 @@
// little utility class that tracks all notifications in queue
private class NotificationTracker {
private Map<RowColumn, Future<?>> queuedWork = new HashMap<>();
+ private Set<RowColumn> recentlyDeleted = new HashSet<>();
private long sizeInBytes = 0;
+ private Map<Long, Predicate<RowColumn>> memoryPredicates = new HashMap<>();
+ private Predicate<RowColumn> memoryPredicate = rc -> false;
private static final long MAX_SIZE = 1 << 24;
+ private long nextSessionId = 0;
private long size(RowColumn rowCol) {
Column col = rowCol.getColumn();
@@ -75,7 +83,7 @@
public synchronized boolean add(RowColumn rowCol, Future<?> task) {
- if (queuedWork.containsKey(rowCol)) {
+ if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
return false;
}
@@ -98,6 +106,9 @@
public synchronized void remove(RowColumn rowCol) {
if (queuedWork.remove(rowCol) != null) {
+ if (memoryPredicate.test(rowCol)) {
+ recentlyDeleted.add(rowCol);
+ }
sizeInBytes -= size(rowCol);
notify();
}
@@ -123,6 +134,34 @@
return true;
}
+ private void resetMemoryPredicate() {
+ memoryPredicate = null;
+ for (Predicate<RowColumn> p : this.memoryPredicates.values()) {
+ if (memoryPredicate == null) {
+ memoryPredicate = p;
+ } else {
+ memoryPredicate = p.or(memoryPredicate);
+ }
+ }
+ }
+
+ public synchronized long beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
+ long sessionId = nextSessionId++;
+ this.memoryPredicates.put(sessionId, Objects.requireNonNull(memoryPredicate));
+ resetMemoryPredicate();
+ return sessionId;
+ }
+
+ public synchronized void finishAddingNotifications(long sessionId) {
+ this.memoryPredicates.remove(sessionId);
+ if (memoryPredicates.size() == 0) {
+ recentlyDeleted.clear();
+ memoryPredicate = rc -> false;
+ } else {
+ resetMemoryPredicate();
+ }
+ }
+
}
private static class NotificationProcessingTask implements Runnable {
@@ -176,25 +215,47 @@
}
}
- public boolean addNotification(final NotificationFinder notificationFinder,
- final Notification notification) {
+ public class Session implements AutoCloseable {
+ private long id;
- WorkTaskAsync workTask =
- new WorkTaskAsync(this, notificationFinder, env, notification, observers);
- FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);
-
- if (!tracker.add(notification.getRowColumn(), ft)) {
- return false;
+ public Session(Predicate<RowColumn> memoryPredicate) {
+ this.id = tracker.beginAddingNotifications(memoryPredicate);
}
- try {
- executor.execute(ft);
- } catch (RejectedExecutionException rje) {
- tracker.remove(notification.getRowColumn());
- throw rje;
+ public boolean addNotification(final NotificationFinder notificationFinder,
+ final Notification notification) {
+
+ WorkTaskAsync workTask =
+ new WorkTaskAsync(NotificationProcessor.this, notificationFinder, env, notification,
+ observers);
+ FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);
+
+ if (!tracker.add(notification.getRowColumn(), ft)) {
+ return false;
+ }
+
+ try {
+ executor.execute(ft);
+ } catch (RejectedExecutionException rje) {
+ tracker.remove(notification.getRowColumn());
+ throw rje;
+ }
+
+ return true;
}
- return true;
+ public void close() {
+ tracker.finishAddingNotifications(id);
+ }
+ }
+
+ /**
+ * Starts a session for adding notifications. During this session, any notifications that are
+ * deleted and match the predicate will be remembered. These remembered notifications can not be
+ * added again while the session is active.
+ */
+ public Session beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
+ return new Session(memoryPredicate);
}
public void requeueNotification(final NotificationFinder notificationFinder,
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java b/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java
deleted file mode 100644
index 4d17965..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/TabletInfoCache.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Range;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.hadoop.io.Text;
-
-public class TabletInfoCache<T, S extends Supplier<T>> {
- private static final long CACHE_TIME = 5 * 60 * 1000;
-
- private final Environment env;
- private List<TabletInfo<T>> cachedTablets;
- private long listSplitsTime = 0;
- private S supplier;
-
- public static class TabletInfo<T> {
- private final Text start;
- private final Text end;
- private T data;
-
- TabletInfo(Text start, Text end, T data) {
- this.start = start;
- this.end = end;
- this.data = data;
- }
-
- private int hashCode(Text t) {
- if (t == null) {
- return 0;
- }
- return t.hashCode();
- }
-
- @Override
- public int hashCode() {
- return hashCode(start) + hashCode(end);
- }
-
- private boolean equals(Text t1, Text t2) {
- if (t1 == t2) {
- return true;
- }
-
- if (t1 == null || t2 == null) {
- return false;
- }
-
- return t1.equals(t2);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof TabletInfo) {
- @SuppressWarnings("rawtypes")
- TabletInfo oti = (TabletInfo) o;
-
- if (equals(start, oti.start)) {
- return equals(end, oti.end);
- }
-
- return false;
- }
-
- return false;
- }
-
- public Text getStart() {
- return start;
- }
-
- public Text getEnd() {
- return end;
- }
-
- public T getData() {
- return data;
- }
-
- public Range getRange() {
- return new Range(start, false, end, true);
- }
- }
-
- public TabletInfoCache(Environment env, S supplier) {
- this.env = env;
- this.supplier = supplier;
- }
-
- private List<TabletInfo<T>> listSplits() throws TableNotFoundException,
- AccumuloSecurityException, AccumuloException {
- List<Text> splits =
- new ArrayList<>(env.getConnector().tableOperations().listSplits(env.getTable()));
- Collections.sort(splits);
-
- List<TabletInfo<T>> tablets = new ArrayList<>(splits.size() + 1);
- for (int i = 0; i < splits.size(); i++) {
- tablets
- .add(new TabletInfo<>(i == 0 ? null : splits.get(i - 1), splits.get(i), supplier.get()));
- }
-
- tablets.add(new TabletInfo<>(splits.size() == 0 ? null : splits.get(splits.size() - 1), null,
- supplier.get()));
- listSplitsTime = System.currentTimeMillis();
- return tablets;
- }
-
- public synchronized List<TabletInfo<T>> getTablets() throws Exception {
- if (cachedTablets == null) {
- cachedTablets = listSplits();
- } else if (System.currentTimeMillis() - listSplitsTime > CACHE_TIME) {
- List<TabletInfo<T>> tablets = listSplits();
- Map<TabletInfo<T>, TabletInfo<T>> oldTablets = new HashMap<>();
- for (TabletInfo<T> tabletInfo : cachedTablets) {
- oldTablets.put(tabletInfo, tabletInfo);
- }
-
- List<TabletInfo<T>> newTablets = new ArrayList<>(tablets.size());
-
- for (TabletInfo<T> tabletInfo : tablets) {
- TabletInfo<T> oldTI = oldTablets.get(tabletInfo);
- if (oldTI != null) {
- newTablets.add(oldTI);
- } else {
- newTablets.add(tabletInfo);
- }
- }
-
- cachedTablets = newTablets;
- }
-
- return Collections.unmodifiableList(cachedTablets);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
deleted file mode 100644
index e18ec2f..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker.finder.hash;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.accumulo.core.data.ArrayByteSequence;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
-import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
-import org.apache.fluo.accumulo.util.NotificationUtil;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.Notification;
-import org.apache.fluo.core.util.ByteUtil;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.NotificationProcessor;
-import org.apache.fluo.core.worker.TxResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HashNotificationFinder implements NotificationFinder {
-
- private NotificationProcessor notificationProcessor;
- private CuratorFramework curator;
- private List<String> finders = Collections.emptyList();
- private int updates = 0;
- private ModulusParams modParams;
- private Environment env;
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private Thread scanThread;
- private PathChildrenCache childrenCache;
- private PersistentEphemeralNode myESNode;
-
- private static final Logger log = LoggerFactory.getLogger(HashNotificationFinder.class);
-
- static class ModParamsChangedException extends RuntimeException {
- private static final long serialVersionUID = 1L;
- }
-
- private class FindersListener implements PathChildrenCacheListener {
-
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_REMOVED:
- if (!stopped.get()) {
- updateFinders();
- }
- break;
- case CHILD_UPDATED:
- log.warn("unexpected event " + event);
- break;
- default:
- break;
-
- }
- }
-
- }
-
- private synchronized void updateFinders() {
-
- String me = myESNode.getActualPath();
- while (me == null) {
- UtilWaitThread.sleep(100);
- me = myESNode.getActualPath();
- }
- me = ZKPaths.getNodeFromPath(me);
-
- List<String> children = new ArrayList<>();
- for (ChildData childData : childrenCache.getCurrentData()) {
- children.add(ZKPaths.getNodeFromPath(childData.getPath()));
- }
-
- Collections.sort(children);
-
- if (!finders.equals(children)) {
- int index = children.indexOf(me);
- if (index == -1) {
- this.modParams = null;
- finders = Collections.emptyList();
- log.debug("Did not find self in list of finders " + me);
- } else {
- updates++;
- this.modParams = new ModulusParams(children.indexOf(me), children.size(), updates);
- finders = children;
- log.debug("updated modulus params " + modParams.remainder + " " + modParams.divisor);
- }
- }
- }
-
- synchronized ModulusParams getModulusParams() {
- return modParams;
- }
-
- @Override
- public void init(Environment env, NotificationProcessor notificationProcessor) {
- Preconditions.checkState(this.notificationProcessor == null);
-
- this.notificationProcessor = notificationProcessor;
-
- this.env = env;
- this.curator = env.getSharedResources().getCurator();
-
- try {
- myESNode =
- new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
- + "/f-", new byte[0]);
- myESNode.start();
- myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);
-
- childrenCache =
- new PathChildrenCache(env.getSharedResources().getCurator(), ZookeeperPath.FINDERS, false);
- childrenCache.getListenable().addListener(new FindersListener());
- childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
-
- updateFinders();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void start() {
- scanThread = new Thread(new ScanTask(this, env, stopped));
- scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
- scanThread.setDaemon(true);
- scanThread.start();
- }
-
- @Override
- public void stop() {
- stopped.set(true);
- try {
- childrenCache.close();
- } catch (IOException e1) {
- log.warn("Failed to close children cache", e1);
- }
-
- try {
- myESNode.close();
- } catch (IOException e1) {
- log.warn("Failed to close ephemeral node", e1);
- }
-
- scanThread.interrupt();
- try {
- scanThread.join();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void failedToProcess(Notification notification, TxResult status) {}
-
- NotificationProcessor getWorkerQueue() {
- return notificationProcessor;
- }
-
- @VisibleForTesting
- static boolean shouldProcess(Notification notification, int divisor, int remainder) {
- byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn());
- return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()),
- new ArrayByteSequence(cfcq), divisor, remainder);
- }
-
- @Override
- public boolean shouldProcess(Notification notification) {
- ModulusParams mp = getModulusParams();
- return shouldProcess(notification, mp.divisor, mp.remainder);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java
new file mode 100644
index 0000000..3ebb01e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ParitionManager.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
+import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.impl.FluoConfigurationImpl;
+import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.FluoThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * This class manages partitioning of notifications across workers coordinating in ZooKeeper.
+ * Workers are divided into groups. Each group is given a subset of the Accumulo table. All workers
+ * in a group scan that subset and use hash partitioning to equally divide notifications.
+ *
+ * <p>
+ * Grouping workers was a compromise between every worker scanning the entire table OR each worker
+ * having a dedicated part of a table. This scheme allows multiple workers to share popular parts of
+ * a table. However, it limits the number of workers that will scan a portion of a table for
+ * notifications. This limitation is important for scaling, even if there are 1,000 workers there
+ * will never be more than 7 to 13 workers scanning a portion of the table.
+ */
+public class ParitionManager {
+
+ private static final Logger log = LoggerFactory.getLogger(ParitionManager.class);
+
+ private final PathChildrenCache childrenCache;
+ private final PersistentEphemeralNode myESNode;
+ private final int groupSize;
+ private long paritionSetTime;
+ private PartitionInfo partitionInfo;
+ private final ScheduledExecutorService schedExecutor;
+
+ private CuratorFramework curator;
+
+ private Environment env;
+
+ private final long minSleepTime;
+ private final long maxSleepTime;
+ private long retrySleepTime;
+
+ private static final long STABILIZE_TIME = TimeUnit.SECONDS.toMillis(60);
+
+ private class FindersListener implements PathChildrenCacheListener {
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_REMOVED:
+ case CHILD_UPDATED:
+ scheduleUpdate();
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ static PartitionInfo getGroupInfo(String me, SortedSet<String> children,
+ Collection<TableRange> tablets, int groupSize) {
+
+ int numGroups = Math.max(1, children.size() / groupSize);
+ int[] groupSizes = new int[numGroups];
+ int count = 0;
+ int myGroupId = -1;
+ int myId = -1;
+
+ for (String child : children) {
+ if (child.equals(me)) {
+ myGroupId = count;
+ myId = groupSizes[count];
+ }
+ groupSizes[count]++;
+ count = (count + 1) % numGroups;
+ }
+
+ List<TableRange> rangesCopy = new ArrayList<>(tablets);
+ Collections.sort(rangesCopy);
+
+ // The behavior of Random with a given seed and shuffle are the same across different versions
+ // of java. Both specify the algorithms in their javadoc and are meant to behave the same across
+ // versions. This is important because different workers may be running different versions of
+ // Java, but all workers need to do the same shuffle.
+ //
+ // Did try to use hashing to partition the tablets among groups, but it was slightly uneven. One
+ // group having a 10% more tablets would lead to uneven utilization.
+ Collections.shuffle(rangesCopy, new Random(42));
+
+ List<TableRange> groupsTablets = new ArrayList<>();
+
+ count = 0;
+ for (TableRange tr : rangesCopy) {
+ if (count == myGroupId) {
+ groupsTablets.add(tr);
+ }
+ count = (count + 1) % numGroups;
+ }
+
+ return new PartitionInfo(myId, myGroupId, groupSizes[myGroupId], numGroups, children.size(),
+ groupsTablets);
+ }
+
+ private void updatePartitionInfo() {
+ try {
+ String me = myESNode.getActualPath();
+ while (me == null) {
+ Thread.sleep(100);
+ me = myESNode.getActualPath();
+ }
+ me = ZKPaths.getNodeFromPath(me);
+
+ byte[] zkSplitData = null;
+ SortedSet<String> children = new TreeSet<>();
+ Set<String> groupSizes = new HashSet<>();
+ for (ChildData childData : childrenCache.getCurrentData()) {
+ String node = ZKPaths.getNodeFromPath(childData.getPath());
+ if (node.equals("splits")) {
+ zkSplitData = childData.getData();
+ } else {
+ children.add(node);
+ groupSizes.add(new String(childData.getData(), UTF_8));
+ }
+ }
+
+ if (zkSplitData == null) {
+ log.info("Did not find splits in zookeeper, will retry later.");
+ setPartitionInfo(null); // disable this worker from processing notifications
+ scheduleRetry();
+ return;
+ }
+
+ if (!children.contains(me)) {
+ log.warn("Did not see self (" + me
+ + "), cannot gather tablet and notification partitioning info.");
+ setPartitionInfo(null); // disable this worker from processing notifications
+ scheduleRetry();
+ return;
+ }
+
+ // ensure all workers agree on the group size
+ if (groupSizes.size() != 1 || !groupSizes.contains(groupSize + "")) {
+ log.warn("Group size disagreement " + groupSize + " " + groupSizes
+ + ", cannot gather tablet and notification partitioning info.");
+ setPartitionInfo(null); // disable this worker from processing notifications
+ scheduleRetry();
+ return;
+ }
+
+ List<Bytes> zkSplits = new ArrayList<>();
+ SerializedSplits.deserialize(zkSplits::add, zkSplitData);
+
+ Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits);
+ PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);
+
+ setPartitionInfo(newPI);
+ } catch (InterruptedException e) {
+ log.debug("Interrupted while gathering tablet and notification partitioning info.", e);
+ } catch (Exception e) {
+ log.warn("Problem gathering tablet and notification partitioning info.", e);
+ setPartitionInfo(null); // disable this worker from processing notifications
+ scheduleRetry();
+ }
+ }
+
+ private synchronized void scheduleRetry() {
+ schedExecutor.schedule(this::updatePartitionInfo, retrySleepTime, TimeUnit.MILLISECONDS);
+ retrySleepTime =
+ Math.min(maxSleepTime,
+ (long) (1.5 * retrySleepTime) + (long) (retrySleepTime * Math.random()));
+ }
+
+ private synchronized void scheduleUpdate() {
+ schedExecutor.schedule(this::updatePartitionInfo, 0, TimeUnit.MILLISECONDS);
+ }
+
+ private class CheckTabletsTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+
+ String me = myESNode.getActualPath();
+ while (me == null) {
+ Thread.sleep(100);
+ me = myESNode.getActualPath();
+ }
+ me = ZKPaths.getNodeFromPath(me);
+
+ String me2 = me;
+ boolean imFirst =
+ childrenCache.getCurrentData().stream().map(ChildData::getPath)
+ .map(ZKPaths::getNodeFromPath).sorted().findFirst().map(s -> s.equals(me2))
+ .orElse(false);
+
+ if (imFirst) {
+
+ ChildData childData = childrenCache.getCurrentData(ZookeeperPath.FINDERS + "/splits");
+ if (childData == null) {
+ byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
+
+ curator.create().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
+ } else {
+ HashSet<Bytes> zkSplits = new HashSet<>();
+ SerializedSplits.deserialize(zkSplits::add, childData.getData());
+
+ HashSet<Bytes> currentSplits = new HashSet<>();
+ byte[] currSplitData = SerializedSplits.serializeTableSplits(env);
+ SerializedSplits.deserialize(currentSplits::add, currSplitData);
+
+ if (!currentSplits.equals(zkSplits)) {
+ curator.setData().forPath(ZookeeperPath.FINDERS + "/splits", currSplitData);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.debug("Interrupted while checking table split points.", e);
+ } catch (Exception e) {
+ log.warn("Failed to checking table split points", e);
+ }
+ }
+ }
+
+ ParitionManager(Environment env, long minSleepTime, long maxSleepTime) {
+ try {
+ this.curator = env.getSharedResources().getCurator();
+ this.env = env;
+
+ this.minSleepTime = minSleepTime;
+ this.maxSleepTime = maxSleepTime;
+ this.retrySleepTime = minSleepTime;
+
+ groupSize =
+ env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE,
+ FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT);
+
+ myESNode =
+ new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
+ + "/f-", ("" + groupSize).getBytes(UTF_8));
+ myESNode.start();
+ myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);
+
+ childrenCache = new PathChildrenCache(curator, ZookeeperPath.FINDERS, true);
+ childrenCache.getListenable().addListener(new FindersListener());
+ childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
+
+ schedExecutor =
+ Executors.newScheduledThreadPool(1,
+ new FluoThreadFactory("Fluo worker partition manager"));
+ schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime,
+ TimeUnit.MILLISECONDS);
+
+ scheduleUpdate();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void setPartitionInfo(PartitionInfo pi) {
+ synchronized (this) {
+ if (!Objects.equals(pi, this.partitionInfo)) {
+ log.debug("Updated finder partition info : " + pi);
+ this.paritionSetTime = System.nanoTime();
+ this.partitionInfo = pi;
+ this.notifyAll();
+ }
+
+ if (pi != null) {
+ retrySleepTime = minSleepTime;
+ }
+ }
+ }
+
+ private long getTimeSincePartitionChange() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - paritionSetTime);
+ }
+
+ synchronized PartitionInfo waitForPartitionInfo() throws InterruptedException {
+ while (partitionInfo == null
+ || getTimeSincePartitionChange() < Math.min(maxSleepTime, STABILIZE_TIME)) {
+ wait(minSleepTime);
+ }
+
+ return partitionInfo;
+ }
+
+ synchronized PartitionInfo getPartitionInfo() {
+ if (getTimeSincePartitionChange() < Math.min(maxSleepTime, STABILIZE_TIME)) {
+ return null;
+ }
+
+ return partitionInfo;
+ }
+
+ public void stop() {
+ try {
+ myESNode.close();
+ } catch (IOException e) {
+ log.debug("Error closing finder ephemeral node", e);
+ }
+ try {
+ childrenCache.close();
+ } catch (IOException e) {
+ log.debug("Error closing finder children cache", e);
+ }
+
+ schedExecutor.shutdownNow();
+ }
+
+ @VisibleForTesting
+ static boolean shouldProcess(Notification notification, int divisor, int remainder) {
+ byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn());
+ return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()),
+ new ArrayByteSequence(cfcq), divisor, remainder);
+ }
+
+ public boolean shouldProcess(Notification notification) {
+ PartitionInfo pi = getPartitionInfo();
+ if (pi == null) {
+ return false;
+ }
+
+ return pi.getMyGroupsRanges().getContaining(notification.getRow()) != null
+ && shouldProcess(notification, pi.getMyGroupSize(), pi.getMyIdInGroup());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
new file mode 100644
index 0000000..e33b28b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfo.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.List;
+
+class PartitionInfo {
+
+ private final int myGroupId;
+ private final int myIdInGroup;
+ private final int numGroups;
+ private final int myGroupSize;
+ private final int numWorkers;
+ private final RangeSet myGroupsRanges;
+
+ PartitionInfo(int myId, int myGroupId, int myGroupSize, int totalGroups, int totalWorkers,
+ List<TableRange> groupsRanges) {
+ this.myIdInGroup = myId;
+ this.myGroupId = myGroupId;
+ this.myGroupSize = myGroupSize;
+ this.numGroups = totalGroups;
+ this.numWorkers = totalWorkers;
+ this.myGroupsRanges = new RangeSet(groupsRanges);
+ }
+
+ /**
+ * @return The id for the group this worker is in.
+ */
+ public int getMyGroupId() {
+ return myGroupId;
+ }
+
+ /**
+ * @return The id for this worker within its group.
+ */
+ public int getMyIdInGroup() {
+ return myIdInGroup;
+ }
+
+ /**
+ * @return The total number of workers groups there are.
+ */
+ public int getNumGroups() {
+ return numGroups;
+ }
+
+ /**
+ * @return The number of workers in the group this workers is in.
+ */
+ public int getMyGroupSize() {
+ return myGroupSize;
+ }
+
+ /**
+ * @return the total number of workers.
+ */
+ public int getNumWorkers() {
+ return numWorkers;
+ }
+
+ /**
+ * @return the table ranges associated with the group this workers is in.
+ */
+ public RangeSet getMyGroupsRanges() {
+ return myGroupsRanges;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof PartitionInfo) {
+ PartitionInfo other = (PartitionInfo) o;
+ return other.myGroupId == myGroupId && other.myIdInGroup == myIdInGroup
+ && other.numGroups == numGroups && other.myGroupSize == myGroupSize
+ && other.numWorkers == numWorkers && other.myGroupsRanges.equals(myGroupsRanges);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "workers:%d groups:%d groupSize:%d groupId:%d idInGroup:%d #tablets:%d", numWorkers,
+ numGroups, myGroupSize, myGroupId, myIdInGroup, myGroupsRanges.size());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
similarity index 82%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java
rename to modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
index cbad8b9..35c3f9b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ModulusParams.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionInfoChangedException.java
@@ -15,14 +15,6 @@
package org.apache.fluo.core.worker.finder.hash;
-class ModulusParams {
- int remainder;
- int divisor;
- int update;
-
- ModulusParams(int r, int d, int u) {
- this.remainder = r;
- this.divisor = d;
- this.update = u;
- }
+public class PartitionInfoChangedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
new file mode 100644
index 0000000..c940850
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionNotificationFinder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.impl.FluoConfigurationImpl;
+import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.worker.NotificationFinder;
+import org.apache.fluo.core.worker.NotificationProcessor;
+import org.apache.fluo.core.worker.TxResult;
+
+public class PartitionNotificationFinder implements NotificationFinder {
+
+ private ParitionManager paritionManager;
+ private Thread scanThread;
+ private NotificationProcessor processor;
+ private Environment env;
+ private AtomicBoolean stopped;
+
+ @Override
+ public void init(Environment env, NotificationProcessor processor) {
+ this.processor = processor;
+ this.env = env;
+ this.stopped = new AtomicBoolean(false);
+
+ }
+
+ @Override
+ public void start() {
+ long minSleepTime =
+ env.getConfiguration().getInt(FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_PROP,
+ FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_DEFAULT);
+ long maxSleepTime =
+ env.getConfiguration().getInt(FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_PROP,
+ FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT);
+
+ paritionManager = new ParitionManager(env, minSleepTime, maxSleepTime);
+
+ scanThread =
+ new Thread(new ScanTask(this, processor, paritionManager, env, stopped, minSleepTime,
+ maxSleepTime));
+ scanThread.setName(getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
+ scanThread.setDaemon(true);
+ scanThread.start();
+ }
+
+ @Override
+ public void stop() {
+ stopped.set(true);
+
+ scanThread.interrupt();
+ try {
+ scanThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ paritionManager.stop();
+ }
+
+ @Override
+ public boolean shouldProcess(Notification notification) {
+ return paritionManager.shouldProcess(notification);
+ }
+
+ @Override
+ public void failedToProcess(Notification notification, TxResult status) {}
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
new file mode 100644
index 0000000..40dc5e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/RangeSet.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import org.apache.fluo.api.data.Bytes;
+
+public class RangeSet {
+ private TreeMap<Bytes, TableRange> tmap;
+ private TableRange lastRange;
+
+ public RangeSet(List<TableRange> ranges) {
+ tmap = new TreeMap<>();
+
+ for (TableRange tablet : ranges) {
+ if (tablet.getEndRow() == null) {
+ lastRange = tablet;
+ } else {
+ tmap.put(tablet.getEndRow(), tablet);
+ }
+ }
+ }
+
+ public TableRange getContaining(Bytes row) {
+ Entry<Bytes, TableRange> entry = tmap.ceilingEntry(row);
+ if (entry != null) {
+ if (entry.getValue().contains(row)) {
+ return entry.getValue();
+ }
+ } else if (lastRange != null) {
+ if (lastRange.contains(row)) {
+ return lastRange;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof RangeSet) {
+ RangeSet ots = (RangeSet) o;
+
+ if (tmap.size() != ots.tmap.size()) {
+ return false;
+ }
+
+ for (Entry<Bytes, TableRange> entry : tmap.entrySet()) {
+ TableRange otr = ots.tmap.get(entry.getKey());
+ if (!Objects.equals(entry.getValue(), otr)) {
+ return false;
+ }
+ }
+
+ return Objects.equals(lastRange, ots.lastRange);
+ }
+ return false;
+ }
+
+ public int size() {
+ return tmap.size() + (lastRange == null ? 0 : 1);
+ }
+
+ public void forEach(Consumer<TableRange> trc) {
+ if (lastRange != null) {
+ trc.accept(lastRange);
+ }
+ tmap.values().forEach(trc);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index e0fa791..1732a29 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -17,11 +17,14 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
@@ -31,12 +34,11 @@
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.TabletInfoCache;
-import org.apache.fluo.core.worker.TabletInfoCache.TabletInfo;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder.ModParamsChangedException;
+import org.apache.fluo.core.worker.NotificationFinder;
+import org.apache.fluo.core.worker.NotificationProcessor;
+import org.apache.fluo.core.worker.NotificationProcessor.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,91 +46,101 @@
private static final Logger log = LoggerFactory.getLogger(ScanTask.class);
- private final HashNotificationFinder hwf;
+ private final NotificationFinder finder;
+ private final ParitionManager partitionManager;
+ private final NotificationProcessor proccessor;
private final Random rand = new Random();
private final AtomicBoolean stopped;
- private final TabletInfoCache<TabletData, Supplier<TabletData>> tabletInfoCache;
+ private final Map<TableRange, TabletData> rangeData;
private final Environment env;
- static long STABILIZE_TIME = 10 * 1000;
-
private long minSleepTime;
private long maxSleepTime;
- ScanTask(HashNotificationFinder hashWorkFinder, Environment env, AtomicBoolean stopped) {
- this.hwf = hashWorkFinder;
- this.tabletInfoCache = new TabletInfoCache<>(env, new Supplier<TabletData>() {
- @Override
- public TabletData get() {
- return new TabletData();
- }
- });
+ ScanTask(NotificationFinder finder, NotificationProcessor proccessor,
+ ParitionManager partitionManager, Environment env, AtomicBoolean stopped, long minSleepTime,
+ long maxSleepTime) {
+ this.finder = finder;
+ this.rangeData = new HashMap<>();
+
this.env = env;
this.stopped = stopped;
- minSleepTime =
- env.getConfiguration().getInt(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP,
- FluoConfigurationImpl.MIN_SLEEP_TIME_DEFAULT);
- maxSleepTime =
- env.getConfiguration().getInt(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP,
- FluoConfigurationImpl.MAX_SLEEP_TIME_DEFAULT);
+ this.proccessor = proccessor;
+ this.partitionManager = partitionManager;
+
+ this.minSleepTime = minSleepTime;
+ this.maxSleepTime = maxSleepTime;
}
@Override
public void run() {
- int qSize = hwf.getWorkerQueue().size();
+
+ List<TableRange> ranges = new ArrayList<>();
+ Set<TableRange> rangeSet = new HashSet<>();
+
+ int qSize = proccessor.size();
while (!stopped.get()) {
try {
+ PartitionInfo partition = partitionManager.waitForPartitionInfo();
- while (hwf.getWorkerQueue().size() > qSize / 2 && !stopped.get()) {
+ while (proccessor.size() > qSize / 2 && !stopped.get()) {
UtilWaitThread.sleep(50, stopped);
}
- // break scan work into a lot of ranges that are randomly ordered. This has a few benefits.
- // Ensures different workers are scanning different tablets.
- // Allows checking local state more frequently in the case where work is not present in many
- // tablets. Allows less frequent scanning of tablets that are
- // usually empty.
- List<TabletInfo<TabletData>> tablets = new ArrayList<>(tabletInfoCache.getTablets());
- Collections.shuffle(tablets, rand);
+ ranges.clear();
+ rangeSet.clear();
+ partition.getMyGroupsRanges().forEach(t -> {
+ ranges.add(t);
+ rangeSet.add(t);
+ });
+ Collections.shuffle(ranges, rand);
+ rangeData.keySet().retainAll(rangeSet);
long minRetryTime = maxSleepTime + System.currentTimeMillis();
int notifications = 0;
int tabletsScanned = 0;
try {
- for (TabletInfo<TabletData> tabletInfo : tablets) {
- if (System.currentTimeMillis() >= tabletInfo.getData().retryTime) {
+ for (TableRange tabletRange : ranges) {
+ TabletData tabletData = rangeData.computeIfAbsent(tabletRange, tr -> new TabletData());
+ if (System.currentTimeMillis() >= tabletData.retryTime) {
int count = 0;
- ModulusParams modParams = hwf.getModulusParams();
- if (modParams != null) {
- // notifications could have been asynchronously queued for deletion. Let that happen
- // 1st before scanning
- env.getSharedResources().getBatchWriter().waitForAsyncFlush();
- count = scan(modParams, tabletInfo.getRange());
- tabletsScanned++;
+ PartitionInfo pi = partitionManager.getPartitionInfo();
+ if (partition.equals(pi)) {
+ try (Session session =
+ proccessor.beginAddingNotifications(rc -> tabletRange.contains(rc.getRow()))) {
+ // notifications could have been asynchronously queued for deletion. Let that
+ // happen
+ // 1st before scanning
+ env.getSharedResources().getBatchWriter().waitForAsyncFlush();
+
+ count = scan(session, partition, tabletRange.getRange());
+ tabletsScanned++;
+ }
+ } else {
+ break;
}
- tabletInfo.getData().updateScanCount(count, maxSleepTime);
+ tabletData.updateScanCount(count, maxSleepTime);
notifications += count;
if (stopped.get()) {
break;
}
}
- minRetryTime = Math.min(tabletInfo.getData().retryTime, minRetryTime);
+ minRetryTime = Math.min(tabletData.retryTime, minRetryTime);
}
- } catch (ModParamsChangedException mpce) {
- hwf.getWorkerQueue().clear();
- waitForFindersToStabilize();
+ } catch (PartitionInfoChangedException mpce) {
+ // nothing to do
}
long sleepTime = Math.max(minSleepTime, minRetryTime - System.currentTimeMillis());
- qSize = hwf.getWorkerQueue().size();
+ qSize = proccessor.size();
log.debug("Scanned {} of {} tablets, added {} new notifications (total queued {})",
- tabletsScanned, tablets.size(), notifications, qSize);
+ tabletsScanned, ranges.size(), notifications, qSize);
if (!stopped.get()) {
UtilWaitThread.sleep(sleepTime, stopped);
@@ -141,7 +153,6 @@
log.error("Error while looking for notifications", e);
}
}
-
}
}
@@ -157,7 +168,7 @@
return wasInt;
}
- private int scan(ModulusParams lmp, Range range) throws TableNotFoundException {
+ private int scan(Session session, PartitionInfo pi, Range range) throws TableNotFoundException {
Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
scanner.setRange(range);
@@ -165,38 +176,24 @@
Notification.configureScanner(scanner);
IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
- NotificationHashFilter.setModulusParams(iterCfg, lmp.divisor, lmp.remainder);
+ NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(), pi.getMyIdInGroup());
scanner.addScanIterator(iterCfg);
int count = 0;
for (Entry<Key, Value> entry : scanner) {
- if (lmp.update != hwf.getModulusParams().update) {
- throw new HashNotificationFinder.ModParamsChangedException();
+ if (!pi.equals(partitionManager.getPartitionInfo())) {
+ throw new PartitionInfoChangedException();
}
if (stopped.get()) {
return count;
}
- if (hwf.getWorkerQueue().addNotification(hwf, Notification.from(entry.getKey()))) {
+ if (session.addNotification(finder, Notification.from(entry.getKey()))) {
count++;
}
}
return count;
}
-
- private void waitForFindersToStabilize() {
- ModulusParams lmp = hwf.getModulusParams();
- long startTime = System.currentTimeMillis();
-
- while (System.currentTimeMillis() - startTime < STABILIZE_TIME) {
- UtilWaitThread.sleep(500, stopped);
- ModulusParams lmp2 = hwf.getModulusParams();
- if (lmp.update != lmp2.update) {
- startTime = System.currentTimeMillis();
- lmp = lmp2;
- }
- }
- }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
new file mode 100644
index 0000000..53ad7ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/SerializedSplits.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.util.ByteUtil;
+
+public class SerializedSplits {
+
+ static final int MAX_SIZE = 1 << 18;
+
+ public static void deserialize(Consumer<Bytes> splitConsumer, byte[] serializedSplits) {
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedSplits);
+ GZIPInputStream gzis = new GZIPInputStream(bais);
+ DataInputStream dis = new DataInputStream(gzis);
+
+ int numSplits = dis.readInt();
+
+ BytesBuilder builder = Bytes.builder();
+
+ for (int i = 0; i < numSplits; i++) {
+ int len = dis.readInt();
+ builder.setLength(0);
+ builder.append(dis, len);
+ splitConsumer.accept(builder.toBytes());
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private static byte[] serializeInternal(List<Bytes> splits) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream gzOut = new GZIPOutputStream(baos);
+ BufferedOutputStream bos = new BufferedOutputStream(gzOut, 1 << 16);
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ dos.writeInt(splits.size());
+ for (Bytes split : splits) {
+ dos.writeInt(split.length());
+ split.writeTo(dos);
+ }
+
+ dos.close();
+
+ return baos.toByteArray();
+ }
+
+ public static byte[] serialize(Collection<Bytes> splits) {
+ List<Bytes> splitsCopy = new ArrayList<>(splits);
+ Collections.sort(splitsCopy);
+
+ try {
+ byte[] serialized = serializeInternal(splitsCopy);
+
+ while (serialized.length > MAX_SIZE) {
+ List<Bytes> splitsCopy2 = new ArrayList<>(splitsCopy.size() / 2 + 1);
+ for (int i = 0; i < splitsCopy.size(); i += 2) {
+ splitsCopy2.add(splitsCopy.get(i));
+ }
+
+ splitsCopy = splitsCopy2;
+ serialized = serializeInternal(splitsCopy);
+ }
+
+ return serialized;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static byte[] serializeTableSplits(Environment env) {
+ List<Bytes> splits;
+ try {
+ splits =
+ env.getConnector().tableOperations().listSplits(env.getTable()).stream()
+ .map(ByteUtil::toBytes).collect(Collectors.toList());
+ } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ return serialize(splits);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
new file mode 100644
index 0000000..30a97f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.core.util.ByteUtil;
+import org.apache.hadoop.io.Text;
+
+import static java.util.stream.Collectors.toList;
+
+public class TableRange implements Comparable<TableRange> {
+ private final Bytes prevEndRow;
+ private final Bytes endRow;
+ private final int hc;
+
+ public TableRange(Bytes per, Bytes er) {
+ this.prevEndRow = per;
+ this.endRow = er;
+ this.hc = Objects.hash(this.prevEndRow, this.endRow);
+ }
+
+ @Override
+ public int hashCode() {
+ return hc;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TableRange) {
+ TableRange ot = (TableRange) o;
+ return Objects.equals(prevEndRow, ot.prevEndRow) && Objects.equals(endRow, ot.endRow);
+ }
+
+ return false;
+ }
+
+ public Bytes getPrevEndRow() {
+ return prevEndRow;
+ }
+
+ public Bytes getEndRow() {
+ return endRow;
+ }
+
+ public boolean contains(Bytes row) {
+ return (prevEndRow == null || row.compareTo(prevEndRow) > 0)
+ && (endRow == null || row.compareTo(endRow) <= 0);
+ }
+
+ @Override
+ public String toString() {
+ return getPrevEndRow() + " " + getEndRow();
+ }
+
+
+ public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) {
+ List<Bytes> sortedRows = rows.stream().sorted().collect(toList());
+ List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1);
+ for (int i = 0; i < sortedRows.size(); i++) {
+ tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), sortedRows.get(i)));
+ }
+
+ tablets.add(new TableRange(sortedRows.size() == 0 ? null
+ : sortedRows.get(sortedRows.size() - 1), null));
+ return tablets;
+ }
+
+
+
+ public Range getRange() {
+ Text tper = Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null);
+ Text ter = Optional.ofNullable(endRow).map(ByteUtil::toText).orElse(null);
+ return new Range(tper, false, ter, true);
+ }
+
+ @Override
+ public int compareTo(TableRange o) {
+ if (Objects.equals(getEndRow(), o.getEndRow())) {
+ // this will catch case of both null
+ return 0;
+ }
+
+ if (getEndRow() == null) {
+ return 1;
+ }
+
+ if (o.getEndRow() == null) {
+ return -1;
+ }
+
+ return getEndRow().compareTo(o.getEndRow());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
index 6790e54..b3496dd 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/HashTest.java
@@ -63,7 +63,7 @@
byte[] cfcq = NotificationUtil.encodeCol(col);
Key k = new Key(row, ColumnConstants.NOTIFY_CF.toArray(), cfcq, new byte[0], 6);
boolean accept = NotificationHashFilter.accept(k, 7, 3);
- Assert.assertEquals(accept, HashNotificationFinder.shouldProcess(Notification.from(k), 7, 3));
+ Assert.assertEquals(accept, ParitionManager.shouldProcess(Notification.from(k), 7, 3));
return accept;
}
}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
new file mode 100644
index 0000000..8a02edc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toList;
+
+public class PartitionManagerTest {
+
+ @Test
+ public void testGrouping() {
+ IntFunction<String> nff = i -> String.format("f-%04d", i);
+
+ for (int numSplits : new int[] {1, 10, 100, 1000}) {
+ for (int numWorkers : new int[] {1, 5, 10, 11, 100}) {
+ for (int groupSize : new int[] {1, 2, 3, 5, 7, 13, 17, 19, 43, 73, 97}) {
+ int expectedGroups = Math.max(1, numWorkers / groupSize);
+ int maxGroupSize =
+ Math.min(numWorkers,
+ groupSize + (int) Math.ceil((numWorkers % groupSize) / (double) expectedGroups));
+
+ TreeSet<String> children = new TreeSet<>();
+
+ IntStream.range(0, numWorkers).mapToObj(nff).forEach(children::add);
+
+ Collection<Bytes> rows =
+ IntStream.iterate(0, i -> i + 1000).limit(numSplits)
+ .mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList());
+ Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
+
+ Set<String> idCombos = new HashSet<>();
+ Map<Integer, RangeSet> groupTablets = new HashMap<>();
+
+ for (int i = 0; i < numWorkers; i++) {
+ String me = nff.apply(i);
+ PartitionInfo pi = ParitionManager.getGroupInfo(me, children, tablets, groupSize);
+ Assert.assertEquals(expectedGroups, pi.getNumGroups());
+ Assert.assertTrue(pi.getMyGroupSize() >= Math.min(numWorkers, groupSize)
+ && pi.getMyGroupSize() <= maxGroupSize);
+ Assert.assertEquals(numWorkers, pi.getNumWorkers());
+ Assert.assertTrue(pi.getMyIdInGroup() >= 0 && pi.getMyIdInGroup() < maxGroupSize);
+ Assert.assertTrue(pi.getMyGroupId() >= 0 && pi.getMyGroupId() < expectedGroups);
+
+ Assert.assertFalse(idCombos.contains(pi.getMyGroupId() + ":" + pi.getMyIdInGroup()));
+ idCombos.add(pi.getMyGroupId() + ":" + pi.getMyIdInGroup());
+
+ if (!groupTablets.containsKey(pi.getMyGroupId())) {
+ groupTablets.put(pi.getMyGroupId(), pi.getMyGroupsRanges());
+ } else {
+ Assert.assertEquals(groupTablets.get(pi.getMyGroupId()), pi.getMyGroupsRanges());
+ }
+ }
+
+ Assert.assertEquals(numWorkers, idCombos.size());
+
+ // check that the tablets for each group are disjoint and that the union of the tablets
+ // for each group has all tablets
+ HashSet<TableRange> allTabletsFromGroups = new HashSet<>();
+
+ for (RangeSet tabletSet : groupTablets.values()) {
+ tabletSet.forEach(tr -> {
+ Assert.assertFalse(allTabletsFromGroups.contains(tr));
+ allTabletsFromGroups.add(tr);
+ });
+ }
+
+ Assert.assertEquals(new HashSet<>(tablets), allTabletsFromGroups);
+
+ // check that all groups have about the same number of tablets
+ IntSummaryStatistics summaryStats =
+ groupTablets.values().stream().mapToInt(RangeSet::size).summaryStatistics();
+ Assert.assertTrue(summaryStats.getMax() - summaryStats.getMin() < 2);
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
new file mode 100644
index 0000000..13aa941
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/SerializedSplitsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+public class SerializedSplitsTest {
+ @Test
+ public void testLotsOfSplits() {
+ List<Bytes> splits =
+ IntStream.iterate(0, i -> i + 13).limit(1_000_000).mapToObj(i -> String.format("%08x", i))
+ .map(Bytes::of).collect(toList());
+
+ byte[] data = SerializedSplits.serialize(splits);
+ Assert.assertTrue(data.length <= SerializedSplits.MAX_SIZE);
+
+ List<Bytes> splits2 = new ArrayList<>();
+ SerializedSplits.deserialize(splits2::add, data);
+
+ Assert.assertTrue(splits2.size() > 10_000);
+ Assert.assertTrue(splits2.size() < splits.size());
+
+ int expectedDiff =
+ Integer.parseInt(splits2.get(1).toString(), 16)
+ - Integer.parseInt(splits2.get(0).toString(), 16);
+ Assert.assertTrue(expectedDiff > 13);
+ Assert.assertTrue(expectedDiff % 13 == 0);
+ // check that splits are evenly spaced
+ for (int i = 1; i < splits2.size(); i++) {
+ int sp1 = Integer.parseInt(splits2.get(i - 1).toString(), 16);
+ int sp2 = Integer.parseInt(splits2.get(i).toString(), 16);
+ Assert.assertEquals(expectedDiff, sp2 - sp1);
+ Assert.assertEquals(0, sp1 % 13);
+ Assert.assertEquals(0, sp2 % 13);
+ }
+
+ Assert.assertTrue(splits.get(0).compareTo(splits2.get(0)) <= 0);
+ Assert
+ .assertTrue(splits.get(splits.size() - 1).compareTo(splits2.get(splits2.size() - 1)) >= 0);
+ }
+
+ @Test
+ public void testSimple() {
+ Set<Bytes> splits =
+ IntStream.iterate(0, i -> i + 13).limit(1_000).mapToObj(i -> String.format("%08x", i))
+ .map(Bytes::of).collect(toSet());
+
+ byte[] data = SerializedSplits.serialize(splits);
+
+ HashSet<Bytes> splits2 = new HashSet<>();
+ SerializedSplits.deserialize(splits2::add, data);
+
+ Assert.assertEquals(splits, splits2);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
new file mode 100644
index 0000000..c73044a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker.finder.hash;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.fluo.api.data.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TableRangeTest {
+ @Test
+ public void testBasic() {
+ TableRange tr1 = new TableRange(null, null);
+
+ Assert.assertTrue(tr1.contains(Bytes.of("a")));
+ Assert.assertTrue(tr1.contains(Bytes.of("z")));
+ Assert.assertNull(tr1.getEndRow());
+ Assert.assertNull(tr1.getPrevEndRow());
+
+ TableRange tr2 = new TableRange(null, Bytes.of("ma"));
+ Assert.assertTrue(tr2.contains(Bytes.of("a")));
+ Assert.assertTrue(tr2.contains(Bytes.of("ma")));
+ Assert.assertFalse(tr2.contains(Bytes.of("maa")));
+ Assert.assertFalse(tr2.contains(Bytes.of("z")));
+ Assert.assertNull(tr2.getPrevEndRow());
+ Assert.assertEquals(Bytes.of("ma"), tr2.getEndRow());
+
+ TableRange tr3 = new TableRange(Bytes.of("la"), null);
+ Assert.assertFalse(tr3.contains(Bytes.of("a")));
+ Assert.assertFalse(tr3.contains(Bytes.of("la")));
+ Assert.assertTrue(tr3.contains(Bytes.of("laa")));
+ Assert.assertTrue(tr3.contains(Bytes.of("z")));
+ Assert.assertEquals(Bytes.of("la"), tr3.getPrevEndRow());
+ Assert.assertNull(tr3.getEndRow());
+
+ TableRange tr4 = new TableRange(Bytes.of("la"), Bytes.of("ma"));
+ Assert.assertFalse(tr4.contains(Bytes.of("a")));
+ Assert.assertFalse(tr4.contains(Bytes.of("la")));
+ Assert.assertTrue(tr4.contains(Bytes.of("laa")));
+ Assert.assertTrue(tr4.contains(Bytes.of("ma")));
+ Assert.assertFalse(tr4.contains(Bytes.of("maa")));
+ Assert.assertFalse(tr4.contains(Bytes.of("z")));
+ Assert.assertEquals(Bytes.of("la"), tr4.getPrevEndRow());
+ Assert.assertEquals(Bytes.of("ma"), tr4.getEndRow());
+ }
+
+ @Test
+ public void testMultiple() {
+
+ Bytes sp1 = Bytes.of("e1");
+ Bytes sp2 = Bytes.of("m1");
+ Bytes sp3 = Bytes.of("r1");
+
+ Collection<TableRange> trc1 =
+ new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1)));
+
+ Assert.assertEquals(4, trc1.size());
+ Assert.assertTrue(trc1.contains(new TableRange(null, sp1)));
+ Assert.assertTrue(trc1.contains(new TableRange(sp1, sp2)));
+ Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3)));
+ Assert.assertTrue(trc1.contains(new TableRange(sp3, null)));
+
+ Collection<TableRange> trc2 = new HashSet<>(TableRange.toTabletRanges(Collections.emptyList()));
+ Assert.assertEquals(1, trc2.size());
+ Assert.assertTrue(trc2.contains(new TableRange(null, null)));
+ }
+
+ @Test
+ public void testCompare() {
+
+ Bytes sp1 = Bytes.of("e1");
+ Bytes sp2 = Bytes.of("m1");
+
+ TableRange tr1 = new TableRange(null, sp1);
+ TableRange tr2 = new TableRange(sp1, sp2);
+ TableRange tr3 = new TableRange(sp2, null);
+
+ Assert.assertTrue(tr1.compareTo(tr2) < 0);
+ Assert.assertTrue(tr2.compareTo(tr1) > 0);
+
+ Assert.assertTrue(tr2.compareTo(tr3) < 0);
+ Assert.assertTrue(tr3.compareTo(tr2) > 0);
+
+ Assert.assertTrue(tr1.compareTo(tr3) < 0);
+ Assert.assertTrue(tr3.compareTo(tr1) > 0);
+
+ Assert.assertTrue(tr1.compareTo(tr1) == 0);
+ Assert.assertTrue(tr2.compareTo(tr2) == 0);
+ Assert.assertTrue(tr3.compareTo(tr3) == 0);
+
+ Assert.assertTrue(tr1.compareTo(new TableRange(null, sp1)) == 0);
+ Assert.assertTrue(tr2.compareTo(new TableRange(sp1, sp2)) == 0);
+ Assert.assertTrue(tr3.compareTo(new TableRange(sp2, null)) == 0);
+
+ Assert.assertTrue(new TableRange(null, null).compareTo(new TableRange(null, null)) == 0);
+ }
+
+ @Test
+ public void testToRange() {
+ for (String prev : new String[] {null, "foo"}) {
+ for (String end : new String[] {null, "zoo"}) {
+ Assert.assertEquals(new Range(prev, false, end, true), new TableRange(prev == null ? null
+ : Bytes.of(prev), end == null ? null : Bytes.of(end)).getRange());
+ }
+ }
+ }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 9db0ce4..66791b5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -28,7 +28,7 @@
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
+import org.apache.fluo.core.worker.finder.hash.PartitionNotificationFinder;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.mini.MiniFluoImpl;
@@ -171,11 +171,11 @@
try (Environment env = new Environment(config)) {
- NotificationFinder nf1 = new HashNotificationFinder();
+ NotificationFinder nf1 = new PartitionNotificationFinder();
nf1.init(env, ((MiniFluoImpl) miniFluo).getNotificationProcessor());
nf1.start();
- NotificationFinder nf2 = new HashNotificationFinder();
+ NotificationFinder nf2 = new PartitionNotificationFinder();
nf2.init(env, ((MiniFluoImpl) miniFluo).getNotificationProcessor());
nf2.start();
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index 4e798ab..6d1ce5d 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -84,8 +84,8 @@
startMiniAccumulo();
}
- config.setProperty(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP, 50);
- config.setProperty(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP, 100);
+ config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_PROP, 50);
+ config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_PROP, 100);
env = new Environment(config);