FluoQueryMetadataCache
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 01da2dc..fd624eb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,25 +1,16 @@
 <?xml version="1.0" encoding="utf-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <parent>
         <groupId>org.apache.rya</groupId>
@@ -41,12 +32,14 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
-            <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf project.  RYA-341 -->
-            <!-- <version>13.0</version>  Overriding Rya's Guava version to be compatible with Fluo's required version.  Alternative is relocation with shade. -->
+            <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf 
+                project. RYA-341 -->
+            <!-- <version>13.0</version> Overriding Rya's Guava version to 
+                be compatible with Fluo's required version. Alternative is relocation with 
+                shade. -->
         </dependency>
 
         <!-- Rya Runtime Dependencies. -->
@@ -75,7 +68,7 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
         </dependency>
-        
+
         <dependency>
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo</artifactId>
@@ -87,6 +80,11 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index c0cfa1d..9e47132 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -36,8 +36,9 @@
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
@@ -55,7 +56,7 @@
 public abstract class BindingSetUpdater extends AbstractObserver {
     private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
     // DAO
-    protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+    protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
 
     // Updaters
     private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
@@ -117,9 +118,9 @@
                 } catch (final Exception e) {
                     throw new RuntimeException("Could not process a Query node.", e);
                 }
-                break;    
-                
-            case CONSTRUCT: 
+                break;
+
+            case CONSTRUCT:
                 final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
                 try{
                     constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
@@ -127,7 +128,7 @@
                     throw new RuntimeException("Could not process a Query node.", e);
                 }
                 break;
-                
+
             case FILTER:
                 final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
                 try {
@@ -145,7 +146,7 @@
                     throw new RuntimeException("Could not process a Join node.", e);
                 }
                 break;
-                
+
             case PERIODIC_QUERY:
                 final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
                 try{
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 61e7244..09d9ede 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -29,6 +29,8 @@
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 
 /**
  * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -40,6 +42,7 @@
 public class ConstructQueryResultObserver extends AbstractObserver {
 
     private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+    protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -48,14 +51,14 @@
 
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-        
+
         //Build row for parent that result will be written to
         BindingSetRow bsRow = BindingSetRow.make(row);
         String constructNodeId = bsRow.getNodeId();
         String bsString= bsRow.getBindingSetString();
-        String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
+        String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
         String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
-        
+
         //Get NodeType of the parent node
         NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
         //Get data for the ConstructQuery result
@@ -63,5 +66,5 @@
         //Write result to parent
         tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
     }
-   
+
 }
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 9514932..78d0ec5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -35,7 +35,8 @@
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +50,7 @@
 public class QueryResultObserver extends AbstractObserver {
 
     private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
-    private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO();
-
+    protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
     /**
      * Builders for each type of {@link IncrementalBindingSetExporter} we support.
      */
@@ -101,7 +101,7 @@
 
         // Read the queryId from the row and get the QueryMetadata.
         final String queryId = row.split(NODEID_BS_DELIM)[0];
-        final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId);
+        final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
 
         // Read the Child Binding Set that will be exported.
         final Bytes valueBytes = tx.get(brow, col);
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 2d7f390..d6fd8bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -34,7 +34,8 @@
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -55,7 +56,7 @@
     private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
-    private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO();
+    private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
     private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
 
     public TripleObserver() {}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
