blob: d04712e6c010a95efedfada86a50a9c6c53ee41d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.storage.accumulo.integration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
import org.apache.rya.api.instance.RyaDetails.FreeTextIndexDetails;
import org.apache.rya.api.instance.RyaDetails.JoinSelectivityDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.apache.rya.api.instance.RyaDetailsRepository;
import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.test.accumulo.MiniAccumuloClusterInstance;
import org.apache.rya.test.accumulo.MiniAccumuloSingleton;
import org.apache.rya.test.accumulo.RyaTestInstanceRule;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
/**
* Integration tests the methods of {@link AccumuloPcjStorage}.
* </p>
* These tests ensures that the PCJ tables are maintained and that these operations
* also update the Rya instance's details.
*/
public class AccumuloPcjStorageIT {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@Rule
public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(ryaInstanceName -> {
// Create Rya Details for the instance name.
final MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance();
final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(cluster.getConnector(), ryaInstanceName);
final RyaDetails details = RyaDetails.builder()
.setRyaInstanceName(ryaInstanceName)
.setRyaVersion("0.0.0.0")
.setFreeTextDetails(new FreeTextIndexDetails(true))
.setEntityCentricIndexDetails(new EntityCentricIndexDetails(true))
//RYA-215 .setGeoIndexDetails( new GeoIndexDetails(true) )
.setTemporalIndexDetails(new TemporalIndexDetails(true))
.setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
.setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
.setProspectorDetails(new ProspectorDetails(Optional.absent()))
.build();
detailsRepo.initialize(details);
});
/**
* @return The {@link MiniAccumuloClusterInstance} used by the tests.
*/
private MiniAccumuloClusterInstance getClusterInstance() {
return MiniAccumuloSingleton.getInstance();
}
@Test
public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
// Ensure the Rya details have been updated to include the PCJ's ID.
final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
.getPCJIndexDetails()
.getPCJDetails();
final PCJDetails expectedDetails = PCJDetails.builder()
.setId( pcjId )
.build();
assertEquals(expectedDetails, detailsMap.get(pcjId));
}
}
@Test
public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
// Delete the PCJ that was just created.
pcjStorage.dropPcj(pcjId);
// Ensure the Rya details have been updated to no longer include the PCJ's ID.
final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
.getPCJIndexDetails()
.getPCJDetails();
assertFalse( detailsMap.containsKey(pcjId) );
}
}
@Test
public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a few PCJs and hold onto their IDs.
final List<String> expectedIds = new ArrayList<>();
String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
expectedIds.add( pcjId );
pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
expectedIds.add( pcjId );
pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
expectedIds.add( pcjId );
// Fetch the PCJ names
final List<String> pcjIds = pcjStorage.listPcjs();
// Ensure the expected IDs match the fetched IDs.
Collections.sort(expectedIds);
Collections.sort(pcjIds);
assertEquals(expectedIds, pcjIds);
}
}
@Test
public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
final String pcjId = pcjStorage.createPcj(sparql);
// Fetch the PCJ's metadata.
final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
// Ensure it has the expected values.
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
assertEquals(expectedMetadata, metadata);
}
}
@Test
public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
final String pcjId = pcjStorage.createPcj(sparql);
// Add some binding sets to it.
final Set<VisibilityBindingSet> results = new HashSet<>();
final MapBindingSet aliceBS = new MapBindingSet();
aliceBS.addBinding("a", VF.createIRI("http://Alice"));
aliceBS.addBinding("b", VF.createIRI("http://Person"));
results.add( new VisibilityBindingSet(aliceBS, "") );
final MapBindingSet charlieBS = new MapBindingSet();
charlieBS.addBinding("a", VF.createIRI("http://Charlie"));
charlieBS.addBinding("b", VF.createIRI("http://Comedian"));
results.add( new VisibilityBindingSet(charlieBS, "") );
pcjStorage.addResults(pcjId, results);
// Make sure the PCJ metadata was updated.
final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
assertEquals(expectedMetadata, metadata);
}
}
@Test
public void listResults() throws Exception {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
final String pcjId = pcjStorage.createPcj(sparql);
// Add some binding sets to it.
final Set<VisibilityBindingSet> storedResults = new HashSet<>();
final MapBindingSet aliceBS = new MapBindingSet();
aliceBS.addBinding("a", VF.createIRI("http://Alice"));
aliceBS.addBinding("b", VF.createIRI("http://Person"));
storedResults.add( new VisibilityBindingSet(aliceBS, "") );
final MapBindingSet charlieBS = new MapBindingSet();
charlieBS.addBinding("a", VF.createIRI("http://Charlie"));
charlieBS.addBinding("b", VF.createIRI("http://Comedian"));
storedResults.add( new VisibilityBindingSet(charlieBS, "") );
pcjStorage.addResults(pcjId, storedResults);
// List the results that were stored.
final Set<BindingSet> results = new HashSet<>();
try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
while(resultsIt.hasNext()) {
results.add( resultsIt.next() );
}
}
// The stored results are returned as normal binding sets, so unwrap them.
final Set<BindingSet> expectedResults = storedResults.stream()
.map(visBs -> visBs.getBindingSet() )
.collect(Collectors.toSet());
assertEquals(expectedResults, results);
}
}
@Test
public void purge() throws Exception {
// Setup the PCJ storage that will be tested against.
final Connector connector = getClusterInstance().getConnector();
final String ryaInstanceName = testInstance.getRyaInstanceName();
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
// Create a PCJ.
final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
final String pcjId = pcjStorage.createPcj(sparql);
// Add some binding sets to it.
final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
final MapBindingSet aliceBS = new MapBindingSet();
aliceBS.addBinding("a", VF.createIRI("http://Alice"));
aliceBS.addBinding("b", VF.createIRI("http://Person"));
expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
final MapBindingSet charlieBS = new MapBindingSet();
charlieBS.addBinding("a", VF.createIRI("http://Charlie"));
charlieBS.addBinding("b", VF.createIRI("http://Comedian"));
expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
pcjStorage.addResults(pcjId, expectedResults);
// Purge the PCJ.
pcjStorage.purge(pcjId);
// List the results that were stored.
final Set<BindingSet> results = new HashSet<>();
try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
while(resultsIt.hasNext()) {
results.add( resultsIt.next() );
}
}
assertTrue( results.isEmpty() );
// Make sure the PCJ metadata was updated.
final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
assertEquals(expectedMetadata, metadata);
}
}
}