blob: 3b211fbe91ba5af5441f67a8e9ed15ea356261d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.edge;
import com.google.common.collect.MapMaker;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressCounter;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ThreadLocalProgressCounter;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkState;
/**
* Basic implementation of edges store, extended this to easily define simple
* and primitive edge stores
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
* @param <K> Key corresponding to Vertex id
* @param <Et> Entry type
*/
public abstract class AbstractEdgeStore<I extends WritableComparable,
V extends Writable, E extends Writable, K, Et>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
implements EdgeStore<I, V, E> {
/** Used to keep track of progress during the move-edges process */
public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
new ThreadLocalProgressCounter();
/** Class logger */
private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
/** Service worker. */
protected CentralizedServiceWorker<I, V, E> service;
/** Giraph configuration. */
protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Progressable to report progress. */
protected Progressable progressable;
/** Map used to temporarily store incoming edges. */
protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
/**
* Whether the chosen {@link OutEdges} implementation allows for Edge
* reuse.
*/
protected boolean reuseEdgeObjects;
/**
* Whether the {@link OutEdges} class used during input is different
* from the one used during computation.
*/
protected boolean useInputOutEdges;
/** Whether we spilled edges on disk */
private volatile boolean hasEdgesOnDisk = false;
/** Create source vertices */
private CreateSourceVertexCallback<I> createSourceVertexCallback;
/**
* Constructor.
*
* @param service Service worker
* @param configuration Configuration
* @param progressable Progressable
*/
public AbstractEdgeStore(
CentralizedServiceWorker<I, V, E> service,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
Progressable progressable) {
this.service = service;
this.configuration = configuration;
this.progressable = progressable;
transientEdges = new MapMaker().concurrencyLevel(
configuration.getNettyServerExecutionConcurrency()).makeMap();
reuseEdgeObjects = configuration.reuseEdgeObjects();
useInputOutEdges = configuration.useInputOutEdges();
createSourceVertexCallback =
GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
.newInstance(configuration);
}
/**
* Get vertexId for a given key
*
* @param entry for vertexId key
* @param representativeVertexId representativeVertexId
* @return vertex Id
*/
protected abstract I getVertexId(Et entry, I representativeVertexId);
/**
* Create vertexId from a given key
*
* @param entry for vertexId key
* @return new vertexId
*/
protected abstract I createVertexId(Et entry);
/**
* Get OutEdges for a given partition
*
* @param partitionId id of partition
* @return OutEdges for the partition
*/
protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
/**
* Return the OutEdges for a given partition
*
* @param entry for vertexId key
* @return out edges
*/
protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
/**
* Writes the given key to the output
*
* @param key input key to be written
* @param output output to write the key to
*/
protected abstract void writeVertexKey(K key, DataOutput output)
throws IOException;
/**
* Reads the given key from the input
*
* @param input input to read the key from
* @return Key read from the input
*/
protected abstract K readVertexKey(DataInput input) throws IOException;
/**
* Get iterator for partition edges
*
* @param partitionEdges map of out-edges for vertices in a partition
* @return iterator
*/
protected abstract Iterator<Et>
getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
@Override
public boolean hasEdgesForPartition(int partitionId) {
return transientEdges.containsKey(partitionId);
}
@Override
public void writePartitionEdgeStore(int partitionId, DataOutput output)
throws IOException {
Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
if (edges != null) {
output.writeInt(edges.size());
if (edges.size() > 0) {
hasEdgesOnDisk = true;
}
for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
writeVertexKey(edge.getKey(), output);
edge.getValue().write(output);
}
}
}
@Override
public void readPartitionEdgeStore(int partitionId, DataInput input)
throws IOException {
checkState(!transientEdges.containsKey(partitionId),
"readPartitionEdgeStore: reading a partition that is already there in" +
" the partition store (impossible)");
Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
int numEntries = input.readInt();
for (int i = 0; i < numEntries; ++i) {
K vertexKey = readVertexKey(input);
OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
edges.readFields(input);
partitionEdges.put(vertexKey, edges);
}
}
/**
* Get out-edges for a given vertex
*
* @param vertexIdEdgeIterator vertex Id Edge iterator
* @param partitionEdgesIn map of out-edges for vertices in a partition
* @return out-edges for the vertex
*/
protected abstract OutEdges<I, E> getVertexOutEdges(
VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
Map<K, OutEdges<I, E>> partitionEdgesIn);
@Override
public void addPartitionEdges(
int partitionId, VertexIdEdges<I, E> edges) {
Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
edges.getVertexIdEdgeIterator();
while (vertexIdEdgeIterator.hasNext()) {
vertexIdEdgeIterator.next();
Edge<I, E> edge = reuseEdgeObjects ?
vertexIdEdgeIterator.getCurrentEdge() :
vertexIdEdgeIterator.releaseCurrentEdge();
OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
partitionEdges);
synchronized (outEdges) {
outEdges.add(edge);
}
}
}
/**
* Convert the input edges to the {@link OutEdges} data structure used
* for computation (if different).
*
* @param inputEdges Input edges
* @return Compute edges
*/
private OutEdges<I, E> convertInputToComputeEdges(
OutEdges<I, E> inputEdges) {
if (!useInputOutEdges) {
return inputEdges;
} else {
return configuration.createAndInitializeOutEdges(inputEdges);
}
}
@Override
public void moveEdgesToVertices() {
if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: No edges to move");
}
return;
}
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: Moving incoming edges to " +
"vertices. Using " + createSourceVertexCallback);
}
service.getPartitionStore().startIteration();
int numThreads = configuration.getNumInputSplitsThreads();
CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
@Override
public Callable<Void> newCallable(int callableId) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
I representativeVertexId = configuration.createVertexId();
OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
if (oocEngine != null) {
oocEngine.processingThreadStart();
}
ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
while (true) {
Partition<I, V, E> partition =
service.getPartitionStore().getNextPartition();
if (partition == null) {
break;
}
Map<K, OutEdges<I, E>> partitionEdges =
transientEdges.remove(partition.getId());
if (partitionEdges == null) {
service.getPartitionStore().putPartition(partition);
continue;
}
Iterator<Et> iterator =
getPartitionEdgesIterator(partitionEdges);
// process all vertices in given partition
int count = 0;
while (iterator.hasNext()) {
// If out-of-core mechanism is used, check whether this thread
// can stay active or it should temporarily suspend and stop
// processing and generating more data for the moment.
if (oocEngine != null &&
(++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
oocEngine.activeThreadCheckIn();
}
Et entry = iterator.next();
I vertexId = getVertexId(entry, representativeVertexId);
OutEdges<I, E> outEdges = convertInputToComputeEdges(
getPartitionEdges(entry));
Vertex<I, V, E> vertex = partition.getVertex(vertexId);
// If the source vertex doesn't exist, create it. Otherwise,
// just set the edges.
if (vertex == null) {
if (createSourceVertexCallback
.shouldCreateSourceVertex(vertexId)) {
// createVertex only if it is allowed by configuration
vertex = configuration.createVertex();
vertex.initialize(createVertexId(entry),
configuration.createVertexValue(), outEdges);
partition.putVertex(vertex);
}
} else {
// A vertex may exist with or without edges initially
// and optimize the case of no initial edges
if (vertex.getNumEdges() == 0) {
vertex.setEdges(outEdges);
} else {
for (Edge<I, E> edge : outEdges) {
vertex.addEdge(edge);
}
}
if (vertex instanceof Trimmable) {
((Trimmable) vertex).trim();
}
// Some Partition implementations (e.g. ByteArrayPartition)
// require us to put back the vertex after modifying it.
partition.saveVertex(vertex);
}
numVerticesProcessed.inc();
iterator.remove();
}
// Some PartitionStore implementations
// (e.g. DiskBackedPartitionStore) require us to put back the
// partition after modifying it.
service.getPartitionStore().putPartition(partition);
}
if (oocEngine != null) {
oocEngine.processingThreadFinish();
}
return null;
}
};
}
};
ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
"move-edges-%d", progressable);
// remove all entries
transientEdges.clear();
if (LOG.isInfoEnabled()) {
LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
"vertices.");
}
}
}