FluoQueryMetadataCache
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 01da2dc..fd624eb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -1,25 +1,16 @@
<?xml version="1.0" encoding="utf-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rya</groupId>
@@ -41,12 +32,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf project. RYA-341 -->
- <!-- <version>13.0</version> Overriding Rya's Guava version to be compatible with Fluo's required version. Alternative is relocation with shade. -->
+ <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf
+ project. RYA-341 -->
+ <!-- <version>13.0</version> Overriding Rya's Guava version to
+ be compatible with Fluo's required version. Alternative is relocation with
+ shade. -->
</dependency>
<!-- Rya Runtime Dependencies. -->
@@ -75,7 +68,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
-
+
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
@@ -87,6 +80,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index c0cfa1d..9e47132 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -36,8 +36,9 @@
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
@@ -55,7 +56,7 @@
public abstract class BindingSetUpdater extends AbstractObserver {
private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
// DAO
- protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
// Updaters
private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
@@ -117,9 +118,9 @@
} catch (final Exception e) {
throw new RuntimeException("Could not process a Query node.", e);
}
- break;
-
- case CONSTRUCT:
+ break;
+
+ case CONSTRUCT:
final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
try{
constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
@@ -127,7 +128,7 @@
throw new RuntimeException("Could not process a Query node.", e);
}
break;
-
+
case FILTER:
final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
try {
@@ -145,7 +146,7 @@
throw new RuntimeException("Could not process a Join node.", e);
}
break;
-
+
case PERIODIC_QUERY:
final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
try{
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index 61e7244..09d9ede 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -29,6 +29,8 @@
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
/**
* Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
@@ -40,6 +42,7 @@
public class ConstructQueryResultObserver extends AbstractObserver {
private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
@Override
public ObservedColumn getObservedColumn() {
@@ -48,14 +51,14 @@
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-
+
//Build row for parent that result will be written to
BindingSetRow bsRow = BindingSetRow.make(row);
String constructNodeId = bsRow.getNodeId();
String bsString= bsRow.getBindingSetString();
- String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
+ String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
-
+
//Get NodeType of the parent node
NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
//Get data for the ConstructQuery result
@@ -63,5 +66,5 @@
//Write result to parent
tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
}
-
+
}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 9514932..78d0ec5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -35,7 +35,8 @@
import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,7 @@
public class QueryResultObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class);
- private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO();
-
+ protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
/**
* Builders for each type of {@link IncrementalBindingSetExporter} we support.
*/
@@ -101,7 +101,7 @@
// Read the queryId from the row and get the QueryMetadata.
final String queryId = row.split(NODEID_BS_DELIM)[0];
- final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId);
+ final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId);
// Read the Child Binding Set that will be exported.
final Bytes valueBytes = tx.get(brow, col);
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 2d7f390..d6fd8bd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -34,7 +34,8 @@
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
+import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -55,7 +56,7 @@
private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO();
+ private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
public TripleObserver() {}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
new file mode 100644
index 0000000..8adc40d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+ private final FluoQueryMetadataDAO dao;
+ private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache;
+ private final Cache<String, Bytes> metadataCache;
+ private int capacity;
+ private int concurrencyLevel;
+
+ /**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+ public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) {
+ this.dao = dao;
+ commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+ metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+ this.capacity = capacity;
+ this.concurrencyLevel = concurrencyLevel;
+ }
+
+ /**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+ public int getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+ public int getConcurrencyLevel() {
+ return concurrencyLevel;
+ }
+
+ @Override
+ public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId);
+ return dao.readStatementPatternMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.JOIN);
+ LOG.debug("Retrieving Metadata from Cache: {}.", nodeId);
+ return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readJoinMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.FILTER);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readFilterMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ try {
+ return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readProjectionMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readAggregationMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readConstructQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readPeriodicQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ @Override
+ public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) {
+ Optional<NodeType> type = NodeType.fromNodeId(nodeId);
+ try {
+ checkArgument(type.isPresent() && type.get() == NodeType.QUERY);
+ LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+ return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() {
+ @Override
+ public CommonNodeMetadata call() throws Exception {
+ LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId);
+ return dao.readQueryMetadata(tx, nodeId);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e);
+ }
+ }
+
+ public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) {
+ Optional<NodeType> type = NodeType.fromNodeId(rowId);
+ try {
+ checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column));
+ return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() {
+ @Override
+ public Bytes call() throws Exception {
+ return tx.get(Bytes.of(rowId), column);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e);
+ }
+ }
+
+ private String getKey(String row, Column column) {
+ return row + ":" + column.getsQualifier() + ":" + column.getsQualifier();
+ }
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
new file mode 100644
index 0000000..faab952
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
@@ -0,0 +1,44 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class);
+ private static FluoQueryMetadataCache CACHE;
+ private static boolean initialized = false;
+ private static final int DEFAULT_CAPACITY = 10000;
+ private static final int DEFAULT_CONCURRENCY = 8;
+
+ /**
+ * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the
+ * indicated capacity and concurrencyLevel if one is provided.
+ *
+ * @param capacity - capacity used to create a new cache
+ * @param concurrencyLevel - concurrencyLevel used to create a new cache
+ */
+ public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) {
+ if (!initialized) {
+ LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity,
+ concurrencyLevel);
+ CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+ initialized = true;
+ } else {
+ LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}",
+ CACHE.getCapacity(), CACHE.getConcurrencyLevel());
+ }
+ return CACHE;
+ }
+
+ /**
+ * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
+ * with a default size of 10000 entries and a default concurrency level of 8.
+ *
+ * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency
+ */
+ public static FluoQueryMetadataCache getOrCreateCache() {
+ return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY);
+ }
+
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
new file mode 100644
index 0000000..3df3708
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java
@@ -0,0 +1,34 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FluoQueryMetadataCacheTest {
+
+ @Test
+ public void testCache() {
+ FluoQueryMetadataDAO mockDAO = Mockito.mock(FluoQueryMetadataDAO.class);
+ Transaction mockTx = Mockito.mock(Transaction.class);
+ String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN);
+ StatementPatternMetadata metadata = StatementPatternMetadata.builder(nodeId).setParentNodeId("parent")
+ .setStatementPattern("pattern").setVarOrder(new VariableOrder("xyz")).build();
+ when(mockDAO.readStatementPatternMetadata(mockTx, nodeId)).thenReturn(metadata);
+
+ FluoQueryMetadataCache cache = new FluoQueryMetadataCache(mockDAO, 20, 2);
+
+ assertEquals(metadata, cache.readStatementPatternMetadata(mockTx, nodeId));
+
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+ cache.readStatementPatternMetadata(mockTx, nodeId);
+
+ Mockito.verify(mockDAO, Mockito.times(1)).readStatementPatternMetadata(mockTx, nodeId);
+ }
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
new file mode 100644
index 0000000..fabf512
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java
@@ -0,0 +1,169 @@
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static java.util.Objects.requireNonNull;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class FluoLatencyIT extends KafkaExportITBase {
+ private static ValueFactory vf;
+ private static DatatypeFactory dtf;
+
+ @BeforeClass
+ public static void init() throws DatatypeConfigurationException {
+ vf = new ValueFactoryImpl();
+ dtf = DatatypeFactory.newInstance();
+ }
+
+ @Test
+ public void resultsExported() throws Exception {
+
+ final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { "
+ + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type";
+
+// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { "
+// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}";
+
+ try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) {
+ // Tell the Fluo app to maintain the PCJ.
+ String pcjId = FluoQueryUtils.createNewPcjId();
+ FluoConfiguration conf = super.getFluoConfiguration();
+ new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient);
+ SailRepositoryConnection conn = super.getRyaSailRepository().getConnection();
+
+ long start = System.currentTimeMillis();
+ int numReturned = 0;
+ int numObs = 10;
+ int numTypes = 5;
+ int numExpected = 0;
+ int increment = numObs*numTypes;
+ while (System.currentTimeMillis() - start < 60000) {
+ List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now());
+ conn.add(statements);
+ numExpected += increment;
+ System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: "
+ + getNumFluoEntries(fluoClient));
+ numReturned += readAllResults(pcjId).size();
+ System.out
+ .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned));
+// FluoITHelper.printFluoTable(conf);
+ Thread.sleep(30000);
+ }
+ }
+ }
+
+ /**
+ * Generates (numObservationsPerType x numTypes) statements of the form:
+ *
+ * <pre>
+ * urn:obs_n uri:hasTime zonedTime
+ * urn:obs_n uri:hasObsType typePrefix_m
+ * </pre>
+ *
+ * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by
+ * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes.
+ *
+ * @param numObservationsPerType - The quantity of observations per type to generate.
+ * @param numTypes - The number of types to generate observations for.
+ * @param typePrefix - The prefix to be used for the type literal in the statement.
+ * @param observationOffset - The offset to be used for determining the value of n in the above statements.
+ * @param zonedTime - The time to be used for all observations generated.
+ * @return A new list of all generated Statements.
+ */
+ public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix,
+ final long observationOffset, final ZonedDateTime zonedTime) {
+ final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT);
+ final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time));
+ final List<Statement> statements = Lists.newArrayList();
+
+ for (long i = 0; i < numObservationsPerType; i++) {
+ for (int j = 0; j < numTypes; j++) {
+ final long observationId = observationOffset + i * numTypes + j;
+ // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId;
+ // final String obsId = "urn:obs_" + observationId;
+ final String obsId = "urn:obs_" + String.format("%020d", observationId);
+ final String type = typePrefix + j;
+ // logger.info(obsId + " " + type + " " + litTime);
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime));
+ statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type)));
+ }
+ }
+
+ return statements;
+ }
+
+ private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception {
+ requireNonNull(pcjId);
+
+ // Read all of the results from the Kafka topic.
+ final Set<VisibilityBindingSet> results = new HashSet<>();
+
+ try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) {
+ final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000);
+ final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator();
+ while (recordIterator.hasNext()) {
+ results.add(recordIterator.next().value());
+ }
+ }
+
+ return results;
+ }
+
+ private int getNumAccEntries(String tableName) throws TableNotFoundException {
+ Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations());
+ int count = 0;
+ for (Map.Entry<Key, Value> entry : scanner) {
+ count++;
+ }
+ return count;
+ }
+
+ private int getNumFluoEntries(FluoClient client) {
+ Transaction tx = client.newTransaction();
+ CellScanner scanner = tx.scanner().build();
+ int count = 0;
+ for (RowColumnValue rcv : scanner) {
+ count++;
+ }
+ return count;
+ }
+
+}
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 59fe54f..7b16dcf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -69,7 +69,6 @@
import org.apache.rya.sail.config.RyaSailFactory;
import org.junit.After;
import org.junit.Before;
-import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
@@ -129,7 +128,7 @@
final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams);
kafkaParams.setUseKafkaBindingSetExporter(true);
kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT);
-
+
final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams);
kafkaConstructParams.setUseKafkaSubgraphExporter(true);
@@ -262,7 +261,7 @@
* If this test fails then its a testing environment issue, not with Rya.
* Source: https://github.com/asmaier/mini-kafka
*/
- @Test
+// @Test
public void embeddedKafkaTest() throws Exception {
// create topic
final String topic = "testTopic";
@@ -339,9 +338,9 @@
// The PCJ Id is the topic name the results will be written to.
return pcjId;
}
-
+
protected void loadData(final Collection<Statement> statements) throws Exception {
-
+
requireNonNull(statements);
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
@@ -352,7 +351,7 @@
// Wait for the Fluo application to finish computing the end result.
super.getMiniFluo().waitForObservers();
-
+
}
}