blob: ac777aa0a7a8b1d524e934c92870a24a78579488 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.accumulo.core.data.Value;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.hadoop.io.Text;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
/**
* Store and format the various temporal index keys.
* Row Keys are in these two forms, where [x] denotes x is optional:
* rowkey = contraintPrefix datetime
* rowkey = datetime 0x/00 uniquesuffix
* contraintPrefix = 0x/00 hash([subject][predicate])
* uniquesuffix = some bytes to make it unique, like hash(statement).
*
* The instance is in one of two modes depending on the constructor:
* storage mode -- construct with a triple statement, get an iterator of keys to store.
* query mode -- construct with a statement and query constraints, get the key prefix to search.
*
* this has the flavor of an immutable object
* This is independent of the underlying database engine
*
* @author David.Lotts
*
*/
public class KeyParts implements Iterable<KeyParts> {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
private static final String CQ_S_P_AT = "spo";
private static final String CQ_P_AT = "po";
private static final String CQ_S_AT = "so";
private static final String CQ_O_AT = "o";
public static final String CQ_BEGIN = "begin";
public static final String CQ_END = "end";
public static final byte[] HASH_PREFIX = new byte[] {0};
public static final byte[] HASH_PREFIX_FOLLOWING = new byte[] {1};
public final Text cf;
public final Text cq;
public final Text constraintPrefix; // subject and/or predicate
final Text storeKey; // subject and/or predicate
final private TemporalInstant instant;
final private Statement statement;
final private boolean queryMode;
KeyParts(final Text constraintPrefix, final TemporalInstant instant, final String cf, final String cq) {
queryMode = true; // query mode
storeKey = null;
statement = null;
this.constraintPrefix = constraintPrefix;
this.instant = instant;
this.cf = new Text(cf);
this.cq = new Text(cq);
}
/**
* this is the value to index.
* @return
*/
public Value getValue() {
assert statement!=null;
return new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement)));
}
public KeyParts(final Statement statement, final TemporalInstant instant2) {
queryMode = false; // store mode
storeKey = null;
constraintPrefix = null;
this.statement = statement;
instant = instant2;
cf = null;
cq = null;
}
private KeyParts(final Text keyText, final Text cf, final Text cq, final Statement statement) {
queryMode = false; // store mode
constraintPrefix = null;
this.statement = statement;
instant = null;
storeKey = keyText;
this.cf = cf;
this.cq = cq;
}
@Override
public Iterator<KeyParts> iterator() {
final String[] strategies = new String[] {
CQ_O_AT, CQ_S_P_AT, CQ_P_AT, CQ_S_AT
} ; // CQ_END?
assert !queryMode : "iterator for queryMode is not immplemented" ;
if (queryMode) {
return null;
}
// if (!queryMode)
return new Iterator<KeyParts>() {
int nextStrategy = 0;
@Override
public boolean hasNext() {
return nextStrategy < strategies.length;
}
@Override
public KeyParts next() {
assert(statement!=null);
Text keyText = new Text();
// increment++ the next strategy AFTER getting the value
switch (nextStrategy++) {
case 0: // index o+hash(p+s)
assert (CQ_O_AT.equals(strategies[0]));
keyText = new Text(instant.getAsKeyBytes());
KeyParts.appendUniqueness(statement, keyText);
return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_O_AT), statement);
case 1:// index hash(s+p)+o
assert (CQ_S_P_AT.equals(strategies[1]));
KeyParts.appendSubjectPredicate(statement, keyText);
KeyParts.appendInstant(instant, keyText);
// appendUniqueness -- Not needed since it is already unique.
return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_S_P_AT), statement);
case 2: // index hash(p)+o
assert (CQ_P_AT.equals(strategies[2]));
KeyParts.appendPredicate(statement, keyText);
KeyParts.appendInstant(instant, keyText);
KeyParts.appendUniqueness(statement, keyText);
return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_P_AT), statement);
case 3: // index hash(s)+o
assert (CQ_S_AT.equals(strategies[3]));
KeyParts.appendSubject(statement, keyText);
KeyParts.appendInstant(instant, keyText);
KeyParts.appendUniqueness(statement, keyText);
return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_S_AT), statement);
}
throw new Error("Next passed end? No such nextStrategy="+(nextStrategy-1));
}
@Override
public void remove() {
throw new Error("Remove not Implemented.");
}
};
}
public byte[] getStoreKey() {
assert !queryMode : "must be in store Mode, store keys are not initialized.";
return storeKey.copyBytes();
}
/**
* Query key is the prefix plus the datetime, but no uniqueness at the end.
* @return the row key for range queries.
*/
public Text getQueryKey() {
return getQueryKey(instant);
}
/**
* Query key is the prefix plus the datetime, but no uniqueness at the end.
*
* @return the row key for range queries.
*/
public Text getQueryKey(final TemporalInstant theInstant) {
assert queryMode : "must be in query Mode, query keys are not initialized.";
final Text keyText = new Text();
if (constraintPrefix != null) {
appendBytes(constraintPrefix.copyBytes(), keyText);
}
appendInstant(theInstant, keyText);
return keyText;
}
@Override
public String toString() {
return "KeyParts [contraintPrefix=" + toHumanString(constraintPrefix) + ", instant=" + toHumanString(instant.getAsKeyBytes()) + ", cf=" + cf + ", cq=" + cq + "]";
}
private static void appendSubject(final Statement statement, final Text keyText) {
final Value statementValue = new Value(StatementSerializer.writeSubject(statement).getBytes(StandardCharsets.UTF_8));
final byte[] hashOfValue = uniqueFromValueForKey(statementValue);
appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte.
appendBytes(hashOfValue, keyText);
}
private static void appendPredicate(final Statement statement, final Text keyText) {
final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writePredicate(statement)));
final byte[] hashOfValue = uniqueFromValueForKey(statementValue);
appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte.
appendBytes(hashOfValue, keyText);
}
private static void appendInstant(final TemporalInstant instant, final Text keyText) {
final byte[] bytes = instant.getAsKeyBytes();
appendBytes(bytes, keyText);
}
private static void appendSubjectPredicate(final Statement statement, final Text keyText) {
final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeSubjectPredicate(statement)));
final byte[] hashOfValue = uniqueFromValueForKey(statementValue);
appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte.
appendBytes(hashOfValue, keyText);
}
/**
* Append any byte array to a row key.
* @param bytes append this
* @param keyText text to append to
*/
private static void appendBytes(final byte[] bytes, final Text keyText) {
keyText.append(bytes, 0, bytes.length);
}
/**
* Get a collision unlikely hash string and append to the key,
* so that if two keys have the same value, then they will be the same,
* if two different values that occur at the same time there keys are different.
* If the application uses a very large number of statements at the exact same time,
* the md5 value might be upgraded to for example sha-1 to avoid collisions.
* @param statement
* @param keyText
*/
public static void appendUniqueness(final Statement statement, final Text keyText) {
keyText.append(HASH_PREFIX, 0, 1); // delimiter
final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement)));
final byte[] hashOfValue = Md5Hash.md5Binary(statementValue);
keyText.append(hashOfValue, 0, hashOfValue.length);
}
/**
* Get a collision unlikely hash string to append to the key,
* so that if two keys have the same value, then they will be the same,
* if two different values that occur at the same time there keys are different.
* @param value
* @return
*/
private static byte[] uniqueFromValueForKey(final Value value) {
return Md5Hash.md5Binary(value);
}
/**
* List all the index keys to find for any query. Set the strategy via the column qualifier, ex: CQ_S_P_AT.
* Column Family (CF) is the context/named-graph.
* @param queryInstant
* @param contraints
* @return
*/
static public List<KeyParts> keyPartsForQuery(final TemporalInstant queryInstant, final StatementConstraints contraints) {
final List<KeyParts> keys = new LinkedList<KeyParts>();
final IRI urlNull = VF.createIRI("urn:null");
final Resource currentContext = contraints.getContext();
final boolean hasSubj = contraints.hasSubject();
if (contraints.hasPredicates()) {
for (final IRI nextPredicate : contraints.getPredicates()) {
final Text contraintPrefix = new Text();
final Statement statement = VF.createStatement(hasSubj ? contraints.getSubject() : urlNull, nextPredicate, urlNull, contraints.getContext());
if (hasSubj) {
appendSubjectPredicate(statement, contraintPrefix);
} else {
appendPredicate(statement, contraintPrefix);
}
keys.add(new KeyParts(contraintPrefix, queryInstant, (currentContext==null)?"":currentContext.toString(), hasSubj?CQ_S_P_AT:CQ_P_AT ));
}
}
else if (contraints.hasSubject()) { // and no predicates
final Text contraintPrefix = new Text();
final Statement statement = VF.createStatement(contraints.getSubject(), urlNull, urlNull);
appendSubject(statement, contraintPrefix);
keys.add( new KeyParts(contraintPrefix, queryInstant, (currentContext==null)?"":currentContext.toString(), CQ_S_AT) );
}
else {
// No constraints except possibly a context/named-graph, handled by the CF
keys.add( new KeyParts(null, queryInstant, (currentContext==null)?"":currentContext.toString(), CQ_O_AT) );
}
return keys;
}
/**
* convert a non-utf8 byte[] and text and value to string and show unprintable bytes as {xx} where x is hex.
* @param value
* @return Human readable representation.
*/
public static String toHumanString(final Value value) {
return toHumanString(value==null?null:value.get());
}
public static String toHumanString(final Text text) {
return toHumanString(text==null?null:text.copyBytes());
}
public static String toHumanString(final byte[] bytes) {
if (bytes==null) {
return "{null}";
}
final StringBuilder sb = new StringBuilder();
for (final byte b : bytes) {
if ((b > 0x7e) || (b < 32)) {
sb.append("{");
sb.append(Integer.toHexString( b & 0xff )); // Lop off the sign extended ones.
sb.append("}");
} else if (b == '{'||b == '}') { // Escape the literal braces.
sb.append("{");
sb.append((char)b);
sb.append("}");
} else {
sb.append((char)b);
}
}
return sb.toString();
}
}