blob: 8e77ef7740cd07e07c3e1b0aa0428b2346d110d5 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.giraph.bsp;
import java.util.Collection;
import java.util.List;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.messages.PartitionSplitInfo;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.worker.WorkerAggregatorHandler;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.worker.WorkerInputSplitsHandler;
import org.apache.giraph.worker.WorkerObserver;
* All workers should have access to this centralized service to
* execute the following methods.
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
public interface CentralizedServiceWorker<I extends WritableComparable,
V extends Writable, E extends Writable>
extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
* Setup (must be called prior to any other function)
* @return Finished superstep stats for the input superstep
FinishedSuperstepStats setup();
* Get the worker information
* @return Worker information
WorkerInfo getWorkerInfo();
* Get the worker client (for instantiating WorkerClientRequestProcessor
* instances.
* @return Worker client
WorkerClient<I, V, E> getWorkerClient();
* Get the worker context.
* @return worker's WorkerContext
WorkerContext getWorkerContext();
* Get the observers for this Worker.
* @return array of WorkerObservers.
WorkerObserver[] getWorkerObservers();
* Get the partition store for this worker.
* The partitions contain the vertices for
* this worker and can be used to run compute() for the vertices or do
* checkpointing.
* @return The partition store for this worker.
PartitionStore<I, V, E> getPartitionStore();
* Both the vertices and the messages need to be checkpointed in order
* for them to be used. This is done after all messages have been
* delivered, but prior to a superstep starting.
void storeCheckpoint() throws IOException;
* Load the vertices, edges, messages from the beginning of a superstep.
* Will load the vertex partitions as designated by the master and set the
* appropriate superstep.
* @param superstep which checkpoint to use
* @return Graph-wide vertex and edge counts
* @throws IOException
VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
* Take all steps prior to actually beginning the computation of a
* superstep.
* @return Collection of all the partition owners from the master for this
* superstep.
Collection<? extends PartitionOwner> startSuperstep();
* Worker is done with its portion of the superstep. Report the
* worker level statistics after the computation.
* @param partitionStatsList All the partition stats for this worker
* @param superstepTimerContext superstep timer context only given when the
* function needs to stop the timer, otherwise null.
* @return Stats of the superstep completion
FinishedSuperstepStats finishSuperstep(
List<PartitionStats> partitionStatsList,
GiraphTimerContext superstepTimerContext);
* Get the partition id that a vertex id would belong to.
* @param vertexId Vertex id
* @return Partition id
int getPartitionId(I vertexId);
* Whether a partition with given id exists on this worker.
* @param partitionId Partition id
* @return True iff this worker has the specified partition
boolean hasPartition(Integer partitionId);
* Every client will need to get a partition owner from a vertex id so that
* they know which worker to sent the request to.
* @param vertexId Vertex index to look for
* @return PartitionOnwer that should contain this vertex if it exists
PartitionOwner getVertexPartitionOwner(I vertexId);
* Get all partition owners.
* @return Iterable through partition owners
Iterable<? extends PartitionOwner> getPartitionOwners();
* If desired by the user, vertex partitions are redistributed among
* workers according to the chosen WorkerGraphPartitioner.
* @param masterSetPartitionOwners Partition owner info passed from the
* master.
void exchangeVertexPartitions(
Collection<? extends PartitionOwner> masterSetPartitionOwners);
* Get the GraphTaskManager that this service is using. Vertices need to know
* this.
* @return the GraphTaskManager instance for this compute node
GraphTaskManager<I, V, E> getGraphTaskManager();
* Operations that will be called if there is a failure by a worker.
void failureCleanup();
* Get server data
* @return Server data
ServerData<I, V, E> getServerData();
* Get worker aggregator handler
* @return Worker aggregator handler
WorkerAggregatorHandler getAggregatorHandler();
* Final preparation for superstep, called after startSuperstep and
* potential loading from checkpoint, right before the computation started
* TODO how to avoid this additional function
void prepareSuperstep();
* Get the superstep output class
* @return SuperstepOutput
SuperstepOutput<I, V, E> getSuperstepOutput();
* Clean up the service (no calls may be issued after this)
* @param finishedSuperstepStats Finished supestep stats
* @throws IOException
* @throws InterruptedException
void cleanup(FinishedSuperstepStats finishedSuperstepStats)
throws IOException, InterruptedException;
* Loads Global stats from zookeeper.
* @return global stats stored in zookeeper for
* previous superstep.
GlobalStats getGlobalStats();
* Get input splits handler used during input
* @return Input splits handler
WorkerInputSplitsHandler getInputSplitsHandler();
* Received addresses and partitions assignments from master.
* @param addressesAndPartitions Addresses and partitions assignment
void addressesAndPartitionsReceived(
AddressesAndPartitionsWritable addressesAndPartitions);
* Store the counter values in the zookeeper after every superstep
* and also after all supersteps are done. This is called before closing
* the zookeeper. We need to call this method after calling cleanup on the
* worker, since some counters are updated during cleanup
* @param allSuperstepsDone boolean value whether all the supersteps
* are completed
void storeCountersInZooKeeper(boolean allSuperstepsDone);
* Close zookeeper
void closeZooKeeper();