Merge pull request #8 from mikewalch/indexing
Closes #4 - Accumulo indexes are now updated using Fluo IndexExporter.
diff --git a/.gitignore b/.gitignore
index b422b3a..abdc4a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,5 +4,5 @@
.classpath
.project
.settings
-target
-data
+target/
+/data/
diff --git a/docs/observer-design.md b/docs/observer-design.md
index 80a47ca..8aa224b 100644
--- a/docs/observer-design.md
+++ b/docs/observer-design.md
@@ -18,22 +18,19 @@
d:com.c domain pagecount 2
d:com.d domain pagecount 1
p:com.a/page1 page cur {"outlinkcount": 2, "outlinks":[c.com, b.com]}
- page score 1
+ incount 0
p:com.b inlinks com.a/pag1 anchorText
page cur {"outlinkcount": 2, "outlinks":[c.com/page1, c.com]}
- page incount 1
- page score 2
+ incount 1
p:com.c inlinks com.a/page1 anchorText
com.b anchorText
com.d anchorText
page incount 3
- score 3
p:com.c/page1 inlinks com.b anchorText
page incount 1
- score 1
p:com.d page cur {"outlinkcount": 1, "outlinks":[c.com]}
- page score 1
-
+ incount 0
+
Resulting Accumulo Table
@@ -49,94 +46,20 @@
d:com.d domain pagecount 1
rank 1:com.d 1
p:com.a/page1 page cur {"outlinkcount": 2, "outlinks":[c.com, b.com]}
- page score 1
+ incount 0
p:com.b inlinks com.a/pag1 anchorText
page cur {"outlinkcount": 2, "outlinks":[c.com/page1, c.com]}
- page incount 1
- page score 2
+ incount 1
p:com.c inlinks com.a/page1 anchorText
com.b anchorText
com.d anchorText
page incount 3
- score 3
p:com.c/page1 inlinks com.b anchorText
page incount 1
- score 1
p:com.d page cur {"outlinkcount": 1, "outlinks":[c.com]}
- page score 1
- ti:3:com.c incount rank 3
- ti:2:com.b 2
- ti:1:com.c/page1 1
- tp:2:com.c pagecount rank 2
- tp:1:com.a 1
- tp:1.com.b 1
- ts:3:com.c score rank 3
- ts:2:com.b 2
- ts:1:com.c/page1 1
- ts:1:com.a/page1 1
- ts:1:com.d 1
-
-Below are available operations:
-
- get(row, col) -> value
- set(row, col, value)
- del(row, col)
-
-PageLoader is called with `pageUri` & `pageJson`
-
- curJson = get(pageUri, page:cur)
- if curJson != pageJson
- set(pageUri, page:new, pageJson)
-
-PageObserver watches `page:new` is called with `pageUri`
-
- curJson = get(pageUri, page:cur)
- newJson = get(pageUri, page:new)
-
- newLinks,delLinks = compare(curJson, newJson)
-
- for link in newLinks:
- set(linkUri, inlinks-update:pageUri, [add,anchorText])
-
- for link in delLinks
- set(linkUri, inlinks-update:pageUri, [del])
-
- if newLinks.isNotEmpty() or delLinks.isNotEmpty():
- set(linkUri, inlinks-changed:ntfy, "")
-
- if curJson == null:
- pageScore = get(pageUri, stats:pagescore).toInteger(0)
-
- if pageScore != 0:
- del(pageDomain, rank:pageScore:pageUri)
-
- set(pageUri, stats:pagescore, pageScore+1)
- set(pageDomain, rank:pageScore+1:pageUri)
-
- set(pageUri, page:cur, newJson)
- del(pageUri, page:new)
-
-InlinksObserver weakly watches `update:inlinks` is called with `pageUri`
-
- List<Update> updates = get(pageUri, update:inlinks)
-
- change = 0
-
- for update in updates:
- if update.action == del:
- change--
- del(pageUri, inlinks:update.linkUri)
- elif update.action == add:
- change++
- set(pageUri, inlinks:update.linkUri, update.anchorText)
-
- if change != 0:
- curCount = get(pageUri, stats:inlinkcount)
- curScore = get(pageUri, stats:pagescore).toInteger(0)
-
- if pageScore != 0:
- del(pageDomain, rank:curScore:pageUri)
-
- set(pageDomain, rank:curScore+change:pageUri)
- set(pageUri, stats:inlinkcount, curCount+change)
- set(pageUri, stats:pagescore, curScore+change)
+ incount 0
+ t:incount rank 3:com.c 3
+ 2:com.b 2
+ 1:com.c/page1 1
+ 0:com.a/page1 0
+ 0:com.d 0
diff --git a/modules/core/src/main/java/io/fluo/webindex/core/Constants.java b/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
index e351deb..056a501 100644
--- a/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
+++ b/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
@@ -28,7 +28,6 @@
// Column Qualifiers
// for page
public static final String INCOUNT = "incount";
- public static final String SCORE = "score";
public static final String NEW = "new";
public static final String CUR = "cur";
// for domains
diff --git a/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java b/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
index a6b5b77..4adf55c 100644
--- a/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
+++ b/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
@@ -14,20 +14,20 @@
package io.fluo.webindex.core.models;
+import java.io.Serializable;
import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
import io.fluo.webindex.core.DataUtil;
-public class Page {
+public class Page implements Serializable {
public static Page EMPTY = new Page();
private String url = "";
private String domain;
private Long numInbound;
private Long numOutbound = new Long(0);
- private Long score;
private String crawlDate;
private String server;
private String title;
@@ -59,10 +59,26 @@
return outboundLinks;
}
- public void addOutboundLink(String url, String anchorText) {
- if (outboundLinks.add(new Link(url, anchorText))) {
+ /**
+ * @return True if page did not already contain link
+ */
+ public boolean addOutboundLink(String url, String anchorText) {
+ boolean added = outboundLinks.add(new Link(url, anchorText));
+ if (added) {
numOutbound++;
}
+ return added;
+ }
+
+ /**
+ * @return True if link was removed
+ */
+ public boolean removeOutboundLink(String url) {
+ boolean removed = outboundLinks.remove(new Link(url));
+ if (removed) {
+ numOutbound--;
+ }
+ return removed;
}
public boolean isEmpty() {
@@ -89,14 +105,6 @@
return numOutbound;
}
- public Long getScore() {
- return score;
- }
-
- public void setScore(Long score) {
- this.score = score;
- }
-
public String getCrawlDate() {
return crawlDate;
}
@@ -113,7 +121,7 @@
this.title = title;
}
- public class Link {
+ public class Link implements Serializable {
private String url;
private String anchorText;
@@ -123,6 +131,10 @@
this.anchorText = anchorText;
}
+ public Link(String url) {
+ this(url, "");
+ }
+
public String getUrl() {
return url;
}
diff --git a/modules/data/pom.xml b/modules/data/pom.xml
index 1390510..a97764a 100644
--- a/modules/data/pom.xml
+++ b/modules/data/pom.xml
@@ -75,6 +75,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -110,6 +116,12 @@
<groupId>io.fluo</groupId>
<artifactId>fluo-mini</artifactId>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Test Dependencies -->
<dependency>
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/Init.java b/modules/data/src/main/java/io/fluo/webindex/data/Init.java
index 4b0107e..54951f1 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/Init.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/Init.java
@@ -17,16 +17,17 @@
import java.util.LinkedList;
import java.util.List;
+import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.RowColumn;
-import io.fluo.webindex.core.Constants;
+import io.fluo.mapreduce.FluoKeyValue;
+import io.fluo.mapreduce.FluoKeyValueGenerator;
import io.fluo.webindex.core.DataConfig;
+import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.spark.IndexStats;
import io.fluo.webindex.data.spark.IndexUtil;
import io.fluo.webindex.data.util.WARCFileInputFormat;
-import io.fluo.mapreduce.FluoKeyValue;
-import io.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.archive.io.ArchiveReader;
@@ -48,28 +50,16 @@
private static final Logger log = LoggerFactory.getLogger(Init.class);
private static IndexEnv env;
- public static void loadAccumulo(JavaPairRDD<RowColumn, Long> sortedCounts) throws Exception {
-
- JavaPairRDD<RowColumn, Long> filteredSortedCounts =
- sortedCounts.filter(t -> !(t._1().getRow().toString().startsWith("d:") && t._1()
- .getColumn().getFamily().toString().equals(Constants.PAGES)));
-
+ public static void loadAccumulo(JavaPairRDD<RowColumn, Bytes> linkIndex) throws Exception {
JavaPairRDD<Key, Value> accumuloData =
- filteredSortedCounts.mapToPair(new PairFunction<Tuple2<RowColumn, Long>, Key, Value>() {
+ linkIndex.mapToPair(new PairFunction<Tuple2<RowColumn, Bytes>, Key, Value>() {
@Override
- public Tuple2<Key, Value> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+ public Tuple2<Key, Value> call(Tuple2<RowColumn, Bytes> tuple) throws Exception {
RowColumn rc = tuple._1();
String row = rc.getRow().toString();
String cf = rc.getColumn().getFamily().toString();
String cq = rc.getColumn().getQualifier().toString();
- byte[] val = tuple._2().toString().getBytes();
- if (cf.equals(Constants.INLINKS)
- || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
- if (tuple._2() > 1) {
- log.info("Found key {} with count of {}", tuple._1(), tuple._2().toString());
- }
- val = rc.getColumn().getVisibility().toArray();
- }
+ byte[] val = tuple._2().toArray();
return new Tuple2<>(new Key(new Text(row), new Text(cf), new Text(cq)), new Value(val));
}
});
@@ -89,26 +79,17 @@
TextOutputFormat.class);
}
- public static void loadFluo(JavaPairRDD<RowColumn, Long> sortedCounts) throws Exception {
+ public static void loadFluo(JavaPairRDD<RowColumn, Bytes> linkIndex) throws Exception {
JavaPairRDD<Key, Value> fluoData =
- sortedCounts.flatMapToPair(new PairFlatMapFunction<Tuple2<RowColumn, Long>, Key, Value>() {
+ linkIndex.flatMapToPair(new PairFlatMapFunction<Tuple2<RowColumn, Bytes>, Key, Value>() {
@Override
- public Iterable<Tuple2<Key, Value>> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+ public Iterable<Tuple2<Key, Value>> call(Tuple2<RowColumn, Bytes> tuple) throws Exception {
List<Tuple2<Key, Value>> output = new LinkedList<>();
RowColumn rc = tuple._1();
String row = rc.getRow().toString();
String cf = rc.getColumn().getFamily().toString();
String cq = rc.getColumn().getQualifier().toString();
- byte[] val = tuple._2().toString().getBytes();
-
- if (cf.equals(Constants.RANK) || cq.equals(Constants.RANK)
- || (row.startsWith("d:") && cf.equals(Constants.PAGES))) {
- return output;
- }
- if (cf.equals(Constants.INLINKS)
- || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
- val = rc.getColumn().getVisibility().toArray();
- }
+ byte[] val = tuple._2().toArray();
FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
fkvg.setRow(row).setColumn(new Column(cf, cq)).setValue(val);
for (FluoKeyValue kv : fkvg.getKeyValues()) {
@@ -144,16 +125,16 @@
env.getSparkCtx().newAPIHadoopFile(dataConfig.watDataDir, WARCFileInputFormat.class,
Text.class, ArchiveReader.class, new Configuration());
- JavaPairRDD<RowColumn, Long> sortedLinkCounts = IndexUtil.createLinkCounts(stats, archives);
+ JavaRDD<Page> pages = IndexUtil.createPages(archives);
- JavaPairRDD<RowColumn, Long> sortedTopCounts =
- IndexUtil.createSortedTopCounts(sortedLinkCounts);
+ JavaPairRDD<RowColumn, Bytes> linkIndex = IndexUtil.createLinkIndex(stats, pages);
// Load intermediate results into Fluo
- loadFluo(sortedTopCounts);
+ JavaPairRDD<RowColumn, Bytes> linkIndexNoRank = IndexUtil.filterRank(linkIndex);
+ loadFluo(linkIndexNoRank);
// Load final indexes into Accumulo
- loadAccumulo(sortedTopCounts);
+ loadAccumulo(linkIndex);
// For testing, Load into HDFS
// loadHDFS(sortedTopCounts);
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java b/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
index 221899f..576156c 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
@@ -19,7 +19,7 @@
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.recipes.accumulo.export.AccumuloExporter;
+import io.fluo.api.data.Bytes;
import io.fluo.recipes.accumulo.export.TableInfo;
import io.fluo.recipes.export.ExportQueue;
import io.fluo.recipes.transaction.TxLog;
@@ -41,9 +41,9 @@
appConfig.addObserver(new ObserverConfiguration(InlinksObserver.class.getName()));
ExportQueue.configure(appConfig, new ExportQueue.Options(IndexExporter.QUEUE_ID,
- IndexExporter.class, String.class, TxLog.class, numExportBuckets));
+ IndexExporter.class, Bytes.class, TxLog.class, numExportBuckets));
- AccumuloExporter.setExportTableInfo(appConfig.getAppConfiguration(), IndexExporter.QUEUE_ID,
+ IndexExporter.setExportTableInfo(appConfig.getAppConfiguration(), IndexExporter.QUEUE_ID,
exportTable);
}
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java b/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
index d8fc838..4b2ccae 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
@@ -22,13 +22,13 @@
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.RowColumn;
+import io.fluo.mapreduce.FluoEntryInputFormat;
import io.fluo.webindex.core.Constants;
import io.fluo.webindex.core.DataConfig;
import io.fluo.webindex.core.DataUtil;
import io.fluo.webindex.data.spark.IndexEnv;
import io.fluo.webindex.data.spark.IndexUtil;
import io.fluo.webindex.data.util.LinkUtil;
-import io.fluo.mapreduce.FluoEntryInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -105,16 +105,14 @@
String cq = rc.getColumn().getQualifier().toString();
Bytes v = kvTuple._2();
if (row.startsWith("p:") && cf.equals(Constants.PAGE)
- && (cq.equals(Constants.INCOUNT) || cq.equals(Constants.SCORE))) {
+ && cq.equals(Constants.INCOUNT)) {
String pageUri = row.substring(2);
Long num = Long.parseLong(v.toString());
Column rankCol =
new Column(Constants.RANK, String.format("%s:%s",
IndexUtil.revEncodeLong(num), pageUri));
- if (cq.equals(Constants.SCORE)) {
- String domain = "d:" + LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
- retval.add(new Tuple2<>(new RowColumn(domain, rankCol), v));
- }
+ String domain = "d:" + LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
+ retval.add(new Tuple2<>(new RowColumn(domain, rankCol), v));
retval.add(new Tuple2<>(new RowColumn("t:" + cq, rankCol), v));
}
return retval;
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
index e1162ba..3be8a79 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
@@ -14,49 +14,113 @@
package io.fluo.webindex.data.fluo;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.RowColumn;
import io.fluo.recipes.accumulo.export.AccumuloExporter;
+import io.fluo.recipes.transaction.LogEntry;
import io.fluo.recipes.transaction.TxLog;
import io.fluo.webindex.core.Constants;
+import io.fluo.webindex.data.spark.IndexUtil;
+import io.fluo.webindex.data.util.FluoConstants;
import org.apache.accumulo.core.data.Mutation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IndexExporter extends AccumuloExporter<String, TxLog> {
+public class IndexExporter extends AccumuloExporter<Bytes, TxLog> {
private static final Logger log = LoggerFactory.getLogger(IndexExporter.class);
- public static final String QUEUE_ID = "peq";
+ public static final String QUEUE_ID = "webIndexQ";
+
+ public static void deleteRankIndex(Map<Bytes, Mutation> mutations, Bytes row, String pageUri,
+ long seq, Long prev) {
+ if (prev != null) {
+ Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+ String cf = String.format("%s:%s", IndexUtil.revEncodeLong(prev), pageUri);
+ m.putDelete(Constants.RANK.getBytes(), cf.getBytes(), seq);
+ log.debug("Deleted rank index for row {} cf {} seq {}", row.toString(), cf, seq);
+ }
+ }
+
+ public static void updateRankIndex(Map<Bytes, Mutation> mutations, Bytes row, String pageUri,
+ long seq, Long prev, Long cur) {
+ if (!cur.equals(prev)) {
+ deleteRankIndex(mutations, row, pageUri, seq, prev);
+ Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+ String cf = String.format("%s:%s", IndexUtil.revEncodeLong(cur), pageUri);
+ m.put(Constants.RANK.getBytes(), cf.getBytes(), seq, cur.toString().getBytes());
+ log.debug("Adding rank index for row {} cf {} seq {} val {}", row.toString(), cf, seq,
+ cur.toString());
+ }
+ }
@Override
- protected List<Mutation> convert(String key, long seq, TxLog txLog) {
- Mutation m = new Mutation(key);
- boolean modified = false;
- for (TxLog.LogEntry entry : txLog.getLogEntries()) {
+ protected Collection<Mutation> convert(Bytes key, long seq, TxLog txLog) {
+ Map<Bytes, Mutation> mutations = new HashMap<>();
+ Map<RowColumn, Bytes> getMap = txLog.getOperationMap(LogEntry.Operation.GET);
+ for (LogEntry entry : txLog.getLogEntries()) {
+ LogEntry.Operation op = entry.getOp();
+ Bytes row = entry.getRow();
+ Column col = entry.getColumn();
Bytes fam = entry.getColumn().getFamily();
Bytes qual = entry.getColumn().getQualifier();
Bytes val = entry.getValue();
- if (fam.toString().equals(Constants.PAGE) || fam.toString().equals(Constants.INLINKS)) {
- log.info("{} {} row {} col {} val {}", seq, entry.getType(), entry.getRow(),
- entry.getColumn(), entry.getValue());
- switch (entry.getType()) {
- case DELETE:
- m.putDelete(fam.toArray(), qual.toArray());
- case SET:
- m.put(fam.toArray(), qual.toArray(), val.toArray());
- default:
- break;
+ log.debug("{} {} row {} col {} val {}", seq, entry.getOp(), entry.getRow(),
+ entry.getColumn(), entry.getValue());
+
+ if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
+ Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+ if (entry.getOp().equals(LogEntry.Operation.DELETE)) {
+ m.putDelete(fam.toArray(), qual.toArray(), seq);
+ } else {
+ m.put(fam.toArray(), qual.toArray(), seq, val.toArray());
}
- modified = true;
+ }
+
+ if (col.equals(FluoConstants.PAGE_INCOUNT_COL)
+ && (op.equals(LogEntry.Operation.SET) || op.equals(LogEntry.Operation.DELETE))) {
+
+ String pageUri = row.toString().substring(2);
+ Long prev = null;
+ Bytes prevGet = getMap.get(new RowColumn(row, FluoConstants.PAGE_INCOUNT_COL));
+ if (prevGet != null) {
+ prev = Long.parseLong(prevGet.toString());
+ }
+
+ // update domain counts
+ Bytes domainRow = PageObserver.getDomainRow(row);
+ if (!domainRow.equals(Bytes.EMPTY)) {
+ if (op.equals(LogEntry.Operation.SET)) {
+ Long cur = Long.parseLong(val.toString());
+ updateRankIndex(mutations, domainRow, pageUri, seq, prev, cur);
+ } else {
+ deleteRankIndex(mutations, domainRow, pageUri, seq, prev);
+ }
+ }
+
+ // update total counts
+ Bytes totalRow = Bytes.of("t:" + Constants.INCOUNT);
+ if (op.equals(LogEntry.Operation.SET)) {
+ Long cur = Long.parseLong(val.toString());
+ updateRankIndex(mutations, totalRow, pageUri, seq, prev, cur);
+ } else {
+ deleteRankIndex(mutations, totalRow, pageUri, seq, prev);
+ }
}
}
- if (modified) {
- return Collections.singletonList(m);
- }
- return Collections.emptyList();
+ return mutations.values();
+ }
+
+ public static Predicate<LogEntry> getFilter() {
+ return le -> le.getColumn().getFamily().toString().equals(Constants.PAGE)
+ || le.getColumn().getFamily().toString().equals(Constants.INLINKS)
+ || le.getColumn().equals(FluoConstants.PAGECOUNT_COL);
}
}
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
index 510ff2c..8707aac 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
@@ -36,7 +36,7 @@
public class InlinksObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(InlinksObserver.class);
- private ExportQueue<String, TxLog> exportQueue;
+ private ExportQueue<Bytes, TxLog> exportQueue;
@Override
public void init(Context context) throws Exception {
@@ -46,7 +46,7 @@
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+ RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx, IndexExporter.getFilter());
TypedTransactionBase ttx = FluoConstants.TYPEL.wrap(rtx);
String pageUri = row.toString().substring(2);
@@ -81,29 +81,32 @@
}
}
- Long incount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong(0) + change;
- if (incount <= 0) {
- if (incount < 0) {
- log.error("Incount for {} is negative: {}", row, incount);
- }
- ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
- } else {
- ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(incount);
+ boolean knownPage = true;
+ Long prevCount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong();
+ if (prevCount == null) {
+ knownPage = false;
+ prevCount = new Long(0);
}
- Long score = ttx.get().row(row).col(FluoConstants.PAGE_SCORE_COL).toLong(0) + change;
- if (score <= 0) {
- if (score < 0) {
- log.error("Score for {} is negative: {}", row, incount);
+ Long curCount = prevCount + change;
+ if (curCount <= 0) {
+ if (curCount < 0) {
+ log.error("Incount for {} is negative: {}", row, curCount);
}
- ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).delete();
+ ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
+ if (knownPage) {
+ PageObserver.updateDomainPageCount(ttx, row, -1);
+ }
} else {
- ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).set(score);
+ ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(curCount);
+ if (!knownPage) {
+ PageObserver.updateDomainPageCount(ttx, row, 1);
+ }
}
TxLog txLog = rtx.getTxLog();
- if (!txLog.getLogEntries().isEmpty()) {
- exportQueue.add(tx, row.toString(), txLog);
+ if (!txLog.isEmpty()) {
+ exportQueue.add(tx, row, txLog);
}
}
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
index 2dc8039..a5834e2 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
@@ -14,6 +14,7 @@
package io.fluo.webindex.data.fluo;
+import java.text.ParseException;
import java.util.Collections;
import java.util.Set;
@@ -27,8 +28,10 @@
import io.fluo.recipes.export.ExportQueue;
import io.fluo.recipes.transaction.RecordingTransactionBase;
import io.fluo.recipes.transaction.TxLog;
+import io.fluo.webindex.core.DataUtil;
import io.fluo.webindex.core.models.Page;
import io.fluo.webindex.data.util.FluoConstants;
+import io.fluo.webindex.data.util.LinkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +39,7 @@
private static final Logger log = LoggerFactory.getLogger(PageObserver.class);
private static final Gson gson = new Gson();
- private ExportQueue<String, TxLog> exportQueue;
+ private ExportQueue<Bytes, TxLog> exportQueue;
@Override
public void init(Context context) throws Exception {
@@ -46,7 +49,7 @@
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+ RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx, IndexExporter.getFilter());
TypedTransactionBase ttx = FluoConstants.TYPEL.wrap(rtx);
String nextJson = ttx.get().row(row).col(FluoConstants.PAGE_NEW_COL).toString("");
if (nextJson.isEmpty()) {
@@ -59,16 +62,22 @@
if (!curJson.isEmpty()) {
Page curPage = gson.fromJson(curJson, Page.class);
curLinks = curPage.getOutboundLinks();
- } else {
- Long score = ttx.get().row(row).col(FluoConstants.PAGE_SCORE_COL).toLong(0);
- Long newScore = score + 1;
- ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).set(newScore);
+ }
+
+ if (curJson.isEmpty() && !nextJson.equals("delete")) {
+ Long incount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong();
+ if (incount == null) {
+ ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(new Long(0));
+ }
+ updateDomainPageCount(ttx, row, 1);
}
Page nextPage;
if (nextJson.equals("delete")) {
ttx.mutate().row(row).col(FluoConstants.PAGE_CUR_COL).delete();
- ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).delete();
+ ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong(); // get for indexing
+ ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
+ updateDomainPageCount(ttx, row, -1);
nextPage = Page.EMPTY;
} else {
ttx.mutate().row(row).col(FluoConstants.PAGE_CUR_COL).set(nextJson);
@@ -97,8 +106,8 @@
ttx.mutate().row(row).col(FluoConstants.PAGE_NEW_COL).delete();
TxLog txLog = rtx.getTxLog();
- if (!txLog.getLogEntries().isEmpty()) {
- exportQueue.add(tx, row.toString(), txLog);
+ if (!txLog.isEmpty()) {
+ exportQueue.add(tx, row, txLog);
}
}
@@ -106,4 +115,29 @@
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoConstants.PAGE_NEW_COL, NotificationType.STRONG);
}
+
+ public static void updateDomainPageCount(TypedTransactionBase ttx, Bytes pageRow, long change) {
+ Bytes domainRow = getDomainRow(pageRow);
+ if (!domainRow.equals(Bytes.EMPTY)) {
+ Long prevCount = ttx.get().row(domainRow).col(FluoConstants.PAGECOUNT_COL).toLong(0);
+ Long curCount = prevCount + change;
+ if (curCount == 0) {
+ ttx.mutate().row(domainRow).col(FluoConstants.PAGECOUNT_COL).delete();
+ log.debug("Deleted pagecount for {}", domainRow);
+ } else {
+ ttx.mutate().row(domainRow).col(FluoConstants.PAGECOUNT_COL).set(curCount);
+ log.debug("Updated pagecount for {} from {} to {}", domainRow, prevCount, curCount);
+ }
+ }
+ }
+
+ public static Bytes getDomainRow(Bytes pageRow) {
+ String pageUri = pageRow.toString().substring(2);
+ try {
+ String pageDomain = LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
+ return Bytes.of("d:" + pageDomain);
+ } catch (ParseException e) {
+ return Bytes.EMPTY;
+ }
+ }
}
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
index 836f5e6..1f2588b 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
@@ -21,9 +21,9 @@
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.RowColumn;
-import io.fluo.webindex.core.DataConfig;
import io.fluo.core.util.AccumuloUtil;
import io.fluo.core.util.SpanUtil;
+import io.fluo.webindex.core.DataConfig;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
index 025a337..6e6004a 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
@@ -19,6 +19,7 @@
import java.util.Set;
import com.google.gson.Gson;
+import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.RowColumn;
import io.fluo.webindex.core.Constants;
@@ -35,7 +36,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.api.java.function.PairFunction;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import scala.Tuple2;
@@ -44,8 +45,11 @@
private static Gson gson = new Gson();
- public static JavaPairRDD<RowColumn, Long> createLinkCounts(IndexStats stats,
- JavaPairRDD<Text, ArchiveReader> archives) {
+ private static void addRCV(List<Tuple2<RowColumn, Long>> tuples, String r, Column c, Long v) {
+ tuples.add(new Tuple2<>(new RowColumn(r, c), v));
+ }
+
+ public static JavaRDD<Page> createPages(JavaPairRDD<Text, ArchiveReader> archives) {
JavaRDD<ArchiveRecord> records =
archives.flatMap(new FlatMapFunction<Tuple2<Text, ArchiveReader>, ArchiveRecord>() {
@@ -56,58 +60,49 @@
});
JavaRDD<Page> pages = records.map(r -> ArchiveUtil.buildPageIgnoreErrors(r));
+ return pages;
+ }
- JavaRDD<RowColumn> links = pages.flatMap(new FlatMapFunction<Page, RowColumn>() {
- @Override
- public Iterable<RowColumn> call(Page page) throws Exception {
- if (page.isEmpty()) {
- stats.addEmpty(1);
- return new ArrayList<>();
- }
- stats.addPage(1);
- Set<Page.Link> links = page.getOutboundLinks();
- stats.addExternalLinks(links.size());
+ public static JavaPairRDD<RowColumn, Bytes> createLinkIndex(IndexStats stats, JavaRDD<Page> pages) {
- List<RowColumn> retval = new ArrayList<>();
- String pageUri = page.getUri();
- String pageDomain = LinkUtil.getReverseTopPrivate(page.getUrl());
- if (links.size() > 0) {
- retval.add(new RowColumn("p:" + pageUri, FluoConstants.PAGE_SCORE_COL));
- retval.add(new RowColumn("p:" + pageUri, new Column(Constants.PAGE, Constants.CUR, gson
- .toJson(page))));
- retval.add(new RowColumn("d:" + pageDomain, new Column(Constants.PAGES, pageUri)));
- }
- for (Page.Link link : links) {
- String linkUri = link.getUri();
- String linkDomain = LinkUtil.getReverseTopPrivate(link.getUrl());
- retval.add(new RowColumn("p:" + linkUri, FluoConstants.PAGE_INCOUNT_COL));
- retval.add(new RowColumn("p:" + linkUri, FluoConstants.PAGE_SCORE_COL));
- retval.add(new RowColumn("p:" + linkUri, new Column(Constants.INLINKS, pageUri, link
- .getAnchorText())));
- retval.add(new RowColumn("d:" + linkDomain, new Column(Constants.PAGES, linkUri)));
- }
- return retval;
- }
- });
- links.persist(StorageLevel.DISK_ONLY());
+ JavaPairRDD<RowColumn, Long> links =
+ pages.flatMapToPair(new PairFlatMapFunction<Page, RowColumn, Long>() {
+ @Override
+ public Iterable<Tuple2<RowColumn, Long>> call(Page page) throws Exception {
+ if (page.isEmpty()) {
+ stats.addEmpty(1);
+ return new ArrayList<>();
+ }
+ stats.addPage(1);
+ Set<Page.Link> links = page.getOutboundLinks();
+ stats.addExternalLinks(links.size());
- final Long one = new Long(1);
- JavaPairRDD<RowColumn, Long> ones = links.mapToPair(s -> new Tuple2<>(s, one));
+ List<Tuple2<RowColumn, Long>> ret = new ArrayList<>();
+ String pageUri = page.getUri();
+ String pageDomain = LinkUtil.getReverseTopPrivate(page.getUrl());
+ final Long one = new Long(1);
+ if (links.size() > 0) {
+ addRCV(ret, "p:" + pageUri, FluoConstants.PAGE_INCOUNT_COL, new Long(0));
+ addRCV(ret, "p:" + pageUri,
+ new Column(Constants.PAGE, Constants.CUR, gson.toJson(page)), one);
+ addRCV(ret, "d:" + pageDomain, new Column(Constants.PAGES, pageUri), new Long(0));
+ }
+ for (Page.Link link : links) {
+ String linkUri = link.getUri();
+ String linkDomain = LinkUtil.getReverseTopPrivate(link.getUrl());
+ addRCV(ret, "p:" + linkUri, FluoConstants.PAGE_INCOUNT_COL, one);
+ addRCV(ret, "p:" + linkUri,
+ new Column(Constants.INLINKS, pageUri, link.getAnchorText()), one);
+ addRCV(ret, "d:" + linkDomain, new Column(Constants.PAGES, linkUri), one);
+ }
+ return ret;
+ }
+ });
- JavaPairRDD<RowColumn, Long> linkCounts = ones.reduceByKey((i1, i2) -> i1 + i2);
+ JavaPairRDD<RowColumn, Long> linkCounts = links.reduceByKey((i1, i2) -> i1 + i2);
JavaPairRDD<RowColumn, Long> sortedLinkCounts = linkCounts.sortByKey();
- return sortedLinkCounts;
- }
-
- public static String revEncodeLong(Long num) {
- Lexicoder<Long> lexicoder = new ReverseLexicoder<>(new ULongLexicoder());
- return Hex.encodeHexString(lexicoder.encode(num));
- }
-
- public static JavaPairRDD<RowColumn, Long> createSortedTopCounts(
- JavaPairRDD<RowColumn, Long> sortedLinkCounts) {
final Long one = new Long(1);
JavaPairRDD<RowColumn, Long> topCounts =
sortedLinkCounts
@@ -115,8 +110,8 @@
@Override
public Iterable<Tuple2<RowColumn, Long>> call(Tuple2<RowColumn, Long> t)
throws Exception {
- List<Tuple2<RowColumn, Long>> retval = new ArrayList<>();
- retval.add(t);
+ List<Tuple2<RowColumn, Long>> ret = new ArrayList<>();
+ ret.add(t);
RowColumn rc = t._1();
String row = rc.getRow().toString();
@@ -125,16 +120,19 @@
Long val = t._2();
if (row.startsWith("d:") && (cf.equals(Constants.PAGES))) {
- retval.add(new Tuple2<>(new RowColumn(row, new Column(Constants.RANK, String
- .format("%s:%s", revEncodeLong(val), cq))), val));
- retval.add(new Tuple2<>(new RowColumn(row, new Column(Constants.DOMAIN,
- Constants.PAGECOUNT)), one));
+ addRCV(ret, row,
+ new Column(Constants.RANK, String.format("%s:%s", revEncodeLong(val), cq)),
+ val);
+ addRCV(ret, row, new Column(Constants.DOMAIN, Constants.PAGECOUNT), one);
} else if ((row.startsWith("p:") && cf.equals(Constants.PAGE))
- && (cq.equals(Constants.INCOUNT) || cq.equals(Constants.SCORE))) {
- retval.add(new Tuple2<>(new RowColumn("t:" + cq, new Column(Constants.RANK,
- String.format("%s:%s", revEncodeLong(val), row.substring(2)))), val));
+ && (cq.equals(Constants.INCOUNT))) {
+ addRCV(
+ ret,
+ "t:" + cq,
+ new Column(Constants.RANK, String.format("%s:%s", revEncodeLong(val),
+ row.substring(2))), val);
}
- return retval;
+ return ret;
}
});
@@ -142,6 +140,37 @@
JavaPairRDD<RowColumn, Long> sortedTopCounts = reducedTopCounts.sortByKey();
- return sortedTopCounts;
+ JavaPairRDD<RowColumn, Long> filteredTopCounts =
+ sortedTopCounts.filter(t -> !(t._1().getRow().toString().startsWith("d:") && t._1()
+ .getColumn().getFamily().toString().equals(Constants.PAGES)));
+
+ JavaPairRDD<RowColumn, Bytes> linkIndex =
+ filteredTopCounts.mapToPair(new PairFunction<Tuple2<RowColumn, Long>, RowColumn, Bytes>() {
+ @Override
+ public Tuple2<RowColumn, Bytes> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+ RowColumn rc = tuple._1();
+ String cf = rc.getColumn().getFamily().toString();
+ String cq = rc.getColumn().getQualifier().toString();
+ byte[] val = tuple._2().toString().getBytes();
+ if (cf.equals(Constants.INLINKS)
+ || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
+ val = rc.getColumn().getVisibility().toArray();
+ }
+ return new Tuple2<>(new RowColumn(rc.getRow(), new Column(cf, cq)), Bytes.of(val));
+ }
+ });
+
+ return linkIndex;
+ }
+
+ public static JavaPairRDD<RowColumn, Bytes> filterRank(JavaPairRDD<RowColumn, Bytes> linkIndex) {
+ JavaPairRDD<RowColumn, Bytes> filteredLinkIndex =
+ linkIndex.filter(t -> !t._1().getColumn().getFamily().toString().equals(Constants.RANK));
+ return filteredLinkIndex;
+ }
+
+ public static String revEncodeLong(Long num) {
+ Lexicoder<Long> lexicoder = new ReverseLexicoder<>(new ULongLexicoder());
+ return Hex.encodeHexString(lexicoder.encode(num));
}
}
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java b/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
index e1541bc..148c336 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
@@ -26,7 +26,8 @@
public static final Column PAGE_NEW_COL = new Column(Constants.PAGE, Constants.NEW);
public static final Column PAGE_CUR_COL = new Column(Constants.PAGE, Constants.CUR);
public static final Column PAGE_INCOUNT_COL = new Column(Constants.PAGE, Constants.INCOUNT);
- public static final Column PAGE_SCORE_COL = new Column(Constants.PAGE, Constants.SCORE);
+
+ public static final Column PAGECOUNT_COL = new Column(Constants.DOMAIN, Constants.PAGECOUNT);
public static final String INLINKS_UPDATE = "inlinks-update";
public static final String INLINKS_CHANGE = "inlinks-change";
diff --git a/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java b/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java
new file mode 100644
index 0000000..df8d940
--- /dev/null
+++ b/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2015 Fluo authors (see AUTHORS)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.fluo.webindex.data;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.client.LoaderExecutor;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.RowColumn;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.mini.MiniFluo;
+import io.fluo.recipes.accumulo.export.TableInfo;
+import io.fluo.webindex.core.Constants;
+import io.fluo.webindex.core.models.Page;
+import io.fluo.webindex.data.fluo.PageUpdate;
+import io.fluo.webindex.data.spark.IndexStats;
+import io.fluo.webindex.data.spark.IndexUtil;
+import io.fluo.webindex.data.util.ArchiveUtil;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.archive.io.ArchiveReader;
+import org.archive.io.ArchiveRecord;
+import org.archive.io.warc.WARCReaderFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class IndexIT {
+
+ private static final Logger log = LoggerFactory.getLogger(IndexIT.class);
+
+ public static TemporaryFolder folder = new TemporaryFolder();
+ public static MiniAccumuloCluster cluster;
+ private static MiniFluo miniFluo;
+ private static FluoConfiguration config;
+ private static final PasswordToken password = new PasswordToken("secret");
+ private static AtomicInteger tableCounter = new AtomicInteger(1);
+ private String exportTable;
+ private transient JavaSparkContext sc;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ folder.create();
+ MiniAccumuloConfig cfg =
+ new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), new String(password.getPassword()));
+ cluster = new MiniAccumuloCluster(cfg);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ cluster.stop();
+ folder.delete();
+ }
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", getClass().getSimpleName());
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Before
+ public void setUpFluo() throws Exception {
+ config = new FluoConfiguration();
+ config.setMiniStartAccumulo(false);
+ config.setApplicationName("lit");
+ config.setAccumuloInstance(cluster.getInstanceName());
+ config.setAccumuloUser("root");
+ config.setAccumuloPassword("secret");
+ config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
+ config.setAccumuloZookeepers(cluster.getZooKeepers());
+ config.setAccumuloTable("data" + tableCounter.getAndIncrement());
+ config.setWorkerThreads(5);
+
+ // create and configure export table
+ exportTable = "export" + tableCounter.getAndIncrement();
+ cluster.getConnector("root", "secret").tableOperations().create(exportTable);
+
+ PrintProps.configureApplication(config,
+ new TableInfo(cluster.getInstanceName(), cluster.getZooKeepers(), "root", "secret",
+ exportTable), 5);
+
+ FluoFactory.newAdmin(config).initialize(
+ new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true));
+
+ miniFluo = FluoFactory.newMiniFluo(config);
+ }
+
+
+ @After
+ public void tearDownFluo() throws Exception {
+ if (miniFluo != null) {
+ miniFluo.close();
+ }
+ }
+
+ public static Map<String, Page> readPages(File input) throws Exception {
+ Map<String, Page> pageMap = new HashMap<>();
+ ArchiveReader ar = WARCReaderFactory.get(input);
+ for (ArchiveRecord r : ar) {
+ Page p = ArchiveUtil.buildPage(r);
+ if (p.isEmpty() || p.getOutboundLinks().isEmpty()) {
+ continue;
+ }
+ pageMap.put(p.getUrl(), p);
+ }
+ ar.close();
+ return pageMap;
+ }
+
+ private void assertOutput(Collection<Page> pages, FluoClient client) throws Exception {
+ JavaRDD<Page> pagesRDD = sc.parallelize(new ArrayList<>(pages));
+ Assert.assertEquals(pages.size(), pagesRDD.count());
+
+ // Create expected output using spark
+ IndexStats stats = new IndexStats(sc);
+ JavaPairRDD<RowColumn, Bytes> linkIndex = IndexUtil.createLinkIndex(stats, pagesRDD);
+ List<Tuple2<RowColumn, Bytes>> linkIndexList = linkIndex.collect();
+ JavaPairRDD<RowColumn, Bytes> linkIndexNoRank = IndexUtil.filterRank(linkIndex);
+
+ // Compare against actual
+ boolean foundDiff = false;
+ foundDiff |= diffExportTable(linkIndexList);
+ foundDiff |= diffFluoTable(client, linkIndexNoRank.collect());
+ if (foundDiff) {
+ dump(client);
+ dumpExportTable();
+ print(linkIndexList);
+ }
+ Assert.assertFalse(foundDiff);
+ }
+
+ @Test
+ public void testIndexing() throws Exception {
+
+ Map<String, Page> pages = readPages(new File("src/test/resources/wat-18.warc"));
+
+ try (FluoClient client = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ for (Page page : pages.values()) {
+ log.debug("Loading page {} with {} links", page.getUrl(), page.getOutboundLinks().size());
+ le.execute(PageUpdate.updatePage(page));
+ }
+ }
+ miniFluo.waitForObservers();
+
+ assertOutput(pages.values(), client);
+
+ String deleteUrl = "http://1000games.me/games/gametion/";
+ log.info("Deleting page {}", deleteUrl);
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ le.execute(PageUpdate.deletePage(deleteUrl));
+ }
+ miniFluo.waitForObservers();
+
+ int numPages = pages.size();
+ Assert.assertNotNull(pages.remove(deleteUrl));
+ Assert.assertEquals(numPages-1, pages.size());
+ assertOutput(pages.values(), client);
+
+ String updateUrl = "http://100zone.blogspot.com/2013/03/please-memp3-4shared.html";
+ Page updatePage = pages.get(updateUrl);
+ long numLinks = updatePage.getNumOutbound();
+ Assert.assertTrue(updatePage.addOutboundLink("http://example.com", "Example"));
+ Assert.assertEquals(numLinks+1, (long)updatePage.getNumOutbound());
+ Assert.assertTrue(updatePage.removeOutboundLink("http://www.blogger.com"));
+ Assert.assertEquals(numLinks, (long)updatePage.getNumOutbound());
+
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ le.execute(PageUpdate.updatePage(updatePage));
+ }
+ miniFluo.waitForObservers();
+
+ Assert.assertNotNull(pages.put(updateUrl, updatePage));
+ assertOutput(pages.values(), client);
+ }
+ }
+
+ private void print(List<Tuple2<RowColumn, Bytes>> linkIndex) {
+ System.out.println("== link index start ==");
+ linkIndex.forEach(t -> System.out.println("rc " + t._1().toString() + " val "
+ + t._2().toString()));
+ System.out.println("== link index end ==");
+ }
+
+ private void dump(FluoClient client) throws Exception {
+ try (Snapshot s = client.newSnapshot()) {
+ RowIterator iter = s.get(new ScannerConfiguration());
+
+ System.out.println("== fluo snapshot start ==");
+ while (iter.hasNext()) {
+ Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
+ ColumnIterator citer = rowEntry.getValue();
+ while (citer.hasNext()) {
+ Map.Entry<Column, Bytes> colEntry = citer.next();
+ System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
+ + colEntry.getValue());
+ }
+ }
+ System.out.println("=== fluo snapshot end ===");
+ }
+ }
+
+ private boolean diffFluoTable(FluoClient client, List<Tuple2<RowColumn, Bytes>> linkIndex)
+ throws Exception {
+ boolean retval = false;
+ try (Snapshot s = client.newSnapshot()) {
+ RowIterator iter = s.get(new ScannerConfiguration());
+ Iterator<Tuple2<RowColumn, Bytes>> indexIter = linkIndex.iterator();
+
+ while (iter.hasNext()) {
+ Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
+ ColumnIterator citer = rowEntry.getValue();
+ while (citer.hasNext() && indexIter.hasNext()) {
+ Map.Entry<Column, Bytes> colEntry = citer.next();
+ Tuple2<RowColumn, Bytes> indexEntry = indexIter.next();
+ RowColumn rc = indexEntry._1();
+ Column col = colEntry.getKey();
+
+ retval |= diff("row", rc.getRow().toString(), rowEntry.getKey().toString());
+ retval |= diff("fam", rc.getColumn().getFamily().toString(), col.getFamily().toString());
+ retval |=
+ diff("qual", rc.getColumn().getQualifier().toString(), col.getQualifier().toString());
+ if (!col.getQualifier().toString().equals(Constants.CUR)) {
+ retval |= diff("val", indexEntry._2().toString(), colEntry.getValue().toString());
+ }
+
+ if (retval) {
+ return true;
+ }
+ log.debug("Verified row {} cf {} cq {} val {}", rc.getRow().toString(), rc.getColumn()
+ .getFamily().toString(), rc.getColumn().getQualifier().toString(), indexEntry._2()
+ .toString());
+ }
+ if (citer.hasNext()) {
+ log.error("An column iterator still has more data");
+ return true;
+ }
+ }
+ if (iter.hasNext() || indexIter.hasNext()) {
+ log.error("An iterator still has more data");
+ return true;
+ }
+ log.debug("No difference found");
+ return false;
+ }
+ }
+
+ private void dumpExportTable() throws Exception {
+ Connector conn = cluster.getConnector("root", "secret");
+ Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
+ Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
+
+ System.out.println("== accumulo export table start ==");
+ while (iterator.hasNext()) {
+ Map.Entry<Key, Value> entry = iterator.next();
+ System.out.println(entry.getKey() + " " + entry.getValue());
+ }
+ System.out.println("== accumulo export table end ==");
+ }
+
+ private boolean diff(String dataType, String expected, String actual) {
+ if (!expected.equals(actual)) {
+ log.error("Difference found in {} - expected {} actual {}", dataType, expected, actual);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean diffExportTable(List<Tuple2<RowColumn, Bytes>> linkIndex) throws Exception {
+ Connector conn = cluster.getConnector("root", "secret");
+ Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
+ Iterator<Map.Entry<Key, Value>> exportIter = scanner.iterator();
+ Iterator<Tuple2<RowColumn, Bytes>> indexIter = linkIndex.iterator();
+
+ boolean retval = false;
+
+ while (exportIter.hasNext() && indexIter.hasNext()) {
+ Tuple2<RowColumn, Bytes> indexEntry = indexIter.next();
+ Map.Entry<Key, Value> exportEntry = exportIter.next();
+ Key key = exportEntry.getKey();
+ RowColumn rc = indexEntry._1();
+ Column col = rc.getColumn();
+
+ retval |= diff("row", rc.getRow().toString(), key.getRow().toString());
+ retval |= diff("fam", col.getFamily().toString(), key.getColumnFamily().toString());
+ retval |= diff("qual", col.getQualifier().toString(), key.getColumnQualifier().toString());
+ if (!col.getQualifier().toString().equals(Constants.CUR)) {
+ retval |= diff("val", indexEntry._2().toString(), exportEntry.getValue().toString());
+ }
+
+ if (retval) {
+ return true;
+ }
+ log.debug("Verified row {} cf {} cq {} val {}", rc.getRow().toString(), rc.getColumn()
+ .getFamily().toString(), rc.getColumn().getQualifier().toString(), indexEntry._2()
+ .toString());
+ }
+
+ if (exportIter.hasNext() || indexIter.hasNext()) {
+ log.error("An iterator still has more data");
+ return true;
+ }
+
+ log.debug("No difference found");
+ return false;
+ }
+}
diff --git a/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java b/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java
deleted file mode 100644
index e262990..0000000
--- a/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright 2015 Fluo authors (see AUTHORS)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package io.fluo.webindex.data;
-
-import java.io.File;
-import java.text.ParseException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.fluo.api.client.FluoAdmin;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.LoaderExecutor;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-import io.fluo.api.mini.MiniFluo;
-import io.fluo.recipes.accumulo.export.TableInfo;
-import io.fluo.webindex.core.models.Page;
-import io.fluo.webindex.data.fluo.PageUpdate;
-import io.fluo.webindex.data.util.ArchiveUtil;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.archive.io.ArchiveReader;
-import org.archive.io.ArchiveRecord;
-import org.archive.io.warc.WARCReaderFactory;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LoadIT {
-
- private static final Logger log = LoggerFactory.getLogger(LoadIT.class);
-
- public static TemporaryFolder folder = new TemporaryFolder();
- public static MiniAccumuloCluster cluster;
- private static MiniFluo miniFluo;
- private static FluoConfiguration config;
- private static final PasswordToken password = new PasswordToken("secret");
- private static AtomicInteger tableCounter = new AtomicInteger(1);
- private String exportTable;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- folder.create();
- MiniAccumuloConfig cfg =
- new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), new String(password.getPassword()));
- cluster = new MiniAccumuloCluster(cfg);
- cluster.start();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- cluster.stop();
- folder.delete();
- }
-
- @Before
- public void setUpFluo() throws Exception {
- config = new FluoConfiguration();
- config.setMiniStartAccumulo(false);
- config.setApplicationName("lit");
- config.setAccumuloInstance(cluster.getInstanceName());
- config.setAccumuloUser("root");
- config.setAccumuloPassword("secret");
- config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
- config.setAccumuloZookeepers(cluster.getZooKeepers());
- config.setAccumuloTable("data" + tableCounter.getAndIncrement());
- config.setWorkerThreads(5);
-
- // create and configure export table
- exportTable = "export" + tableCounter.getAndIncrement();
- cluster.getConnector("root", "secret").tableOperations().create(exportTable);
-
- PrintProps.configureApplication(config,
- new TableInfo(cluster.getInstanceName(), cluster.getZooKeepers(), "root", "secret",
- exportTable), 5);
-
- FluoFactory.newAdmin(config).initialize(
- new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true));
-
- miniFluo = FluoFactory.newMiniFluo(config);
- }
-
-
- @After
- public void tearDownFluo() throws Exception {
- if (miniFluo != null) {
- miniFluo.close();
- }
- }
-
- @Test
- public void testLoad() throws Exception {
-
- try (FluoClient client = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-
- ArchiveReader ar = WARCReaderFactory.get(new File("src/test/resources/wat-18.warc"));
-
- try (LoaderExecutor le = client.newLoaderExecutor()) {
- Iterator<ArchiveRecord> records = ar.iterator();
- while (records.hasNext()) {
- try {
- ArchiveRecord r = records.next();
- Page p = ArchiveUtil.buildPage(r);
- if (p.isEmpty() || p.getOutboundLinks().isEmpty()) {
- continue;
- }
- log.info("Loading page {} with {} links", p.getUrl(), p.getOutboundLinks().size());
- le.execute(PageUpdate.updatePage(p));
- } catch (ParseException e) {
- log.debug("Parse exception occurred", e);
- }
- }
- }
- ar.close();
- miniFluo.waitForObservers();
- dump(client);
- dumpExportTable();
-
- String url = "http://1000games.me/games/gametion/";
- log.info("Deleting page {}", url);
- try (LoaderExecutor le = client.newLoaderExecutor()) {
- le.execute(PageUpdate.deletePage(url));
- }
-
- miniFluo.waitForObservers();
- dump(client);
- dumpExportTable();
- }
- }
-
- private void dump(FluoClient client) throws Exception {
- try (Snapshot s = client.newSnapshot()) {
- RowIterator iter = s.get(new ScannerConfiguration());
-
- System.out.println("== snapshot start ==");
- while (iter.hasNext()) {
- Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
- ColumnIterator citer = rowEntry.getValue();
- while (citer.hasNext()) {
- Map.Entry<Column, Bytes> colEntry = citer.next();
- System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
- + colEntry.getValue());
- }
- }
- System.out.println("=== snapshot end ===");
- }
- }
-
- private void dumpExportTable() throws Exception {
- Connector conn = cluster.getConnector("root", "secret");
- Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
- Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
-
- System.out.println("== export table start ==");
- while (iterator.hasNext()) {
- Map.Entry<Key, Value> entry = iterator.next();
- System.out.println(entry.getKey() + " " + entry.getValue());
- }
- System.out.println("== export table end ==");
-
- }
-}
diff --git a/modules/data/src/test/resources/log4j.properties b/modules/data/src/test/resources/log4j.properties
index d0ad78b..4577b43 100644
--- a/modules/data/src/test/resources/log4j.properties
+++ b/modules/data/src/test/resources/log4j.properties
@@ -15,18 +15,22 @@
log4j.rootLogger=INFO, CA
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
+log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c] %-5p: %m%n
-log4j.logger.org.apache.curator=ERROR
+log4j.logger.akka=WARN
log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
log4j.logger.org.apache.accumulo.core.util.shell.Shell=FATAL
log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN
+log4j.logger.org.apache.curator=ERROR
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.hadoop.conf=ERROR
log4j.logger.org.apache.hadoop.mapred=ERROR
log4j.logger.org.apache.hadoop.mapreduce=ERROR
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL
log4j.logger.org.apache.zookeeper.ZooKeeper=WARN
-log4j.logger.io.fluo=INFO
+log4j.logger.org.spark-project=WARN
+log4j.logger.io.fluo=WARN
+log4j.logger.Remoting=WARN
diff --git a/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java b/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
index 23f7506..d8a7268 100644
--- a/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
+++ b/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
@@ -133,7 +133,6 @@
private Page getPage(String url) {
Page page = null;
Long incount = (long) 0;
- Long score = (long) 0;
try {
Scanner scanner = conn.createScanner(dataConfig.accumuloIndexTable, Authorizations.EMPTY);
scanner.setRange(Range.exact("p:" + DataUtil.toUri(url), Constants.PAGE));
@@ -144,9 +143,6 @@
case Constants.INCOUNT:
incount = getLongValue(entry);
break;
- case Constants.SCORE:
- score = getLongValue(entry);
- break;
case Constants.CUR:
page = gson.fromJson(entry.getValue().toString(), Page.class);
break;
@@ -163,7 +159,6 @@
page = new Page(url);
}
page.setNumInbound(incount);
- page.setScore(score);
page.setDomain(WebUtil.getDomain(page.getUrl()));
return page;
}
@@ -265,7 +260,7 @@
@DefaultValue("0") @QueryParam("pageNum") Integer pageNum) {
TopResults results = new TopResults();
- if (resultType.equals(Constants.INCOUNT) || resultType.equals(Constants.SCORE)) {
+ if (resultType.equals(Constants.INCOUNT)) {
results.setResultType(resultType);
results.setPageNum(pageNum);
try {
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
index 26485f9..4a91471 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
@@ -15,7 +15,7 @@
</div>
<div class="row">
<div class="col-md-6 col-md-offset-3" style="margin-top: 20px">
- <p><b>View top pages by:</b> <a href="/top?resultType=score">Score</a> <a href="/top?resultType=incount">Inbound Links</a></p>
+ <p><b>View top pages by <a href="/top?resultType=incount">Inbound Links</a></p>
</div>
</div>
<#include "common/footer.ftl">
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
index ec377ef..5db7610 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
@@ -8,7 +8,6 @@
</#if>
<tr><td>URL<td>${page.url?html} - <a href="${page.url?html}">Go to page</a></tr>
<tr><td>Domain<td><a href="pages?domain=${page.domain?url}">${page.domain?html}</a></tr>
- <tr><td>Page Score<td>${page.score}</tr>
<tr><td>Inbound links<td><a href="/links?url=${page.url?url}&linkType=in">${page.numInbound}</a></tr>
<#if page.crawlDate??>
<tr><td>Outbound links<td><a href="/links?url=${page.url?url}&linkType=out">${page.numOutbound}</a></tr>
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
index 498d03a..dcc88a8 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
@@ -28,7 +28,7 @@
<div class="row">
<div class="col-md-12">
<table class="table table-striped">
- <thead><th>Score</th><th>URL</th></thead>
+ <thead><th>Inbound Links</th><th>URL</th></thead>
<#list pages.pages as page>
<tr>
<td>${page.score?html}</td>
diff --git a/pom.xml b/pom.xml
index 97874b9..a2a1344 100644
--- a/pom.xml
+++ b/pom.xml
@@ -341,6 +341,25 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <io.fluo.it.instance.name>it-instance-maven</io.fluo.it.instance.name>
+ <io.fluo.it.instance.clear>false</io.fluo.it.instance.clear>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<profiles>