new file mode 100644
index 0000000..8adc40d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+    private final FluoQueryMetadataDAO dao;
+    private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
+    private final Cache<String, Bytes> metadataCache;
+    private int capacity;
+    private int concurrencyLevel;
+
+    /**
+     * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
+     *
+     * @param capacity - max size of the cache
+     */
+    public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+        this.dao = dao;
+        commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+        metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+        this.capacity = capacity;
+        this.concurrencyLevel = concurrencyLevel;
+    }
+
+    /**
+     * @return - capacity of this cache in terms of max number of entries
+     */
+    public int getCapacity() {
+        return capacity;
+    }
+
+    /**
+     * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on
+     *         without waiting for other threads
+     */
+    public int getConcurrencyLevel() {
+        return concurrencyLevel;
+    }
+
+    @Override
+    public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+                    return dao.readStatementPatternMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
+            LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
+            return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readJoinMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readFilterMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
+        LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+        try {
+            return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readProjectionMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readAggregationMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readConstructQueryMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readPeriodicQueryMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    @Override
+    public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+        Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+        try {
+            checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
+            LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+            return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+                @Override
+                public CommonNodeMetadata call() throws Exception {
+                    LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+                    return dao.readQueryMetadata(tx, nodeId);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
+        }
+    }
+
+    public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+        Optional<NodeType> type = NodeType.fromNodeId(rowId);
+        try {
+            checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
+            return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
+                @Override
+                public Bytes call() throws Exception {
+                    return tx.get(Bytes.of(rowId), column);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
+        }
+    }
+
+    private String getKey(String row, Column column) {
+        return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
+    }
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
new file mode 100644
index 0000000..faab952
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -0,0 +1,44 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
+    private static FluoQueryMetadataCache CACHE;
+    private static boolean initialized = false;
+    private static final int DEFAULT_CAPACITY = 10000;
+    private static final int DEFAULT_CONCURRENCY = 8;
+
+    /**
+     * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
+     * indicated capacity and concurrencyLevel if one is provided.
+     *
+     * @param capacity - capacity used to create a new cache
+     * @param concurrencyLevel - concurrencyLevel used to create a new cache
+     */
+    public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
+        if (!initialized) {
+            LOG.debug("Cache has not been initialized.  Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+                    concurrencyLevel);
+            CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+            initialized = true;
+        } else {
+            LOG.debug("Cache has already been initialized.  Returning cache with capacity: {} and concurrencylevel: {}",
+                    CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+        }
+        return CACHE;
+    }
+
+    /**
+     * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
+     * with a default size of 10000 entries and a default concurrency level of 8.
+     *
+     * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
+     */
+    public static FluoQueryMetadataCache getOrCreateCache() {
+        return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
+    }
+
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
new file mode 100644
index 0000000..3df3708
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -0,0 +1,34 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FluoQueryMetadataCacheTest {
+
+    @Test
+    public void testCache() {
+        FluoQueryMetadataDAO mockDAO = Mockito.mock(FluoQueryMetadataDAO.class);
+        Transaction mockTx = Mockito.mock(Transaction.class);
+        String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+        StatementPatternMetadata metadata = StatementPatternMetadata.builder(nodeId).setParentNodeId("parent")
+                .setStatementPattern("pattern").setVarOrder(new VariableOrder("xyz")).build();
+        when(mockDAO.readStatementPatternMetadata(mockTx, nodeId)).thenReturn(metadata);
+
+        FluoQueryMetadataCache cache = new FluoQueryMetadataCache(mockDAO, 20, 2);
+
+        assertEquals(metadata, cache.readStatementPatternMetadata(mockTx, nodeId));
+
+        cache.readStatementPatternMetadata(mockTx, nodeId);
+        cache.readStatementPatternMetadata(mockTx, nodeId);
+        cache.readStatementPatternMetadata(mockTx, nodeId);
+        cache.readStatementPatternMetadata(mockTx, nodeId);
+
+        Mockito.verify(mockDAO, Mockito.times(1)).readStatementPatternMetadata(mockTx, nodeId);
+    }
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
new file mode 100644
index 0000000..fabf512
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
@@ -0,0 +1,169 @@
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static java.util.Objects.requireNonNull;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class FluoLatencyIT extends KafkaExportITBase {
+    private static ValueFactory vf;
+    private static DatatypeFactory dtf;
+
+    @BeforeClass
+    public static void init() throws DatatypeConfigurationException {
+        vf = new ValueFactoryImpl();
+        dtf = DatatypeFactory.newInstance();
+    }
+
+    @Test
+    public void resultsExported() throws Exception {
+
+        final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
+                + "    ?obs <uri:hasTime> ?time. " + "    ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
+
+//        final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
+//                + "    ?obs <uri:hasTime> ?time. " + "    ?obs <uri:hasObsType> ?type " + "}";
+
+        try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Tell the Fluo app to maintain the PCJ.
+            String pcjId = FluoQueryUtils.createNewPcjId();
+            FluoConfiguration conf = super.getFluoConfiguration();
+            new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
+            SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
+
+            long start = System.currentTimeMillis();
+            int numReturned = 0;
+            int numObs = 10;
+            int numTypes = 5;
+            int numExpected = 0;
+            int increment = numObs*numTypes;
+            while (System.currentTimeMillis() - start < 60000) {
+                List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
+                conn.add(statements);
+                numExpected += increment;
+                System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
+                        + getNumFluoEntries(fluoClient));
+                numReturned += readAllResults(pcjId).size();
+                System.out
+                        .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
+//                FluoITHelper.printFluoTable(conf);
+                Thread.sleep(30000);
+            }
+        }
+    }
+
+    /**
+     * Generates (numObservationsPerType x numTypes) statements of the form:
+     *
+     * <pre>
+     * urn:obs_n uri:hasTime zonedTime
+     * urn:obs_n uri:hasObsType typePrefix_m
+     * </pre>
+     *
+     * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
+     * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
+     *
+     * @param numObservationsPerType - The quantity of observations per type to generate.
+     * @param numTypes - The number of types to generate observations for.
+     * @param typePrefix - The prefix to be used for the type literal in the statement.
+     * @param observationOffset - The offset to be used for determining the value of n in the above statements.
+     * @param zonedTime - The time to be used for all observations generated.
+     * @return A new list of all generated Statements.
+     */
+    public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
+            final long observationOffset, final ZonedDateTime zonedTime) {
+        final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
+        final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
+        final List<Statement> statements = Lists.newArrayList();
+
+        for (long i = 0; i < numObservationsPerType; i++) {
+            for (int j = 0; j < numTypes; j++) {
+                final long observationId = observationOffset + i * numTypes + j;
+                // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
+                // final String obsId = "urn:obs_" + observationId;
+                final String obsId = "urn:obs_" + String.format("%020d", observationId);
+                final String type = typePrefix + j;
+                // logger.info(obsId + " " + type + " " + litTime);
+                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
+                statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
+            }
+        }
+
+        return statements;
+    }
+
+    private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
+        requireNonNull(pcjId);
+
+        // Read all of the results from the Kafka topic.
+        final Set<VisibilityBindingSet> results = new HashSet<>();
+
+        try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+            final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
+            while (recordIterator.hasNext()) {
+                results.add(recordIterator.next().value());
+            }
+        }
+
+        return results;
+    }
+
+    private int getNumAccEntries(String tableName) throws TableNotFoundException {
+        Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
+        int count = 0;
+        for (Map.Entry<Key, Value> entry : scanner) {
+            count++;
+        }
+        return count;
+    }
+
+    private int getNumFluoEntries(FluoClient client) {
+        Transaction tx = client.newTransaction();
+        CellScanner scanner = tx.scanner().build();
+        int count = 0;
+        for (RowColumnValue rcv : scanner) {
+            count++;
+        }
+        return count;
+    }
+
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 59fe54f..7b16dcf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -69,7 +69,6 @@
 import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
@@ -129,7 +128,7 @@
         final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
         kafkaParams.setUseKafkaBindingSetExporter(true);
         kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
-        
+
         final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
         kafkaConstructParams.setUseKafkaSubgraphExporter(true);
 
@@ -262,7 +261,7 @@
      * If this test fails then its a testing environment issue, not with Rya.
      * Source: https://github.com/asmaier/mini-kafka
      */
-    @Test
+//    @Test
     public void embeddedKafkaTest() throws Exception {
         // create topic
         final String topic = "testTopic";
@@ -339,9 +338,9 @@
         // The PCJ Id is the topic name the results will be written to.
         return pcjId;
     }
-    
+
     protected void loadData(final Collection<Statement> statements) throws Exception {
-        
+
         requireNonNull(statements);
 
         final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
@@ -352,7 +351,7 @@
 
         // Wait for the Fluo application to finish computing the end result.
         super.getMiniFluo().waitForObservers();
-        
+
     }
 
 }