blob: 37e8d95a9f354a3fe9a773cdc514ef7d88d322f6 [file] [log] [blame]
package org.apache.rya.accumulo.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.resolver.RyaTripleContext;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolverException;
/**
* Basic {@link WritableComparable} for using Rya data with Hadoop.
* RyaStatementWritable wraps a {@link RyaStatement}, which in turn represents a
* statement as a collection of {@link org.apache.rya.api.domain.RyaIRI} and
* {@link org.apache.rya.api.domain.RyaType} objects.
* <p>
* This class is mutable, like all {@link org.apache.hadoop.io.Writable}s. When
* used as Mapper or Reducer input, the Hadoop framework will typically reuse
* the same object to load the next record. However, loading the next record
* will create a new RyaStatement internally. Therefore, if a statement must be
* stored for any length of time, be sure to extract the internal RyaStatement.
*/
public class RyaStatementWritable implements WritableComparable<RyaStatementWritable> {
private RyaTripleContext ryaContext;
private RyaStatement ryaStatement;
/**
* Instantiates a RyaStatementWritable with the default RyaTripleContext.
* @param conf Unused.
*/
public RyaStatementWritable(Configuration conf) {
this();
}
/**
* Instantiates a RyaStatementWritable with a given context.
* @param ryaContext Context used for reading and writing the statement.
*/
public RyaStatementWritable(RyaTripleContext ryaContext) {
this.ryaContext = ryaContext;
}
/**
* Instantiates a RyaStatementWritable with the default RyaTripleContext.
*/
public RyaStatementWritable() {
this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration());
}
/**
* Instantiates a RyaStatementWritable with a given statement and context.
* @param ryaStatement The statement (triple) represented by this object.
* @param ryaContext Context used for reading and writing the statement.
*/
public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) {
this(ryaContext);
this.ryaStatement = ryaStatement;
}
/**
* Gets the contained RyaStatement.
* @return The statement represented by this RyaStatementWritable.
*/
public RyaStatement getRyaStatement() {
return ryaStatement;
}
/**
* Sets the contained RyaStatement.
* @param ryaStatement The statement to be represented by this
* RyaStatementWritable.
*/
public void setRyaStatement(RyaStatement ryaStatement) {
this.ryaStatement = ryaStatement;
}
/**
* Comparison method for natural ordering. Compares based on the logical
* triple (the s/p/o/context information in the underlying RyaStatement)
* and then by the metadata contained in the RyaStatement if the triples are
* the same.
* @return Zero if both RyaStatementWritables contain equivalent statements
* or both have null statements; otherwise, an integer whose sign
* corresponds to a consistent ordering.
*/
@Override
public int compareTo(RyaStatementWritable other) {
CompareToBuilder builder = new CompareToBuilder();
RyaStatement rsThis = this.getRyaStatement();
RyaStatement rsOther = other.getRyaStatement(); // should throw NPE if other is null, as per Comparable contract
builder.append(rsThis == null, rsOther == null);
if (rsThis != null && rsOther != null) {
builder.append(rsThis.getSubject(), rsOther.getSubject());
builder.append(rsThis.getPredicate(), rsOther.getPredicate());
builder.append(rsThis.getObject(), rsOther.getObject());
builder.append(rsThis.getContext(), rsOther.getContext());
builder.append(rsThis.getQualifer(), rsOther.getQualifer());
builder.append(rsThis.getColumnVisibility(), rsOther.getColumnVisibility());
builder.append(rsThis.getValue(), rsOther.getValue());
builder.append(rsThis.getTimestamp(), rsOther.getTimestamp());
}
return builder.toComparison();
}
/**
* Returns a hash based on the hashCode method in RyaStatement.
* @return A hash that should be consistent for equivalent RyaStatements.
*/
@Override
public int hashCode() {
if (ryaStatement == null) {
return 0;
}
return ryaStatement.hashCode();
}
/**
* Tests for equality using the equals method in RyaStatement.
* @param o Object to compare with
* @return true if both objects are RyaStatementWritables containing
* equivalent RyaStatements.
*/
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (o == null || !(o instanceof RyaStatementWritable)) {
return false;
}
RyaStatement rsThis = this.getRyaStatement();
RyaStatement rsOther = ((RyaStatementWritable) o).getRyaStatement();
if (rsThis == null) {
return rsOther == null;
}
else {
return rsThis.equals(rsOther);
}
}
/**
* Serializes this RyaStatementWritable.
* @param dataOutput An output stream for serialized statement data.
* @throws IOException if the RyaStatement is null or otherwise can't be
* serialized.
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
if (ryaStatement == null) {
throw new IOException("Rya Statement is null");
}
try {
Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(ryaStatement);
TripleRow tripleRow = map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
byte[] row = tripleRow.getRow();
byte[] columnFamily = tripleRow.getColumnFamily();
byte[] columnQualifier = tripleRow.getColumnQualifier();
write(dataOutput, row);
write(dataOutput, columnFamily);
write(dataOutput, columnQualifier);
write(dataOutput, ryaStatement.getColumnVisibility());
write(dataOutput, ryaStatement.getValue());
Long timestamp = ryaStatement.getTimestamp();
boolean b = timestamp != null;
dataOutput.writeBoolean(b);
if (b) {
dataOutput.writeLong(timestamp);
}
} catch (TripleRowResolverException e) {
throw new IOException(e);
}
}
/**
* Write part of a statement to an output stream.
* @param dataOutput Stream for writing serialized statements.
* @param row Individual field to write, as a byte array.
* @throws IOException if writing to the stream fails.
*/
protected void write(DataOutput dataOutput, byte[] row) throws IOException {
boolean b = row != null;
dataOutput.writeBoolean(b);
if (b) {
dataOutput.writeInt(row.length);
dataOutput.write(row);
}
}
/**
* Read part of a statement from an input stream.
* @param dataInput Stream for reading serialized statements.
* @return The next individual field, as a byte array.
* @throws IOException if reading from the stream fails.
*/
protected byte[] read(DataInput dataInput) throws IOException {
if (dataInput.readBoolean()) {
int len = dataInput.readInt();
byte[] bytes = new byte[len];
dataInput.readFully(bytes);
return bytes;
}else {
return null;
}
}
/**
* Loads a RyaStatementWritable by reading data from an input stream.
* Creates a new RyaStatement and assigns it to this RyaStatementWritable.
* @param dataInput An stream containing serialized statement data.
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
byte[] row = read(dataInput);
byte[] columnFamily = read(dataInput);
byte[] columnQualifier = read(dataInput);
byte[] columnVisibility = read(dataInput);
byte[] value = read(dataInput);
boolean b = dataInput.readBoolean();
Long timestamp = null;
if (b) {
timestamp = dataInput.readLong();
}
try {
ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
new TripleRow(row, columnFamily, columnQualifier));
ryaStatement.setColumnVisibility(columnVisibility);
ryaStatement.setValue(value);
ryaStatement.setTimestamp(timestamp);
} catch (TripleRowResolverException e) {
throw new IOException(e);
}
}
}