blob: 730e07a2f69ce187fe0438634285007a07aaf805 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.graph;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.util.WritableUtils;
/**
* Stores the vertices into a memory-based tree map. This implementation allows
* the runtime graph modification and random access by vertex ID.
*
* But it might be inefficient in memory usage.
*
* @param <V> Vertex ID object type
* @param <E> Edge cost object type
* @param <M> Vertex value object type
*/
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
private final ConcurrentHashMap<V, byte[]> vertices = new ConcurrentHashMap<V, byte[]>();
private GraphJobRunner<V, E, M> runner;
private HamaConfiguration conf;
private AtomicInteger activeVertices = new AtomicInteger(0);
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
this.runner = runner;
this.conf = conf;
}
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
if (!vertices.containsKey(vertex.getVertexID())) {
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
vertices.putIfAbsent(vertex.getVertexID(),
WritableUtils.serialize(vertex));
} else {
vertices.putIfAbsent(vertex.getVertexID(),
WritableUtils.unsafeSerialize(vertex));
}
} else {
Vertex<V, E, M> v = this.get(vertex.getVertexID());
for (Edge<V, E> e : vertex.getEdges()) {
v.addEdge(e);
}
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
} else {
vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(v));
}
}
}
@Override
public void remove(V vertexID) throws UnsupportedOperationException {
vertices.remove(vertexID);
}
public void clear() {
vertices.clear();
}
@Override
public int size() {
return vertices.size();
}
@Override
public Vertex<V, E, M> get(V vertexID) throws IOException {
Vertex<V, E, M> v = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
WritableUtils.deserialize(vertices.get(vertexID), v);
} else {
WritableUtils.unsafeDeserialize(vertices.get(vertexID), v);
}
v.setRunner(runner);
return v;
}
@Override
public Iterator<Vertex<V, E, M>> iterator() {
final Iterator<byte[]> it = vertices.values().iterator();
return new Iterator<Vertex<V, E, M>>() {
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Vertex<V, E, M> next() {
Vertex<V, E, M> v = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
WritableUtils.deserialize(it.next(), v);
} else {
WritableUtils.unsafeDeserialize(it.next(), v);
}
v.setRunner(runner);
return v;
}
@Override
public void remove() {
it.remove();
}
};
}
@Override
public Set<V> keySet() {
return vertices.keySet();
}
@Override
public void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
incrementCount();
vertex.setComputed();
if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
} else {
vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(vertex));
}
}
public void incrementCount() {
activeVertices.incrementAndGet();
}
@Override
public void finishAdditions() {
}
@Override
public void finishRemovals() {
}
@Override
public void startSuperstep() throws IOException {
}
@Override
public void finishSuperstep() throws IOException {
activeVertices.set(0);
}
public int getActiveVerticesNum() {
return activeVertices.get();
}
}