updated for recipes changes
diff --git a/modules/core/src/main/java/webindex/core/IndexClient.java b/modules/core/src/main/java/webindex/core/IndexClient.java
index ec7861f..f22a346 100644
--- a/modules/core/src/main/java/webindex/core/IndexClient.java
+++ b/modules/core/src/main/java/webindex/core/IndexClient.java
@@ -14,12 +14,11 @@
package webindex.core;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.function.Consumer;
import com.google.gson.Gson;
import org.apache.accumulo.core.client.Connector;
@@ -248,10 +247,10 @@
return links;
}
- public static Collection<Mutation> genDomainMutations(DomainUpdate update, long seq) {
+ public static void genDomainMutations(DomainUpdate update, long seq, Consumer<Mutation> consumer) {
Map<RowColumn, Bytes> oldData = genDomainData(update.getDomain(), update.getOldPageCount());
Map<RowColumn, Bytes> newData = genDomainData(update.getDomain(), update.getNewPageCount());
- return AccumuloExporter.generateMutations(seq, oldData, newData);
+ AccumuloExporter.generateMutations(seq, oldData, newData, consumer);
}
public static Map<RowColumn, Bytes> genDomainData(String domain, Long pageCount) {
@@ -262,37 +261,33 @@
Bytes.of(pageCount + ""));
}
- public static Collection<Mutation> genPageMutations(PageUpdate update, long seq) {
- int listSize = update.getAddedLinks().size() + update.getDeletedLinks().size() + 1;
- ArrayList<Mutation> mutations = new ArrayList<>(listSize);
-
+ public static void genPageMutations(PageUpdate update, long seq, Consumer<Mutation> consumer) {
Mutation jsonMutation = new Mutation("p:" + update.getUri());
if (update.getJson().equals(Page.DELETE_JSON)) {
jsonMutation.putDelete(Constants.PAGE, Constants.CUR, seq);
} else {
jsonMutation.put(Constants.PAGE, Constants.CUR, seq, update.getJson());
}
- mutations.add(jsonMutation);
+ consumer.accept(jsonMutation);
// invert links on export
for (Link link : update.getAddedLinks()) {
Mutation m = new Mutation("p:" + link.getUri());
m.put(Constants.INLINKS, update.getUri(), seq, link.getAnchorText());
- mutations.add(m);
+ consumer.accept(m);
}
for (Link link : update.getDeletedLinks()) {
Mutation m = new Mutation("p:" + link.getUri());
m.putDelete(Constants.INLINKS, update.getUri(), seq);
- mutations.add(m);
+ consumer.accept(m);
}
- return mutations;
}
- public static Collection<Mutation> genUriMutations(UriUpdate update, long seq) {
+ public static void genUriMutations(UriUpdate update, long seq, Consumer<Mutation> consumer) {
Map<RowColumn, Bytes> oldData = genUriData(update.getUri(), update.getOldInfo());
Map<RowColumn, Bytes> newData = genUriData(update.getUri(), update.getNewInfo());
- return AccumuloExporter.generateMutations(seq, oldData, newData);
+ AccumuloExporter.generateMutations(seq, oldData, newData, consumer);
}
public static Map<RowColumn, Bytes> genUriData(String uri, UriInfo info) {
diff --git a/modules/data/src/main/java/webindex/data/fluo/IndexExporter.java b/modules/data/src/main/java/webindex/data/fluo/IndexExporter.java
index 7e05e35..13a517d 100644
--- a/modules/data/src/main/java/webindex/data/fluo/IndexExporter.java
+++ b/modules/data/src/main/java/webindex/data/fluo/IndexExporter.java
@@ -14,7 +14,7 @@
package webindex.data.fluo;
-import java.util.Collection;
+import java.util.function.Consumer;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.api.metrics.Meter;
@@ -48,22 +48,23 @@
}
@Override
- protected Collection<Mutation> translate(SequencedExport<String, IndexUpdate> export) {
+ protected void translate(SequencedExport<String, IndexUpdate> export, Consumer<Mutation> consumer) {
if (export.getValue() instanceof DomainUpdate) {
domainsExported.mark();
- return IndexClient.genDomainMutations((DomainUpdate) export.getValue(), export.getSequence());
+ IndexClient.genDomainMutations((DomainUpdate) export.getValue(), export.getSequence(),
+ consumer);
} else if (export.getValue() instanceof PageUpdate) {
pagesExported.mark();
- return IndexClient.genPageMutations((PageUpdate) export.getValue(), export.getSequence());
+ IndexClient.genPageMutations((PageUpdate) export.getValue(), export.getSequence(), consumer);
} else if (export.getValue() instanceof UriUpdate) {
linksExported.mark();
- return IndexClient.genUriMutations((UriUpdate) export.getValue(), export.getSequence());
+ IndexClient.genUriMutations((UriUpdate) export.getValue(), export.getSequence(), consumer);
+ } else {
+ String msg =
+ "An object with an IndexUpdate class (" + export.getValue().getClass().toString()
+ + ") was placed on the export queue";
+ log.error(msg);
+ throw new IllegalStateException(msg);
}
-
- String msg =
- "An object with an IndexUpdate class (" + export.getValue().getClass().toString()
- + ") was placed on the export queue";
- log.error(msg);
- throw new IllegalStateException(msg);
}
}