blob: c8d0f794eee3a9b96617403e20057b530e6673bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc.data;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.persistence.DataIndex;
import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Implementation of a message store used for out-of-core mechanism.
*
* @param <I> Vertex id
* @param <M> Message data
*/
public class DiskBackedMessageStore<I extends WritableComparable,
M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
implements MessageStore<I, M> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(DiskBackedMessageStore.class);
/** Configuration */
private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
/** In-memory message store */
private final MessageStore<I, M> messageStore;
/** Whether the message store uses message combiner or not */
private final boolean useMessageCombiner;
/** Which superstep this message store is used for */
private final long superstep;
/** Message value class */
private final MessageValueFactory<M> messageValueFactory;
/**
* Type of VertexIdMessage class (container for serialized messages) received
* for a particular message. If we write the received messages to disk before
* adding them to message store, we need this type when we want to read the
* messages back from disk (so that we know how to read the messages from
* disk).
*/
private enum SerializedMessageClass {
/** ByteArrayVertexIdMessages */
BYTE_ARRAY_VERTEX_ID_MESSAGES,
/** ByteArrayOneMEssageToManyIds */
BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
}
/**
* Constructor
*
* @param config Configuration
* @param oocEngine Out-of-core engine
* @param messageStore In-memory message store for which out-of-core message
* store would be wrapper
* @param useMessageCombiner Whether message combiner is used for this message
* store
* @param superstep superstep number this messages store is used for
*/
public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
config,
OutOfCoreEngine oocEngine,
MessageStore<I, M> messageStore,
boolean useMessageCombiner, long superstep) {
super(config, oocEngine);
this.config = config;
this.messageStore = messageStore;
this.useMessageCombiner = useMessageCombiner;
this.superstep = superstep;
this.messageValueFactory = config.createOutgoingMessageValueFactory();
}
@Override
public boolean isPointerListEncoding() {
return messageStore.isPointerListEncoding();
}
@Override
public Iterable<M> getVertexMessages(I vertexId) {
return messageStore.getVertexMessages(vertexId);
}
@Override
public void clearVertexMessages(I vertexId) {
messageStore.clearVertexMessages(vertexId);
}
@Override
public void clearAll() {
messageStore.clearAll();
}
@Override
public boolean hasMessagesForVertex(I vertexId) {
return messageStore.hasMessagesForVertex(vertexId);
}
@Override
public boolean hasMessagesForPartition(int partitionId) {
return messageStore.hasMessagesForPartition(partitionId);
}
@Override
public void addPartitionMessages(
int partitionId, VertexIdMessages<I, M> messages) {
if (useMessageCombiner) {
messageStore.addPartitionMessages(partitionId, messages);
} else {
addEntry(partitionId, messages);
}
}
@Override
public long loadPartitionData(int partitionId)
throws IOException {
if (!useMessageCombiner) {
return loadPartitionDataProxy(partitionId,
new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
.addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
}
@Override
public long offloadPartitionData(int partitionId)
throws IOException {
if (!useMessageCombiner) {
return offloadPartitionDataProxy(partitionId,
new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
.addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
}
@Override
public long offloadBuffers(int partitionId)
throws IOException {
if (!useMessageCombiner) {
return offloadBuffersProxy(partitionId,
new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
.addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
}
@Override
public void finalizeStore() {
messageStore.finalizeStore();
}
@Override
public Iterable<I> getPartitionDestinationVertices(int partitionId) {
return messageStore.getPartitionDestinationVertices(partitionId);
}
@Override
public void clearPartition(int partitionId) {
messageStore.clearPartition(partitionId);
}
@Override
public void writePartition(DataOutput out, int partitionId)
throws IOException {
messageStore.writePartition(out, partitionId);
}
@Override
public void readFieldsForPartition(DataInput in, int partitionId)
throws IOException {
messageStore.readFieldsForPartition(in, partitionId);
}
@Override
protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
throws IOException {
SerializedMessageClass messageClass;
if (messages instanceof ByteArrayVertexIdMessages) {
messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
} else if (messages instanceof ByteArrayOneMessageToManyIds) {
messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
} else {
throw new IllegalStateException("writeEntry: serialized message " +
"type is not supported");
}
out.writeByte(messageClass.ordinal());
messages.write(out);
}
@Override
protected VertexIdMessages<I, M> readNextEntry(DataInput in)
throws IOException {
byte messageType = in.readByte();
SerializedMessageClass messageClass =
SerializedMessageClass.values()[messageType];
VertexIdMessages<I, M> vim;
switch (messageClass) {
case BYTE_ARRAY_VERTEX_ID_MESSAGES:
vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
vim.setConf(config);
break;
case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
vim.setConf(config);
break;
default:
throw new IllegalStateException("readNextEntry: unsupported " +
"serialized message type!");
}
vim.readFields(in);
return vim;
}
@Override
protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
DataIndex index) throws IOException {
long numBytes = 0;
if (hasPartitionDataOnFile.remove(partitionId)) {
OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
partitionId);
numBytes = inputWrapper.finalizeInput(true);
}
return numBytes;
}
@Override
protected long offloadInMemoryPartitionData(
int partitionId, int ioThreadId, DataIndex index) throws IOException {
long numBytes = 0;
if (messageStore.hasMessagesForPartition(partitionId)) {
OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
false);
messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
messageStore.clearPartition(partitionId);
numBytes = outputWrapper.finalizeOutput();
hasPartitionDataOnFile.add(partitionId);
}
return numBytes;
}
@Override
protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
return messages.getSerializedSize();
}
@Override
protected void addEntryToInMemoryPartitionData(int partitionId,
VertexIdMessages<I, M>
messages) {
messageStore.addPartitionMessages(partitionId, messages);
}
}