blob: 751cc6e36f96dc221d23e1580a521eac19af192a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM;
import java.util.Map;
import java.util.Set;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.VarNameUtils;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
/**
* An observer that matches new Triples to the Statement Patterns that are part
* of any PCJ that is being maintained. If the triple matches a pattern, then
* the new result is stored as a binding set for the pattern.
*/
public class TripleObserver extends AbstractObserver {
private static final Logger log = LoggerFactory.getLogger(TripleObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
private final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache();
private final StatementPatternIdCache SP_ID_CACHE = StatementPatternIdCacheSupplier.getOrCreateCache();
private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter();
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.TRIPLES, NotificationType.STRONG);
}
@Override
public void process(final TransactionBase tx, final Bytes brow, final Column column) {
// Get string representation of triple.
final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement);
log.trace("Beginging to process triple.");
final String triple = IncUpdateDAO.getTripleString(ryaStatement);
Set<String> spIDs = SP_ID_CACHE.getStatementPatternIds(tx);
//see if triple matches conditions of any of the SP
for (String spID: spIDs) {
// Fetch its metadata.
final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
log.trace("Retrieved metadata: {}", spMetadata);
// Attempt to match the triple against the pattern.
final String pattern = spMetadata.getStatementPattern();
final VariableOrder varOrder = spMetadata.getVariableOrder();
final String bindingSetString = getBindingSet(triple, pattern, varOrder);
log.trace("Created binding set match string: {}", bindingSetString);
// Statement matches to a binding set.
if(bindingSetString.length() != 0) {
// Fetch the triple's visibility label.
final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, "");
//Make BindingSet and sharded row
final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder);
visBindingSet.setVisibility(visibility);
Bytes row = BindingHashShardingFunction.addShard(spID, varOrder, visBindingSet);
// If this is a new Binding Set, then emit it.
if(tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) {
try {
final Bytes valueBytes = BS_SERDE.serialize(visBindingSet);
log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n",
tx.getStartTimestamp(), spID, visBindingSet);
tx.set(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
} catch(final Exception e) {
log.error("Couldn't serialize a Binding Set. This value will be skipped.", e);
}
}
}
}
// Once the triple has been handled, it may be deleted.
tx.delete(brow, column);
}
/**
* Determines whether a triple matches a Statement Pattern. If so, it generates a string representation of a
* BindingSet whose order is determined by varOrder.
*
* @param triple - The triple to consider.
* @param pattern - The pattern the triple must match.
* @param varOrder - The variable order of the Binding Set String that is produced by this method.
* @return The string representation of a Binding Set that is generated by matching the triple to the pattern;
* otherwise an empty string if the pattern couldn't be matched.
*/
private static String getBindingSet(final String triple, final String pattern, final VariableOrder varOrder) {
final String[] patternArray = pattern.split(DELIM);
final String[] tripleArray = triple.split(DELIM);
final String[] varOrderArray = varOrder.toArray();
final Map<String,String> bindingValues = Maps.newHashMap();
if(patternArray.length != 3 || tripleArray.length != 3) {
throw new IllegalArgumentException("Invald number of components");
}
// Extract the binding names and values.
for(int i = 0; i < 3; i ++) {
if(VarNameUtils.isConstant(patternArray[i])) {
// If a constant value does not match, then the triple does not match the pattern.
if(!patternArray[i].substring(7).equals(tripleArray[i])) {
return "";
}
} else{
bindingValues.put(patternArray[i], tripleArray[i]);
}
}
// Create the returned binding set string from the extracted values.
String bindingSetString = "";
for (final String bindingName : varOrderArray) {
if(bindingSetString.length() == 0) {
bindingSetString = bindingValues.get(bindingName);
} else {
bindingSetString = bindingSetString + DELIM + bindingValues.get(bindingName);
}
}
return bindingSetString;
}
}