blob: 731952a94d68f4eca0c9b12c1d9aed0f4f749ec9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.api;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Transaction;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.query.BatchRyaQuery;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.calrissian.mango.collect.CloseableIterable;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
* <p>
* This is a two phase process.
* <ol>
* <li>Setup metadata about each node of the query using a single Fluo transaction. </li>
* <li>Scan Rya for binding sets that match each Statement Pattern from the query
* and use a separate Fluo transaction for each batch that is inserted. This
* ensure historic triples will be included in the query's results.</li>
* </ol>
* After the first step is finished, any new Triples that are added to the Fluo
* application will be matched against statement patterns, the final results
* will percolate to the top of the query, and those results will be exported to
* Rya's query system.
*/
@DefaultAnnotation(NonNull.class)
public class CreateFluoPcj {
private static final Logger log = Logger.getLogger(CreateFluoPcj.class);
/**
* The default Statement Pattern batch insert size is 1000.
*/
private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
/**
* The default Join batch size is 5000.
*/
private static final int DEFAULT_JOIN_BATCH_SIZE = 5000;
/**
* The maximum number of binding sets that will be inserted into each Statement
* Pattern's result set per Fluo transaction.
*/
private final int spInsertBatchSize;
/**
* The maximum number of join results that will be processed per transaction.
*/
private final int joinBatchSize;
/**
* Constructs an instance of {@link CreateFluoPcj} that uses
* {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
*/
public CreateFluoPcj() {
this(DEFAULT_SP_INSERT_BATCH_SIZE, DEFAULT_JOIN_BATCH_SIZE);
}
/**
* Constructs an instance of {@link CreateFluoPcj}.
*
* @param spInsertBatchSize - The maximum number of binding sets that will be
* inserted into each Statement Pattern's result set per Fluo transaction.
*/
public CreateFluoPcj(final int spInsertBatchSize, final int joinBatchSize) {
checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
checkArgument(joinBatchSize > 0, "The Join batch size '" + joinBatchSize + "' must be greater than 0.");
this.spInsertBatchSize = spInsertBatchSize;
this.joinBatchSize = joinBatchSize;
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
* creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
* inside of Fluo. This method assumes that the user will export the results to Kafka
* according to the Kafka {@link ExportStrategy}.
*
* @param sparql - sparql query String to be registered with Fluo
* @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws UnsupportedQueryException
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
*/
public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
Preconditions.checkNotNull(sparql);
Preconditions.checkNotNull(fluo);
String pcjId = FluoQueryUtils.createNewPcjId();
return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluo);
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides
* no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
* creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
* inside of Fluo. Results are exported according to the Set of {@link ExportStrategy} enums. If
* the Rya ExportStrategy is specified, care should be taken to verify that the PCJ table exists.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param sparql - sparql query String to be registered with Fluo
* @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
* @throws UnsupportedQueryException
* @throws MalformedQueryException
*/
public FluoQuery createPcj(
final String pcjId,
final String sparql,
final Set<ExportStrategy> strategies,
final FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(sparql);
requireNonNull(strategies);
requireNonNull(fluo);
FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId, strategies);
writeFluoQuery(fluo, fluoQuery, pcjId);
return fluoQuery;
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an
* instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
* Results are exported to a PCJ table with the provided pcjId according to the Rya
* {@link ExportStrategy}.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param pcjStorage - Provides access to the PCJ index. (not null)
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws UnsupportedQueryException
*/
public FluoQuery createPcj(
final String pcjId,
final PrecomputedJoinStorage pcjStorage,
final FluoClient fluo) throws MalformedQueryException, PcjException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(pcjStorage);
requireNonNull(fluo);
// Parse the query's structure for the metadata that will be written to fluo.
final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
final String sparql = pcjMetadata.getSparql();
return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
}
private FluoQuery makeFluoQuery(String sparql, String pcjId, Set<ExportStrategy> strategies) throws MalformedQueryException, UnsupportedQueryException {
String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder()
.setExportStrategies(strategies)
.setFluoQueryId(queryId)
.setSparql(sparql)
.setJoinBatchSize(joinBatchSize);
FluoQuery query = builder.build();
if(query.getQueryType() == QueryType.PERIODIC && !Sets.newHashSet(ExportStrategy.PERIODIC).containsAll(strategies)) {
throw new UnsupportedQueryException("Periodic Queries must only utilize the PeriodicExport or the NoOpExport ExportStrategy.");
}
if(query.getQueryType() != QueryType.PERIODIC && strategies.contains(ExportStrategy.PERIODIC)) {
throw new UnsupportedQueryException("Only Periodic Queries can utilize the PeriodicExport ExportStrategy.");
}
return query;
}
private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
try (Transaction tx = fluo.newTransaction()) {
// Write the query's structure to Fluo.
new FluoQueryMetadataDAO().write(tx, fluoQuery);
// Flush the changes to Fluo.
tx.commit();
}
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ.
* <p>
* This call scans Rya for Statement Pattern matches and inserts them into
* the Fluo application. It is assumed that results for any query registered
* using this method will be exported to Kafka according to the Kafka {@link ExportStrategy}.
*
* @param sparql - sparql query that will registered with Fluo. (not null)
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @param accumulo - Accumulo connector for connecting with Accumulo
* @param ryaInstance - Name of Rya instance to connect to
* @return The Fluo application's Query ID of the query that was created.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws UnsupportedQueryException
*/
public String withRyaIntegration(
final String sparql,
final FluoClient fluo,
final Connector accumulo,
final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
requireNonNull(sparql);
requireNonNull(fluo);
requireNonNull(accumulo);
requireNonNull(ryaInstance);
// Write the SPARQL query's structure to the Fluo Application.
final FluoQuery fluoQuery = createPcj(sparql, fluo);
//import results already ingested into Rya that match query
importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
// return queryId to the caller for later monitoring from the export.
return fluoQuery.getQueryMetadata().getNodeId();
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ.
* <p>
* This call scans Rya for Statement Pattern matches and inserts them into
* the Fluo application. This method does not verify that a PcjTable with the
* the given pcjId actually exists, so one should verify that the table exists before
* using the Rya ExportStrategy. Results will be exported according to the Set of
* {@link ExportStrategy} enums.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param sparql - sparql query that will registered with Fluo. (not null)
* @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @param accumulo - Accumulo connector for connecting with Accumulo
* @param ryaInstance - name of Rya instance to connect to
* @return FluoQuery containing the metadata for the newly registered SPARQL query
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws UnsupportedQueryException
*/
public FluoQuery withRyaIntegration(
final String pcjId,
final String sparql,
final Set<ExportStrategy> strategies,
final FluoClient fluo,
final Connector accumulo,
final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(sparql);
requireNonNull(fluo);
requireNonNull(accumulo);
requireNonNull(ryaInstance);
// Write the SPARQL query's structure to the Fluo Application.
final FluoQuery fluoQuery = createPcj(pcjId, sparql, strategies, fluo);
//import results already ingested into Rya that match query
importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
// return queryId to the caller for later monitoring from the export.
return fluoQuery;
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ.
* <p>
* This call scans Rya for Statement Pattern matches and inserts them into
* the Fluo application. The Fluo application will then maintain the intermediate
* results as new triples are inserted and export any new query results to the
* {@code pcjId} within the provided {@code pcjStorage}. This method requires that a
* PCJ table already exist for the query corresponding to the pcjId. By default, results will be exported
* to this table according to the Rya {@link ExportStrategy}.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param pcjStorage - Provides access to the PCJ index. (not null)
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @param accumulo - Accumuo connector for connecting to Accumulo
* @param ryaInstance - name of Rya instance to connect to
* @return FluoQuery containing the metadata for the newly registered SPARQL query
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws UnsupportedQueryException
*/
public FluoQuery withRyaIntegration(
final String pcjId,
final PrecomputedJoinStorage pcjStorage,
final FluoClient fluo,
final Connector accumulo,
final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(pcjStorage);
requireNonNull(fluo);
requireNonNull(accumulo);
requireNonNull(ryaInstance);
// Parse the query's structure for the metadata that will be written to fluo.
final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
final String sparql = pcjMetadata.getSparql();
return withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo, accumulo, ryaInstance);
}
private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
throws RyaDAOException {
// Reuse the same set object while performing batch inserts.
final Set<RyaStatement> queryBatch = new HashSet<>();
// Iterate through each of the statement patterns and insert their
// historic matches into Fluo.
for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
// Get an iterator over all of the binding sets that match the
// statement pattern.
final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
queryBatch.add(spToRyaStatement(pattern));
}
// Create AccumuloRyaQueryEngine to query for historic results
final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
conf.setTablePrefix(ryaInstance);
conf.setAuths(getAuths(accumulo));
try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) {
final Set<RyaStatement> triplesBatch = new HashSet<>();
// Insert batches of the binding sets into Fluo.
for (final RyaStatement ryaStatement : queryIterable) {
if (triplesBatch.size() == spInsertBatchSize) {
writeBatch(fluo, triplesBatch);
triplesBatch.clear();
}
triplesBatch.add(ryaStatement);
}
if (!triplesBatch.isEmpty()) {
writeBatch(fluo, triplesBatch);
triplesBatch.clear();
}
} catch (final IOException e) {
log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
}
}
private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {
checkNotNull(fluo);
checkNotNull(batch);
new InsertTriples().insert(fluo, batch);
}
private static RyaStatement spToRyaStatement(final StatementPattern sp) {
final Value subjVal = sp.getSubjectVar().getValue();
final Value predVal = sp.getPredicateVar().getValue();
final Value objVal = sp.getObjectVar().getValue();
RyaURI subjURI = null;
RyaURI predURI = null;
RyaType objType = null;
if(subjVal != null) {
if(!(subjVal instanceof Resource)) {
throw new AssertionError("Subject must be a Resource.");
}
subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
}
if (predVal != null) {
if(!(predVal instanceof IRI)) {
throw new AssertionError("Predicate must be a URI.");
}
predURI = RdfToRyaConversions.convertURI((IRI) predVal);
}
if (objVal != null ) {
objType = RdfToRyaConversions.convertValue(objVal);
}
return new RyaStatement(subjURI, predURI, objType);
}
private String[] getAuths(final Connector accumulo) {
Authorizations auths;
try {
auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
final List<byte[]> authList = auths.getAuthorizations();
final String[] authArray = new String[authList.size()];
for(int i = 0; i < authList.size(); i++){
authArray[i] = new String(authList.get(i), "UTF-8");
}
return authArray;
} catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) {
throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
}
}
}