| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.indexing; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.commons.codec.binary.StringUtils; |
| import org.apache.hadoop.io.Text; |
| import org.eclipse.rdf4j.model.IRI; |
| import org.eclipse.rdf4j.model.Resource; |
| import org.eclipse.rdf4j.model.Statement; |
| import org.eclipse.rdf4j.model.ValueFactory; |
| import org.eclipse.rdf4j.model.impl.SimpleValueFactory; |
| |
| /** |
| * Store and format the various temporal index keys. |
| * Row Keys are in these two forms, where [x] denotes x is optional: |
| * rowkey = contraintPrefix datetime |
| * rowkey = datetime 0x/00 uniquesuffix |
| * contraintPrefix = 0x/00 hash([subject][predicate]) |
| * uniquesuffix = some bytes to make it unique, like hash(statement). |
| * |
| * The instance is in one of two modes depending on the constructor: |
| * storage mode -- construct with a triple statement, get an iterator of keys to store. |
| * query mode -- construct with a statement and query constraints, get the key prefix to search. |
| * |
| * this has the flavor of an immutable object |
| * This is independent of the underlying database engine |
| * |
| * @author David.Lotts |
| * |
| */ |
| public class KeyParts implements Iterable<KeyParts> { |
| private static final ValueFactory VF = SimpleValueFactory.getInstance(); |
| |
| private static final String CQ_S_P_AT = "spo"; |
| private static final String CQ_P_AT = "po"; |
| private static final String CQ_S_AT = "so"; |
| private static final String CQ_O_AT = "o"; |
| public static final String CQ_BEGIN = "begin"; |
| public static final String CQ_END = "end"; |
| |
| public static final byte[] HASH_PREFIX = new byte[] {0}; |
| public static final byte[] HASH_PREFIX_FOLLOWING = new byte[] {1}; |
| |
| public final Text cf; |
| public final Text cq; |
| public final Text constraintPrefix; // subject and/or predicate |
| final Text storeKey; // subject and/or predicate |
| final private TemporalInstant instant; |
| final private Statement statement; |
| final private boolean queryMode; |
| KeyParts(final Text constraintPrefix, final TemporalInstant instant, final String cf, final String cq) { |
| queryMode = true; // query mode |
| storeKey = null; |
| statement = null; |
| this.constraintPrefix = constraintPrefix; |
| this.instant = instant; |
| this.cf = new Text(cf); |
| this.cq = new Text(cq); |
| } |
| |
| /** |
| * this is the value to index. |
| * @return |
| */ |
| public Value getValue() { |
| assert statement!=null; |
| return new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); |
| } |
| |
| public KeyParts(final Statement statement, final TemporalInstant instant2) { |
| queryMode = false; // store mode |
| storeKey = null; |
| constraintPrefix = null; |
| this.statement = statement; |
| instant = instant2; |
| cf = null; |
| cq = null; |
| } |
| |
| private KeyParts(final Text keyText, final Text cf, final Text cq, final Statement statement) { |
| queryMode = false; // store mode |
| constraintPrefix = null; |
| this.statement = statement; |
| instant = null; |
| storeKey = keyText; |
| this.cf = cf; |
| this.cq = cq; |
| } |
| |
| @Override |
| public Iterator<KeyParts> iterator() { |
| final String[] strategies = new String[] { |
| CQ_O_AT, CQ_S_P_AT, CQ_P_AT, CQ_S_AT |
| } ; // CQ_END? |
| assert !queryMode : "iterator for queryMode is not immplemented" ; |
| if (queryMode) { |
| return null; |
| } |
| |
| // if (!queryMode) |
| return new Iterator<KeyParts>() { |
| int nextStrategy = 0; |
| |
| @Override |
| public boolean hasNext() { |
| return nextStrategy < strategies.length; |
| } |
| |
| @Override |
| public KeyParts next() { |
| assert(statement!=null); |
| Text keyText = new Text(); |
| // increment++ the next strategy AFTER getting the value |
| switch (nextStrategy++) { |
| case 0: // index o+hash(p+s) |
| assert (CQ_O_AT.equals(strategies[0])); |
| keyText = new Text(instant.getAsKeyBytes()); |
| KeyParts.appendUniqueness(statement, keyText); |
| return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_O_AT), statement); |
| case 1:// index hash(s+p)+o |
| assert (CQ_S_P_AT.equals(strategies[1])); |
| KeyParts.appendSubjectPredicate(statement, keyText); |
| KeyParts.appendInstant(instant, keyText); |
| // appendUniqueness -- Not needed since it is already unique. |
| return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_S_P_AT), statement); |
| case 2: // index hash(p)+o |
| assert (CQ_P_AT.equals(strategies[2])); |
| KeyParts.appendPredicate(statement, keyText); |
| KeyParts.appendInstant(instant, keyText); |
| KeyParts.appendUniqueness(statement, keyText); |
| return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_P_AT), statement); |
| case 3: // index hash(s)+o |
| assert (CQ_S_AT.equals(strategies[3])); |
| KeyParts.appendSubject(statement, keyText); |
| KeyParts.appendInstant(instant, keyText); |
| KeyParts.appendUniqueness(statement, keyText); |
| return new KeyParts(keyText, new Text(StatementSerializer.writeContext(statement)), new Text(CQ_S_AT), statement); |
| } |
| throw new Error("Next passed end? No such nextStrategy="+(nextStrategy-1)); |
| |
| } |
| |
| @Override |
| public void remove() { |
| throw new Error("Remove not Implemented."); |
| } |
| }; |
| } |
| |
| public byte[] getStoreKey() { |
| assert !queryMode : "must be in store Mode, store keys are not initialized."; |
| return storeKey.copyBytes(); |
| } |
| |
| /** |
| * Query key is the prefix plus the datetime, but no uniqueness at the end. |
| * @return the row key for range queries. |
| */ |
| public Text getQueryKey() { |
| return getQueryKey(instant); |
| } |
| |
| /** |
| * Query key is the prefix plus the datetime, but no uniqueness at the end. |
| * |
| * @return the row key for range queries. |
| */ |
| public Text getQueryKey(final TemporalInstant theInstant) { |
| assert queryMode : "must be in query Mode, query keys are not initialized."; |
| final Text keyText = new Text(); |
| if (constraintPrefix != null) { |
| appendBytes(constraintPrefix.copyBytes(), keyText); |
| } |
| appendInstant(theInstant, keyText); |
| return keyText; |
| } |
| |
| @Override |
| public String toString() { |
| return "KeyParts [contraintPrefix=" + toHumanString(constraintPrefix) + ", instant=" + toHumanString(instant.getAsKeyBytes()) + ", cf=" + cf + ", cq=" + cq + "]"; |
| } |
| private static void appendSubject(final Statement statement, final Text keyText) { |
| final Value statementValue = new Value(StatementSerializer.writeSubject(statement).getBytes(StandardCharsets.UTF_8)); |
| final byte[] hashOfValue = uniqueFromValueForKey(statementValue); |
| appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte. |
| appendBytes(hashOfValue, keyText); |
| } |
| |
| private static void appendPredicate(final Statement statement, final Text keyText) { |
| final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writePredicate(statement))); |
| final byte[] hashOfValue = uniqueFromValueForKey(statementValue); |
| appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte. |
| appendBytes(hashOfValue, keyText); |
| } |
| |
| private static void appendInstant(final TemporalInstant instant, final Text keyText) { |
| final byte[] bytes = instant.getAsKeyBytes(); |
| appendBytes(bytes, keyText); |
| } |
| |
| private static void appendSubjectPredicate(final Statement statement, final Text keyText) { |
| final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeSubjectPredicate(statement))); |
| final byte[] hashOfValue = uniqueFromValueForKey(statementValue); |
| appendBytes(HASH_PREFIX, keyText); // prefix the hash with a zero byte. |
| appendBytes(hashOfValue, keyText); |
| } |
| |
| /** |
| * Append any byte array to a row key. |
| * @param bytes append this |
| * @param keyText text to append to |
| */ |
| private static void appendBytes(final byte[] bytes, final Text keyText) { |
| keyText.append(bytes, 0, bytes.length); |
| } |
| |
| /** |
| * Get a collision unlikely hash string and append to the key, |
| * so that if two keys have the same value, then they will be the same, |
| * if two different values that occur at the same time there keys are different. |
| * If the application uses a very large number of statements at the exact same time, |
| * the md5 value might be upgraded to for example sha-1 to avoid collisions. |
| * @param statement |
| * @param keyText |
| */ |
| public static void appendUniqueness(final Statement statement, final Text keyText) { |
| keyText.append(HASH_PREFIX, 0, 1); // delimiter |
| final Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); |
| final byte[] hashOfValue = Md5Hash.md5Binary(statementValue); |
| keyText.append(hashOfValue, 0, hashOfValue.length); |
| } |
| /** |
| * Get a collision unlikely hash string to append to the key, |
| * so that if two keys have the same value, then they will be the same, |
| * if two different values that occur at the same time there keys are different. |
| * @param value |
| * @return |
| */ |
| private static byte[] uniqueFromValueForKey(final Value value) { |
| return Md5Hash.md5Binary(value); |
| } |
| |
| /** |
| * List all the index keys to find for any query. Set the strategy via the column qualifier, ex: CQ_S_P_AT. |
| * Column Family (CF) is the context/named-graph. |
| * @param queryInstant |
| * @param contraints |
| * @return |
| */ |
| static public List<KeyParts> keyPartsForQuery(final TemporalInstant queryInstant, final StatementConstraints contraints) { |
| final List<KeyParts> keys = new LinkedList<KeyParts>(); |
| final IRI urlNull = VF.createIRI("urn:null"); |
| final Resource currentContext = contraints.getContext(); |
| final boolean hasSubj = contraints.hasSubject(); |
| if (contraints.hasPredicates()) { |
| for (final IRI nextPredicate : contraints.getPredicates()) { |
| final Text contraintPrefix = new Text(); |
| final Statement statement = VF.createStatement(hasSubj ? contraints.getSubject() : urlNull, nextPredicate, urlNull, contraints.getContext()); |
| if (hasSubj) { |
| appendSubjectPredicate(statement, contraintPrefix); |
| } else { |
| appendPredicate(statement, contraintPrefix); |
| } |
| keys.add(new KeyParts(contraintPrefix, queryInstant, (currentContext==null)?"":currentContext.toString(), hasSubj?CQ_S_P_AT:CQ_P_AT )); |
| } |
| } |
| else if (contraints.hasSubject()) { // and no predicates |
| final Text contraintPrefix = new Text(); |
| final Statement statement = VF.createStatement(contraints.getSubject(), urlNull, urlNull); |
| appendSubject(statement, contraintPrefix); |
| keys.add( new KeyParts(contraintPrefix, queryInstant, (currentContext==null)?"":currentContext.toString(), CQ_S_AT) ); |
| } |
| else { |
| // No constraints except possibly a context/named-graph, handled by the CF |
| keys.add( new KeyParts(null, queryInstant, (currentContext==null)?"":currentContext.toString(), CQ_O_AT) ); |
| } |
| return keys; |
| } |
| /** |
| * convert a non-utf8 byte[] and text and value to string and show unprintable bytes as {xx} where x is hex. |
| * @param value |
| * @return Human readable representation. |
| */ |
| public static String toHumanString(final Value value) { |
| return toHumanString(value==null?null:value.get()); |
| } |
| public static String toHumanString(final Text text) { |
| return toHumanString(text==null?null:text.copyBytes()); |
| } |
| public static String toHumanString(final byte[] bytes) { |
| if (bytes==null) { |
| return "{null}"; |
| } |
| final StringBuilder sb = new StringBuilder(); |
| for (final byte b : bytes) { |
| if ((b > 0x7e) || (b < 32)) { |
| sb.append("{"); |
| sb.append(Integer.toHexString( b & 0xff )); // Lop off the sign extended ones. |
| sb.append("}"); |
| } else if (b == '{'||b == '}') { // Escape the literal braces. |
| sb.append("{"); |
| sb.append((char)b); |
| sb.append("}"); |
| } else { |
| sb.append((char)b); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| } |