blob: 8e77ef7740cd07e07c3e1b0aa0428b2346d110d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.bsp;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.messages.PartitionSplitInfo;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.worker.WorkerAggregatorHandler;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.worker.WorkerInputSplitsHandler;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* All workers should have access to this centralized service to
* execute the following methods.
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
public interface CentralizedServiceWorker<I extends WritableComparable,
V extends Writable, E extends Writable>
extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
/**
* Setup (must be called prior to any other function)
*
* @return Finished superstep stats for the input superstep
*/
FinishedSuperstepStats setup();
/**
* Get the worker information
*
* @return Worker information
*/
WorkerInfo getWorkerInfo();
/**
* Get the worker client (for instantiating WorkerClientRequestProcessor
* instances.
*
* @return Worker client
*/
WorkerClient<I, V, E> getWorkerClient();
/**
* Get the worker context.
*
* @return worker's WorkerContext
*/
WorkerContext getWorkerContext();
/**
* Get the observers for this Worker.
*
* @return array of WorkerObservers.
*/
WorkerObserver[] getWorkerObservers();
/**
* Get the partition store for this worker.
* The partitions contain the vertices for
* this worker and can be used to run compute() for the vertices or do
* checkpointing.
*
* @return The partition store for this worker.
*/
PartitionStore<I, V, E> getPartitionStore();
/**
* Both the vertices and the messages need to be checkpointed in order
* for them to be used. This is done after all messages have been
* delivered, but prior to a superstep starting.
*/
void storeCheckpoint() throws IOException;
/**
* Load the vertices, edges, messages from the beginning of a superstep.
* Will load the vertex partitions as designated by the master and set the
* appropriate superstep.
*
* @param superstep which checkpoint to use
* @return Graph-wide vertex and edge counts
* @throws IOException
*/
VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
/**
* Take all steps prior to actually beginning the computation of a
* superstep.
*
* @return Collection of all the partition owners from the master for this
* superstep.
*/
Collection<? extends PartitionOwner> startSuperstep();
/**
* Worker is done with its portion of the superstep. Report the
* worker level statistics after the computation.
*
* @param partitionStatsList All the partition stats for this worker
* @param superstepTimerContext superstep timer context only given when the
* function needs to stop the timer, otherwise null.
* @return Stats of the superstep completion
*/
FinishedSuperstepStats finishSuperstep(
List<PartitionStats> partitionStatsList,
GiraphTimerContext superstepTimerContext);
/**
* Get the partition id that a vertex id would belong to.
*
* @param vertexId Vertex id
* @return Partition id
*/
@Override
int getPartitionId(I vertexId);
/**
* Whether a partition with given id exists on this worker.
*
* @param partitionId Partition id
* @return True iff this worker has the specified partition
*/
boolean hasPartition(Integer partitionId);
/**
* Every client will need to get a partition owner from a vertex id so that
* they know which worker to sent the request to.
*
* @param vertexId Vertex index to look for
* @return PartitionOnwer that should contain this vertex if it exists
*/
PartitionOwner getVertexPartitionOwner(I vertexId);
/**
* Get all partition owners.
*
* @return Iterable through partition owners
*/
Iterable<? extends PartitionOwner> getPartitionOwners();
/**
* If desired by the user, vertex partitions are redistributed among
* workers according to the chosen WorkerGraphPartitioner.
*
* @param masterSetPartitionOwners Partition owner info passed from the
* master.
*/
void exchangeVertexPartitions(
Collection<? extends PartitionOwner> masterSetPartitionOwners);
/**
* Get the GraphTaskManager that this service is using. Vertices need to know
* this.
*
* @return the GraphTaskManager instance for this compute node
*/
GraphTaskManager<I, V, E> getGraphTaskManager();
/**
* Operations that will be called if there is a failure by a worker.
*/
void failureCleanup();
/**
* Get server data
*
* @return Server data
*/
ServerData<I, V, E> getServerData();
/**
* Get worker aggregator handler
*
* @return Worker aggregator handler
*/
WorkerAggregatorHandler getAggregatorHandler();
/**
* Final preparation for superstep, called after startSuperstep and
* potential loading from checkpoint, right before the computation started
* TODO how to avoid this additional function
*/
void prepareSuperstep();
/**
* Get the superstep output class
*
* @return SuperstepOutput
*/
SuperstepOutput<I, V, E> getSuperstepOutput();
/**
* Clean up the service (no calls may be issued after this)
*
* @param finishedSuperstepStats Finished supestep stats
* @throws IOException
* @throws InterruptedException
*/
void cleanup(FinishedSuperstepStats finishedSuperstepStats)
throws IOException, InterruptedException;
/**
* Loads Global stats from zookeeper.
* @return global stats stored in zookeeper for
* previous superstep.
*/
GlobalStats getGlobalStats();
/**
* Get input splits handler used during input
*
* @return Input splits handler
*/
WorkerInputSplitsHandler getInputSplitsHandler();
/**
* Received addresses and partitions assignments from master.
*
* @param addressesAndPartitions Addresses and partitions assignment
*/
void addressesAndPartitionsReceived(
AddressesAndPartitionsWritable addressesAndPartitions);
/**
* Store the counter values in the zookeeper after every superstep
* and also after all supersteps are done. This is called before closing
* the zookeeper. We need to call this method after calling cleanup on the
* worker, since some counters are updated during cleanup
* @param allSuperstepsDone boolean value whether all the supersteps
* are completed
*/
void storeCountersInZooKeeper(boolean allSuperstepsDone);
/**
* Close zookeeper
*/
void closeZooKeeper();
}