Merge pull request #8 from mikewalch/indexing

Closes #4 - Accumulo indexes are now updated using Fluo IndexExporter.
diff --git a/.gitignore b/.gitignore
index b422b3a..abdc4a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,5 +4,5 @@
 .classpath
 .project
 .settings
-target
-data
+target/
+/data/
diff --git a/docs/observer-design.md b/docs/observer-design.md
index 80a47ca..8aa224b 100644
--- a/docs/observer-design.md
+++ b/docs/observer-design.md
@@ -18,22 +18,19 @@
     d:com.c           domain    pagecount       2
     d:com.d           domain    pagecount       1
     p:com.a/page1     page      cur             {"outlinkcount": 2, "outlinks":[c.com, b.com]}
-                      page      score           1
+                                incount         0
     p:com.b           inlinks   com.a/pag1      anchorText
                       page      cur             {"outlinkcount": 2, "outlinks":[c.com/page1, c.com]}
-                      page      incount         1
-                      page      score           2
+                                incount         1
     p:com.c           inlinks   com.a/page1     anchorText
                                 com.b           anchorText
                                 com.d           anchorText
                       page      incount         3
-                                score           3
     p:com.c/page1     inlinks   com.b           anchorText
                       page      incount         1
-                                score           1
     p:com.d           page      cur             {"outlinkcount": 1, "outlinks":[c.com]}
-                      page      score           1
-
+                                incount         0
+    
 
 Resulting Accumulo Table
 
@@ -49,94 +46,20 @@
     d:com.d           domain    pagecount       1
                       rank      1:com.d         1
     p:com.a/page1     page      cur             {"outlinkcount": 2, "outlinks":[c.com, b.com]}
-                      page      score           1
+                                incount         0
     p:com.b           inlinks   com.a/pag1      anchorText
                       page      cur             {"outlinkcount": 2, "outlinks":[c.com/page1, c.com]}
-                      page      incount         1
-                      page      score           2
+                                incount         1
     p:com.c           inlinks   com.a/page1     anchorText
                                 com.b           anchorText
                                 com.d           anchorText
                       page      incount         3
-                                score           3
     p:com.c/page1     inlinks   com.b           anchorText
                       page      incount         1
-                                score           1
     p:com.d           page      cur             {"outlinkcount": 1, "outlinks":[c.com]}
-                      page      score           1
-    ti:3:com.c        incount   rank            3
-    ti:2:com.b                                  2
-    ti:1:com.c/page1                            1
-    tp:2:com.c        pagecount rank            2
-    tp:1:com.a                                  1
-    tp:1.com.b                                  1
-    ts:3:com.c        score     rank            3
-    ts:2:com.b                                  2
-    ts:1:com.c/page1                            1
-    ts:1:com.a/page1                            1
-    ts:1:com.d                                  1
-
-Below are available operations:
-
-    get(row, col) -> value
-    set(row, col, value)
-    del(row, col)
-
-PageLoader is called with `pageUri` & `pageJson`
-
-    curJson = get(pageUri, page:cur)
-    if curJson != pageJson
-      set(pageUri, page:new, pageJson)
-
-PageObserver watches `page:new` is called with `pageUri`
-
-    curJson = get(pageUri, page:cur)
-    newJson = get(pageUri, page:new)
-
-    newLinks,delLinks = compare(curJson, newJson)
-
-    for link in newLinks:
-      set(linkUri, inlinks-update:pageUri, [add,anchorText])
-
-    for link in delLinks
-      set(linkUri, inlinks-update:pageUri, [del])
-      
-    if newLinks.isNotEmpty() or delLinks.isNotEmpty():
-      set(linkUri, inlinks-changed:ntfy, "")
-
-    if curJson == null:
-      pageScore = get(pageUri, stats:pagescore).toInteger(0)
-
-      if pageScore != 0:
-        del(pageDomain, rank:pageScore:pageUri)
-      
-      set(pageUri, stats:pagescore, pageScore+1)
-      set(pageDomain, rank:pageScore+1:pageUri)
-
-    set(pageUri, page:cur, newJson)
-    del(pageUri, page:new)
-
-InlinksObserver weakly watches `update:inlinks` is called with `pageUri`
-
-    List<Update> updates = get(pageUri, update:inlinks)
-
-    change = 0
-
-    for update in updates:
-      if update.action == del:
-        change--
-        del(pageUri, inlinks:update.linkUri)
-      elif update.action == add:
-        change++
-        set(pageUri, inlinks:update.linkUri, update.anchorText)
-
-    if change != 0:
-      curCount = get(pageUri, stats:inlinkcount)
-      curScore = get(pageUri, stats:pagescore).toInteger(0)
-
-      if pageScore != 0:
-        del(pageDomain, rank:curScore:pageUri)
-
-      set(pageDomain, rank:curScore+change:pageUri)
-      set(pageUri, stats:inlinkcount, curCount+change)
-      set(pageUri, stats:pagescore, curScore+change)
+                                incount         0
+    t:incount         rank      3:com.c         3
+                                2:com.b         2
+                                1:com.c/page1   1
+                                0:com.a/page1   0
+                                0:com.d         0
diff --git a/modules/core/src/main/java/io/fluo/webindex/core/Constants.java b/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
index e351deb..056a501 100644
--- a/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
+++ b/modules/core/src/main/java/io/fluo/webindex/core/Constants.java
@@ -28,7 +28,6 @@
   // Column Qualifiers
   // for page
   public static final String INCOUNT = "incount";
-  public static final String SCORE = "score";
   public static final String NEW = "new";
   public static final String CUR = "cur";
   // for domains
diff --git a/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java b/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
index a6b5b77..4adf55c 100644
--- a/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
+++ b/modules/core/src/main/java/io/fluo/webindex/core/models/Page.java
@@ -14,20 +14,20 @@
 
 package io.fluo.webindex.core.models;
 
+import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.util.HashSet;
 import java.util.Set;
 
 import io.fluo.webindex.core.DataUtil;
 
