blob: b49a50b6263a3d29cd9d0e5ce515707c7ec89298 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.graph;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.IDSkippingIterator.Strategy;
@SuppressWarnings("rawtypes")
public final class DiskVerticesInfo<V extends WritableComparable, E extends Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
public static final String DISK_VERTICES_PATH_KEY = "hama.disk.vertices.path";
private static final byte NULL = 0;
private static final byte NOT_NULL = 1;
private FSDataOutputStream staticGraphPartsDos;
private FSDataInputStream staticGraphPartsDis;
private FSDataOutputStream softGraphPartsDos;
private FSDataInputStream softGraphPartsDis;
private FSDataOutputStream softGraphPartsNextIterationDos;
private BitSet activeVertices;
private long[] softValueOffsets;
private long[] softValueOffsetsNextIteration;
private long[] staticOffsets;
private ArrayList<Long> tmpSoftOffsets;
private ArrayList<Long> tmpStaticOffsets;
private int size;
private boolean lockedAdditions = false;
private String rootPath;
private Vertex<V, E, M> cachedVertexInstance;
private int currentStep = 0;
private int index = 0;
private Configuration conf;
private GraphJobRunner<V, E, M> runner;
private String staticFile;
@Override
public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
TaskAttemptID attempt) throws IOException {
this.runner = runner;
this.conf = conf;
tmpSoftOffsets = new ArrayList<Long>();
tmpStaticOffsets = new ArrayList<Long>();
String p = conf.get(DISK_VERTICES_PATH_KEY, "/tmp/graph/");
rootPath = p + attempt.getJobID().toString() + "/" + attempt.toString()
+ "/";
LocalFileSystem local = FileSystem.getLocal(conf);
local.mkdirs(new Path(rootPath));
staticFile = rootPath + "static.graph";
local.delete(new Path(staticFile), false);
staticGraphPartsDos = local.create(new Path(staticFile));
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
local.delete(new Path(softGraphFileName), false);
softGraphPartsDos = local.create(new Path(softGraphFileName));
}
@Override
public void cleanup(Configuration conf, TaskAttemptID attempt)
throws IOException {
IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
staticGraphPartsDis, softGraphPartsDis);
// delete the contents
FileSystem.getLocal(conf).delete(new Path(rootPath), true);
}
@Override
public void addVertex(Vertex<V, E, M> vertex) throws IOException {
// messages must be added in sorted order to work this out correctly
checkArgument(!lockedAdditions,
"Additions are locked now, nobody is allowed to change the structure anymore.");
// write the static parts
tmpStaticOffsets.add(staticGraphPartsDos.getPos());
vertex.getVertexID().write(staticGraphPartsDos);
staticGraphPartsDos.writeInt(vertex.getEdges() == null ? 0 : vertex
.getEdges().size());
for (Edge<?, ?> e : vertex.getEdges()) {
e.getDestinationVertexID().write(staticGraphPartsDos);
}
serializeSoft(vertex, -1, null, softGraphPartsDos);
size++;
}
@Override
public void removeVertex(V vertexID) {
throw new UnsupportedOperationException ("Not yet implemented");
}
/**
* Serializes the vertex's soft parts to its file. If the vertex does not have
* an index yet (e.G. at startup) you can provide -1 and it will be added to
* the temporary storage.
*/
private void serializeSoft(Vertex<V, E, M> vertex, int index,
long[] softValueOffsets, FSDataOutputStream softGraphParts)
throws IOException {
// safe offset write the soft parts
if (index >= 0) {
softValueOffsets[index] = softGraphParts.getPos();
// only set the bitset if we've finished the setup
activeVertices.set(index, vertex.isHalted());
} else {
tmpSoftOffsets.add(softGraphParts.getPos());
}
if (vertex.getValue() == null) {
softGraphParts.write(NULL);
} else {
softGraphParts.write(NOT_NULL);
vertex.getValue().write(softGraphParts);
}
vertex.writeState(softGraphParts);
softGraphParts.writeInt(vertex.getEdges().size());
for (Edge<?, ?> e : vertex.getEdges()) {
if (e.getValue() == null) {
softGraphParts.write(NULL);
} else {
softGraphParts.write(NOT_NULL);
e.getValue().write(softGraphParts);
}
}
}
@Override
public void finishAdditions() {
// copy the arraylist to a plain array
softValueOffsets = copy(tmpSoftOffsets);
softValueOffsetsNextIteration = copy(tmpSoftOffsets);
staticOffsets = copy(tmpStaticOffsets);
activeVertices = new BitSet(size);
tmpStaticOffsets = null;
tmpSoftOffsets = null;
IOUtils.cleanup(null, staticGraphPartsDos, softGraphPartsDos);
// prevent additional vertices from beeing added
lockedAdditions = true;
}
@Override
public void finishRemovals() {
throw new UnsupportedOperationException ("Not yet implemented");
}
private static long[] copy(ArrayList<Long> lst) {
long[] arr = new long[lst.size()];
for (int i = 0; i < arr.length; i++) {
arr[i] = lst.get(i);
}
return arr;
}
@Override
public boolean isFinishedAdditions() {
return lockedAdditions;
}
@Override
public void startSuperstep() throws IOException {
index = 0;
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
LocalFileSystem local = FileSystem.getLocal(conf);
local.delete(new Path(softGraphFileName), true);
softGraphPartsNextIterationDos = local.create(new Path(softGraphFileName));
softValueOffsets = softValueOffsetsNextIteration;
softValueOffsetsNextIteration = new long[softValueOffsetsNextIteration.length];
}
@Override
public void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
// write to the soft parts
serializeSoft(vertex, index++, softValueOffsetsNextIteration,
softGraphPartsNextIterationDos);
}
@Override
public void finishSuperstep() throws IOException {
// do not delete files in the first step
IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
softGraphPartsDis);
if (currentStep > 0) {
LocalFileSystem local = FileSystem.getLocal(conf);
local.delete(new Path(getSoftGraphFileName(rootPath, currentStep - 1)),
true);
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
softGraphPartsDis = local.open(new Path(softGraphFileName));
}
currentStep++;
}
@Override
public int size() {
return size;
}
private final class IDSkippingDiskIterator extends
IDSkippingIterator<V, E, M> {
int currentIndex = 0;
@Override
public Vertex<V, E, M> next() {
return cachedVertexInstance;
}
@Override
public boolean hasNext(V e,
org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
if (currentIndex >= size) {
return false;
} else {
currentIndex = fill(strat, currentIndex, e);
return true;
}
}
}
@Override
public IDSkippingIterator<V, E, M> skippingIterator() {
try {
// reset
String softGraphFileName = getSoftGraphFileName(rootPath,
Math.max(0, currentStep - 1));
LocalFileSystem local = FileSystem.getLocal(conf);
// close the files
IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsDis,
staticGraphPartsDis, staticGraphPartsDos);
softGraphPartsDis = local.open(new Path(softGraphFileName));
staticGraphPartsDis = local.open(new Path(staticFile));
// ensure the vertex is not null
if (cachedVertexInstance == null) {
cachedVertexInstance = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
cachedVertexInstance.runner = runner;
}
ensureVertexIDNotNull();
} catch (IOException e) {
throw new RuntimeException(e);
}
return new IDSkippingDiskIterator();
}
@SuppressWarnings("unchecked")
private void ensureVertexIDNotNull() {
if (cachedVertexInstance.getVertexID() == null) {
cachedVertexInstance.setVertexID((V) GraphJobRunner
.createVertexIDObject());
}
}
@SuppressWarnings("unchecked")
private void ensureVertexValueNotNull() {
if (cachedVertexInstance.getValue() == null) {
cachedVertexInstance.setValue((M) GraphJobRunner.createVertexValue());
}
}
@SuppressWarnings({ "unchecked", "static-method" })
private void ensureEdgeIDNotNull(Edge<V, E> edge) {
if (edge.getDestinationVertexID() == null) {
edge.setDestinationVertexID((V) GraphJobRunner.createVertexIDObject());
}
}
@SuppressWarnings({ "unchecked", "static-method" })
private void ensureEdgeValueNotNull(Edge<V, E> edge) {
if (edge.getValue() == null) {
edge.setCost((E) GraphJobRunner.createEdgeCostObject());
}
}
/**
* Fills the cachedVertexInstance with the next acceptable item after the
* given index that matches the given messageVertexID if provided.
*
* @param strat the strategy that defines if a vertex that is serialized
* should be accepted.
* @param index the index of the vertices to start from.
* @param messageVertexId the message vertex id that can be matched by the
* strategy. Can be null as well, this is handled by the strategy.
* @return the index of the item after the currently found item.
*/
private int fill(Strategy strat, int index, V messageVertexId) {
try {
while (true) {
// seek until we found something that satisfied our strategy
staticGraphPartsDis.seek(staticOffsets[index]);
boolean halted = activeVertices.get(index);
cachedVertexInstance.setVotedToHalt(halted);
cachedVertexInstance.getVertexID().readFields(staticGraphPartsDis);
if (strat.accept(cachedVertexInstance, messageVertexId)) {
break;
}
if (++index >= size) {
return size;
}
}
softGraphPartsDis.seek(softValueOffsets[index]);
// setting vertex value null here, because it may be overridden. Messaging
// is not materializing the message directly- so it is possible for the
// read fields method to change this object (thus a new object).
cachedVertexInstance.setValue(null);
if (softGraphPartsDis.readByte() == NOT_NULL) {
ensureVertexValueNotNull();
cachedVertexInstance.getValue().readFields(softGraphPartsDis);
}
cachedVertexInstance.readState(softGraphPartsDis);
int numEdges = staticGraphPartsDis.readInt();
int softEdges = softGraphPartsDis.readInt();
if (softEdges != numEdges) {
throw new IllegalArgumentException(
"Number of edges seemed to change. This is not possible (yet).");
}
// edges could actually be cached, however the local mode is preventing it
// sometimes as edge destinations are send and possible overridden in
// messages here.
ArrayList<Edge<V, E>> edges = new ArrayList<Edge<V, E>>();
// read the soft file in parallel
for (int i = 0; i < numEdges; i++) {
Edge<V, E> edge = new Edge<V, E>();
ensureEdgeValueNotNull(edge);
ensureEdgeIDNotNull(edge);
edge.getDestinationVertexID().readFields(staticGraphPartsDis);
if (softGraphPartsDis.readByte() == NOT_NULL) {
ensureEdgeValueNotNull(edge);
edge.getCost().readFields(softGraphPartsDis);
} else {
edge.setCost(null);
}
edges.add(edge);
}
// make edges unmodifiable
cachedVertexInstance.setEdges(Collections.unmodifiableList(edges));
} catch (IOException e) {
throw new RuntimeException(e);
}
return index + 1;
}
private static String getSoftGraphFileName(String root, int step) {
return root + "soft_" + step + ".graph";
}
}