blob: 26d7596940470987514a2722e4dfad1e646d639e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.storage.accumulo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.test.accumulo.MiniAccumuloClusterInstance;
import org.apache.rya.test.accumulo.MiniAccumuloSingleton;
import org.apache.rya.test.accumulo.RyaTestInstanceRule;
import org.apache.zookeeper.ClientCnxn;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Optional;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Performs integration test using {@link MiniAccumuloCluster} to ensure the
* functions of {@link PcjTables} work within a cluster setting.
*/
public class PcjTablesIT {
private static final String USE_MOCK_INSTANCE = ".useMockInstance";
private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
private static final String CLOUDBASE_USER = "sc.cloudbase.username";
private static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
private static final ValueFactory VF = SimpleValueFactory.getInstance();
private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
// The MiniAccumuloCluster is re-used between tests.
private final MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance();
// Rya data store and connections.
protected RyaSailRepository ryaRepo = null;
protected RepositoryConnection ryaConn = null;
@Rule
public RyaTestInstanceRule testInstance = new RyaTestInstanceRule();
@BeforeClass
public static void killLoudLogs() {
Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
}
@Before
public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException {
// Setup the Rya library to use the Mini Accumulo.
ryaRepo = setupRya();
ryaConn = ryaRepo.getConnection();
}
@After
public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException {
// Stop Rya.
if (ryaRepo != null) {
ryaRepo.shutDown();
}
}
private String getRyaInstanceName() {
return testInstance.getRyaInstanceName();
}
/**
* Format a Mini Accumulo to be a Rya repository.
*
* @return The Rya repository sitting on top of the Mini Accumulo.
*/
private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException {
// Setup the Rya Repository that will be used to create Repository Connections.
final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
crdfdao.setConnector( cluster.getConnector() );
// Setup Rya configuration values.
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
conf.setTablePrefix(getRyaInstanceName());
conf.setDisplayQueryPlan(true);
conf.setBoolean(USE_MOCK_INSTANCE, false);
conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName());
conf.set(CLOUDBASE_USER, cluster.getUsername());
conf.set(CLOUDBASE_PASSWORD, cluster.getPassword());
conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName());
crdfdao.setConf(conf);
ryaStore.setRyaDAO(crdfdao);
final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
ryaRepo.initialize();
return ryaRepo;
}
/**
* Ensure that when a new PCJ table is created, it is initialized with the
* correct metadata values.
* <p>
* The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)}
*/
@Test
public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException {
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
// Create a PCJ table in the Mini Accumulo.
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
// Fetch the PcjMetadata and ensure it has the correct values.
final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
// Ensure the metadata matches the expected value.
final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders);
assertEquals(expected, pcjMetadata);
}
/**
* Ensure when results have been written to the PCJ table that they are in Accumulo.
* <p>
* The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)}
*/
@Test
public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
// Create a PCJ table in the Mini Accumulo.
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
// Add a few results to the PCJ table.
final MapBindingSet alice = new MapBindingSet();
alice.addBinding("name", VF.createIRI("http://Alice"));
alice.addBinding("age", VF.createLiteral(BigInteger.valueOf(14)));
final MapBindingSet bob = new MapBindingSet();
bob.addBinding("name", VF.createIRI("http://Bob"));
bob.addBinding("age", VF.createLiteral(BigInteger.valueOf(16)));
final MapBindingSet charlie = new MapBindingSet();
charlie.addBinding("name", VF.createIRI("http://Charlie"));
charlie.addBinding("age", VF.createLiteral(BigInteger.valueOf(12)));
final Set<BindingSet> results = Sets.newHashSet(alice, bob, charlie);
pcjs.addResults(accumuloConn, pcjTableName, Sets.newHashSet(
new VisibilityBindingSet(alice),
new VisibilityBindingSet(bob),
new VisibilityBindingSet(charlie)));
// Make sure the cardinality was updated.
final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
assertEquals(3, metadata.getCardinality());
// Scan Accumulo for the stored results.
final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
// Ensure the expected results match those that were stored.
final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
expectedResults.putAll("name;age", results);
expectedResults.putAll("age;name", results);
assertEquals(expectedResults, fetchedResults);
}
@Test
public void listResults() throws Exception {
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
// Create a PCJ table in the Mini Accumulo.
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
// Add a few results to the PCJ table.
final MapBindingSet alice = new MapBindingSet();
alice.addBinding("name", VF.createIRI("http://Alice"));
alice.addBinding("age", VF.createLiteral(BigInteger.valueOf(14)));
final MapBindingSet bob = new MapBindingSet();
bob.addBinding("name", VF.createIRI("http://Bob"));
bob.addBinding("age", VF.createLiteral(BigInteger.valueOf(16)));
final MapBindingSet charlie = new MapBindingSet();
charlie.addBinding("name", VF.createIRI("http://Charlie"));
charlie.addBinding("age", VF.createLiteral(BigInteger.valueOf(12)));
pcjs.addResults(accumuloConn, pcjTableName, Sets.newHashSet(
new VisibilityBindingSet(alice),
new VisibilityBindingSet(bob),
new VisibilityBindingSet(charlie)));
// Fetch the Binding Sets that have been stored in the PCJ table.
final Set<BindingSet> results = new HashSet<>();
final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations());
try {
while(resultsIt.hasNext()) {
results.add( resultsIt.next() );
}
} finally {
resultsIt.close();
}
// Verify the fetched results match the expected ones.
final Set<BindingSet> expected = Sets.newHashSet(alice, bob, charlie);
assertEquals(expected, results);
}
/**
* Ensure when results are already stored in Rya, that we are able to populate
* the PCJ table for a new SPARQL query using those results.
* <p>
* The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection)}
*/
@Test
public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
// Load some Triples into Rya.
final Set<Statement> triples = new HashSet<>();
triples.add( VF.createStatement(VF.createIRI("http://Alice"), VF.createIRI("http://hasAge"), VF.createLiteral(BigInteger.valueOf(14))) );
triples.add( VF.createStatement(VF.createIRI("http://Alice"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Bob"), VF.createIRI("http://hasAge"), VF.createLiteral(BigInteger.valueOf(16))) );
triples.add( VF.createStatement(VF.createIRI("http://Bob"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Charlie"), VF.createIRI("http://hasAge"), VF.createLiteral(BigInteger.valueOf(12))) );
triples.add( VF.createStatement(VF.createIRI("http://Charlie"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Eve"), VF.createIRI("http://hasAge"), VF.createLiteral(BigInteger.valueOf(43))) );
triples.add( VF.createStatement(VF.createIRI("http://Eve"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
for(final Statement triple : triples) {
ryaConn.add(triple);
}
// Create a PCJ table that will include those triples in its results.
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
// Populate the PCJ table using a Rya connection.
pcjs.populatePcj(accumuloConn, pcjTableName, ryaConn);
// Scan Accumulo for the stored results.
final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
// Make sure the cardinality was updated.
final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
assertEquals(3, metadata.getCardinality());
// Ensure the expected results match those that were stored.
final MapBindingSet alice = new MapBindingSet();
alice.addBinding("name", VF.createIRI("http://Alice"));
alice.addBinding("age", VF.createLiteral(BigInteger.valueOf(14)));
final MapBindingSet bob = new MapBindingSet();
bob.addBinding("name", VF.createIRI("http://Bob"));
bob.addBinding("age", VF.createLiteral(BigInteger.valueOf(16)));
final MapBindingSet charlie = new MapBindingSet();
charlie.addBinding("name", VF.createIRI("http://Charlie"));
charlie.addBinding("age", VF.createLiteral(BigInteger.valueOf(12)));
final Set<BindingSet> results = Sets.newHashSet(alice, bob, charlie);
final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
expectedResults.putAll("name;age", results);
expectedResults.putAll("age;name", results);
assertEquals(expectedResults, fetchedResults);
}
/**
* Ensure the method that creates a new PCJ table, scans Rya for matches, and
* stores them in the PCJ table works.
* <p>
* The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)}
*/
@Test
public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException {
// Load some Triples into Rya.
final Set<Statement> triples = new HashSet<>();
triples.add( VF.createStatement(VF.createIRI("http://Alice"), VF.createIRI("http://hasAge"), VF.createLiteral(14)) );
triples.add( VF.createStatement(VF.createIRI("http://Alice"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Bob"), VF.createIRI("http://hasAge"), VF.createLiteral(16)) );
triples.add( VF.createStatement(VF.createIRI("http://Bob"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Charlie"), VF.createIRI("http://hasAge"), VF.createLiteral(12)) );
triples.add( VF.createStatement(VF.createIRI("http://Charlie"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
triples.add( VF.createStatement(VF.createIRI("http://Eve"), VF.createIRI("http://hasAge"), VF.createLiteral(43)) );
triples.add( VF.createStatement(VF.createIRI("http://Eve"), VF.createIRI("http://playsSport"), VF.createLiteral("Soccer")) );
for(final Statement triple : triples) {
ryaConn.add(triple);
}
// Create a PCJ table that will include those triples in its results.
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
// Create and populate the PCJ table.
final PcjTables pcjs = new PcjTables();
pcjs.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.absent());
// Make sure the cardinality was updated.
final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
assertEquals(3, metadata.getCardinality());
// Scan Accumulo for the stored results.
final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName);
// Ensure the expected results match those that were stored.
final MapBindingSet alice = new MapBindingSet();
alice.addBinding("name", VF.createIRI("http://Alice"));
alice.addBinding("age", VF.createLiteral(BigInteger.valueOf(14)));
final MapBindingSet bob = new MapBindingSet();
bob.addBinding("name", VF.createIRI("http://Bob"));
bob.addBinding("age", VF.createLiteral(BigInteger.valueOf(16)));
final MapBindingSet charlie = new MapBindingSet();
charlie.addBinding("name", VF.createIRI("http://Charlie"));
charlie.addBinding("age", VF.createLiteral(BigInteger.valueOf(12)));
final Set<BindingSet> results = Sets.newHashSet(alice, bob, charlie);
final Multimap<String, BindingSet> expectedResults = HashMultimap.create();
expectedResults.putAll("name;age", results);
expectedResults.putAll("age;name", results);
assertEquals(expectedResults, fetchedResults);
}
@Test
public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
final Connector accumuloConn = cluster.getConnector();
// Set up the table names that will be used.
final String instance1 = "instance1_";
final String instance2 = "instance2_";
final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1");
final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2");
final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3");
final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1");
// Create the PCJ Tables that are in instance 1 and instance 2.
final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, instance1_table1, varOrders, sparql);
pcjs.createPcjTable(accumuloConn, instance1_table2, varOrders, sparql);
pcjs.createPcjTable(accumuloConn, instance1_table3, varOrders, sparql);
pcjs.createPcjTable(accumuloConn, instance2_table1, varOrders, sparql);
// Ensure all of the names have been stored for instance 1 and 2.
final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3);
final Set<String> instance1Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance1) );
assertEquals(expected1, instance1Tables);
final Set<String> expected2 = Sets.newHashSet(instance2_table1);
final Set<String> instance2Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance2) );
assertEquals(expected2, instance2Tables);
}
@Test
public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
final String sparql =
"SELECT ?name ?age " +
"{" +
"FILTER(?age < 30) ." +
"?name <http://hasAge> ?age." +
"?name <http://playsSport> \"Soccer\" " +
"}";
final Connector accumuloConn = cluster.getConnector();
// Create a PCJ table in the Mini Accumulo.
final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
// Add a few results to the PCJ table.
final MapBindingSet alice = new MapBindingSet();
alice.addBinding("name", VF.createIRI("http://Alice"));
alice.addBinding("age", VF.createLiteral(BigInteger.valueOf(14)));
final MapBindingSet bob = new MapBindingSet();
bob.addBinding("name", VF.createIRI("http://Bob"));
bob.addBinding("age", VF.createLiteral(BigInteger.valueOf(16)));
final MapBindingSet charlie = new MapBindingSet();
charlie.addBinding("name", VF.createIRI("http://Charlie"));
charlie.addBinding("age", VF.createLiteral(BigInteger.valueOf(12)));
pcjs.addResults(accumuloConn, pcjTableName, Sets.newHashSet(
new VisibilityBindingSet(alice),
new VisibilityBindingSet(bob),
new VisibilityBindingSet(charlie)));
// Make sure the cardinality was updated.
PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
assertEquals(3, metadata.getCardinality());
// Purge the data.
pcjs.purgePcjTable(accumuloConn, pcjTableName);
// Make sure the cardinality was updated to 0.
metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
assertEquals(0, metadata.getCardinality());
}
@Test
public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException {
final Connector accumuloConn = cluster.getConnector();
// Create a PCJ index.
final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj");
final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") );
final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
final PcjTables pcjs = new PcjTables();
pcjs.createPcjTable(accumuloConn, tableName, varOrders, sparql);
// Fetch its metadata to show that it has actually been created.
final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
assertEquals(expectedMetadata, metadata);
// Drop it.
pcjs.dropPcjTable(accumuloConn, tableName);
// Show the metadata is no longer present.
PCJStorageException tableDoesNotExistException = null;
try {
metadata = pcjs.getPcjMetadata(accumuloConn, tableName);
} catch(final PCJStorageException e) {
tableDoesNotExistException = e;
}
assertNotNull(tableDoesNotExistException);
}
/**
* Scan accumulo for the results that are stored in a PCJ table. The
* multimap stores a set of deserialized binding sets that were in the PCJ
* table for every variable order that is found in the PCJ metadata.
*/
private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
final Multimap<String, BindingSet> fetchedResults = HashMultimap.create();
// Get the variable orders the data was written to.
final PcjTables pcjs = new PcjTables();
final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName);
// Scan Accumulo for the stored results.
for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations());
scanner.fetchColumnFamily( new Text(varOrder.toString()) );
for(final Entry<Key, Value> entry : scanner) {
final byte[] serializedResult = entry.getKey().getRow().getBytes();
final BindingSet result = converter.convert(serializedResult, varOrder);
fetchedResults.put(varOrder.toString(), result);
}
}
return fetchedResults;
}
}