blob: 1e3b250579d9241b9777369ff7a2072ff321aa57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.app.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Bytes.BytesBuilder;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.eclipse.rdf4j.model.Value;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.hash.Hashing;
/**
* This class adds or removes a hash to or from the rowId for sharding purposes. When creating a sharded row key, it
* takes the form: node_prefix:shardId:nodeId//Binding_values. For example, the row key generated from nodeId = SP_123,
* varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob)
* indicates the shard id hash generated from the Binding value "uri:Bob".
*
*/
public class BindingHashShardingFunction {
private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter();
private static final int HASH_LEN = 4;
/**
* Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
* example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
* SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
* Binding value "uri:Bob".
*
* @param nodeId - Node Id with type and UUID
* @param varOrder - VarOrder used to order BindingSet values
* @param bs - BindingSet with partially formed query values
* @return - serialized Bytes rowId for storing BindingSet results in Fluo
*/
public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) {
checkNotNull(nodeId);
checkNotNull(varOrder);
checkNotNull(bs);
String[] rowPrefixAndId = nodeId.split("_");
Preconditions.checkArgument(rowPrefixAndId.length == 2);
String prefix = rowPrefixAndId[0];
String id = rowPrefixAndId[1];
String firstBindingString = "";
Bytes rowSuffix = Bytes.of(id);
if (varOrder.getVariableOrders().size() > 0) {
VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0));
firstBindingString = BS_CONVERTER.convert(bs, first);
rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs);
}
BytesBuilder builder = Bytes.builder();
builder.append(Bytes.of(prefix + ":"));
builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString)));
builder.append(":");
builder.append(rowSuffix);
return builder.toBytes();
}
/**
* Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For
* example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
* SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
* Binding value "uri:Bob".
*
* @param nodeId - Node Id with type and UUID
* @param firstBsVal - String representation of the first BsValue
* @return - serialized Bytes prefix for scanning rows
*/
public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) {
checkNotNull(nodeId);
checkNotNull(firstBsVal);
final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal);
final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType();
return getShardedScanPrefix(nodeId, bindingString);
}
/**
* Generates a sharded rowId from the indicated nodeId and bindingString. For example, the row key generated from
* nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be
* SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the
* Binding value "uri:Bob".
*
* @param nodeId - NodeId with tyep and UUID
* @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter}
* @return - serialized, sharded Bytes prefix
*/
public static Bytes getShardedScanPrefix(String nodeId, String bindingString) {
checkNotNull(nodeId);
checkNotNull(bindingString);
String[] rowPrefixAndId = nodeId.split("_");
Preconditions.checkArgument(rowPrefixAndId.length == 2);
String prefix = rowPrefixAndId[0];
String id = rowPrefixAndId[1];
BytesBuilder builder = Bytes.builder();
builder.append(prefix + ":");
builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + bindingString)));
builder.append(":");
builder.append(id);
builder.append(NODEID_BS_DELIM);
builder.append(bindingString);
return builder.toBytes();
}
private static boolean hasHash(Bytes prefixBytes, Bytes row) {
for (int i = prefixBytes.length() + 1; i < prefixBytes.length() + HASH_LEN; i++) {
byte b = row.byteAt(i);
boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
if (!isAlphaNum) {
return false;
}
}
if (row.byteAt(prefixBytes.length()) != ':' || row.byteAt(prefixBytes.length() + HASH_LEN + 1) != ':') {
return false;
}
return true;
}
/**
* @return Returns input with prefix and hash stripped from beginning.
*/
public static Bytes removeHash(Bytes prefixBytes, Bytes row) {
checkNotNull(prefixBytes);
checkNotNull(row);
checkArgument(row.length() >= prefixBytes.length() + 6, "Row is shorter than expected " + row);
checkArgument(row.subSequence(0, prefixBytes.length()).equals(prefixBytes),
"Row does not have expected prefix " + row);
checkArgument(hasHash(prefixBytes, row), "Row does not have expected hash " + row);
BytesBuilder builder = Bytes.builder();
builder.append(prefixBytes);
builder.append("_");
builder.append(row.subSequence(prefixBytes.length() + 6, row.length()));
return builder.toBytes();
}
private static String genHash(Bytes row) {
int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
hash = hash & 0x7fffffff;
// base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
// nice for debugging.
String hashString = Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
hashString = hashString.substring(hashString.length() - HASH_LEN);
return hashString;
}
}