-public class Page {
+public class Page implements Serializable {
 
   public static Page EMPTY = new Page();
   private String url = "";
   private String domain;
   private Long numInbound;
   private Long numOutbound = new Long(0);
-  private Long score;
   private String crawlDate;
   private String server;
   private String title;
@@ -59,10 +59,26 @@
     return outboundLinks;
   }
 
-  public void addOutboundLink(String url, String anchorText) {
-    if (outboundLinks.add(new Link(url, anchorText))) {
+  /**
+   * @return True if page did not already contain link
+   */
+  public boolean addOutboundLink(String url, String anchorText) {
+    boolean added = outboundLinks.add(new Link(url, anchorText));
+    if (added) {
       numOutbound++;
     }
+    return added;
+  }
+
+  /**
+   * @return True if link was removed
+   */
+  public boolean removeOutboundLink(String url) {
+    boolean removed = outboundLinks.remove(new Link(url));
+    if (removed) {
+      numOutbound--;
+    }
+    return removed;
   }
 
   public boolean isEmpty() {
@@ -89,14 +105,6 @@
     return numOutbound;
   }
 
-  public Long getScore() {
-    return score;
-  }
-
-  public void setScore(Long score) {
-    this.score = score;
-  }
-
   public String getCrawlDate() {
     return crawlDate;
   }
@@ -113,7 +121,7 @@
     this.title = title;
   }
 
-  public class Link {
+  public class Link implements Serializable {
 
     private String url;
     private String anchorText;
@@ -123,6 +131,10 @@
       this.anchorText = anchorText;
     }
 
+    public Link(String url) {
+      this(url, "");
+    }
+
     public String getUrl() {
       return url;
     }
diff --git a/modules/data/pom.xml b/modules/data/pom.xml
index 1390510..a97764a 100644
--- a/modules/data/pom.xml
+++ b/modules/data/pom.xml
@@ -75,6 +75,12 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -110,6 +116,12 @@
       <groupId>io.fluo</groupId>
       <artifactId>fluo-mini</artifactId>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- Test Dependencies -->
     <dependency>
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/Init.java b/modules/data/src/main/java/io/fluo/webindex/data/Init.java
index 4b0107e..54951f1 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/Init.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/Init.java
@@ -17,16 +17,17 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.data.RowColumn;
-import io.fluo.webindex.core.Constants;
+import io.fluo.mapreduce.FluoKeyValue;
+import io.fluo.mapreduce.FluoKeyValueGenerator;
 import io.fluo.webindex.core.DataConfig;
+import io.fluo.webindex.core.models.Page;
 import io.fluo.webindex.data.spark.IndexEnv;
 import io.fluo.webindex.data.spark.IndexStats;
 import io.fluo.webindex.data.spark.IndexUtil;
 import io.fluo.webindex.data.util.WARCFileInputFormat;
-import io.fluo.mapreduce.FluoKeyValue;
-import io.fluo.mapreduce.FluoKeyValueGenerator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 import org.archive.io.ArchiveReader;
@@ -48,28 +50,16 @@
   private static final Logger log = LoggerFactory.getLogger(Init.class);
   private static IndexEnv env;
 
-  public static void loadAccumulo(JavaPairRDD<RowColumn, Long> sortedCounts) throws Exception {
-
-    JavaPairRDD<RowColumn, Long> filteredSortedCounts =
-        sortedCounts.filter(t -> !(t._1().getRow().toString().startsWith("d:") && t._1()
-            .getColumn().getFamily().toString().equals(Constants.PAGES)));
-
+  public static void loadAccumulo(JavaPairRDD<RowColumn, Bytes> linkIndex) throws Exception {
     JavaPairRDD<Key, Value> accumuloData =
-        filteredSortedCounts.mapToPair(new PairFunction<Tuple2<RowColumn, Long>, Key, Value>() {
+        linkIndex.mapToPair(new PairFunction<Tuple2<RowColumn, Bytes>, Key, Value>() {
           @Override
-          public Tuple2<Key, Value> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+          public Tuple2<Key, Value> call(Tuple2<RowColumn, Bytes> tuple) throws Exception {
             RowColumn rc = tuple._1();
             String row = rc.getRow().toString();
             String cf = rc.getColumn().getFamily().toString();
             String cq = rc.getColumn().getQualifier().toString();
-            byte[] val = tuple._2().toString().getBytes();
-            if (cf.equals(Constants.INLINKS)
-                || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
-              if (tuple._2() > 1) {
-                log.info("Found key {} with count of {}", tuple._1(), tuple._2().toString());
-              }
-              val = rc.getColumn().getVisibility().toArray();
-            }
+            byte[] val = tuple._2().toArray();
             return new Tuple2<>(new Key(new Text(row), new Text(cf), new Text(cq)), new Value(val));
           }
         });
@@ -89,26 +79,17 @@
         TextOutputFormat.class);
   }
 
-  public static void loadFluo(JavaPairRDD<RowColumn, Long> sortedCounts) throws Exception {
+  public static void loadFluo(JavaPairRDD<RowColumn, Bytes> linkIndex) throws Exception {
     JavaPairRDD<Key, Value> fluoData =
-        sortedCounts.flatMapToPair(new PairFlatMapFunction<Tuple2<RowColumn, Long>, Key, Value>() {
+        linkIndex.flatMapToPair(new PairFlatMapFunction<Tuple2<RowColumn, Bytes>, Key, Value>() {
           @Override
-          public Iterable<Tuple2<Key, Value>> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+          public Iterable<Tuple2<Key, Value>> call(Tuple2<RowColumn, Bytes> tuple) throws Exception {
             List<Tuple2<Key, Value>> output = new LinkedList<>();
             RowColumn rc = tuple._1();
             String row = rc.getRow().toString();
             String cf = rc.getColumn().getFamily().toString();
             String cq = rc.getColumn().getQualifier().toString();
-            byte[] val = tuple._2().toString().getBytes();
-
-            if (cf.equals(Constants.RANK) || cq.equals(Constants.RANK)
-                || (row.startsWith("d:") && cf.equals(Constants.PAGES))) {
-              return output;
-            }
-            if (cf.equals(Constants.INLINKS)
-                || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
-              val = rc.getColumn().getVisibility().toArray();
-            }
+            byte[] val = tuple._2().toArray();
             FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
             fkvg.setRow(row).setColumn(new Column(cf, cq)).setValue(val);
             for (FluoKeyValue kv : fkvg.getKeyValues()) {
@@ -144,16 +125,16 @@
         env.getSparkCtx().newAPIHadoopFile(dataConfig.watDataDir, WARCFileInputFormat.class,
             Text.class, ArchiveReader.class, new Configuration());
 
-    JavaPairRDD<RowColumn, Long> sortedLinkCounts = IndexUtil.createLinkCounts(stats, archives);
+    JavaRDD<Page> pages = IndexUtil.createPages(archives);
 
-    JavaPairRDD<RowColumn, Long> sortedTopCounts =
-        IndexUtil.createSortedTopCounts(sortedLinkCounts);
+    JavaPairRDD<RowColumn, Bytes> linkIndex = IndexUtil.createLinkIndex(stats, pages);
 
     // Load intermediate results into Fluo
-    loadFluo(sortedTopCounts);
+    JavaPairRDD<RowColumn, Bytes> linkIndexNoRank = IndexUtil.filterRank(linkIndex);
+    loadFluo(linkIndexNoRank);
 
     // Load final indexes into Accumulo
-    loadAccumulo(sortedTopCounts);
+    loadAccumulo(linkIndex);
 
     // For testing, Load into HDFS
     // loadHDFS(sortedTopCounts);
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java b/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
index 221899f..576156c 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/PrintProps.java
@@ -19,7 +19,7 @@
 
 import io.fluo.api.config.FluoConfiguration;
 import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.recipes.accumulo.export.AccumuloExporter;
+import io.fluo.api.data.Bytes;
 import io.fluo.recipes.accumulo.export.TableInfo;
 import io.fluo.recipes.export.ExportQueue;
 import io.fluo.recipes.transaction.TxLog;
@@ -41,9 +41,9 @@
     appConfig.addObserver(new ObserverConfiguration(InlinksObserver.class.getName()));
 
     ExportQueue.configure(appConfig, new ExportQueue.Options(IndexExporter.QUEUE_ID,
-        IndexExporter.class, String.class, TxLog.class, numExportBuckets));
+        IndexExporter.class, Bytes.class, TxLog.class, numExportBuckets));
 
-    AccumuloExporter.setExportTableInfo(appConfig.getAppConfiguration(), IndexExporter.QUEUE_ID,
+    IndexExporter.setExportTableInfo(appConfig.getAppConfiguration(), IndexExporter.QUEUE_ID,
         exportTable);
   }
 
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java b/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
index d8fc838..4b2ccae 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/Reindex.java
@@ -22,13 +22,13 @@
 import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.data.RowColumn;
+import io.fluo.mapreduce.FluoEntryInputFormat;
 import io.fluo.webindex.core.Constants;
 import io.fluo.webindex.core.DataConfig;
 import io.fluo.webindex.core.DataUtil;
 import io.fluo.webindex.data.spark.IndexEnv;
 import io.fluo.webindex.data.spark.IndexUtil;
 import io.fluo.webindex.data.util.LinkUtil;
-import io.fluo.mapreduce.FluoEntryInputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -105,16 +105,14 @@
                 String cq = rc.getColumn().getQualifier().toString();
                 Bytes v = kvTuple._2();
                 if (row.startsWith("p:") && cf.equals(Constants.PAGE)
-                    && (cq.equals(Constants.INCOUNT) || cq.equals(Constants.SCORE))) {
+                    && cq.equals(Constants.INCOUNT)) {
                   String pageUri = row.substring(2);
                   Long num = Long.parseLong(v.toString());
                   Column rankCol =
                       new Column(Constants.RANK, String.format("%s:%s",
                           IndexUtil.revEncodeLong(num), pageUri));
-                  if (cq.equals(Constants.SCORE)) {
-                    String domain = "d:" + LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
-                    retval.add(new Tuple2<>(new RowColumn(domain, rankCol), v));
-                  }
+                  String domain = "d:" + LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
+                  retval.add(new Tuple2<>(new RowColumn(domain, rankCol), v));
                   retval.add(new Tuple2<>(new RowColumn("t:" + cq, rankCol), v));
                 }
                 return retval;
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
index e1162ba..3be8a79 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/IndexExporter.java
@@ -14,49 +14,113 @@
 
 package io.fluo.webindex.data.fluo;
 
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
 
 import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.RowColumn;
 import io.fluo.recipes.accumulo.export.AccumuloExporter;
+import io.fluo.recipes.transaction.LogEntry;
 import io.fluo.recipes.transaction.TxLog;
 import io.fluo.webindex.core.Constants;
+import io.fluo.webindex.data.spark.IndexUtil;
+import io.fluo.webindex.data.util.FluoConstants;
 import org.apache.accumulo.core.data.Mutation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class IndexExporter extends AccumuloExporter<String, TxLog> {
+public class IndexExporter extends AccumuloExporter<Bytes, TxLog> {
 
   private static final Logger log = LoggerFactory.getLogger(IndexExporter.class);
-  public static final String QUEUE_ID = "peq";
+  public static final String QUEUE_ID = "webIndexQ";
+
+  public static void deleteRankIndex(Map<Bytes, Mutation> mutations, Bytes row, String pageUri,
+      long seq, Long prev) {
+    if (prev != null) {
+      Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+      String cf = String.format("%s:%s", IndexUtil.revEncodeLong(prev), pageUri);
+      m.putDelete(Constants.RANK.getBytes(), cf.getBytes(), seq);
+      log.debug("Deleted rank index for row {} cf {} seq {}", row.toString(), cf, seq);
+    }
+  }
+
+  public static void updateRankIndex(Map<Bytes, Mutation> mutations, Bytes row, String pageUri,
+      long seq, Long prev, Long cur) {
+    if (!cur.equals(prev)) {
+      deleteRankIndex(mutations, row, pageUri, seq, prev);
+      Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+      String cf = String.format("%s:%s", IndexUtil.revEncodeLong(cur), pageUri);
+      m.put(Constants.RANK.getBytes(), cf.getBytes(), seq, cur.toString().getBytes());
+      log.debug("Adding rank index for row {} cf {} seq {} val {}", row.toString(), cf, seq,
+          cur.toString());
+    }
+  }
 
   @Override
-  protected List<Mutation> convert(String key, long seq, TxLog txLog) {
-    Mutation m = new Mutation(key);
-    boolean modified = false;
-    for (TxLog.LogEntry entry : txLog.getLogEntries()) {
+  protected Collection<Mutation> convert(Bytes key, long seq, TxLog txLog) {
+    Map<Bytes, Mutation> mutations = new HashMap<>();
 
+    Map<RowColumn, Bytes> getMap = txLog.getOperationMap(LogEntry.Operation.GET);
+    for (LogEntry entry : txLog.getLogEntries()) {
+      LogEntry.Operation op = entry.getOp();
+      Bytes row = entry.getRow();
+      Column col = entry.getColumn();
       Bytes fam = entry.getColumn().getFamily();
       Bytes qual = entry.getColumn().getQualifier();
       Bytes val = entry.getValue();
 
-      if (fam.toString().equals(Constants.PAGE) || fam.toString().equals(Constants.INLINKS)) {
-        log.info("{} {} row {} col {} val {}", seq, entry.getType(), entry.getRow(),
-            entry.getColumn(), entry.getValue());
-        switch (entry.getType()) {
-          case DELETE:
-            m.putDelete(fam.toArray(), qual.toArray());
-          case SET:
-            m.put(fam.toArray(), qual.toArray(), val.toArray());
-          default:
-            break;
+      log.debug("{} {} row {} col {} val {}", seq, entry.getOp(), entry.getRow(),
+          entry.getColumn(), entry.getValue());
+
+      if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) {
+        Mutation m = mutations.computeIfAbsent(row, k -> new Mutation(k.toArray()));
+        if (entry.getOp().equals(LogEntry.Operation.DELETE)) {
+          m.putDelete(fam.toArray(), qual.toArray(), seq);
+        } else {
+          m.put(fam.toArray(), qual.toArray(), seq, val.toArray());
         }
-        modified = true;
+      }
+
+      if (col.equals(FluoConstants.PAGE_INCOUNT_COL)
+          && (op.equals(LogEntry.Operation.SET) || op.equals(LogEntry.Operation.DELETE))) {
+
+        String pageUri = row.toString().substring(2);
+        Long prev = null;
+        Bytes prevGet = getMap.get(new RowColumn(row, FluoConstants.PAGE_INCOUNT_COL));
+        if (prevGet != null) {
+          prev = Long.parseLong(prevGet.toString());
+        }
+
+        // update domain counts
+        Bytes domainRow = PageObserver.getDomainRow(row);
+        if (!domainRow.equals(Bytes.EMPTY)) {
+          if (op.equals(LogEntry.Operation.SET)) {
+            Long cur = Long.parseLong(val.toString());
+            updateRankIndex(mutations, domainRow, pageUri, seq, prev, cur);
+          } else {
+            deleteRankIndex(mutations, domainRow, pageUri, seq, prev);
+          }
+        }
+
+        // update total counts
+        Bytes totalRow = Bytes.of("t:" + Constants.INCOUNT);
+        if (op.equals(LogEntry.Operation.SET)) {
+          Long cur = Long.parseLong(val.toString());
+          updateRankIndex(mutations, totalRow, pageUri, seq, prev, cur);
+        } else {
+          deleteRankIndex(mutations, totalRow, pageUri, seq, prev);
+        }
       }
     }
-    if (modified) {
-      return Collections.singletonList(m);
-    }
-    return Collections.emptyList();
+    return mutations.values();
+  }
+
+  public static Predicate<LogEntry> getFilter() {
+    return le -> le.getColumn().getFamily().toString().equals(Constants.PAGE)
+        || le.getColumn().getFamily().toString().equals(Constants.INLINKS)
+        || le.getColumn().equals(FluoConstants.PAGECOUNT_COL);
   }
 }
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
index 510ff2c..8707aac 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/InlinksObserver.java
@@ -36,7 +36,7 @@
 public class InlinksObserver extends AbstractObserver {
 
   private static final Logger log = LoggerFactory.getLogger(InlinksObserver.class);
-  private ExportQueue<String, TxLog> exportQueue;
+  private ExportQueue<Bytes, TxLog> exportQueue;
 
   @Override
   public void init(Context context) throws Exception {
@@ -46,7 +46,7 @@
   @Override
   public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
 
-    RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+    RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx, IndexExporter.getFilter());
     TypedTransactionBase ttx = FluoConstants.TYPEL.wrap(rtx);
 
     String pageUri = row.toString().substring(2);
@@ -81,29 +81,32 @@
       }
     }
 
-    Long incount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong(0) + change;
-    if (incount <= 0) {
-      if (incount < 0) {
-        log.error("Incount for {} is negative: {}", row, incount);
-      }
-      ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
-    } else {
-      ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(incount);
+    boolean knownPage = true;
+    Long prevCount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong();
+    if (prevCount == null) {
+      knownPage = false;
+      prevCount = new Long(0);
     }
 
-    Long score = ttx.get().row(row).col(FluoConstants.PAGE_SCORE_COL).toLong(0) + change;
-    if (score <= 0) {
-      if (score < 0) {
-        log.error("Score for {} is negative: {}", row, incount);
+    Long curCount = prevCount + change;
+    if (curCount <= 0) {
+      if (curCount < 0) {
+        log.error("Incount for {} is negative: {}", row, curCount);
       }
-      ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).delete();
+      ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
+      if (knownPage) {
+        PageObserver.updateDomainPageCount(ttx, row, -1);
+      }
     } else {
-      ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).set(score);
+      ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(curCount);
+      if (!knownPage) {
+        PageObserver.updateDomainPageCount(ttx, row, 1);
+      }
     }
 
     TxLog txLog = rtx.getTxLog();
-    if (!txLog.getLogEntries().isEmpty()) {
-      exportQueue.add(tx, row.toString(), txLog);
+    if (!txLog.isEmpty()) {
+      exportQueue.add(tx, row, txLog);
     }
   }
 
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java b/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
index 2dc8039..a5834e2 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/fluo/PageObserver.java
@@ -14,6 +14,7 @@
 
 package io.fluo.webindex.data.fluo;
 
+import java.text.ParseException;
 import java.util.Collections;
 import java.util.Set;
 
@@ -27,8 +28,10 @@
 import io.fluo.recipes.export.ExportQueue;
 import io.fluo.recipes.transaction.RecordingTransactionBase;
 import io.fluo.recipes.transaction.TxLog;
+import io.fluo.webindex.core.DataUtil;
 import io.fluo.webindex.core.models.Page;
 import io.fluo.webindex.data.util.FluoConstants;
+import io.fluo.webindex.data.util.LinkUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +39,7 @@
 
   private static final Logger log = LoggerFactory.getLogger(PageObserver.class);
   private static final Gson gson = new Gson();
-  private ExportQueue<String, TxLog> exportQueue;
+  private ExportQueue<Bytes, TxLog> exportQueue;
 
   @Override
   public void init(Context context) throws Exception {
@@ -46,7 +49,7 @@
   @Override
   public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
 
-    RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx);
+    RecordingTransactionBase rtx = RecordingTransactionBase.wrap(tx, IndexExporter.getFilter());
     TypedTransactionBase ttx = FluoConstants.TYPEL.wrap(rtx);
     String nextJson = ttx.get().row(row).col(FluoConstants.PAGE_NEW_COL).toString("");
     if (nextJson.isEmpty()) {
@@ -59,16 +62,22 @@
     if (!curJson.isEmpty()) {
       Page curPage = gson.fromJson(curJson, Page.class);
       curLinks = curPage.getOutboundLinks();
-    } else {
-      Long score = ttx.get().row(row).col(FluoConstants.PAGE_SCORE_COL).toLong(0);
-      Long newScore = score + 1;
-      ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).set(newScore);
+    }
+
+    if (curJson.isEmpty() && !nextJson.equals("delete")) {
+      Long incount = ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong();
+      if (incount == null) {
+        ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).set(new Long(0));
+      }
+      updateDomainPageCount(ttx, row, 1);
     }
 
     Page nextPage;
     if (nextJson.equals("delete")) {
       ttx.mutate().row(row).col(FluoConstants.PAGE_CUR_COL).delete();
-      ttx.mutate().row(row).col(FluoConstants.PAGE_SCORE_COL).delete();
+      ttx.get().row(row).col(FluoConstants.PAGE_INCOUNT_COL).toLong(); // get for indexing
+      ttx.mutate().row(row).col(FluoConstants.PAGE_INCOUNT_COL).delete();
+      updateDomainPageCount(ttx, row, -1);
       nextPage = Page.EMPTY;
     } else {
       ttx.mutate().row(row).col(FluoConstants.PAGE_CUR_COL).set(nextJson);
@@ -97,8 +106,8 @@
     ttx.mutate().row(row).col(FluoConstants.PAGE_NEW_COL).delete();
 
     TxLog txLog = rtx.getTxLog();
-    if (!txLog.getLogEntries().isEmpty()) {
-      exportQueue.add(tx, row.toString(), txLog);
+    if (!txLog.isEmpty()) {
+      exportQueue.add(tx, row, txLog);
     }
   }
 
@@ -106,4 +115,29 @@
   public ObservedColumn getObservedColumn() {
     return new ObservedColumn(FluoConstants.PAGE_NEW_COL, NotificationType.STRONG);
   }
+
+  public static void updateDomainPageCount(TypedTransactionBase ttx, Bytes pageRow, long change) {
+    Bytes domainRow = getDomainRow(pageRow);
+    if (!domainRow.equals(Bytes.EMPTY)) {
+      Long prevCount = ttx.get().row(domainRow).col(FluoConstants.PAGECOUNT_COL).toLong(0);
+      Long curCount = prevCount + change;
+      if (curCount == 0) {
+        ttx.mutate().row(domainRow).col(FluoConstants.PAGECOUNT_COL).delete();
+        log.debug("Deleted pagecount for {}", domainRow);
+      } else {
+        ttx.mutate().row(domainRow).col(FluoConstants.PAGECOUNT_COL).set(curCount);
+        log.debug("Updated pagecount for {} from {} to {}", domainRow, prevCount, curCount);
+      }
+    }
+  }
+
+  public static Bytes getDomainRow(Bytes pageRow) {
+    String pageUri = pageRow.toString().substring(2);
+    try {
+      String pageDomain = LinkUtil.getReverseTopPrivate(DataUtil.toUrl(pageUri));
+      return Bytes.of("d:" + pageDomain);
+    } catch (ParseException e) {
+      return Bytes.EMPTY;
+    }
+  }
 }
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
index 836f5e6..1f2588b 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexEnv.java
@@ -21,9 +21,9 @@
 import io.fluo.api.config.FluoConfiguration;
 import io.fluo.api.data.Bytes;
 import io.fluo.api.data.RowColumn;
-import io.fluo.webindex.core.DataConfig;
 import io.fluo.core.util.AccumuloUtil;
 import io.fluo.core.util.SpanUtil;
+import io.fluo.webindex.core.DataConfig;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
index 025a337..6e6004a 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/spark/IndexUtil.java
@@ -19,6 +19,7 @@
 import java.util.Set;
 
 import com.google.gson.Gson;
+import io.fluo.api.data.Bytes;
 import io.fluo.api.data.Column;
 import io.fluo.api.data.RowColumn;
 import io.fluo.webindex.core.Constants;
@@ -35,7 +36,7 @@
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.api.java.function.PairFunction;
 import org.archive.io.ArchiveReader;
 import org.archive.io.ArchiveRecord;
 import scala.Tuple2;
@@ -44,8 +45,11 @@
 
   private static Gson gson = new Gson();
 
-  public static JavaPairRDD<RowColumn, Long> createLinkCounts(IndexStats stats,
-      JavaPairRDD<Text, ArchiveReader> archives) {
+  private static void addRCV(List<Tuple2<RowColumn, Long>> tuples, String r, Column c, Long v) {
+    tuples.add(new Tuple2<>(new RowColumn(r, c), v));
+  }
+
+  public static JavaRDD<Page> createPages(JavaPairRDD<Text, ArchiveReader> archives) {
 
     JavaRDD<ArchiveRecord> records =
         archives.flatMap(new FlatMapFunction<Tuple2<Text, ArchiveReader>, ArchiveRecord>() {
@@ -56,58 +60,49 @@
         });
 
     JavaRDD<Page> pages = records.map(r -> ArchiveUtil.buildPageIgnoreErrors(r));
+    return pages;
+  }
 
-    JavaRDD<RowColumn> links = pages.flatMap(new FlatMapFunction<Page, RowColumn>() {
-      @Override
-      public Iterable<RowColumn> call(Page page) throws Exception {
-        if (page.isEmpty()) {
-          stats.addEmpty(1);
-          return new ArrayList<>();
-        }
-        stats.addPage(1);
-        Set<Page.Link> links = page.getOutboundLinks();
-        stats.addExternalLinks(links.size());
+  public static JavaPairRDD<RowColumn, Bytes> createLinkIndex(IndexStats stats, JavaRDD<Page> pages) {
 
-        List<RowColumn> retval = new ArrayList<>();
-        String pageUri = page.getUri();
-        String pageDomain = LinkUtil.getReverseTopPrivate(page.getUrl());
-        if (links.size() > 0) {
-          retval.add(new RowColumn("p:" + pageUri, FluoConstants.PAGE_SCORE_COL));
-          retval.add(new RowColumn("p:" + pageUri, new Column(Constants.PAGE, Constants.CUR, gson
-              .toJson(page))));
-          retval.add(new RowColumn("d:" + pageDomain, new Column(Constants.PAGES, pageUri)));
-        }
-        for (Page.Link link : links) {
-          String linkUri = link.getUri();
-          String linkDomain = LinkUtil.getReverseTopPrivate(link.getUrl());
-          retval.add(new RowColumn("p:" + linkUri, FluoConstants.PAGE_INCOUNT_COL));
-          retval.add(new RowColumn("p:" + linkUri, FluoConstants.PAGE_SCORE_COL));
-          retval.add(new RowColumn("p:" + linkUri, new Column(Constants.INLINKS, pageUri, link
-              .getAnchorText())));
-          retval.add(new RowColumn("d:" + linkDomain, new Column(Constants.PAGES, linkUri)));
-        }
-        return retval;
-      }
-    });
-    links.persist(StorageLevel.DISK_ONLY());
+    JavaPairRDD<RowColumn, Long> links =
+        pages.flatMapToPair(new PairFlatMapFunction<Page, RowColumn, Long>() {
+          @Override
+          public Iterable<Tuple2<RowColumn, Long>> call(Page page) throws Exception {
+            if (page.isEmpty()) {
+              stats.addEmpty(1);
+              return new ArrayList<>();
+            }
+            stats.addPage(1);
+            Set<Page.Link> links = page.getOutboundLinks();
+            stats.addExternalLinks(links.size());
 
-    final Long one = new Long(1);
-    JavaPairRDD<RowColumn, Long> ones = links.mapToPair(s -> new Tuple2<>(s, one));
+            List<Tuple2<RowColumn, Long>> ret = new ArrayList<>();
+            String pageUri = page.getUri();
+            String pageDomain = LinkUtil.getReverseTopPrivate(page.getUrl());
+            final Long one = new Long(1);
+            if (links.size() > 0) {
+              addRCV(ret, "p:" + pageUri, FluoConstants.PAGE_INCOUNT_COL, new Long(0));
+              addRCV(ret, "p:" + pageUri,
+                  new Column(Constants.PAGE, Constants.CUR, gson.toJson(page)), one);
+              addRCV(ret, "d:" + pageDomain, new Column(Constants.PAGES, pageUri), new Long(0));
+            }
+            for (Page.Link link : links) {
+              String linkUri = link.getUri();
+              String linkDomain = LinkUtil.getReverseTopPrivate(link.getUrl());
+              addRCV(ret, "p:" + linkUri, FluoConstants.PAGE_INCOUNT_COL, one);
+              addRCV(ret, "p:" + linkUri,
+                  new Column(Constants.INLINKS, pageUri, link.getAnchorText()), one);
+              addRCV(ret, "d:" + linkDomain, new Column(Constants.PAGES, linkUri), one);
+            }
+            return ret;
+          }
+        });
 
-    JavaPairRDD<RowColumn, Long> linkCounts = ones.reduceByKey((i1, i2) -> i1 + i2);
+    JavaPairRDD<RowColumn, Long> linkCounts = links.reduceByKey((i1, i2) -> i1 + i2);
 
     JavaPairRDD<RowColumn, Long> sortedLinkCounts = linkCounts.sortByKey();
 
-    return sortedLinkCounts;
-  }
-
-  public static String revEncodeLong(Long num) {
-    Lexicoder<Long> lexicoder = new ReverseLexicoder<>(new ULongLexicoder());
-    return Hex.encodeHexString(lexicoder.encode(num));
-  }
-
-  public static JavaPairRDD<RowColumn, Long> createSortedTopCounts(
-      JavaPairRDD<RowColumn, Long> sortedLinkCounts) {
     final Long one = new Long(1);
     JavaPairRDD<RowColumn, Long> topCounts =
         sortedLinkCounts
@@ -115,8 +110,8 @@
               @Override
               public Iterable<Tuple2<RowColumn, Long>> call(Tuple2<RowColumn, Long> t)
                   throws Exception {
-                List<Tuple2<RowColumn, Long>> retval = new ArrayList<>();
-                retval.add(t);
+                List<Tuple2<RowColumn, Long>> ret = new ArrayList<>();
+                ret.add(t);
 
                 RowColumn rc = t._1();
                 String row = rc.getRow().toString();
@@ -125,16 +120,19 @@
                 Long val = t._2();
 
                 if (row.startsWith("d:") && (cf.equals(Constants.PAGES))) {
-                  retval.add(new Tuple2<>(new RowColumn(row, new Column(Constants.RANK, String
-                      .format("%s:%s", revEncodeLong(val), cq))), val));
-                  retval.add(new Tuple2<>(new RowColumn(row, new Column(Constants.DOMAIN,
-                      Constants.PAGECOUNT)), one));
+                  addRCV(ret, row,
+                      new Column(Constants.RANK, String.format("%s:%s", revEncodeLong(val), cq)),
+                      val);
+                  addRCV(ret, row, new Column(Constants.DOMAIN, Constants.PAGECOUNT), one);
                 } else if ((row.startsWith("p:") && cf.equals(Constants.PAGE))
-                    && (cq.equals(Constants.INCOUNT) || cq.equals(Constants.SCORE))) {
-                  retval.add(new Tuple2<>(new RowColumn("t:" + cq, new Column(Constants.RANK,
-                      String.format("%s:%s", revEncodeLong(val), row.substring(2)))), val));
+                    && (cq.equals(Constants.INCOUNT))) {
+                  addRCV(
+                      ret,
+                      "t:" + cq,
+                      new Column(Constants.RANK, String.format("%s:%s", revEncodeLong(val),
+                          row.substring(2))), val);
                 }
-                return retval;
+                return ret;
               }
             });
 
@@ -142,6 +140,37 @@
 
     JavaPairRDD<RowColumn, Long> sortedTopCounts = reducedTopCounts.sortByKey();
 
-    return sortedTopCounts;
+    JavaPairRDD<RowColumn, Long> filteredTopCounts =
+        sortedTopCounts.filter(t -> !(t._1().getRow().toString().startsWith("d:") && t._1()
+            .getColumn().getFamily().toString().equals(Constants.PAGES)));
+
+    JavaPairRDD<RowColumn, Bytes> linkIndex =
+        filteredTopCounts.mapToPair(new PairFunction<Tuple2<RowColumn, Long>, RowColumn, Bytes>() {
+          @Override
+          public Tuple2<RowColumn, Bytes> call(Tuple2<RowColumn, Long> tuple) throws Exception {
+            RowColumn rc = tuple._1();
+            String cf = rc.getColumn().getFamily().toString();
+            String cq = rc.getColumn().getQualifier().toString();
+            byte[] val = tuple._2().toString().getBytes();
+            if (cf.equals(Constants.INLINKS)
+                || (cf.equals(Constants.PAGE) && cq.startsWith(Constants.CUR))) {
+              val = rc.getColumn().getVisibility().toArray();
+            }
+            return new Tuple2<>(new RowColumn(rc.getRow(), new Column(cf, cq)), Bytes.of(val));
+          }
+        });
+
+    return linkIndex;
+  }
+
+  public static JavaPairRDD<RowColumn, Bytes> filterRank(JavaPairRDD<RowColumn, Bytes> linkIndex) {
+    JavaPairRDD<RowColumn, Bytes> filteredLinkIndex =
+        linkIndex.filter(t -> !t._1().getColumn().getFamily().toString().equals(Constants.RANK));
+    return filteredLinkIndex;
+  }
+
+  public static String revEncodeLong(Long num) {
+    Lexicoder<Long> lexicoder = new ReverseLexicoder<>(new ULongLexicoder());
+    return Hex.encodeHexString(lexicoder.encode(num));
   }
 }
diff --git a/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java b/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
index e1541bc..148c336 100644
--- a/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
+++ b/modules/data/src/main/java/io/fluo/webindex/data/util/FluoConstants.java
@@ -26,7 +26,8 @@
   public static final Column PAGE_NEW_COL = new Column(Constants.PAGE, Constants.NEW);
   public static final Column PAGE_CUR_COL = new Column(Constants.PAGE, Constants.CUR);
   public static final Column PAGE_INCOUNT_COL = new Column(Constants.PAGE, Constants.INCOUNT);
-  public static final Column PAGE_SCORE_COL = new Column(Constants.PAGE, Constants.SCORE);
+
+  public static final Column PAGECOUNT_COL = new Column(Constants.DOMAIN, Constants.PAGECOUNT);
 
   public static final String INLINKS_UPDATE = "inlinks-update";
   public static final String INLINKS_CHANGE = "inlinks-change";
diff --git a/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java b/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java
new file mode 100644
index 0000000..df8d940
--- /dev/null
+++ b/modules/data/src/test/java/io/fluo/webindex/data/IndexIT.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2015 Fluo authors (see AUTHORS)
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.fluo.webindex.data;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import io.fluo.api.client.FluoAdmin;
+import io.fluo.api.client.FluoClient;
+import io.fluo.api.client.FluoFactory;
+import io.fluo.api.client.LoaderExecutor;
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.config.FluoConfiguration;
+import io.fluo.api.config.ScannerConfiguration;
+import io.fluo.api.data.Bytes;
+import io.fluo.api.data.Column;
+import io.fluo.api.data.RowColumn;
+import io.fluo.api.iterator.ColumnIterator;
+import io.fluo.api.iterator.RowIterator;
+import io.fluo.api.mini.MiniFluo;
+import io.fluo.recipes.accumulo.export.TableInfo;
+import io.fluo.webindex.core.Constants;
+import io.fluo.webindex.core.models.Page;
+import io.fluo.webindex.data.fluo.PageUpdate;
+import io.fluo.webindex.data.spark.IndexStats;
+import io.fluo.webindex.data.spark.IndexUtil;
+import io.fluo.webindex.data.util.ArchiveUtil;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.archive.io.ArchiveReader;
+import org.archive.io.ArchiveRecord;
+import org.archive.io.warc.WARCReaderFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class IndexIT {
+
+  private static final Logger log = LoggerFactory.getLogger(IndexIT.class);
+
+  public static TemporaryFolder folder = new TemporaryFolder();
+  public static MiniAccumuloCluster cluster;
+  private static MiniFluo miniFluo;
+  private static FluoConfiguration config;
+  private static final PasswordToken password = new PasswordToken("secret");
+  private static AtomicInteger tableCounter = new AtomicInteger(1);
+  private String exportTable;
+  private transient JavaSparkContext sc;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    folder.create();
+    MiniAccumuloConfig cfg =
+        new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), new String(password.getPassword()));
+    cluster = new MiniAccumuloCluster(cfg);
+    cluster.start();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    cluster.stop();
+    folder.delete();
+  }
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", getClass().getSimpleName());
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+  }
+
+  @Before
+  public void setUpFluo() throws Exception {
+    config = new FluoConfiguration();
+    config.setMiniStartAccumulo(false);
+    config.setApplicationName("lit");
+    config.setAccumuloInstance(cluster.getInstanceName());
+    config.setAccumuloUser("root");
+    config.setAccumuloPassword("secret");
+    config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
+    config.setAccumuloZookeepers(cluster.getZooKeepers());
+    config.setAccumuloTable("data" + tableCounter.getAndIncrement());
+    config.setWorkerThreads(5);
+
+    // create and configure export table
+    exportTable = "export" + tableCounter.getAndIncrement();
+    cluster.getConnector("root", "secret").tableOperations().create(exportTable);
+
+    PrintProps.configureApplication(config,
+        new TableInfo(cluster.getInstanceName(), cluster.getZooKeepers(), "root", "secret",
+            exportTable), 5);
+
+    FluoFactory.newAdmin(config).initialize(
+        new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true));
+
+    miniFluo = FluoFactory.newMiniFluo(config);
+  }
+
+
+  @After
+  public void tearDownFluo() throws Exception {
+    if (miniFluo != null) {
+      miniFluo.close();
+    }
+  }
+
+  public static Map<String, Page> readPages(File input) throws Exception {
+    Map<String, Page> pageMap = new HashMap<>();
+    ArchiveReader ar = WARCReaderFactory.get(input);
+    for (ArchiveRecord r : ar) {
+      Page p = ArchiveUtil.buildPage(r);
+      if (p.isEmpty() || p.getOutboundLinks().isEmpty()) {
+        continue;
+      }
+      pageMap.put(p.getUrl(), p);
+    }
+    ar.close();
+    return pageMap;
+  }
+
+  private void assertOutput(Collection<Page> pages, FluoClient client) throws Exception {
+    JavaRDD<Page> pagesRDD = sc.parallelize(new ArrayList<>(pages));
+    Assert.assertEquals(pages.size(), pagesRDD.count());
+
+    // Create expected output using spark
+    IndexStats stats = new IndexStats(sc);
+    JavaPairRDD<RowColumn, Bytes> linkIndex = IndexUtil.createLinkIndex(stats, pagesRDD);
+    List<Tuple2<RowColumn, Bytes>> linkIndexList = linkIndex.collect();
+    JavaPairRDD<RowColumn, Bytes> linkIndexNoRank = IndexUtil.filterRank(linkIndex);
+
+    // Compare against actual
+    boolean foundDiff = false;
+    foundDiff |= diffExportTable(linkIndexList);
+    foundDiff |= diffFluoTable(client, linkIndexNoRank.collect());
+    if (foundDiff) {
+      dump(client);
+      dumpExportTable();
+      print(linkIndexList);
+    }
+    Assert.assertFalse(foundDiff);
+  }
+
+  @Test
+  public void testIndexing() throws Exception {
+
+    Map<String, Page> pages = readPages(new File("src/test/resources/wat-18.warc"));
+
+    try (FluoClient client = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
+
+      try (LoaderExecutor le = client.newLoaderExecutor()) {
+        for (Page page : pages.values()) {
+          log.debug("Loading page {} with {} links", page.getUrl(), page.getOutboundLinks().size());
+          le.execute(PageUpdate.updatePage(page));
+        }
+      }
+      miniFluo.waitForObservers();
+
+      assertOutput(pages.values(), client);
+
+      String deleteUrl = "http://1000games.me/games/gametion/";
+      log.info("Deleting page {}", deleteUrl);
+      try (LoaderExecutor le = client.newLoaderExecutor()) {
+        le.execute(PageUpdate.deletePage(deleteUrl));
+      }
+      miniFluo.waitForObservers();
+
+      int numPages = pages.size();
+      Assert.assertNotNull(pages.remove(deleteUrl));
+      Assert.assertEquals(numPages-1, pages.size());
+      assertOutput(pages.values(), client);
+
+      String updateUrl = "http://100zone.blogspot.com/2013/03/please-memp3-4shared.html";
+      Page updatePage = pages.get(updateUrl);
+      long numLinks = updatePage.getNumOutbound();
+      Assert.assertTrue(updatePage.addOutboundLink("http://example.com", "Example"));
+      Assert.assertEquals(numLinks+1, (long)updatePage.getNumOutbound());
+      Assert.assertTrue(updatePage.removeOutboundLink("http://www.blogger.com"));
+      Assert.assertEquals(numLinks, (long)updatePage.getNumOutbound());
+
+      try (LoaderExecutor le = client.newLoaderExecutor()) {
+        le.execute(PageUpdate.updatePage(updatePage));
+      }
+      miniFluo.waitForObservers();
+
+      Assert.assertNotNull(pages.put(updateUrl, updatePage));
+      assertOutput(pages.values(), client);
+    }
+  }
+
+  private void print(List<Tuple2<RowColumn, Bytes>> linkIndex) {
+    System.out.println("== link index start ==");
+    linkIndex.forEach(t -> System.out.println("rc " + t._1().toString() + " val "
+        + t._2().toString()));
+    System.out.println("== link index end ==");
+  }
+
+  private void dump(FluoClient client) throws Exception {
+    try (Snapshot s = client.newSnapshot()) {
+      RowIterator iter = s.get(new ScannerConfiguration());
+
+      System.out.println("== fluo snapshot start ==");
+      while (iter.hasNext()) {
+        Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
+        ColumnIterator citer = rowEntry.getValue();
+        while (citer.hasNext()) {
+          Map.Entry<Column, Bytes> colEntry = citer.next();
+          System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
+              + colEntry.getValue());
+        }
+      }
+      System.out.println("=== fluo snapshot end ===");
+    }
+  }
+
+  private boolean diffFluoTable(FluoClient client, List<Tuple2<RowColumn, Bytes>> linkIndex)
+      throws Exception {
+    boolean retval = false;
+    try (Snapshot s = client.newSnapshot()) {
+      RowIterator iter = s.get(new ScannerConfiguration());
+      Iterator<Tuple2<RowColumn, Bytes>> indexIter = linkIndex.iterator();
+
+      while (iter.hasNext()) {
+        Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
+        ColumnIterator citer = rowEntry.getValue();
+        while (citer.hasNext() && indexIter.hasNext()) {
+          Map.Entry<Column, Bytes> colEntry = citer.next();
+          Tuple2<RowColumn, Bytes> indexEntry = indexIter.next();
+          RowColumn rc = indexEntry._1();
+          Column col = colEntry.getKey();
+
+          retval |= diff("row", rc.getRow().toString(), rowEntry.getKey().toString());
+          retval |= diff("fam", rc.getColumn().getFamily().toString(), col.getFamily().toString());
+          retval |=
+              diff("qual", rc.getColumn().getQualifier().toString(), col.getQualifier().toString());
+          if (!col.getQualifier().toString().equals(Constants.CUR)) {
+            retval |= diff("val", indexEntry._2().toString(), colEntry.getValue().toString());
+          }
+
+          if (retval) {
+            return true;
+          }
+          log.debug("Verified row {} cf {} cq {} val {}", rc.getRow().toString(), rc.getColumn()
+              .getFamily().toString(), rc.getColumn().getQualifier().toString(), indexEntry._2()
+              .toString());
+        }
+        if (citer.hasNext()) {
+          log.error("An column iterator still has more data");
+          return true;
+        }
+      }
+      if (iter.hasNext() || indexIter.hasNext()) {
+        log.error("An iterator still has more data");
+        return true;
+      }
+      log.debug("No difference found");
+      return false;
+    }
+  }
+
+  private void dumpExportTable() throws Exception {
+    Connector conn = cluster.getConnector("root", "secret");
+    Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
+    Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
+
+    System.out.println("== accumulo export table start ==");
+    while (iterator.hasNext()) {
+      Map.Entry<Key, Value> entry = iterator.next();
+      System.out.println(entry.getKey() + " " + entry.getValue());
+    }
+    System.out.println("== accumulo export table end ==");
+  }
+
+  private boolean diff(String dataType, String expected, String actual) {
+    if (!expected.equals(actual)) {
+      log.error("Difference found in {} - expected {} actual {}", dataType, expected, actual);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean diffExportTable(List<Tuple2<RowColumn, Bytes>> linkIndex) throws Exception {
+    Connector conn = cluster.getConnector("root", "secret");
+    Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
+    Iterator<Map.Entry<Key, Value>> exportIter = scanner.iterator();
+    Iterator<Tuple2<RowColumn, Bytes>> indexIter = linkIndex.iterator();
+
+    boolean retval = false;
+
+    while (exportIter.hasNext() && indexIter.hasNext()) {
+      Tuple2<RowColumn, Bytes> indexEntry = indexIter.next();
+      Map.Entry<Key, Value> exportEntry = exportIter.next();
+      Key key = exportEntry.getKey();
+      RowColumn rc = indexEntry._1();
+      Column col = rc.getColumn();
+
+      retval |= diff("row", rc.getRow().toString(), key.getRow().toString());
+      retval |= diff("fam", col.getFamily().toString(), key.getColumnFamily().toString());
+      retval |= diff("qual", col.getQualifier().toString(), key.getColumnQualifier().toString());
+      if (!col.getQualifier().toString().equals(Constants.CUR)) {
+        retval |= diff("val", indexEntry._2().toString(), exportEntry.getValue().toString());
+      }
+
+      if (retval) {
+        return true;
+      }
+      log.debug("Verified row {} cf {} cq {} val {}", rc.getRow().toString(), rc.getColumn()
+          .getFamily().toString(), rc.getColumn().getQualifier().toString(), indexEntry._2()
+          .toString());
+    }
+
+    if (exportIter.hasNext() || indexIter.hasNext()) {
+      log.error("An iterator still has more data");
+      return true;
+    }
+
+    log.debug("No difference found");
+    return false;
+  }
+}
diff --git a/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java b/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java
deleted file mode 100644
index e262990..0000000
--- a/modules/data/src/test/java/io/fluo/webindex/data/LoadIT.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright 2015 Fluo authors (see AUTHORS)
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package io.fluo.webindex.data;
-
-import java.io.File;
-import java.text.ParseException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import io.fluo.api.client.FluoAdmin;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.LoaderExecutor;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-import io.fluo.api.mini.MiniFluo;
-import io.fluo.recipes.accumulo.export.TableInfo;
-import io.fluo.webindex.core.models.Page;
-import io.fluo.webindex.data.fluo.PageUpdate;
-import io.fluo.webindex.data.util.ArchiveUtil;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.archive.io.ArchiveReader;
-import org.archive.io.ArchiveRecord;
-import org.archive.io.warc.WARCReaderFactory;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LoadIT {
-
-  private static final Logger log = LoggerFactory.getLogger(LoadIT.class);
-
-  public static TemporaryFolder folder = new TemporaryFolder();
-  public static MiniAccumuloCluster cluster;
-  private static MiniFluo miniFluo;
-  private static FluoConfiguration config;
-  private static final PasswordToken password = new PasswordToken("secret");
-  private static AtomicInteger tableCounter = new AtomicInteger(1);
-  private String exportTable;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    folder.create();
-    MiniAccumuloConfig cfg =
-        new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), new String(password.getPassword()));
-    cluster = new MiniAccumuloCluster(cfg);
-    cluster.start();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    cluster.stop();
-    folder.delete();
-  }
-
-  @Before
-  public void setUpFluo() throws Exception {
-    config = new FluoConfiguration();
-    config.setMiniStartAccumulo(false);
-    config.setApplicationName("lit");
-    config.setAccumuloInstance(cluster.getInstanceName());
-    config.setAccumuloUser("root");
-    config.setAccumuloPassword("secret");
-    config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
-    config.setAccumuloZookeepers(cluster.getZooKeepers());
-    config.setAccumuloTable("data" + tableCounter.getAndIncrement());
-    config.setWorkerThreads(5);
-
-    // create and configure export table
-    exportTable = "export" + tableCounter.getAndIncrement();
-    cluster.getConnector("root", "secret").tableOperations().create(exportTable);
-
-    PrintProps.configureApplication(config,
-        new TableInfo(cluster.getInstanceName(), cluster.getZooKeepers(), "root", "secret",
-            exportTable), 5);
-
-    FluoFactory.newAdmin(config).initialize(
-        new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true));
-
-    miniFluo = FluoFactory.newMiniFluo(config);
-  }
-
-
-  @After
-  public void tearDownFluo() throws Exception {
-    if (miniFluo != null) {
-      miniFluo.close();
-    }
-  }
-
-  @Test
-  public void testLoad() throws Exception {
-
-    try (FluoClient client = FluoFactory.newClient(miniFluo.getClientConfiguration())) {
-
-      ArchiveReader ar = WARCReaderFactory.get(new File("src/test/resources/wat-18.warc"));
-
-      try (LoaderExecutor le = client.newLoaderExecutor()) {
-        Iterator<ArchiveRecord> records = ar.iterator();
-        while (records.hasNext()) {
-          try {
-            ArchiveRecord r = records.next();
-            Page p = ArchiveUtil.buildPage(r);
-            if (p.isEmpty() || p.getOutboundLinks().isEmpty()) {
-              continue;
-            }
-            log.info("Loading page {} with {} links", p.getUrl(), p.getOutboundLinks().size());
-            le.execute(PageUpdate.updatePage(p));
-          } catch (ParseException e) {
-            log.debug("Parse exception occurred", e);
-          }
-        }
-      }
-      ar.close();
-      miniFluo.waitForObservers();
-      dump(client);
-      dumpExportTable();
-
-      String url = "http://1000games.me/games/gametion/";
-      log.info("Deleting page {}", url);
-      try (LoaderExecutor le = client.newLoaderExecutor()) {
-        le.execute(PageUpdate.deletePage(url));
-      }
-
-      miniFluo.waitForObservers();
-      dump(client);
-      dumpExportTable();
-    }
-  }
-
-  private void dump(FluoClient client) throws Exception {
-    try (Snapshot s = client.newSnapshot()) {
-      RowIterator iter = s.get(new ScannerConfiguration());
-
-      System.out.println("== snapshot start ==");
-      while (iter.hasNext()) {
-        Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next();
-        ColumnIterator citer = rowEntry.getValue();
-        while (citer.hasNext()) {
-          Map.Entry<Column, Bytes> colEntry = citer.next();
-          System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t"
-              + colEntry.getValue());
-        }
-      }
-      System.out.println("=== snapshot end ===");
-    }
-  }
-
-  private void dumpExportTable() throws Exception {
-    Connector conn = cluster.getConnector("root", "secret");
-    Scanner scanner = conn.createScanner(exportTable, Authorizations.EMPTY);
-    Iterator<Map.Entry<Key, Value>> iterator = scanner.iterator();
-
-    System.out.println("== export table start ==");
-    while (iterator.hasNext()) {
-      Map.Entry<Key, Value> entry = iterator.next();
-      System.out.println(entry.getKey() + " " + entry.getValue());
-    }
-    System.out.println("== export table end ==");
-
-  }
-}
diff --git a/modules/data/src/test/resources/log4j.properties b/modules/data/src/test/resources/log4j.properties
index d0ad78b..4577b43 100644
--- a/modules/data/src/test/resources/log4j.properties
+++ b/modules/data/src/test/resources/log4j.properties
@@ -15,18 +15,22 @@
 log4j.rootLogger=INFO, CA
 log4j.appender.CA=org.apache.log4j.ConsoleAppender
 log4j.appender.CA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
+log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c] %-5p: %m%n
 
-log4j.logger.org.apache.curator=ERROR
+log4j.logger.akka=WARN
 log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
 log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
 log4j.logger.org.apache.accumulo.core.util.shell.Shell=FATAL
 log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN
+log4j.logger.org.apache.curator=ERROR
 log4j.logger.org.apache.hadoop=WARN
 log4j.logger.org.apache.hadoop.conf=ERROR
 log4j.logger.org.apache.hadoop.mapred=ERROR
 log4j.logger.org.apache.hadoop.mapreduce=ERROR
 log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+log4j.logger.org.apache.spark=WARN
 log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL
 log4j.logger.org.apache.zookeeper.ZooKeeper=WARN
-log4j.logger.io.fluo=INFO
+log4j.logger.org.spark-project=WARN
+log4j.logger.io.fluo=WARN
+log4j.logger.Remoting=WARN
diff --git a/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java b/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
index 23f7506..d8a7268 100644
--- a/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
+++ b/modules/ui/src/main/java/io/fluo/webindex/ui/WebIndexResources.java
@@ -133,7 +133,6 @@
   private Page getPage(String url) {
     Page page = null;
     Long incount = (long) 0;
-    Long score = (long) 0;
     try {
       Scanner scanner = conn.createScanner(dataConfig.accumuloIndexTable, Authorizations.EMPTY);
       scanner.setRange(Range.exact("p:" + DataUtil.toUri(url), Constants.PAGE));
@@ -144,9 +143,6 @@
           case Constants.INCOUNT:
             incount = getLongValue(entry);
             break;
-          case Constants.SCORE:
-            score = getLongValue(entry);
-            break;
           case Constants.CUR:
             page = gson.fromJson(entry.getValue().toString(), Page.class);
             break;
@@ -163,7 +159,6 @@
       page = new Page(url);
     }
     page.setNumInbound(incount);
-    page.setScore(score);
     page.setDomain(WebUtil.getDomain(page.getUrl()));
     return page;
   }
@@ -265,7 +260,7 @@
       @DefaultValue("0") @QueryParam("pageNum") Integer pageNum) {
 
     TopResults results = new TopResults();
-    if (resultType.equals(Constants.INCOUNT) || resultType.equals(Constants.SCORE)) {
+    if (resultType.equals(Constants.INCOUNT)) {
       results.setResultType(resultType);
       results.setPageNum(pageNum);
       try {
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
index 26485f9..4a91471 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/home.ftl
@@ -15,7 +15,7 @@
 </div>
 <div class="row">
   <div class="col-md-6 col-md-offset-3" style="margin-top: 20px">
-    <p><b>View top pages by:</b> &nbsp; <a href="/top?resultType=score">Score</a> &nbsp; <a href="/top?resultType=incount">Inbound Links</a></p>
+    <p><b>View top pages by <a href="/top?resultType=incount">Inbound Links</a></p>
   </div>
 </div>
 <#include "common/footer.ftl">
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
index ec377ef..5db7610 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/page.ftl
@@ -8,7 +8,6 @@
       </#if>
       <tr><td>URL<td>${page.url?html} &nbsp;- &nbsp;<a href="${page.url?html}">Go to page</a></tr>
       <tr><td>Domain<td><a href="pages?domain=${page.domain?url}">${page.domain?html}</a></tr>
-      <tr><td>Page Score<td>${page.score}</tr>
       <tr><td>Inbound links<td><a href="/links?url=${page.url?url}&linkType=in">${page.numInbound}</a></tr>
       <#if page.crawlDate??>
         <tr><td>Outbound links<td><a href="/links?url=${page.url?url}&linkType=out">${page.numOutbound}</a></tr>
diff --git a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
index 498d03a..dcc88a8 100644
--- a/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
+++ b/modules/ui/src/main/resources/io/fluo/webindex/ui/views/pages.ftl
@@ -28,7 +28,7 @@
 <div class="row">
   <div class="col-md-12">
     <table class="table table-striped">
-    <thead><th>Score</th><th>URL</th></thead>
+    <thead><th>Inbound Links</th><th>URL</th></thead>
     <#list pages.pages as page>
       <tr>
         <td>${page.score?html}</td>
diff --git a/pom.xml b/pom.xml
index 97874b9..a2a1344 100644
--- a/pom.xml
+++ b/pom.xml
@@ -341,6 +341,25 @@
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <io.fluo.it.instance.name>it-instance-maven</io.fluo.it.instance.name>
+            <io.fluo.it.instance.clear>false</io.fluo.it.instance.clear>
+          </systemPropertyVariables>
+        </configuration>
+        <executions>
+          <execution>
+            <id>run-integration-tests</id>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
   <profiles>