blob: aa0f6155fbea947dac3f906463324a8d740aa6e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.accumulo.mr.merge.util;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.hadoop.io.Text;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.query.strategy.ByteRange;
import org.apache.rya.api.query.strategy.TriplePatternStrategy;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.utils.NullableStatementImpl;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
/**
* A {@link QueryRuleset} that additionally maps rules to ranges in Accumulo tables. Also enables
* copying one or more entire tables, independent of the query-derived rules.
*/
public class AccumuloQueryRuleset extends QueryRuleset {
private final Map<TABLE_LAYOUT, List<Range>> tableRanges = new HashMap<>();
private final List<String> entireTables = new LinkedList<String>();
private final RyaTripleContext ryaContext;
/**
* Constructs the ruleset and the associated Ranges, given a Configuration that contains a SPARQL query.
* @param conf Configuration including the query and connection information.
* @throws IOException if the range can't be resolved
* @throws QueryRulesetException if the query can't be translated to valid rules
*/
public AccumuloQueryRuleset(final RdfCloudTripleStoreConfiguration conf) throws IOException, QueryRulesetException {
// Extract StatementPatterns and conditions from the query
super(conf);
// Turn StatementPatterns into Ranges
ryaContext = RyaTripleContext.getInstance(conf);
for (final CopyRule rule : rules) {
final StatementPattern sp = rule.getStatement();
final Map.Entry<TABLE_LAYOUT, ByteRange> entry = getRange(sp);
final TABLE_LAYOUT layout = entry.getKey();
final ByteRange byteRange = entry.getValue();
final Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()));
if (!tableRanges.containsKey(layout)) {
tableRanges.put(layout, new LinkedList<Range>());
}
tableRanges.get(layout).add(range);
}
}
/**
* Turn a single StatementPattern into a Range.
* @param conf
* @throws IOException if the range can't be resolved
*/
private Map.Entry<TABLE_LAYOUT, ByteRange> getRange(final StatementPattern sp) throws IOException {
final Var context = sp.getContextVar();
final Statement stmt = new NullableStatementImpl((Resource) sp.getSubjectVar().getValue(),
(IRI) sp.getPredicateVar().getValue(), sp.getObjectVar().getValue(),
context == null ? null : (Resource) context.getValue());
final RyaStatement rs = RdfToRyaConversions.convertStatement(stmt);
final TriplePatternStrategy strategy = ryaContext.retrieveStrategy(rs);
final Map.Entry<TABLE_LAYOUT, ByteRange> entry =
strategy.defineRange(rs.getSubject(), rs.getPredicate(), rs.getObject(), rs.getContext(), conf);
return entry;
}
/**
* Add an instruction to select an entire table, with no restricting rule.
*/
public void addTable(final String tableName) {
entireTables.add(tableName);
}
/**
* Get table names and input configurations for each range
* @return A Map representing each table and {@link InputTableConfig} needed to get all the rows that match the rules.
*/
public Map<String, InputTableConfig> getInputConfigs() {
final Map<String, InputTableConfig> configs = new HashMap<>();
for (final TABLE_LAYOUT layout : tableRanges.keySet()) {
final String parentTable = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, conf.getTablePrefix());
final InputTableConfig config = new InputTableConfig();
config.setRanges(tableRanges.get(layout));
configs.put(parentTable, config);
}
for (final String tableName : entireTables) {
final InputTableConfig config = new InputTableConfig();
final List<Range> ranges = new LinkedList<>();
ranges.add(new Range());
config.setRanges(ranges);
configs.put(tableName, config);
}
return configs;
}
/**
* Get the rules that apply to all statements within a Range. The range may not
* contain every row relevant to the associated rule(s), but every row within the
* range is relevant to the rule(s).
* @param layout Defines which table the range is meant to scan
* @param range The Range of rows in that table
* @return Any rules in this ruleset that match the given table and contain the given range
* @throws IOException if the Range can't be resolved
*/
public List<CopyRule> getRules(final TABLE_LAYOUT layout, final Range range) throws IOException {
final List<CopyRule> matchingRules = new LinkedList<>();
for (final CopyRule rule : rules) {
// Compare the rule to the given range
final Map.Entry<TABLE_LAYOUT, ByteRange> entry = getRange(rule.getStatement());
final TABLE_LAYOUT ruleLayout = entry.getKey();
// If they apply to different tables, they are unrelated.
if (!ruleLayout.equals(layout)) {
continue;
}
// If the given range is contained in (or equal to) the rule's range, then the
// rule matches and should be included.
final ByteRange byteRange = entry.getValue();
final Range ruleRange = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd()));
if (rangeContainsRange(ruleRange, range)) {
matchingRules.add(rule);
}
}
return matchingRules;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
for (final String fullTableName : entireTables) {
sb.append("\n\tCopy entire table ").append(fullTableName).append("\n");
}
return sb.toString();
}
private static boolean rangeContainsRange(final Range r1, final Range r2) {
// 1. If r1.start is infinite, r1 contains r2.start
if (!r1.isInfiniteStartKey()) {
// 2. Otherwise, if r2.start is infinite, r1 can't contain r2
if (r2.isInfiniteStartKey()) {
return false;
}
final Key start2 = r2.getStartKey();
// 3. If r2 is inclusive, r1 needs to contain r2's start key.
if (r2.isStartKeyInclusive()) {
if (!r1.contains(start2)) {
return false;
}
}
// 4. Otherwise, the only failure is if r2's start key comes first (they can be equal)
else if (start2.compareTo(r1.getStartKey()) < 0) {
return false;
}
}
// Similar logic for end points
if (!r1.isInfiniteStopKey()) {
if (r2.isInfiniteStopKey()) {
return false;
}
final Key end2 = r2.getEndKey();
if (r2.isEndKeyInclusive()) {
if (!r1.contains(end2)) {
return false;
}
}
else if (end2.compareTo(r1.getEndKey()) > 0) {
return false;
}
}
return true;
}
}