blob: 0837e5c76967f65769f1dcd2ba5333aa9c510b98 [file] [log] [blame]
package org.apache.rya.accumulo.pig;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.query.strategy.ByteRange;
import org.apache.rya.api.query.strategy.TriplePatternStrategy;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.rdftriplestore.inference.InferenceEngine;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.QueryParser;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
/**
*/
public class StatementPatternStorage extends AccumuloStorage {
private static final Log logger = LogFactory.getLog(StatementPatternStorage.class);
protected TABLE_LAYOUT layout;
protected String subject = "?s";
protected String predicate = "?p";
protected String object = "?o";
protected String context;
private Value subject_value;
private Value predicate_value;
private Value object_value;
private RyaTripleContext ryaContext;
/**
* whether to turn inferencing on or off
*/
private boolean infer = false;
public StatementPatternStorage() {
if (super.conf != null){
ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(super.conf));
}
else {
ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
}
}
private Value getValue(Var subjectVar) {
return subjectVar.hasValue() ? subjectVar.getValue() : null;
}
@Override
public void setLocation(String location, Job job) throws IOException {
super.setLocation(location, job);
}
@Override
protected void setLocationFromUri(String uri, Job job) throws IOException {
super.setLocationFromUri(uri, job);
// ex: accumulo://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c&context=c&infer=true
addStatementPatternRange(subject, predicate, object, context);
if (infer) {
addInferredRanges(table, job);
}
if (layout == null || ranges.size() == 0) {
throw new IllegalArgumentException("Range and/or layout is null. Check the query");
}
table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table);
tableName = new Text(table);
}
@Override
protected void addLocationFromUriPart(String[] pair) {
if (pair[0].equals("subject")) {
this.subject = pair[1];
} else if (pair[0].equals("predicate")) {
this.predicate = pair[1];
} else if (pair[0].equals("object")) {
this.object = pair[1];
} else if (pair[0].equals("context")) {
this.context = pair[1];
} else if (pair[0].equals("infer")) {
this.infer = Boolean.parseBoolean(pair[1]);
}
}
protected void addStatementPatternRange(String subj, String pred, String obj, String ctxt) throws IOException {
logger.info("Adding statement pattern[subject:" + subj + ", predicate:" + pred + ", object:" + obj + ", context:" + ctxt + "]");
StringBuilder sparqlBuilder = new StringBuilder();
sparqlBuilder.append("select * where {\n");
if (ctxt != null) {
/**
* select * where {
GRAPH ?g {
<http://www.example.org/exampleDocument#Monica> ?p ?o.
}
}
*/
sparqlBuilder.append("GRAPH ").append(ctxt).append(" {\n");
}
sparqlBuilder.append(subj).append(" ").append(pred).append(" ").append(obj).append(".\n");
if (ctxt != null) {
sparqlBuilder.append("}\n");
}
sparqlBuilder.append("}\n");
String sparql = sparqlBuilder.toString();
if (logger.isDebugEnabled()) {
logger.debug("Sparql statement range[" + sparql + "]");
}
QueryParser parser = new SPARQLParser();
ParsedQuery parsedQuery = null;
try {
parsedQuery = parser.parseQuery(sparql, null);
} catch (MalformedQueryException e) {
throw new IOException(e);
}
parsedQuery.getTupleExpr().visitChildren(new AbstractQueryModelVisitor<IOException>() {
@Override
public void meet(StatementPattern node) throws IOException {
Var subjectVar = node.getSubjectVar();
Var predicateVar = node.getPredicateVar();
Var objectVar = node.getObjectVar();
subject_value = getValue(subjectVar);
predicate_value = getValue(predicateVar);
object_value = getValue(objectVar);
Var contextVar = node.getContextVar();
Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, object_value);
layout = temp.getKey();
Range range = temp.getValue();
addRange(range);
if (contextVar != null && contextVar.getValue() != null) {
String context_str = contextVar.getValue().stringValue();
addColumnPair(context_str, "");
}
}
});
}
protected Map.Entry<TABLE_LAYOUT, Range> createRange(Value s_v, Value p_v, Value o_v) throws IOException {
RyaURI subject_rya = RdfToRyaConversions.convertResource((Resource) s_v);
RyaURI predicate_rya = RdfToRyaConversions.convertURI((IRI) p_v);
RyaType object_rya = RdfToRyaConversions.convertValue(o_v);
TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null);
if (strategy == null) {
return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range());
}
Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null);
ByteRange byteRange = entry.getValue();
return new RdfCloudTripleStoreUtils.CustomEntry<org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>(
entry.getKey(), new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()))
);
}
protected void addInferredRanges(String tablePrefix, Job job) throws IOException {
logger.info("Adding inferences to statement pattern[subject:" + subject_value + ", predicate:" + predicate_value + ", object:" + object_value + "]");
//inference engine
AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
InferenceEngine inferenceEngine = new InferenceEngine();
try {
AccumuloRdfConfiguration rdfConf = new AccumuloRdfConfiguration(job.getConfiguration());
rdfConf.setTablePrefix(tablePrefix);
ryaDAO.setConf(rdfConf);
try {
if (!mock) {
ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, userP.getBytes(StandardCharsets.UTF_8)));
} else {
ryaDAO.setConnector(new MockInstance(inst).getConnector(user, userP.getBytes(StandardCharsets.UTF_8)));
}
} catch (Exception e) {
throw new IOException(e);
}
ryaDAO.init();
inferenceEngine.setConf(rdfConf);
inferenceEngine.setRyaDAO(ryaDAO);
inferenceEngine.setSchedule(false);
inferenceEngine.init();
//is it subclassof or subpropertyof
if (RDF.TYPE.equals(predicate_value)) {
//try subclassof
Collection<IRI> parents = InferenceEngine.findParents(inferenceEngine.getSubClassOfGraph(), (IRI) object_value);
if (parents != null && parents.size() > 0) {
//subclassof relationships found
//don't add self, that will happen anyway later
//add all relationships
for (IRI parent : parents) {
Map.Entry<TABLE_LAYOUT, Range> temp =
createRange(subject_value, predicate_value, parent);
Range range = temp.getValue();
if (logger.isDebugEnabled()) {
logger.debug("Found subClassOf relationship [type:" + object_value + " is subClassOf:" + parent + "]");
}
addRange(range);
}
}
} else if (predicate_value != null) {
//subpropertyof check
Set<IRI> parents = InferenceEngine.findParents(inferenceEngine.getSubPropertyOfGraph(), (IRI) predicate_value);
for (IRI parent : parents) {
Map.Entry<TABLE_LAYOUT, Range> temp =
createRange(subject_value, parent, object_value);
Range range = temp.getValue();
if (logger.isDebugEnabled()) {
logger.debug("Found subPropertyOf relationship [type:" + predicate_value + " is subPropertyOf:" + parent + "]");
}
addRange(range);
}
}
} catch (Exception e) {
logger.error("Exception in adding inferred ranges", e);
throw new IOException(e);
} finally {
if (inferenceEngine != null) {
try {
inferenceEngine.destroy();
} catch (InferenceEngineException e) {
logger.error("Exception closing InferenceEngine", e);
}
}
if (ryaDAO != null) {
try {
ryaDAO.destroy();
} catch (RyaDAOException e) {
logger.error("Exception closing ryadao", e);
}
}
}
}
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
Key key = reader.getCurrentKey();
org.apache.accumulo.core.data.Value value = reader.getCurrentValue();
ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes());
RyaStatement ryaStatement = ryaContext.deserializeTriple(layout, new TripleRow(key.getRow().getBytes(),
key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes()));
Tuple tuple = TupleFactory.getInstance().newTuple(7);
tuple.set(0, ryaStatement.getSubject().getData());
tuple.set(1, ryaStatement.getPredicate().getData());
tuple.set(2, ryaStatement.getObject().getData());
tuple.set(3, (ryaStatement.getContext() != null) ? (ryaStatement.getContext().getData()) : (null));
tuple.set(4, ryaStatement.getSubject().getDataType());
tuple.set(5, ryaStatement.getPredicate().getDataType());
tuple.set(6, ryaStatement.getObject().getDataType());
return tuple;
}
} catch (Exception e) {
throw new IOException(e);
}
return null;
}
}