blob: ab4bd55fba20201d8f925b18c9b55e0daeab3f4d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.rya.indexing.accumulo.entity;
import static java.util.Objects.requireNonNull;
import static org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
import static org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTES;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RyaContext;
import org.apache.rya.api.resolver.RyaTypeResolverException;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.openrdf.model.URI;
public class EntityCentricIndex extends AbstractAccumuloIndexer {
private static final Logger logger = Logger.getLogger(EntityCentricIndex.class);
private static final String TABLE_SUFFIX = "EntityCentricIndex";
private static final String OBJECT = "object";
private static final String SUBJECT = "subject";
private AccumuloRdfConfiguration conf;
private BatchWriter writer;
private boolean isInit = false;
private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException,
TableExistsException {
ConfigUtils.createTableIfNotExists(conf, getTableName());
public Configuration getConf() {
return conf;
//initialization occurs in setConf because index is created using reflection
public void setConf(final Configuration conf) {
if (conf instanceof AccumuloRdfConfiguration) {
this.conf = (AccumuloRdfConfiguration) conf;
} else {
this.conf = new AccumuloRdfConfiguration(conf);
if (!isInit) {
try {
isInit = true;
} catch (final AccumuloException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
} catch (final AccumuloSecurityException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
} catch (final TableNotFoundException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
} catch (final TableExistsException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
} catch (final IOException e) {
logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e);
throw new RuntimeException(e);
* Get the Accumulo table used by this index.
* @return table used by instances of this index
public String getTableName() {
return getTableName(conf);
* Get the Accumulo table that will be used by this index.
* @param conf - The Rya configuration that specifies which instance of Rya
* the table name will be built for. (not null)
* @return table name guaranteed to be used by instances of this index
public static String getTableName(final Configuration conf) {
return ConfigUtils.getTablePrefix(conf) + TABLE_SUFFIX;
* Make the Accumulo table name used by this indexer for a specific instance of Rya.
* @param ryaInstanceName - The name of the Rya instance the table name is for. (not null)
* @return The Accumulo table name used by this indexer for a specific instance of Rya.
public static String makeTableName(final String ryaInstanceName) {
return ryaInstanceName + TABLE_SUFFIX;
public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException {
try {
this.writer = writer.getBatchWriter(getTableName());
} catch (final AccumuloException e) {
throw new IOException(e);
} catch (final AccumuloSecurityException e) {
throw new IOException(e);
} catch (final TableNotFoundException e) {
throw new IOException(e);
public void storeStatement(final RyaStatement stmt) throws IOException {
Preconditions.checkNotNull(writer, "BatchWriter not Set");
try {
for (final TripleRow row : serializeStatement(stmt)) {
} catch (final MutationsRejectedException e) {
throw new IOException(e);
} catch (final RyaTypeResolverException e) {
throw new IOException(e);
public void deleteStatement(final RyaStatement stmt) throws IOException {
Preconditions.checkNotNull(writer, "BatchWriter not Set");
try {
for (final TripleRow row : serializeStatement(stmt)) {
} catch (final MutationsRejectedException e) {
throw new IOException(e);
} catch (final RyaTypeResolverException e) {
throw new IOException(e);
protected Mutation deleteMutation(final TripleRow tripleRow) {
final Mutation m = new Mutation(new Text(tripleRow.getRow()));
final byte[] columnFamily = tripleRow.getColumnFamily();
final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
final byte[] columnQualifier = tripleRow.getColumnQualifier();
final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
final byte[] columnVisibility = tripleRow.getColumnVisibility();
final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp());
return m;
public static Collection<Mutation> createMutations(final RyaStatement stmt) throws RyaTypeResolverException{
final Collection<Mutation> m = Lists.newArrayList();
for (final TripleRow tr : serializeStatement(stmt)){
return m;
private static Mutation createMutation(final TripleRow tripleRow) {
final Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
final byte[] columnVisibility = tripleRow.getColumnVisibility();
final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility);
final Long timestamp = tripleRow.getTimestamp();
final byte[] value = tripleRow.getValue();
final Value v = value == null ? EMPTY_VALUE : new Value(value);
final byte[] columnQualifier = tripleRow.getColumnQualifier();
final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier);
final byte[] columnFamily = tripleRow.getColumnFamily();
final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
mutation.put(cfText, cqText, cv, timestamp, v);
return mutation;
private static List<TripleRow> serializeStatement(final RyaStatement stmt) throws RyaTypeResolverException {
final RyaURI subject = stmt.getSubject();
final RyaURI predicate = stmt.getPredicate();
final RyaType object = stmt.getObject();
final RyaURI context = stmt.getContext();
final Long timestamp = stmt.getTimestamp();
final byte[] columnVisibility = stmt.getColumnVisibility();
final byte[] value = stmt.getValue();
assert subject != null && predicate != null && object != null;
final byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(StandardCharsets.UTF_8);
final byte[] subjBytes = subject.getData().getBytes(StandardCharsets.UTF_8);
final byte[] predBytes = predicate.getData().getBytes(StandardCharsets.UTF_8);
final byte[][] objBytes = RyaContext.getInstance().serializeType(object);
return Lists.newArrayList(new TripleRow(subjBytes,
Bytes.concat(cf, DELIM_BYTES,
OBJECT.getBytes(StandardCharsets.UTF_8), DELIM_BYTES,
objBytes[0], objBytes[1]),
new TripleRow(objBytes[0],
Bytes.concat(cf, DELIM_BYTES,
SUBJECT.getBytes(StandardCharsets.UTF_8), DELIM_BYTES,
subjBytes, objBytes[1]),
* Deserialize a row from the entity-centric index.
* @param key Row key, contains statement data
* @param value Row value
* @return The statement represented by the row
* @throws IOException if edge direction can't be extracted as expected.
* @throws RyaTypeResolverException if a type error occurs deserializing the statement's object.
public static RyaStatement deserializeStatement(final Key key, final Value value) throws RyaTypeResolverException, IOException {
assert key != null;
assert value != null;
final byte[] entityBytes = key.getRowData().toArray();
final byte[] predicateBytes = key.getColumnFamilyData().toArray();
final byte[] data = key.getColumnQualifierData().toArray();
final long timestamp = key.getTimestamp();
final byte[] columnVisibility = key.getColumnVisibilityData().toArray();
final byte[] valueBytes = value.get();
// main entity is either the subject or object
// data contains: column family , var name of other node , data of other node + datatype of object
int split = Bytes.indexOf(data, DELIM_BYTES);
final byte[] columnFamily = Arrays.copyOf(data, split);
final byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split), StandardCharsets.UTF_8);
final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length);
split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
final byte[] otherNodeData = Arrays.copyOf(otherNodeBytes, split);
final byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length);
byte[] objectBytes;
RyaURI subject;
final RyaURI predicate = new RyaURI(new String(predicateBytes, StandardCharsets.UTF_8));
RyaType object;
RyaURI context = null;
// Expect either:, otherNodeVar="object", otherNodeBytes={, object.datatype}
// or:, otherNodeVar="subject", otherNodeBytes={, object.datatype}
switch (otherNodeVar) {
subject = new RyaURI(new String(otherNodeData, StandardCharsets.UTF_8));
objectBytes = Bytes.concat(entityBytes, typeBytes);
case OBJECT:
subject = new RyaURI(new String(entityBytes, StandardCharsets.UTF_8));
objectBytes = Bytes.concat(otherNodeData, typeBytes);
throw new IOException("Failed to deserialize entity-centric index row. "
+ "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'");
object = RyaContext.getInstance().deserialize(objectBytes);
if (columnFamily != null && columnFamily.length > 0) {
context = new RyaURI(new String(columnFamily, StandardCharsets.UTF_8));
return new RyaStatement(subject, predicate, object, context,
null, columnVisibility, valueBytes, timestamp);
* Return the RyaType of the Entity Centric Index row.
* @param key Row key, contains statement data
* @param value Row value
* @return The statement represented by the row
* @throws IOException if edge direction can't be extracted as expected.
* @throws RyaTypeResolverException if a type error occurs deserializing the statement's object.
public static RyaType getRyaType(final Key key, final Value value) throws RyaTypeResolverException, IOException {
assert key != null;
assert value != null;
final byte[] entityBytes = key.getRowData().toArray();
final byte[] data = key.getColumnQualifierData().toArray();
// main entity is either the subject or object
// data contains: column family , var name of other node , data of other node + datatype of object
int split = Bytes.indexOf(data, DELIM_BYTES);
final byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length);
split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split), StandardCharsets.UTF_8);
final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length);
split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
final byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length);
byte[] objectBytes;
RyaURI subject;
RyaType object;
RyaType type = null;
switch (otherNodeVar) {
objectBytes = Bytes.concat(entityBytes, typeBytes);
object = RyaContext.getInstance().deserialize(objectBytes);
type = object;
case OBJECT:
subject = new RyaURI(new String(entityBytes, StandardCharsets.UTF_8));
type = subject;
throw new IOException("Failed to deserialize entity-centric index row. "
+ "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'");
return type;
public void init() {
public void setConnector(final Connector connector) {
public void destroy() {
public void purge(final RdfCloudTripleStoreConfiguration configuration) {
public void dropAndDestroy() {
public Set<URI> getIndexablePredicates() {
return null;