blob: 4598d4ced623d3601448cc72ef383436851d897c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.comm;
import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeStore;
import org.apache.giraph.edge.EdgeStoreFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
import org.apache.giraph.ooc.data.DiskBackedMessageStore;
import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.partition.SimplePartitionStore;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
/**
* Anything that the server stores
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
*/
@SuppressWarnings("rawtypes")
public class ServerData<I extends WritableComparable,
V extends Writable, E extends Writable> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(ServerData.class);
/** Configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Partition store for this worker. */
private volatile PartitionStore<I, V, E> partitionStore;
/** Edge store for this worker. */
private final EdgeStore<I, V, E> edgeStore;
/** Message store factory */
private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
messageStoreFactory;
/**
* Message store for incoming messages (messages which will be consumed
* in the next super step)
*/
private volatile MessageStore<I, Writable> incomingMessageStore;
/**
* Message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*/
private volatile MessageStore<I, Writable> currentMessageStore;
/**
* Map of partition ids to vertex mutations from other workers. These are
* mutations that should be applied before execution of *current* super step.
* (accesses to keys should be thread-safe as multiple threads may resolve
* mutations of different partitions at the same time)
*/
private ConcurrentMap<Integer,
ConcurrentMap<I, VertexMutations<I, V, E>>>
oldPartitionMutations = Maps.newConcurrentMap();
/**
* Map of partition ids to vertex mutations from other workers. These are
* mutations that are coming from other workers as the execution goes one in a
* super step. These mutations should be applied in the *next* super step.
* (this should be thread-safe)
*/
private ConcurrentMap<Integer,
ConcurrentMap<I, VertexMutations<I, V, E>>>
partitionMutations = Maps.newConcurrentMap();
/**
* Holds aggregators which current worker owns from current superstep
*/
private final OwnerAggregatorServerData ownerAggregatorData;
/**
* Holds old aggregators from previous superstep
*/
private final AllAggregatorServerData allAggregatorData;
/** Service worker */
private final CentralizedServiceWorker<I, V, E> serviceWorker;
/** Store for current messages from other workers to this worker */
private volatile List<Writable> currentWorkerToWorkerMessages =
Collections.synchronizedList(new ArrayList<Writable>());
/** Store for message from other workers to this worker for next superstep */
private volatile List<Writable> incomingWorkerToWorkerMessages =
Collections.synchronizedList(new ArrayList<Writable>());
/** Job context (for progress) */
private final Mapper<?, ?, ?, ?>.Context context;
/** Out-of-core engine */
private final OutOfCoreEngine oocEngine;
/**
* Constructor.
*
* @param service Service worker
* @param workerServer Worker server
* @param conf Configuration
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E> service,
WorkerServer workerServer,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
Mapper<?, ?, ?, ?>.Context context) {
this.serviceWorker = service;
this.conf = conf;
this.messageStoreFactory = createMessageStoreFactory();
EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
edgeStoreFactory.initialize(service, conf, context);
EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
PartitionStore<I, V, E> inMemoryPartitionStore =
new SimplePartitionStore<I, V, E>(conf, context);
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
oocEngine = new OutOfCoreEngine(conf, service, workerServer);
partitionStore =
new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
conf, context, oocEngine);
edgeStore =
new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
} else {
partitionStore = inMemoryPartitionStore;
edgeStore = inMemoryEdgeStore;
oocEngine = null;
}
ownerAggregatorData = new OwnerAggregatorServerData(context);
allAggregatorData = new AllAggregatorServerData(context, conf);
this.context = context;
}
/**
* Decide which message store should be used for current application,
* and create the factory for that store
*
* @return Message store factory
*/
private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
createMessageStoreFactory() {
Class<? extends MessageStoreFactory> messageStoreFactoryClass =
MESSAGE_STORE_FACTORY_CLASS.get(conf);
MessageStoreFactory messageStoreFactoryInstance =
ReflectionUtils.newInstance(messageStoreFactoryClass);
messageStoreFactoryInstance.initialize(serviceWorker, conf);
return messageStoreFactoryInstance;
}
/**
* Return the out-of-core engine for this worker.
*
* @return The out-of-core engine
*/
public OutOfCoreEngine getOocEngine() {
return oocEngine;
}
/**
* Return the edge store for this worker.
*
* @return The edge store
*/
public EdgeStore<I, V, E> getEdgeStore() {
return edgeStore;
}
/**
* Return the partition store for this worker.
*
* @return The partition store
*/
public PartitionStore<I, V, E> getPartitionStore() {
return partitionStore;
}
/**
* Get message store for incoming messages (messages which will be consumed
* in the next super step)
*
* @param <M> Message data
* @return Incoming message store
*/
public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
return (MessageStore<I, M>) incomingMessageStore;
}
/**
* Get message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*
* @param <M> Message data
* @return Current message store
*/
public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
return (MessageStore<I, M>) currentMessageStore;
}
/**
* Re-initialize message stores.
* Discards old values if any.
*/
public void resetMessageStores() {
if (currentMessageStore != null) {
currentMessageStore.clearAll();
currentMessageStore = null;
}
if (incomingMessageStore != null) {
incomingMessageStore.clearAll();
incomingMessageStore = null;
}
prepareSuperstep();
}
/** Prepare for next superstep */
public void prepareSuperstep() {
if (currentMessageStore != null) {
currentMessageStore.clearAll();
}
MessageStore<I, Writable> nextCurrentMessageStore;
MessageStore<I, Writable> nextIncomingMessageStore;
MessageStore<I, Writable> messageStore;
// First create the necessary in-memory message stores. If out-of-core
// mechanism is enabled, we wrap the in-memory message stores within
// disk-backed messages stores.
if (incomingMessageStore != null) {
nextCurrentMessageStore = incomingMessageStore;
} else {
messageStore = messageStoreFactory.newStore(
conf.getIncomingMessageClasses());
if (oocEngine == null) {
nextCurrentMessageStore = messageStore;
} else {
nextCurrentMessageStore = new DiskBackedMessageStore<>(
conf, oocEngine, messageStore,
conf.getIncomingMessageClasses().useMessageCombiner(),
serviceWorker.getSuperstep());
}
}
messageStore = messageStoreFactory.newStore(
conf.getOutgoingMessageClasses());
if (oocEngine == null) {
nextIncomingMessageStore = messageStore;
} else {
nextIncomingMessageStore = new DiskBackedMessageStore<>(
conf, oocEngine, messageStore,
conf.getOutgoingMessageClasses().useMessageCombiner(),
serviceWorker.getSuperstep() + 1);
}
// If out-of-core engine is enabled, we avoid overlapping of out-of-core
// decisions with change of superstep. This avoidance is done to simplify
// the design and reduce excessive use of synchronization primitives.
if (oocEngine != null) {
oocEngine.getSuperstepLock().writeLock().lock();
}
currentMessageStore = nextCurrentMessageStore;
incomingMessageStore = nextIncomingMessageStore;
if (oocEngine != null) {
oocEngine.reset();
oocEngine.getSuperstepLock().writeLock().unlock();
}
currentMessageStore.finalizeStore();
currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
incomingWorkerToWorkerMessages =
Collections.synchronizedList(new ArrayList<Writable>());
}
/**
* Get the vertex mutations (synchronize on the values)
*
* @return Vertex mutations
*/
public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
getPartitionMutations() {
return partitionMutations;
}
/**
* Get holder for aggregators which current worker owns
*
* @return Holder for aggregators which current worker owns
*/
public OwnerAggregatorServerData getOwnerAggregatorData() {
return ownerAggregatorData;
}
/**
* Get holder for aggregators from previous superstep
*
* @return Holder for aggregators from previous superstep
*/
public AllAggregatorServerData getAllAggregatorData() {
return allAggregatorData;
}
/**
* Get the reference of the service worker.
*
* @return CentralizedServiceWorker
*/
public CentralizedServiceWorker<I, V, E> getServiceWorker() {
return this.serviceWorker;
}
/**
* Get and clear worker to worker messages for this superstep. Can be
* called only once per superstep.
*
* @return List of messages for this worker
*/
public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
List<Writable> ret = currentWorkerToWorkerMessages;
currentWorkerToWorkerMessages = null;
return ret;
}
/**
* Add incoming message to this worker for next superstep. Thread-safe.
*
* @param message Message received
*/
public void addIncomingWorkerToWorkerMessage(Writable message) {
incomingWorkerToWorkerMessages.add(message);
}
/**
* Get worker to worker messages received in previous superstep.
* @return list of current worker to worker messages.
*/
public List<Writable> getCurrentWorkerToWorkerMessages() {
return currentWorkerToWorkerMessages;
}
/**
* Prepare resolving mutation.
*/
public void prepareResolveMutations() {
oldPartitionMutations = partitionMutations;
partitionMutations = Maps.newConcurrentMap();
}
/**
* Resolve mutations specific for a partition. This method is called once
* per partition, before the computation for that partition starts.
* @param partition The partition to resolve mutations for
*/
public void resolvePartitionMutation(Partition<I, V, E> partition) {
Integer partitionId = partition.getId();
VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
oldPartitionMutations.get(partitionId);
boolean ignoreExistingVertices =
conf.getIncomingMessageClasses().ignoreExistingVertices();
// Resolve mutations that are explicitly sent for this partition
if (prevPartitionMutations != null) {
for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
.entrySet()) {
I vertexId = entry.getKey();
Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
VertexMutations<I, V, E> vertexMutations = entry.getValue();
Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
originalVertex, vertexMutations,
!ignoreExistingVertices &&
getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
if (LOG.isDebugEnabled()) {
LOG.debug("resolvePartitionMutations: Resolved vertex index " +
vertexId + " in partition index " + partitionId +
" with original vertex " + originalVertex +
", returned vertex " + vertex + " on superstep " +
serviceWorker.getSuperstep() + " with mutations " +
vertexMutations);
}
if (vertex != null) {
partition.putVertex(vertex);
} else if (originalVertex != null) {
partition.removeVertex(vertexId);
if (!ignoreExistingVertices) {
getCurrentMessageStore().clearVertexMessages(vertexId);
}
}
context.progress();
}
}
if (!ignoreExistingVertices) {
// Keep track of vertices which are not here in the partition, but have
// received messages
Iterable<I> destinations = getCurrentMessageStore().
getPartitionDestinationVertices(partitionId);
if (!Iterables.isEmpty(destinations)) {
for (I vertexId : destinations) {
if (partition.getVertex(vertexId) == null) {
Vertex<I, V, E> vertex =
vertexResolver.resolve(vertexId, null, null, true);
if (LOG.isDebugEnabled()) {
LOG.debug(
"resolvePartitionMutations: A non-existing vertex has " +
"message(s). Added vertex index " + vertexId +
" in partition index " + partitionId +
", vertex = " + vertex + ", on superstep " +
serviceWorker.getSuperstep());
}
if (vertex != null) {
partition.putVertex(vertex);
}
context.progress();
}
}
}
}
}
/**
* In case of async message store we have to wait for all messages
* to be processed before going into next superstep.
*/
public void waitForComplete() {
if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
}
}
}