fixes #1026 enable scanning notifications (#1032)
diff --git a/.travis.yml b/.travis.yml
index b071db0..c996f5c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,6 @@
- openjdk8
env:
- ADDITIONAL_MAVEN_OPTS=
- - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.8.1
+ - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.9.0
script:
- mvn clean verify javadoc:jar $ADDITIONAL_MAVEN_OPTS
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 3ec090f..355e16b 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -19,6 +19,7 @@
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import javax.inject.Provider;
@@ -37,6 +38,7 @@
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,8 +235,13 @@
}
public ScanUtil.ScanOpts getScanOpts() {
- return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
- hexEncNonAscii, scanAccumuloTable, false);
+ EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+ ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+ ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+ ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+
+ return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, flags);
}
}
}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
index bcd03c0..3844e4d 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -17,12 +17,14 @@
import java.io.IOException;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import com.beust.jcommander.Parameter;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -59,6 +61,9 @@
description = "Export key/values stored in Accumulo as JSON file.")
public boolean exportAsJson = false;
+ @Parameter(names = "--ntfy", help = true, description = "Scan active notifications")
+ public boolean scanNtfy = false;
+
public String getStartRow() {
return startRow;
}
@@ -90,11 +95,23 @@
throw new IllegalArgumentException(
"Both \"--raw\" and \"--json\" can not be set together.");
}
+
+ if (this.scanAccumuloTable && this.scanNtfy) {
+ throw new IllegalArgumentException(
+ "Both \"--raw\" and \"--ntfy\" can not be set together.");
+ }
}
public ScanUtil.ScanOpts getScanOpts() {
- return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
- hexEncNonAscii, scanAccumuloTable, exportAsJson);
+ EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+ ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+ ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+ ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+ ScanUtil.setFlag(flags, exportAsJson, ScanFlags.JSON);
+ ScanUtil.setFlag(flags, scanNtfy, ScanFlags.NTFY);
+
+ return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, flags);
}
public static ScanOptions parse(String[] args) {
@@ -114,14 +131,17 @@
FluoConfiguration config = CommandUtil.resolveFluoConfig();
config.setApplicationName(options.getApplicationName());
options.overrideFluoConfig(config);
- CommandUtil.verifyAppRunning(config);
try {
options.overrideFluoConfig(config);
if (options.scanAccumuloTable) {
config = FluoAdminImpl.mergeZookeeperConfig(config);
ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
+ } else if (options.scanNtfy) {
+ config = FluoAdminImpl.mergeZookeeperConfig(config);
+ ScanUtil.scanNotifications(options.getScanOpts(), config, System.out);
} else {
+ CommandUtil.verifyAppRunning(config);
ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
}
} catch (RuntimeException | IOException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
new file mode 100644
index 0000000..04afc13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.core.impl.Notification;
+
+import static java.util.stream.Collectors.toSet;
+
+public class NotificationScanner implements CellScanner {
+
+ private Iterable<Entry<Key, Value>> scanner;
+ private Predicate<RowColumnValue> filter;
+
+ private static Predicate<RowColumnValue> createColumnFilter(Collection<Column> allColumns) {
+ if (allColumns.size() == 0) {
+ return rcv -> true;
+ } else {
+ Set<Bytes> families = allColumns.stream().filter(col -> !col.isQualifierSet())
+ .map(col -> col.getFamily()).collect(toSet());
+ Set<Column> columns =
+ allColumns.stream().filter(col -> col.isQualifierSet()).collect(toSet());
+
+ if (families.size() == 0) {
+ return rcv -> columns.contains(rcv.getColumn());
+ } else if (columns.size() == 0) {
+ return rcv -> families.contains(rcv.getColumn().getFamily());
+ } else {
+ return rcv -> families.contains(rcv.getColumn().getFamily())
+ || columns.contains(rcv.getColumn());
+ }
+ }
+ }
+
+ NotificationScanner(Scanner scanner, Collection<Column> columns) {
+ this(scanner, createColumnFilter(columns));
+ }
+
+ NotificationScanner(Scanner scanner, Predicate<RowColumnValue> filter) {
+ scanner.clearColumns();
+ Notification.configureScanner(scanner);
+ this.scanner = scanner;
+ this.filter = filter;
+ }
+
+ @VisibleForTesting
+ NotificationScanner(Iterable<Entry<Key, Value>> scanner, Collection<Column> columns) {
+ this.scanner = scanner;
+ this.filter = createColumnFilter(columns);
+ }
+
+ @Override
+ public Iterator<RowColumnValue> iterator() {
+ Iterator<RowColumnValue> iter = Iterators.transform(scanner.iterator(), entry -> {
+ Notification n = Notification.from(entry.getKey());
+ return new RowColumnValue(n.getRow(), n.getColumn(), Bytes.of(entry.getValue().get()));
+ });
+
+ return Iterators.filter(iter, rcv -> filter.test(rcv));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 3cb9d49..a6c8e68 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -20,6 +20,7 @@
import java.text.DateFormat;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -33,6 +34,7 @@
import com.google.gson.JsonIOException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.format.FluoFormatter;
import org.apache.fluo.api.client.FluoClient;
@@ -112,6 +114,31 @@
}
}
+
+ private static void scan(ScanOpts options, PrintStream out, CellScanner cellScanner) {
+ Function<Bytes, String> encoder = getEncoder(options);
+
+ if (options.exportAsJson) {
+ generateJson(cellScanner, encoder, out);
+ } else {
+ for (RowColumnValue rcv : cellScanner) {
+ out.print(encoder.apply(rcv.getRow()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getFamily()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getQualifier()));
+ out.print(' ');
+ out.print(encoder.apply(rcv.getColumn().getVisibility()));
+ out.print("\t");
+ out.print(encoder.apply(rcv.getValue()));
+ out.println();
+ if (out.checkError()) {
+ break;
+ }
+ }
+ }
+ }
+
public static void scanFluo(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
throws IOException {
@@ -121,27 +148,34 @@
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
- Function<Bytes, String> encoder = getEncoder(options);
- if (options.exportAsJson) {
- generateJson(cellScanner, encoder, out);
- } else {
- for (RowColumnValue rcv : cellScanner) {
- out.print(encoder.apply(rcv.getRow()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getFamily()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getQualifier()));
- out.print(' ');
- out.print(encoder.apply(rcv.getColumn().getVisibility()));
- out.print("\t");
- out.print(encoder.apply(rcv.getValue()));
- out.println();
- if (out.checkError()) {
- break;
- }
- }
- }
+ scan(options, out, cellScanner);
+ }
+ }
+ }
+
+ public static void scanNotifications(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
+ throws IOException {
+
+ Connector conn = AccumuloUtil.getConnector(sConfig);
+
+ Span span = getSpan(options);
+ Collection<Column> columns = getColumns(options);
+
+ Scanner scanner = null;
+ try {
+ scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
+
+ scanner.setRange(SpanUtil.toRange(span));
+
+ NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);
+
+ scan(options, out, ntfyScanner);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
}
}
}
@@ -202,6 +236,18 @@
}
}
+ public static enum ScanFlags {
+ HELP,
+ // hex encode node ascii
+ HEX,
+ // scan accumuo table directly
+ ACCUMULO,
+ // endode output as json
+ JSON,
+ // scan notification
+ NTFY
+ }
+
public static class ScanOpts {
private String startRow;
@@ -209,23 +255,24 @@
private List<String> columns;
private String exactRow;
private String rowPrefix;
- public boolean help;
- public boolean hexEncNonAscii = true;
- public boolean scanAccumuloTable = false;
- public boolean exportAsJson = false;
+ public final boolean help;
+ public final boolean hexEncNonAscii;
+ public final boolean scanAccumuloTable;
+ public final boolean exportAsJson;
+ public final boolean scanNtfy;
public ScanOpts(String startRow, String endRow, List<String> columns, String exactRow,
- String rowPrefix, boolean help, boolean hexEncNonAscii, boolean scanAccumuloTable,
- boolean exportAsJson) {
+ String rowPrefix, EnumSet<ScanFlags> flags) {
this.startRow = startRow;
this.endRow = endRow;
this.columns = columns;
this.exactRow = exactRow;
this.rowPrefix = rowPrefix;
- this.help = help;
- this.hexEncNonAscii = hexEncNonAscii;
- this.scanAccumuloTable = scanAccumuloTable;
- this.exportAsJson = exportAsJson;
+ this.help = flags.contains(ScanFlags.HELP);
+ this.hexEncNonAscii = flags.contains(ScanFlags.HEX);
+ this.scanAccumuloTable = flags.contains(ScanFlags.ACCUMULO);
+ this.exportAsJson = flags.contains(ScanFlags.JSON);
+ this.scanNtfy = flags.contains(ScanFlags.NTFY);
}
public String getStartRow() {
@@ -251,4 +298,10 @@
return columns;
}
}
+
+ public static void setFlag(EnumSet<ScanFlags> flags, boolean b, ScanFlags flag) {
+ if (b) {
+ flags.add(flag);
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
new file mode 100644
index 0000000..0228243
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NotificationScannerTest {
+
+ private static class Data implements Iterable<Entry<Key, Value>> {
+ TreeMap<Key, Value> data = new TreeMap<>();
+
+ void putNtfy(String row, String fam, String qual) {
+ byte[] r = row.getBytes(StandardCharsets.UTF_8);
+ byte[] f = ColumnConstants.NOTIFY_CF.toArray();
+ byte[] q = NotificationUtil.encodeCol(new Column(fam, qual));
+
+ data.put(new Key(r, f, q, new byte[0], 42L), new Value(new byte[0]));
+ }
+
+ @Override
+ public Iterator<Entry<Key, Value>> iterator() {
+ return data.entrySet().iterator();
+ }
+ }
+
+
+ /**
+ * When scanning notifications, column filtering is done on the client side. This test ensures
+ * that filtering works correctly.
+ */
+ @Test
+ public void testColumnFiltering() {
+
+ Data data = new Data();
+ data.putNtfy("r001", "f8", "q2");
+ data.putNtfy("r001", "f9", "q1");
+ data.putNtfy("r002", "f8", "q2");
+ data.putNtfy("r002", "f8", "q3");
+ data.putNtfy("r004", "f9", "q3");
+ data.putNtfy("r004", "f9", "q4");
+
+ HashSet<RowColumnValue> expected = new HashSet<>();
+ expected.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+ expected.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ expected.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+ expected.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+ expected.add(new RowColumnValue("r004", new Column("f9", "q3"), ""));
+ expected.add(new RowColumnValue("r004", new Column("f9", "q4"), ""));
+
+ NotificationScanner scanner = new NotificationScanner(data, Collections.emptySet());
+ HashSet<RowColumnValue> actual = new HashSet<>();
+ scanner.forEach(actual::add);
+ Assert.assertEquals(expected, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ HashSet<RowColumnValue> expected2 = new HashSet<>();
+ expected.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f9"))
+ .forEach(expected2::add);
+ Assert.assertEquals(expected2, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9"), new Column("f8")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ Assert.assertEquals(expected, actual);
+
+ scanner = new NotificationScanner(data, Arrays.asList(new Column("f9", "q1")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ expected2.clear();
+ expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ Assert.assertEquals(expected2, actual);
+
+ scanner =
+ new NotificationScanner(data, Arrays.asList(new Column("f9", "q1"), new Column("f8")));
+ actual.clear();
+ scanner.forEach(actual::add);
+ expected2.clear();
+ expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+ expected2.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+ expected2.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+ expected2.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+ Assert.assertEquals(expected2, actual);
+ }
+}