blob: 889c8caf30ac9cb84b0dfae4e686fd725bc1f49f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.storage.accumulo;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.AbstractAggregateOperator;
import org.eclipse.rdf4j.query.algebra.ExtensionElem;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import com.google.common.base.Preconditions;
/**
* This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for
* creating, deleting, and interacting with tables where PeriodicQuery results are stored.
*/
public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage {
private final String ryaInstance;
private final Connector accumuloConn;
private Authorizations auths;
private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
private static final PcjTables pcjTables = new PcjTables();
private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory();
/**
* Creates a AccumuloPeriodicQueryResultStorage Object.
* @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance
* @param ryaInstance - Rya Instance name for connecting to Rya
*/
public AccumuloPeriodicQueryResultStorage(final Connector accumuloConn, final String ryaInstance) {
this.accumuloConn = Preconditions.checkNotNull(accumuloConn);
this.ryaInstance = Preconditions.checkNotNull(ryaInstance);
final String user = accumuloConn.whoami();
try {
this.auths = accumuloConn.securityOperations().getUserAuthorizations(user);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new RuntimeException("Unable access user: " + user + "authorizations.");
}
}
@Override
public String createPeriodicQuery(final String sparql) throws PeriodicQueryStorageException {
Preconditions.checkNotNull(sparql);
final String queryId = pcjIdFactory.nextId();
return createPeriodicQuery(queryId, sparql);
}
@Override
public String createPeriodicQuery(final String queryId, final String sparql) throws PeriodicQueryStorageException {
Set<String> bindingNames;
try {
bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql);
} catch (final MalformedQueryException e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
final List<String> varOrderList = new ArrayList<>();
varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId);
varOrderList.addAll(bindingNames);
createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList));
return queryId;
}
@Override
public void createPeriodicQuery(final String queryId, final String sparql, final VariableOrder order) throws PeriodicQueryStorageException {
Preconditions.checkNotNull(sparql);
Preconditions.checkNotNull(queryId);
Preconditions.checkNotNull(order);
Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)),
"periodicBinId binding name must occur first in VariableOrder.");
final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
final Set<VariableOrder> varOrders = new HashSet<>();
varOrders.add(order);
try {
pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql);
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
@Override
public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(final String queryId) throws PeriodicQueryStorageException {
try {
return new PeriodicQueryStorageMetadata(
pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)));
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
@Override
public void addPeriodicQueryResults(final String queryId, final Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId),
"BindingSet must contain periodBinId binding."));
try {
pcjTables.addResults(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId), results);
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
@Override
public void deletePeriodicQueryResults(final String queryId, final long binId) throws PeriodicQueryStorageException {
final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
BatchDeleter deleter = null;
try {
final Text prefix = getRowPrefix(binId);
deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig());
deleter.setRanges(Collections.singleton(Range.prefix(prefix)));
deleter.delete();
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
} finally {
try {
if(deleter != null) {
deleter.close();
}
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
}
public void deletePeriodicQueryResults(final String queryId) throws PeriodicQueryStorageException {
try {
pcjTables.purgePcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
@Override
public void deletePeriodicQuery(final String queryId) throws PeriodicQueryStorageException {
try {
pcjTables.dropPcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
} catch (final Exception e) {
throw new PeriodicQueryStorageException(e.getMessage());
}
}
@Override
public CloseableIterator<BindingSet> listResults(final String queryId, final Optional<Long> binId)
throws PeriodicQueryStorageException {
requireNonNull(queryId);
final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
// Fetch the Variable Orders for the binding sets and choose one of
// them. It
// doesn't matter which one we choose because they all result in the
// same output.
final PeriodicQueryStorageMetadata metadata = getPeriodicQueryMetadata(queryId);
final VariableOrder varOrder = metadata.getVariableOrder();
try {
// Fetch only the Binding Sets whose Variable Order matches the
// selected one.
final Scanner scanner = accumuloConn.createScanner(tableName, auths);
scanner.fetchColumnFamily(new Text(varOrder.toString()));
if (binId.isPresent()) {
scanner.setRange(Range.prefix(getRowPrefix(binId.get())));
}
return new AccumuloValueBindingSetIterator(scanner);
} catch (final Exception e) {
throw new PeriodicQueryStorageException(String.format("PCJ Table does not exist for name '%s'.", tableName), e);
}
}
private Text getRowPrefix(final long binId) throws BindingSetConversionException {
final ValueFactory vf = SimpleValueFactory.getInstance();
final QueryBindingSet bs = new QueryBindingSet();
bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(Long.toString(binId), XMLSchema.LONG));
return new Text(converter.convert(bs, new VariableOrder(PeriodicQueryResultStorage.PeriodicBinId)));
}
@Override
public List<String> listPeriodicTables() {
final List<String> periodicTables = new ArrayList<>();
final String periodicPrefix = ryaInstance + PeriodicQueryTableNameFactory.PeriodicTableSuffix;
boolean foundInstance = false;
for (final String tableName : accumuloConn.tableOperations().list()) {
if (tableName.startsWith(ryaInstance)) {
// This table is part of the target Rya instance.
foundInstance = true;
if (tableName.startsWith(periodicPrefix)) {
periodicTables.add(tableName);
}
} else if (foundInstance) {
// We have encountered the first table name that does not start
// with the rya instance name after those that do. Because the
// list is sorted, there can't be any more pcj tables for the
// target instance in the list.
break;
}
}
return periodicTables;
}
/**
* Class for removing any aggregate variables from the ProjectionElementList
* of the parsed SPARQL queries. This ensures that only non-aggregation
* values are contained in the Accumulo row. The non-aggregation variables
* are not updated while the aggregation variables are, so they are included in
* the serialized BindingSet in the Accumulo Value field, which is overwritten
* if an entry with the same Key and different Value (updated aggregation) is
* written to the table.
*
*/
static class AggregateVariableRemover extends AbstractQueryModelVisitor<RuntimeException> {
private Set<String> bindingNames;
public Set<String> getNonAggregationVariables(final String sparql) throws MalformedQueryException {
final TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
bindingNames = te.getBindingNames();
te.visit(this);
return bindingNames;
}
@Override
public void meet(final ExtensionElem node) {
if(node.getExpr() instanceof AbstractAggregateOperator) {
bindingNames.remove(node.getName());
}
}
}
}