blob: 2d30bf9bf7901268b192b54b51892352a2296e58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
/**
* Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
* Thread-safe, but expects the caller to synchronized between deletes, adds,
* puts and gets.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class DiskBackedPartitionStore<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends PartitionStore<I, V, E, M> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(DiskBackedPartitionStore.class);
/** States the partition can be found in */
private enum State { ACTIVE, INACTIVE, LOADING, OFFLOADING, ONDISK };
/** Global lock to the whole partition */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Global write lock. Must be hold to modify class state for read and write.
* Conditions are bond to this lock.
*/
private final Lock wLock = lock.writeLock();
/** The ids of the partitions contained in the store */
private final Set<Integer> partitionIds = Sets.newHashSet();
/** Partitions' states store */
private final Map<Integer, State> states = Maps.newHashMap();
/** Current active partitions, which have not been put back yet */
private final Map<Integer, Partition<I, V, E, M>> active = Maps.newHashMap();
/** Inactive partitions to re-activate or spill to disk to make space */
private final Map<Integer, Partition<I, V, E, M>> inactive =
Maps.newLinkedHashMap();
/** Ids of partitions stored on disk and number of vertices contained */
private final Map<Integer, Integer> onDisk = Maps.newHashMap();
/** Per-partition users counters (clearly only for active partitions) */
private final Map<Integer, Integer> counters = Maps.newHashMap();
/** These Conditions are used to partitions' change of state */
private final Map<Integer, Condition> pending = Maps.newHashMap();
/**
* Used to signal threads waiting to load partitions. Can be used when new
* inactive partitions are avaiable, or when free slots are available.
*/
private final Condition notEmpty = wLock.newCondition();
/** Executors for users requests. Uses caller threads */
private final ExecutorService pool = new DirectExecutorService();
/** Giraph configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Mapper context */
private final Context context;
/** Base path where the partition files are written to */
private final String[] basePaths;
/** Used to hash partition Ids */
private final HashFunction hasher = Hashing.murmur3_32();
/** Maximum number of slots */
private final int maxInMemoryPartitions;
/** Number of slots used */
private int inMemoryPartitions;
/**
* Constructor
*
* @param conf Configuration
* @param context Context
*/
public DiskBackedPartitionStore(
ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
this.context = context;
// We must be able to hold at least one partition in memory
maxInMemoryPartitions = Math.max(MAX_PARTITIONS_IN_MEMORY.get(conf), 1);
// Take advantage of multiple disks
String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
basePaths = new String[userPaths.length];
int i = 0;
for (String path : userPaths) {
basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
}
}
@Override
public Iterable<Integer> getPartitionIds() {
try {
return pool.submit(new Callable<Iterable<Integer>>() {
@Override
public Iterable<Integer> call() throws Exception {
wLock.lock();
try {
return Iterables.unmodifiableIterable(partitionIds);
} finally {
wLock.unlock();
}
}
}).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"getPartitionIds: cannot retrieve partition ids", e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"getPartitionIds: cannot retrieve partition ids", e);
}
}
@Override
public boolean hasPartition(final Integer id) {
try {
return pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
wLock.lock();
try {
return partitionIds.contains(id);
} finally {
wLock.unlock();
}
}
}).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"hasPartition: cannot check partition", e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"hasPartition: cannot check partition", e);
}
}
@Override
public int getNumPartitions() {
try {
return pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
wLock.lock();
try {
return partitionIds.size();
} finally {
wLock.unlock();
}
}
}).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"getNumPartitions: cannot retrieve partition ids", e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"getNumPartitions: cannot retrieve partition ids", e);
}
}
@Override
public Partition<I, V, E, M> getPartition(Integer id) {
try {
return pool.submit(new GetPartition(id)).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"getPartition: cannot retrieve partition " + id, e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"getPartition: cannot retrieve partition " + id, e);
}
}
@Override
public void putPartition(Partition<I, V, E, M> partition) {
Integer id = partition.getId();
try {
pool.submit(new PutPartition(id, partition)).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"putPartition: cannot put back partition " + id, e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"putPartition: cannot put back partition " + id, e);
}
}
@Override
public void deletePartition(Integer id) {
try {
pool.submit(new DeletePartition(id)).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"deletePartition: cannot delete partition " + id, e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"deletePartition: cannot delete partition " + id, e);
}
}
@Override
public Partition<I, V, E, M> removePartition(Integer id) {
Partition<I, V, E, M> partition = getPartition(id);
// we put it back, so the partition can turn INACTIVE and be deleted.
putPartition(partition);
deletePartition(id);
return partition;
}
@Override
public void addPartition(Partition<I, V, E, M> partition) {
Integer id = partition.getId();
try {
pool.submit(new AddPartition(partition.getId(), partition)).get();
} catch (InterruptedException e) {
throw new IllegalStateException(
"addPartition: cannot add partition " + id, e);
} catch (ExecutionException e) {
throw new IllegalStateException(
"addPartition: cannot add partition " + id, e);
}
}
@Override
public void shutdown() {
try {
pool.shutdown();
try {
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
}
} finally {
for (Integer id : onDisk.values()) {
deletePartitionFiles(id);
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(partitionIds.toString());
sb.append("\nActive\n");
for (Entry<Integer, Partition<I, V, E, M>> e : active.entrySet()) {
sb.append(e.getKey() + ":" + e.getValue() + "\n");
}
sb.append("Inactive\n");
for (Entry<Integer, Partition<I, V, E, M>> e : inactive.entrySet()) {
sb.append(e.getKey() + ":" + e.getValue() + "\n");
}
sb.append("OnDisk\n");
for (Entry<Integer, Integer> e : onDisk.entrySet()) {
sb.append(e.getKey() + ":" + e.getValue() + "\n");
}
sb.append("Counters\n");
for (Entry<Integer, Integer> e : counters.entrySet()) {
sb.append(e.getKey() + ":" + e.getValue() + "\n");
}
sb.append("Pending\n");
for (Entry<Integer, Condition> e : pending.entrySet()) {
sb.append(e.getKey() + "\n");
}
return sb.toString();
}
/**
* Increment the number of active users for a partition. Caller should hold
* the global write lock.
*
* @param id The id of the counter to increment
* @return The new value
*/
private Integer incrementCounter(Integer id) {
Integer count = counters.get(id);
if (count == null) {
count = 0;
}
counters.put(id, ++count);
return count;
}
/**
* Decrement the number of active users for a partition. Caller should hold
* the global write lock.
*
* @param id The id of the counter to decrement
* @return The new value
*/
private Integer decrementCounter(Integer id) {
Integer count = counters.get(id);
if (count == null) {
throw new IllegalStateException("no counter for partition " + id);
}
counters.put(id, --count);
return count;
}
/**
* Writes vertex data (Id, Vertex Value and halted state) to stream.
*
* @param output The output stream
* @param vertex The vertex to serialize
* @throws IOException
*/
private void writeVertexData(
DataOutput output,
Vertex<I, V, E, M> vertex)
throws IOException {
vertex.getId().write(output);
vertex.getValue().write(output);
output.writeBoolean(vertex.isHalted());
}
/**
* Writes vertex edges (Id, Edges) to stream.
*
* @param output The output stream
* @param vertex The vertex to serialize
* @throws IOException
*/
@SuppressWarnings("unchecked")
private void writeOutEdges(
DataOutput output,
Vertex<I, V, E, M> vertex)
throws IOException {
vertex.getId().write(output);
((OutEdges<I, E>) vertex.getEdges()).write(output);
}
/**
* Read vertex data from an input and initialize the vertex.
*
* @param in The input stream
* @param vertex The vertex to initialize
* @throws IOException
*/
private void readVertexData(DataInput in, Vertex<I, V, E, M> vertex)
throws IOException {
I id = conf.createVertexId();
id.readFields(in);
V value = conf.createVertexValue();
value.readFields(in);
vertex.initialize(id, value);
if (in.readBoolean()) {
vertex.voteToHalt();
} else {
vertex.wakeUp();
}
}
/**
* Read vertex edges from an input and set them to the vertex.
*
* @param in The input stream
* @param partition The partition owning the vertex
* @throws IOException
*/
private void readOutEdges(DataInput in, Partition<I, V, E, M> partition)
throws IOException {
I id = conf.createVertexId();
id.readFields(in);
Vertex<I, V, E, M> v = partition.getVertex(id);
OutEdges<I, E> edges = conf.createOutEdges();
edges.readFields(in);
v.setEdges(edges);
}
/**
* Load a partition from disk. It deletes the files after the load,
* except for the edges, if the graph is static.
*
* @param id The id of the partition to load
* @param numVertices The number of vertices contained on disk
* @return The partition
* @throws IOException
*/
private Partition<I, V, E, M> loadPartition(Integer id, int numVertices)
throws IOException {
Partition<I, V, E, M> partition =
conf.createPartition(id, context);
File file = new File(getVerticesPath(id));
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
for (int i = 0; i < numVertices; ++i) {
Vertex<I, V , E, M> vertex = conf.createVertex();
readVertexData(inputStream, vertex);
partition.putVertex(vertex);
}
inputStream.close();
file.delete();
file = new File(getEdgesPath(id));
inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
for (int i = 0; i < numVertices; ++i) {
readOutEdges(inputStream, partition);
}
inputStream.close();
/*
* If the graph is static, keep the file around.
*/
if (!conf.isStaticGraph()) {
file.delete();
}
return partition;
}
/**
* Write a partition to disk.
*
* @param partition The partition to offload
* @throws IOException
*/
private void offloadPartition(Partition<I, V, E, M> partition)
throws IOException {
File file = new File(getVerticesPath(partition.getId()));
file.getParentFile().mkdirs();
file.createNewFile();
if (LOG.isInfoEnabled()) {
LOG.info("offloadPartition: writing partition vertices " +
partition.getId() + " to " + file.getAbsolutePath());
}
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
writeVertexData(outputStream, vertex);
}
outputStream.close();
file = new File(getEdgesPath(partition.getId()));
/*
* Avoid writing back edges if we have already written them once and
* the graph is not changing.
*/
if (!conf.isStaticGraph() || !file.exists()) {
file.createNewFile();
if (LOG.isInfoEnabled()) {
LOG.info("offloadPartition: writing partition edges " +
partition.getId() + " to " + file.getAbsolutePath());
}
outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
writeOutEdges(outputStream, vertex);
}
outputStream.close();
}
}
/**
* Append a partition on disk at the end of the file. Expects the caller
* to hold the global lock.
*
* @param partition The partition
* @throws IOException
*/
private void addToOOCPartition(Partition<I, V, E, M> partition)
throws IOException {
Integer id = partition.getId();
Integer count = onDisk.get(id);
onDisk.put(id, count + (int) partition.getVertexCount());
File file = new File(getVerticesPath(id));
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file, true)));
for (Vertex<I, V, E, M> vertex : partition) {
writeVertexData(outputStream, vertex);
}
outputStream.close();
file = new File(getEdgesPath(id));
outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file, true)));
for (Vertex<I, V, E, M> vertex : partition) {
writeOutEdges(outputStream, vertex);
}
outputStream.close();
}
/**
* Delete a partition's files.
*
* @param id The id of the partition owning the file.
*/
public void deletePartitionFiles(Integer id) {
File file = new File(getVerticesPath(id));
file.delete();
file = new File(getEdgesPath(id));
file.delete();
}
/**
* Get the path and basename of the storage files.
*
* @param partitionId The partition
* @return The path to the given partition
*/
private String getPartitionPath(Integer partitionId) {
int hash = hasher.hashInt(partitionId).asInt();
int idx = Math.abs(hash % basePaths.length);
return basePaths[idx] + "/partition-" + partitionId;
}
/**
* Get the path to the file where vertices are stored.
*
* @param partitionId The partition
* @return The path to the vertices file
*/
private String getVerticesPath(Integer partitionId) {
return getPartitionPath(partitionId) + "_vertices";
}
/**
* Get the path to the file where edges are stored.
*
* @param partitionId The partition
* @return The path to the edges file
*/
private String getEdgesPath(Integer partitionId) {
return getPartitionPath(partitionId) + "_edges";
}
/**
* Task that gets a partition from the store
*/
private class GetPartition implements Callable<Partition<I, V, E, M>> {
/** Partition id */
private Integer id;
/**
* Constructor
*
* @param id Partition id
*/
public GetPartition(Integer id) {
this.id = id;
}
/**
* Removes and returns the last recently used entry.
*
* @return The last recently used entry.
*/
private Entry<Integer, Partition<I, V, E, M>> getLRUEntry() {
Iterator<Entry<Integer, Partition<I, V, E, M>>> i =
inactive.entrySet().iterator();
Entry<Integer, Partition<I, V, E, M>> lruEntry = i.next();
i.remove();
return lruEntry;
}
@Override
public Partition<I, V, E, M> call() throws Exception {
Partition<I, V, E, M> partition = null;
while (partition == null) {
wLock.lock();
try {
State pState = states.get(id);
switch (pState) {
case ONDISK:
Entry<Integer, Partition<I, V, E, M>> lru = null;
states.put(id, State.LOADING);
int numVertices = onDisk.remove(id);
/*
* Wait until we have space in memory or inactive data for a switch
*/
while (inMemoryPartitions >= maxInMemoryPartitions &&
inactive.size() == 0) {
notEmpty.await();
}
/*
* we have to make some space first
*/
if (inMemoryPartitions >= maxInMemoryPartitions) {
lru = getLRUEntry();
states.put(lru.getKey(), State.OFFLOADING);
pending.get(lru.getKey()).signalAll();
} else { // there is space, just add it to the in-memory partitions
inMemoryPartitions++;
}
/*
* do IO without contention, the threads interested to these
* partitions will subscribe to the relative Condition.
*/
wLock.unlock();
if (lru != null) {
offloadPartition(lru.getValue());
}
partition = loadPartition(id, numVertices);
wLock.lock();
/*
* update state and signal the pending threads
*/
if (lru != null) {
states.put(lru.getKey(), State.ONDISK);
onDisk.put(lru.getKey(), (int) lru.getValue().getVertexCount());
pending.get(lru.getKey()).signalAll();
}
active.put(id, partition);
states.put(id, State.ACTIVE);
pending.get(id).signalAll();
incrementCounter(id);
break;
case INACTIVE:
partition = inactive.remove(id);
active.put(id, partition);
states.put(id, State.ACTIVE);
incrementCounter(id);
break;
case ACTIVE:
partition = active.get(id);
incrementCounter(id);
break;
case LOADING:
pending.get(id).await();
break;
case OFFLOADING:
pending.get(id).await();
break;
default:
throw new IllegalStateException(
"illegal state " + pState + " for partition " + id);
}
} finally {
wLock.unlock();
}
}
return partition;
}
}
/**
* Task that puts a partition back to the store
*/
private class PutPartition implements Callable<Void> {
/** Partition id */
private Integer id;
/**
* Constructor
*
* @param id The partition id
* @param partition The partition
*/
public PutPartition(Integer id, Partition<I, V, E, M> partition) {
this.id = id;
}
@Override
public Void call() throws Exception {
wLock.lock();
try {
if (decrementCounter(id) == 0) {
inactive.put(id, active.remove(id));
states.put(id, State.INACTIVE);
pending.get(id).signalAll();
notEmpty.signal();
}
return null;
} finally {
wLock.unlock();
}
}
}
/**
* Task that adds a partition to the store
*/
private class AddPartition implements Callable<Void> {
/** Partition id */
private Integer id;
/** Partition */
private Partition<I, V, E, M> partition;
/**
* Constructor
*
* @param id The partition id
* @param partition The partition
*/
public AddPartition(Integer id, Partition<I, V, E, M> partition) {
this.id = id;
this.partition = partition;
}
@Override
public Void call() throws Exception {
wLock.lock();
try {
if (partitionIds.contains(id)) {
Partition<I, V, E, M> existing = null;
boolean isOOC = false;
boolean done = false;
while (!done) {
State pState = states.get(id);
switch (pState) {
case ONDISK:
isOOC = true;
done = true;
break;
/*
* just add data to the in-memory partitions,
* concurrency should be managed by the caller.
*/
case INACTIVE:
existing = inactive.get(id);
done = true;
break;
case ACTIVE:
existing = active.get(id);
done = true;
break;
case LOADING:
pending.get(id).await();
break;
case OFFLOADING:
pending.get(id).await();
break;
default:
throw new IllegalStateException(
"illegal state " + pState + " for partition " + id);
}
}
if (isOOC) {
addToOOCPartition(partition);
} else {
existing.addPartition(partition);
}
} else {
Condition newC = wLock.newCondition();
pending.put(id, newC);
partitionIds.add(id);
if (inMemoryPartitions < maxInMemoryPartitions) {
inMemoryPartitions++;
states.put(id, State.INACTIVE);
inactive.put(id, partition);
notEmpty.signal();
} else {
states.put(id, State.OFFLOADING);
onDisk.put(id, (int) partition.getVertexCount());
wLock.unlock();
offloadPartition(partition);
wLock.lock();
states.put(id, State.ONDISK);
newC.signalAll();
}
}
return null;
} finally {
wLock.unlock();
}
}
}
/**
* Task that deletes a partition to the store
*/
private class DeletePartition implements Callable<Void> {
/** Partition id */
private Integer id;
/**
* Constructor
*
* @param id The partition id
*/
public DeletePartition(Integer id) {
this.id = id;
}
@Override
public Void call() throws Exception {
boolean done = false;
wLock.lock();
try {
while (!done) {
State pState = states.get(id);
switch (pState) {
case ONDISK:
onDisk.remove(id);
deletePartitionFiles(id);
done = true;
break;
case INACTIVE:
inactive.remove(id);
inMemoryPartitions--;
notEmpty.signal();
done = true;
break;
case ACTIVE:
pending.get(id).await();
break;
case LOADING:
pending.get(id).await();
break;
case OFFLOADING:
pending.get(id).await();
break;
default:
throw new IllegalStateException(
"illegal state " + pState + " for partition " + id);
}
}
partitionIds.remove(id);
states.remove(id);
counters.remove(id);
pending.remove(id).signalAll();
return null;
} finally {
wLock.unlock();
}
}
}
/**
* Direct Executor that executes tasks within the calling threads.
*/
private class DirectExecutorService extends AbstractExecutorService {
/** Executor state */
private volatile boolean shutdown = false;
/**
* Constructor
*/
public DirectExecutorService() { }
/**
* Execute the task in the calling thread.
*
* @param task Task to execute
*/
public void execute(Runnable task) {
task.run();
}
/**
* Shutdown the executor.
*/
public void shutdown() {
this.shutdown = true;
}
/**
* Shutdown the executor and return the current queue (empty).
*
* @return The list of awaiting tasks
*/
public List<Runnable> shutdownNow() {
this.shutdown = true;
return Collections.emptyList();
}
/**
* Return current shutdown state.
*
* @return Shutdown state
*/
public boolean isShutdown() {
return shutdown;
}
/**
* Return current termination state.
*
* @return Termination state
*/
public boolean isTerminated() {
return shutdown;
}
/**
* Do nothing and return shutdown state.
*
* @param timeout Timeout
* @param unit Time unit
* @return Shutdown state
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return shutdown;
}
}
}