blob: 17a19705a4bf71c965d9958ac94172c2c5a5dccd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework.piece;
import java.util.Iterator;
import java.util.List;
import org.apache.giraph.block_app.framework.api.BlockMasterApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.function.Consumer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.collect.Iterators;
/**
* Parent of all Pieces, contains comprehensive list of methods Piece
* can support. Specific subclasses should be extended directly,
* to simplify usage - most frequently for example Piece class.
*
* Single unit of execution, capturing:
* - sending and then receiving messages from vertices
* - sending data to be aggregated from workers to master
* - sending values from master, via aggregators, to workers
* - sending and receiving worker messages
*
*
* Order of execution is:
* - On master, once at the start of the application
* -- registerAggregators (deprecated, use registerReducers instead)
*
* - After masterCompute of previous piece, on master:
* -- registerReducers
*
* - Send logic on workers:
* -- getVertexSender per each worker thread, and on object returned by it:
* --- vertexSend on each vertex
* --- postprocess on each worker thread
* -- workerContextSend per worker
*
* - Logic on master:
* -- masterCompute
*
* - Receive logic on workers:
* -- workerContextReceive per worker
* -- getVertexReceiver per each worker thread, and on object returned by it:
* --- vertexReceive on each vertex
* --- postprocess on each worker thread
*
* And before everything, during initialization, registerAggregators.
*
* Only masterCompute and registerReducers/registerAggregators should modify
* the Piece, all of the worker methods should treat Piece as read-only.
*
* Each piece should be encapsulated unit of execution. Vertex value should be
* used as a single implicit "communication" channel between different pieces,
* all other dependencies should be explicitly defined and passed through
* constructor, via interfaces (as explained below).
* I.e. state of the vertex value is invariant that Pieces act upon.
* Best is not to depend on explicit vertex value class, but on interface that
* provides all needed functions, so that pieces can be freely combined,
* as long as vertex value implements appropriate ones.
* Similarly, use most abstract class you need - if Piece doesn't depend
* on edge value, don't use NullWritable, but Writable. Or if it doesn't
* depend on ExecutionStage, use Object for it.
*
* All other external dependencies should be explicitly passed through
* constructor, through interfaces.
*
* All Pieces will be created within one context - on the master.
* They are then going to be replicated across all workers, and across all
* threads within each worker, and will see everything that happens in global
* context (masterCompute) before them, including any state master has.
* Through ObjectHolder/ObjectTransfer, you can pass data between Pieces in
* global context, and from global context to worker functions of a Piece
* that happens in the future.
*
* VertexReceiver of previous Piece and VertexSender of next Piece live in
* the same context, and vertexReceive of the next Piece is executed
* immediately after vertexSend of the previous piece, before vertexSend is
* called on the next vertex.
* This detail allows you to have external dependency on each other through
* memory only mediator objects - like ObjectTransfer.
*
* All other logic going to live in different contexts,
* specifically VertexSender and VertexReceiver of the same Piece,
* or workerContextSend and VertexSender of the same Piece, and cannot interact
* with each other outside of changing the state of the graph or using
* global communication api.
*
* All methods on this class (or objects it returns) will be called serially,
* so there is no need for any Thread synchronization.
* Each Thread will have a complete deep copy of the Piece, to achieve that,
* so all static fields must be written to be Thread safe!
* (i.e. either immutable, or have synchronized/locked access to them)
*
* @param <I> Vertex id type
* @param <V> Vertex value type
* @param <E> Edge value type
* @param <M> Message type
* @param <WV> Worker value type
* @param <WM> Worker message type
* @param <S> Execution stage type
*/
@SuppressWarnings({ "rawtypes" })
public abstract class AbstractPiece<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable, WV,
WM extends Writable, S> implements Block {
// Overridable functions
// registerReducers(CreateReducersApi reduceApi, S executionStage)
/**
* Add automatic handling of reducers to registerReducers.
* Only for internal use.
*/
public abstract void wrappedRegisterReducers(
BlockMasterApi masterApi, S executionStage);
// getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)
/**
* Add automatic handling of reducers to getVertexSender.
*
* Only for Framework internal use.
*/
public abstract InnerVertexSender getWrappedVertexSender(
final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);
/**
* Override to have worker context send computation.
*
* Called once per worker, after all vertices have been processed with
* getVertexSender.
*/
public void workerContextSend(
BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
WV workerValue) {
}
/**
* Function that is called on master, after send phase, before receive phase.
*
* It can:
* - read aggregators sent from worker
* - do global processing
* - send data to workers through aggregators
*/
public void masterCompute(BlockMasterApi masterApi, S executionStage) {
}
/**
* Override to have worker context receive computation.
*
* Called once per worker, before all vertices are going to be processed
* with getVertexReceiver.
*/
public void workerContextReceive(
BlockWorkerContextReceiveApi workerContextApi, S executionStage,
WV workerValue, List<WM> workerMessages) {
}
/**
* Override to do vertex receive processing.
*
* Creates handler that defines what should be executed on worker
* for each vertex during receive phase.
*
* This logic executed last.
* This function is called once on each worker on each thread, in parallel,
* on their copy of Piece object to create functions handler.
*
* If returned object implements Postprocessor interface, then corresponding
* postprocess() function is going to be called once, after all vertices
* corresponding thread needed to process are done.
*/
public VertexReceiver<I, V, E, M> getVertexReceiver(
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
return null;
}
/**
* Returns MessageClasses definition for messages being sent by this Piece.
*/
public abstract MessageClasses<I, M> getMessageClasses(
ImmutableClassesGiraphConfiguration conf);
/**
* Override to provide different next execution stage for
* Pieces that come after it.
*
* Execution stage should be immutable, and this function should be
* returning a new object, if it needs to return different value.
*
* It affects pieces that come after this piece,
* and isn't applied to execution stage this piece sees.
*/
public S nextExecutionStage(S executionStage) {
return executionStage;
}
/**
* Override to register any potential aggregators used by this piece.
*
* @deprecated Use registerReducers instead.
*/
@Deprecated
public void registerAggregators(BlockMasterApi masterApi)
throws InstantiationException, IllegalAccessException {
}
// Inner classes
/** Inner class to provide clean use without specifying types */
public abstract class InnerVertexSender
implements VertexSender<I, V, E>, VertexPostprocessor {
@Override
public void postprocess() { }
}
/** Inner class to provide clean use without specifying types */
public abstract class InnerVertexReceiver
implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
@Override
public void postprocess() { }
}
// Internal implementation
@Override
public final Iterator<AbstractPiece> iterator() {
return Iterators.<AbstractPiece>singletonIterator(this);
}
@Override
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
consumer.apply(this);
}
@Override
public PieceCount getPieceCount() {
return new PieceCount(1);
}
@Override
public String toString() {
String name = getClass().getSimpleName();
if (name.isEmpty()) {
name = getClass().getName();
}
return name;
}
// make hashCode and equals final, forcing them to be based on
// reference identity.
@Override
public final int hashCode() {
return super.hashCode();
}
@Override
public final boolean equals(Object obj) {
return super.equals(obj);
}
}