| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.graph; |
| |
| import net.iharder.Base64; |
| import org.apache.giraph.bsp.ApplicationState; |
| import org.apache.giraph.bsp.BspInputFormat; |
| import org.apache.giraph.bsp.CentralizedService; |
| import org.apache.giraph.bsp.CentralizedServiceMaster; |
| import org.apache.giraph.bsp.SuperstepState; |
| import org.apache.giraph.graph.GraphMapper.MapFunctions; |
| import org.apache.giraph.zk.BspEvent; |
| import org.apache.giraph.zk.PredicateLock; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapred.RunningJob; |
| import org.apache.hadoop.mapreduce.Counter; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.log4j.Logger; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.json.JSONArray; |
| import org.json.JSONException; |
| import org.json.JSONObject; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| |
| import org.apache.giraph.graph.partition.MasterGraphPartitioner; |
| import org.apache.giraph.graph.partition.PartitionOwner; |
| import org.apache.giraph.graph.partition.PartitionStats; |
| import org.apache.giraph.graph.partition.PartitionUtils; |
| import org.apache.giraph.utils.WritableUtils; |
| |
| /** |
| * ZooKeeper-based implementation of {@link CentralizedService}. |
| */ |
| @SuppressWarnings("rawtypes") |
| public class BspServiceMaster< |
| I extends WritableComparable, |
| V extends Writable, |
| E extends Writable, M extends Writable> |
| extends BspService<I, V, E, M> |
| implements CentralizedServiceMaster<I, V, E, M> { |
| /** Class logger */ |
| private static final Logger LOG = Logger.getLogger(BspServiceMaster.class); |
| /** Superstep counter */ |
| private Counter superstepCounter = null; |
| /** Vertex counter */ |
| private Counter vertexCounter = null; |
| /** Finished vertex counter */ |
| private Counter finishedVertexCounter = null; |
| /** Edge counter */ |
| private Counter edgeCounter = null; |
| /** Sent messages counter */ |
| private Counter sentMessagesCounter = null; |
| /** Workers on this superstep */ |
| private Counter currentWorkersCounter = null; |
| /** Current master task partition */ |
| private Counter currentMasterTaskPartitionCounter = null; |
| /** Last checkpointed superstep */ |
| private Counter lastCheckpointedSuperstepCounter = null; |
| /** Am I the master? */ |
| private boolean isMaster = false; |
| /** Max number of workers */ |
| private final int maxWorkers; |
| /** Min number of workers */ |
| private final int minWorkers; |
| /** Min % responded workers */ |
| private final float minPercentResponded; |
| /** Poll period in msecs */ |
| private final int msecsPollPeriod; |
| /** Max number of poll attempts */ |
| private final int maxPollAttempts; |
| /** Min number of long tails before printing */ |
| private final int partitionLongTailMinPrint; |
| /** Last finalized checkpoint */ |
| private long lastCheckpointedSuperstep = -1; |
| /** State of the superstep changed */ |
| private final BspEvent superstepStateChanged = |
| new PredicateLock(); |
| /** Master graph partitioner */ |
| private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner; |
| /** All the partition stats from the last superstep */ |
| private final List<PartitionStats> allPartitionStatsList = |
| new ArrayList<PartitionStats>(); |
| /** Counter group name for the Giraph statistics */ |
| public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats"; |
| /** Aggregator writer */ |
| public AggregatorWriter aggregatorWriter; |
| |
| public BspServiceMaster( |
| String serverPortList, |
| int sessionMsecTimeout, |
| Mapper<?, ?, ?, ?>.Context context, |
| GraphMapper<I, V, E, M> graphMapper) { |
| super(serverPortList, sessionMsecTimeout, context, graphMapper); |
| registerBspEvent(superstepStateChanged); |
| |
| maxWorkers = |
| getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1); |
| minWorkers = |
| getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1); |
| minPercentResponded = |
| getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED, |
| 100.0f); |
| msecsPollPeriod = |
| getConfiguration().getInt(GiraphJob.POLL_MSECS, |
| GiraphJob.POLL_MSECS_DEFAULT); |
| maxPollAttempts = |
| getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS, |
| GiraphJob.POLL_ATTEMPTS_DEFAULT); |
| partitionLongTailMinPrint = getConfiguration().getInt( |
| GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT, |
| GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT); |
| masterGraphPartitioner = |
| getGraphPartitionerFactory().createMasterGraphPartitioner(); |
| } |
| |
| @Override |
| public void setJobState(ApplicationState state, |
| long applicationAttempt, |
| long desiredSuperstep) { |
| JSONObject jobState = new JSONObject(); |
| try { |
| jobState.put(JSONOBJ_STATE_KEY, state.toString()); |
| jobState.put(JSONOBJ_APPLICATION_ATTEMPT_KEY, applicationAttempt); |
| jobState.put(JSONOBJ_SUPERSTEP_KEY, desiredSuperstep); |
| } catch (JSONException e) { |
| throw new RuntimeException("setJobState: Coudn't put " + |
| state.toString()); |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("setJobState: " + jobState.toString() + " on superstep " + |
| getSuperstep()); |
| } |
| try { |
| getZkExt().createExt(MASTER_JOB_STATE_PATH + "/jobState", |
| jobState.toString().getBytes(), |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT_SEQUENTIAL, |
| true); |
| } catch (KeeperException.NodeExistsException e) { |
| throw new IllegalStateException( |
| "setJobState: Imposible that " + |
| MASTER_JOB_STATE_PATH + " already exists!", e); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "setJobState: Unknown KeeperException for " + |
| MASTER_JOB_STATE_PATH, e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "setJobState: Unknown InterruptedException for " + |
| MASTER_JOB_STATE_PATH, e); |
| } |
| |
| if (state == ApplicationState.FAILED) { |
| failJob(); |
| } |
| } |
| |
| /** |
| * Master uses this to calculate the {@link VertexInputFormat} |
| * input splits and write it to ZooKeeper. |
| * |
| * @param numWorkers Number of available workers |
| * @throws InstantiationException |
| * @throws IllegalAccessException |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| private List<InputSplit> generateInputSplits(int numWorkers) { |
| VertexInputFormat<I, V, E, M> vertexInputFormat = |
| BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration()); |
| List<InputSplit> splits; |
| try { |
| splits = vertexInputFormat.getSplits(getContext(), numWorkers); |
| float samplePercent = |
| getConfiguration().getFloat( |
| GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT, |
| GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT); |
| if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) { |
| int lastIndex = (int) (samplePercent * splits.size() / 100f); |
| List<InputSplit> sampleSplits = splits.subList(0, lastIndex); |
| LOG.warn("generateInputSplits: Using sampling - Processing " + |
| "only " + sampleSplits.size() + " instead of " + |
| splits.size() + " expected splits."); |
| return sampleSplits; |
| } else { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("generateInputSplits: Got " + splits.size() + |
| " input splits for " + numWorkers + " workers"); |
| } |
| return splits; |
| } |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "generateInputSplits: Got IOException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "generateInputSplits: Got InterruptedException", e); |
| } |
| } |
| |
| /** |
| * When there is no salvaging this job, fail it. |
| * |
| * @throws IOException |
| */ |
| private void failJob() { |
| LOG.fatal("failJob: Killing job " + getJobId()); |
| try { |
| @SuppressWarnings("deprecation") |
| org.apache.hadoop.mapred.JobClient jobClient = |
| new org.apache.hadoop.mapred.JobClient( |
| (org.apache.hadoop.mapred.JobConf) |
| getConfiguration()); |
| @SuppressWarnings("deprecation") |
| org.apache.hadoop.mapred.JobID jobId = |
| org.apache.hadoop.mapred.JobID.forName(getJobId()); |
| RunningJob job = jobClient.getJob(jobId); |
| job.killJob(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Parse the {@link WorkerInfo} objects from a ZooKeeper path |
| * (and children). |
| * |
| * @param workerInfosPath Path where all the workers are children |
| * @param watch Watch or not? |
| * @return List of workers in that path |
| */ |
| private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath, |
| boolean watch) { |
| List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>(); |
| List<String> workerInfoPathList; |
| try { |
| workerInfoPathList = |
| getZkExt().getChildrenExt(workerInfosPath, watch, false, true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "getWorkers: Got KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "getWorkers: Got InterruptedStateException", e); |
| } |
| for (String workerInfoPath : workerInfoPathList) { |
| WorkerInfo workerInfo = new WorkerInfo(); |
| WritableUtils.readFieldsFromZnode( |
| getZkExt(), workerInfoPath, true, null, workerInfo); |
| workerInfoList.add(workerInfo); |
| } |
| return workerInfoList; |
| } |
| |
| /** |
| * Get the healthy and unhealthy {@link WorkerInfo} objects for |
| * a superstep |
| * |
| * @param superstep superstep to check |
| * @param healthyWorkerInfoList filled in with current data |
| * @param unhealthyWorkerInfoList filled in with current data |
| */ |
| private void getAllWorkerInfos( |
| long superstep, |
| List<WorkerInfo> healthyWorkerInfoList, |
| List<WorkerInfo> unhealthyWorkerInfoList) { |
| String healthyWorkerInfoPath = |
| getWorkerInfoHealthyPath(getApplicationAttempt(), superstep); |
| String unhealthyWorkerInfoPath = |
| getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep); |
| |
| try { |
| getZkExt().createOnceExt(healthyWorkerInfoPath, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException("getWorkers: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException("getWorkers: IllegalStateException" |
| , e); |
| } |
| |
| try { |
| getZkExt().createOnceExt(unhealthyWorkerInfoPath, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException("getWorkers: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException("getWorkers: IllegalStateException" |
| , e); |
| } |
| |
| List<WorkerInfo> currentHealthyWorkerInfoList = |
| getWorkerInfosFromPath(healthyWorkerInfoPath, true); |
| List<WorkerInfo> currentUnhealthyWorkerInfoList = |
| getWorkerInfosFromPath(unhealthyWorkerInfoPath, false); |
| |
| healthyWorkerInfoList.clear(); |
| if (currentHealthyWorkerInfoList != null) { |
| for (WorkerInfo healthyWorkerInfo : |
| currentHealthyWorkerInfoList) { |
| healthyWorkerInfoList.add(healthyWorkerInfo); |
| } |
| } |
| |
| unhealthyWorkerInfoList.clear(); |
| if (currentUnhealthyWorkerInfoList != null) { |
| for (WorkerInfo unhealthyWorkerInfo : |
| currentUnhealthyWorkerInfoList) { |
| unhealthyWorkerInfoList.add(unhealthyWorkerInfo); |
| } |
| } |
| } |
| |
| /** |
| * Check all the {@link WorkerInfo} objects to ensure that a minimum |
| * number of good workers exists out of the total that have reported. |
| * |
| * @return List of of healthy workers such that the minimum has been |
| * met, otherwise null |
| */ |
| private List<WorkerInfo> checkWorkers() { |
| boolean failJob = true; |
| int pollAttempt = 0; |
| List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>(); |
| List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>(); |
| int totalResponses = -1; |
| while (pollAttempt < maxPollAttempts) { |
| getAllWorkerInfos( |
| getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList); |
| totalResponses = healthyWorkerInfoList.size() + |
| unhealthyWorkerInfoList.size(); |
| if ((totalResponses * 100.0f / maxWorkers) >= |
| minPercentResponded) { |
| failJob = false; |
| break; |
| } |
| getContext().setStatus(getGraphMapper().getMapFunctions() + " " + |
| "checkWorkers: Only found " + |
| totalResponses + |
| " responses of " + maxWorkers + |
| " needed to start superstep " + |
| getSuperstep()); |
| if (getWorkerHealthRegistrationChangedEvent().waitMsecs( |
| msecsPollPeriod)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("checkWorkers: Got event that health " + |
| "registration changed, not using poll attempt"); |
| } |
| getWorkerHealthRegistrationChangedEvent().reset(); |
| continue; |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("checkWorkers: Only found " + totalResponses + |
| " responses of " + maxWorkers + |
| " needed to start superstep " + |
| getSuperstep() + ". Sleeping for " + |
| msecsPollPeriod + " msecs and used " + pollAttempt + |
| " of " + maxPollAttempts + " attempts."); |
| // Find the missing workers if there are only a few |
| if ((maxWorkers - totalResponses) <= |
| partitionLongTailMinPrint) { |
| Set<Integer> partitionSet = new TreeSet<Integer>(); |
| for (WorkerInfo workerInfo : healthyWorkerInfoList) { |
| partitionSet.add(workerInfo.getPartitionId()); |
| } |
| for (WorkerInfo workerInfo : unhealthyWorkerInfoList) { |
| partitionSet.add(workerInfo.getPartitionId()); |
| } |
| for (int i = 1; i <= maxWorkers; ++i) { |
| if (partitionSet.contains(new Integer(i))) { |
| continue; |
| } else if (i == getTaskPartition()) { |
| continue; |
| } else { |
| LOG.info("checkWorkers: No response from "+ |
| "partition " + i + " (could be master)"); |
| } |
| } |
| } |
| } |
| ++pollAttempt; |
| } |
| if (failJob) { |
| LOG.error("checkWorkers: Did not receive enough processes in " + |
| "time (only " + totalResponses + " of " + |
| minWorkers + " required). This occurs if you do not " + |
| "have enough map tasks available simultaneously on " + |
| "your Hadoop instance to fulfill the number of " + |
| "requested workers."); |
| return null; |
| } |
| |
| if (healthyWorkerInfoList.size() < minWorkers) { |
| LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() + |
| " available when " + minWorkers + " are required."); |
| return null; |
| } |
| |
| getContext().setStatus(getGraphMapper().getMapFunctions() + " " + |
| "checkWorkers: Done - Found " + totalResponses + |
| " responses of " + maxWorkers + " needed to start superstep " + |
| getSuperstep()); |
| |
| return healthyWorkerInfoList; |
| } |
| |
| @Override |
| public int createInputSplits() { |
| // Only the 'master' should be doing this. Wait until the number of |
| // processes that have reported health exceeds the minimum percentage. |
| // If the minimum percentage is not met, fail the job. Otherwise |
| // generate the input splits |
| try { |
| if (getZkExt().exists(INPUT_SPLIT_PATH, false) != null) { |
| LOG.info(INPUT_SPLIT_PATH + |
| " already exists, no need to create"); |
| return Integer.parseInt( |
| new String( |
| getZkExt().getData(INPUT_SPLIT_PATH, false, null))); |
| } |
| } catch (KeeperException.NoNodeException e) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("createInputSplits: Need to create the " + |
| "input splits at " + INPUT_SPLIT_PATH); |
| } |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "createInputSplits: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "createInputSplits: IllegalStateException", e); |
| } |
| |
| // When creating znodes, in case the master has already run, resume |
| // where it left off. |
| List<WorkerInfo> healthyWorkerInfoList = checkWorkers(); |
| if (healthyWorkerInfoList == null) { |
| setJobState(ApplicationState.FAILED, -1, -1); |
| return -1; |
| } |
| |
| // Note that the input splits may only be a sample if |
| // INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100 |
| List<InputSplit> splitList = |
| generateInputSplits(healthyWorkerInfoList.size()); |
| if (healthyWorkerInfoList.size() > splitList.size()) { |
| LOG.warn("createInputSplits: Number of inputSplits=" |
| + splitList.size() + " < " + |
| healthyWorkerInfoList.size() + |
| "=number of healthy processes, " + |
| "some workers will be not used"); |
| } |
| String inputSplitPath = null; |
| for (int i = 0; i< splitList.size(); ++i) { |
| try { |
| ByteArrayOutputStream byteArrayOutputStream = |
| new ByteArrayOutputStream(); |
| DataOutput outputStream = |
| new DataOutputStream(byteArrayOutputStream); |
| InputSplit inputSplit = splitList.get(i); |
| Text.writeString(outputStream, |
| inputSplit.getClass().getName()); |
| ((Writable) inputSplit).write(outputStream); |
| inputSplitPath = INPUT_SPLIT_PATH + "/" + i; |
| getZkExt().createExt(inputSplitPath, |
| byteArrayOutputStream.toByteArray(), |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("createInputSplits: Created input split " + |
| "with index " + i + " serialized as " + |
| byteArrayOutputStream.toString()); |
| } |
| } catch (KeeperException.NodeExistsException e) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("createInputSplits: Node " + |
| inputSplitPath + " already exists."); |
| } |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "createInputSplits: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "createInputSplits: IllegalStateException", e); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "createInputSplits: IOException", e); |
| } |
| } |
| |
| // Let workers know they can start trying to load the input splits |
| try { |
| getZkExt().create(INPUT_SPLITS_ALL_READY_PATH, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } catch (KeeperException.NodeExistsException e) { |
| LOG.info("createInputSplits: Node " + |
| INPUT_SPLITS_ALL_READY_PATH + " already exists."); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "createInputSplits: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "createInputSplits: IllegalStateException", e); |
| } |
| |
| return splitList.size(); |
| } |
| |
| /** |
| * Read the finalized checkpoint file and associated metadata files for the |
| * checkpoint. Modifies the {@link PartitionOwner} objects to get the |
| * checkpoint prefixes. It is an optimization to prevent all workers from |
| * searching all the files. Also read in the aggregator data from the |
| * finalized checkpoint file and setting it. |
| * |
| * @param superstep Checkpoint set to examine. |
| * @param partitionOwners Partition owners to modify with checkpoint |
| * prefixes |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws KeeperException |
| */ |
| private void prepareCheckpointRestart( |
| long superstep, |
| Collection<PartitionOwner> partitionOwners) |
| throws IOException, KeeperException, InterruptedException { |
| FileSystem fs = getFs(); |
| List<Path> validMetadataPathList = new ArrayList<Path>(); |
| String finalizedCheckpointPath = |
| getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; |
| DataInputStream finalizedStream = |
| fs.open(new Path(finalizedCheckpointPath)); |
| int prefixFileCount = finalizedStream.readInt(); |
| for (int i = 0; i < prefixFileCount; ++i) { |
| String metadataFilePath = |
| finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX; |
| validMetadataPathList.add(new Path(metadataFilePath)); |
| } |
| |
| // Set the merged aggregator data if it exists. |
| int aggregatorDataSize = finalizedStream.readInt(); |
| if (aggregatorDataSize > 0) { |
| byte [] aggregatorZkData = new byte[aggregatorDataSize]; |
| int actualDataRead = |
| finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize); |
| if (actualDataRead != aggregatorDataSize) { |
| throw new RuntimeException( |
| "prepareCheckpointRestart: Only read " + actualDataRead + |
| " of " + aggregatorDataSize + " aggregator bytes from " + |
| finalizedCheckpointPath); |
| } |
| String mergedAggregatorPath = |
| getMergedAggregatorPath(getApplicationAttempt(), superstep - 1); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("prepareCheckpointRestart: Reloading merged " + |
| "aggregator " + "data '" + |
| Arrays.toString(aggregatorZkData) + |
| "' to previous checkpoint in path " + |
| mergedAggregatorPath); |
| } |
| if (getZkExt().exists(mergedAggregatorPath, false) == null) { |
| getZkExt().createExt(mergedAggregatorPath, |
| aggregatorZkData, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } |
| else { |
| getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1); |
| } |
| } |
| finalizedStream.close(); |
| |
| Map<Integer, PartitionOwner> idOwnerMap = |
| new HashMap<Integer, PartitionOwner>(); |
| for (PartitionOwner partitionOwner : partitionOwners) { |
| if (idOwnerMap.put(partitionOwner.getPartitionId(), |
| partitionOwner) != null) { |
| throw new IllegalStateException( |
| "prepareCheckpointRestart: Duplicate partition " + |
| partitionOwner); |
| } |
| } |
| // Reading the metadata files. Simply assign each partition owner |
| // the correct file prefix based on the partition id. |
| for (Path metadataPath : validMetadataPathList) { |
| String checkpointFilePrefix = metadataPath.toString(); |
| checkpointFilePrefix = |
| checkpointFilePrefix.substring( |
| 0, |
| checkpointFilePrefix.length() - |
| CHECKPOINT_METADATA_POSTFIX.length()); |
| DataInputStream metadataStream = fs.open(metadataPath); |
| long partitions = metadataStream.readInt(); |
| for (long i = 0; i < partitions; ++i) { |
| long dataPos = metadataStream.readLong(); |
| int partitionId = metadataStream.readInt(); |
| PartitionOwner partitionOwner = idOwnerMap.get(partitionId); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("prepareSuperstepRestart: File " + metadataPath + |
| " with position " + dataPos + |
| ", partition id = " + partitionId + |
| " assigned to " + partitionOwner); |
| } |
| partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix); |
| } |
| metadataStream.close(); |
| } |
| } |
| |
| @Override |
| public void setup() { |
| // Might have to manually load a checkpoint. |
| // In that case, the input splits are not set, they will be faked by |
| // the checkpoint files. Each checkpoint file will be an input split |
| // and the input split |
| superstepCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Superstep"); |
| vertexCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate vertices"); |
| finishedVertexCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate finished vertices"); |
| edgeCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Aggregate edges"); |
| sentMessagesCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Sent messages"); |
| currentWorkersCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers"); |
| currentMasterTaskPartitionCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition"); |
| lastCheckpointedSuperstepCounter = getContext().getCounter( |
| GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep"); |
| if (getRestartedSuperstep() != UNSET_SUPERSTEP) { |
| superstepCounter.increment(getRestartedSuperstep()); |
| } |
| } |
| |
| @Override |
| public boolean becomeMaster() { |
| // Create my bid to become the master, then try to become the worker |
| // or return false. |
| String myBid = null; |
| try { |
| myBid = |
| getZkExt().createExt(MASTER_ELECTION_PATH + |
| "/" + getHostnamePartitionId(), |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.EPHEMERAL_SEQUENTIAL, |
| true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "becomeMaster: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "becomeMaster: IllegalStateException", e); |
| } |
| while (true) { |
| JSONObject jobState = getJobState(); |
| try { |
| if ((jobState != null) && |
| ApplicationState.valueOf( |
| jobState.getString(JSONOBJ_STATE_KEY)) == |
| ApplicationState.FINISHED) { |
| LOG.info("becomeMaster: Job is finished, " + |
| "give up trying to be the master!"); |
| isMaster = false; |
| return isMaster; |
| } |
| } catch (JSONException e) { |
| throw new IllegalStateException( |
| "becomeMaster: Couldn't get state from " + jobState, e); |
| } |
| try { |
| List<String> masterChildArr = |
| getZkExt().getChildrenExt( |
| MASTER_ELECTION_PATH, true, true, true); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("becomeMaster: First child is '" + |
| masterChildArr.get(0) + "' and my bid is '" + |
| myBid + "'"); |
| } |
| if (masterChildArr.get(0).equals(myBid)) { |
| currentMasterTaskPartitionCounter.increment( |
| getTaskPartition() - |
| currentMasterTaskPartitionCounter.getValue()); |
| aggregatorWriter = |
| BspUtils.createAggregatorWriter(getConfiguration()); |
| try { |
| aggregatorWriter.initialize(getContext(), |
| getApplicationAttempt()); |
| } catch (IOException e) { |
| throw new IllegalStateException("becomeMaster: " + |
| "Couldn't initialize aggregatorWriter", e); |
| } |
| LOG.info("becomeMaster: I am now the master!"); |
| isMaster = true; |
| return isMaster; |
| } |
| LOG.info("becomeMaster: Waiting to become the master..."); |
| getMasterElectionChildrenChangedEvent().waitForever(); |
| getMasterElectionChildrenChangedEvent().reset(); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "becomeMaster: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "becomeMaster: IllegalStateException", e); |
| } |
| } |
| } |
| |
| /** |
| * Collect and aggregate the worker statistics for a particular superstep. |
| * |
| * @param superstep Superstep to aggregate on |
| * @return Global statistics aggregated on all worker statistics |
| */ |
| private GlobalStats aggregateWorkerStats(long superstep) { |
| Class<? extends Writable> partitionStatsClass = |
| masterGraphPartitioner.createPartitionStats().getClass(); |
| GlobalStats globalStats = new GlobalStats(); |
| // Get the stats from the all the worker selected nodes |
| String workerFinishedPath = |
| getWorkerFinishedPath(getApplicationAttempt(), superstep); |
| List<String> workerFinishedPathList = null; |
| try { |
| workerFinishedPathList = |
| getZkExt().getChildrenExt( |
| workerFinishedPath, false, false, true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: InterruptedException", e); |
| } |
| |
| allPartitionStatsList.clear(); |
| for (String finishedPath : workerFinishedPathList) { |
| JSONObject workerFinishedInfoObj = null; |
| try { |
| byte [] zkData = |
| getZkExt().getData(finishedPath, false, null); |
| workerFinishedInfoObj = new JSONObject(new String(zkData)); |
| List<? extends Writable> writableList = |
| WritableUtils.readListFieldsFromByteArray( |
| Base64.decode(workerFinishedInfoObj.getString( |
| JSONOBJ_PARTITION_STATS_KEY)), |
| partitionStatsClass, |
| getConfiguration()); |
| for (Writable writable : writableList) { |
| globalStats.addPartitionStats((PartitionStats) writable); |
| globalStats.addMessageCount( |
| workerFinishedInfoObj.getLong( |
| JSONOBJ_NUM_MESSAGES_KEY)); |
| allPartitionStatsList.add((PartitionStats) writable); |
| } |
| } catch (JSONException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: JSONException", e); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: InterruptedException", e); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "aggregateWorkerStats: IOException", e); |
| } |
| } |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info("aggregateWorkerStats: Aggregation found " + globalStats + |
| " on superstep = " + getSuperstep()); |
| } |
| return globalStats; |
| } |
| |
| /** |
| * Get the aggregator values for a particular superstep, |
| * aggregate and save them. Does nothing on the INPUT_SUPERSTEP. |
| * |
| * @param superstep superstep to check |
| */ |
| private void collectAndProcessAggregatorValues(long superstep) { |
| if (superstep == INPUT_SUPERSTEP) { |
| // Nothing to collect on the input superstep |
| return; |
| } |
| Map<String, Aggregator<? extends Writable>> aggregatorMap = |
| new TreeMap<String, Aggregator<? extends Writable>>(); |
| String workerFinishedPath = |
| getWorkerFinishedPath(getApplicationAttempt(), superstep); |
| List<String> hostnameIdPathList = null; |
| try { |
| hostnameIdPathList = |
| getZkExt().getChildrenExt( |
| workerFinishedPath, false, false, true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: InterruptedException", e); |
| } |
| |
| for (String hostnameIdPath : hostnameIdPathList) { |
| JSONObject workerFinishedInfoObj = null; |
| JSONArray aggregatorArray = null; |
| try { |
| byte [] zkData = |
| getZkExt().getData(hostnameIdPath, false, null); |
| workerFinishedInfoObj = new JSONObject(new String(zkData)); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: InterruptedException", |
| e); |
| } catch (JSONException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: JSONException", e); |
| } |
| try { |
| aggregatorArray = workerFinishedInfoObj.getJSONArray( |
| JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY); |
| } catch (JSONException e) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("collectAndProcessAggregatorValues: " + |
| "No aggregators" + " for " + hostnameIdPath); |
| } |
| continue; |
| } |
| for (int i = 0; i < aggregatorArray.length(); ++i) { |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("collectAndProcessAggregatorValues: " + |
| "Getting aggregators from " + |
| aggregatorArray.getJSONObject(i)); |
| } |
| String aggregatorName = |
| aggregatorArray.getJSONObject(i).getString( |
| AGGREGATOR_NAME_KEY); |
| String aggregatorClassName = |
| aggregatorArray.getJSONObject(i).getString( |
| AGGREGATOR_CLASS_NAME_KEY); |
| @SuppressWarnings("unchecked") |
| Aggregator<Writable> aggregator = |
| (Aggregator<Writable>) aggregatorMap.get(aggregatorName); |
| boolean firstTime = false; |
| if (aggregator == null) { |
| @SuppressWarnings("unchecked") |
| Aggregator<Writable> aggregatorWritable = |
| (Aggregator<Writable>) getAggregator(aggregatorName); |
| aggregator = aggregatorWritable; |
| if (aggregator == null) { |
| @SuppressWarnings("unchecked") |
| Class<? extends Aggregator<Writable>> aggregatorClass = |
| (Class<? extends Aggregator<Writable>>) |
| Class.forName(aggregatorClassName); |
| aggregator = registerAggregator( |
| aggregatorName, |
| aggregatorClass); |
| } |
| aggregatorMap.put(aggregatorName, aggregator); |
| firstTime = true; |
| } |
| Writable aggregatorValue = |
| aggregator.createAggregatedValue(); |
| InputStream input = |
| new ByteArrayInputStream( |
| Base64.decode( |
| aggregatorArray.getJSONObject(i). |
| getString(AGGREGATOR_VALUE_KEY))); |
| aggregatorValue.readFields(new DataInputStream(input)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("collectAndProcessAggregatorValues: " + |
| "aggregator value size=" + input.available() + |
| " for aggregator=" + aggregatorName + |
| " value=" + aggregatorValue); |
| } |
| if (firstTime) { |
| aggregator.setAggregatedValue(aggregatorValue); |
| } else { |
| aggregator.aggregate(aggregatorValue); |
| } |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "IOException when reading aggregator data " + |
| aggregatorArray, e); |
| } catch (JSONException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "JSONException when reading aggregator data " + |
| aggregatorArray, e); |
| } catch (ClassNotFoundException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "ClassNotFoundException when reading aggregator data " + |
| aggregatorArray, e); |
| } catch (InstantiationException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "InstantiationException when reading aggregator data " + |
| aggregatorArray, e); |
| } catch (IllegalAccessException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "IOException when reading aggregator data " + |
| aggregatorArray, e); |
| } |
| } |
| } |
| if (aggregatorMap.size() > 0) { |
| String mergedAggregatorPath = |
| getMergedAggregatorPath(getApplicationAttempt(), superstep); |
| byte [] zkData = null; |
| JSONArray aggregatorArray = new JSONArray(); |
| for (Map.Entry<String, Aggregator<? extends Writable>> entry : |
| aggregatorMap.entrySet()) { |
| try { |
| ByteArrayOutputStream outputStream = |
| new ByteArrayOutputStream(); |
| DataOutput output = new DataOutputStream(outputStream); |
| entry.getValue().getAggregatedValue().write(output); |
| |
| JSONObject aggregatorObj = new JSONObject(); |
| aggregatorObj.put(AGGREGATOR_NAME_KEY, |
| entry.getKey()); |
| aggregatorObj.put( |
| AGGREGATOR_VALUE_KEY, |
| Base64.encodeBytes(outputStream.toByteArray())); |
| aggregatorArray.put(aggregatorObj); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("collectAndProcessAggregatorValues: " + |
| "Trying to add aggregatorObj " + |
| aggregatorObj + "(" + |
| entry.getValue().getAggregatedValue() + |
| ") to merged aggregator path " + |
| mergedAggregatorPath); |
| } |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: " + |
| "IllegalStateException", e); |
| } catch (JSONException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: JSONException", e); |
| } |
| } |
| try { |
| zkData = aggregatorArray.toString().getBytes(); |
| getZkExt().createExt(mergedAggregatorPath, |
| zkData, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException.NodeExistsException e) { |
| LOG.warn("collectAndProcessAggregatorValues: " + |
| mergedAggregatorPath+ |
| " already exists!"); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "collectAndProcessAggregatorValues: IllegalStateException", |
| e); |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("collectAndProcessAggregatorValues: Finished " + |
| "loading " + |
| mergedAggregatorPath+ " with aggregator values " + |
| aggregatorArray); |
| } |
| } |
| } |
| |
| /** |
| * Finalize the checkpoint file prefixes by taking the chosen workers and |
| * writing them to a finalized file. Also write out the master |
| * aggregated aggregator array from the previous superstep. |
| * |
| * @param superstep superstep to finalize |
| * @param chosenWorkerList list of chosen workers that will be finalized |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws KeeperException |
| */ |
| private void finalizeCheckpoint( |
| long superstep, |
| List<WorkerInfo> chosenWorkerInfoList) |
| throws IOException, KeeperException, InterruptedException { |
| Path finalizedCheckpointPath = |
| new Path(getCheckpointBasePath(superstep) + |
| CHECKPOINT_FINALIZED_POSTFIX); |
| try { |
| getFs().delete(finalizedCheckpointPath, false); |
| } catch (IOException e) { |
| LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " + |
| finalizedCheckpointPath); |
| } |
| |
| // Format: |
| // <number of files> |
| // <used file prefix 0><used file prefix 1>... |
| // <aggregator data length><aggregators as a serialized JSON byte array> |
| FSDataOutputStream finalizedOutputStream = |
| getFs().create(finalizedCheckpointPath); |
| finalizedOutputStream.writeInt(chosenWorkerInfoList.size()); |
| for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { |
| String chosenWorkerInfoPrefix = |
| getCheckpointBasePath(superstep) + "." + |
| chosenWorkerInfo.getHostnameId(); |
| finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix); |
| } |
| String mergedAggregatorPath = |
| getMergedAggregatorPath(getApplicationAttempt(), superstep - 1); |
| if (getZkExt().exists(mergedAggregatorPath, false) != null) { |
| byte [] aggregatorZkData = |
| getZkExt().getData(mergedAggregatorPath, false, null); |
| finalizedOutputStream.writeInt(aggregatorZkData.length); |
| finalizedOutputStream.write(aggregatorZkData); |
| } |
| else { |
| finalizedOutputStream.writeInt(0); |
| } |
| finalizedOutputStream.close(); |
| lastCheckpointedSuperstep = superstep; |
| lastCheckpointedSuperstepCounter.increment(superstep - |
| lastCheckpointedSuperstepCounter.getValue()); |
| } |
| |
| /** |
| * Assign the partitions for this superstep. If there are changes, |
| * the workers will know how to do the exchange. If this was a restarted |
| * superstep, then make sure to provide information on where to find the |
| * checkpoint file. |
| * |
| * @param allPartitionStatsList All partition stats |
| * @param chosenWorkerInfoList All the chosen worker infos |
| * @param masterGraphPartitioner Master graph partitioner |
| */ |
| private void assignPartitionOwners( |
| List<PartitionStats> allPartitionStatsList, |
| List<WorkerInfo> chosenWorkerInfoList, |
| MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) { |
| Collection<PartitionOwner> partitionOwners; |
| if (getSuperstep() == INPUT_SUPERSTEP || |
| getSuperstep() == getRestartedSuperstep()) { |
| partitionOwners = |
| masterGraphPartitioner.createInitialPartitionOwners( |
| chosenWorkerInfoList, maxWorkers); |
| if (partitionOwners.isEmpty()) { |
| throw new IllegalStateException( |
| "assignAndExchangePartitions: No partition owners set"); |
| } |
| } else { |
| partitionOwners = |
| masterGraphPartitioner.generateChangedPartitionOwners( |
| allPartitionStatsList, |
| chosenWorkerInfoList, |
| maxWorkers, |
| getSuperstep()); |
| |
| PartitionUtils.analyzePartitionStats(partitionOwners, |
| allPartitionStatsList); |
| } |
| |
| // If restarted, prepare the checkpoint restart |
| if (getRestartedSuperstep() == getSuperstep()) { |
| try { |
| prepareCheckpointRestart(getSuperstep(), partitionOwners); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "assignPartitionOwners: IOException on preparing", e); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "assignPartitionOwners: KeeperException on preparing", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "assignPartitionOwners: InteruptedException on preparing", |
| e); |
| } |
| } |
| |
| // There will be some exchange of partitions |
| if (!partitionOwners.isEmpty()) { |
| String vertexExchangePath = |
| getPartitionExchangePath(getApplicationAttempt(), |
| getSuperstep()); |
| try { |
| getZkExt().createOnceExt(vertexExchangePath, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "assignPartitionOwners: KeeperException creating " + |
| vertexExchangePath); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "assignPartitionOwners: InterruptedException creating " + |
| vertexExchangePath); |
| } |
| } |
| |
| // Workers are waiting for these assignments |
| String partitionAssignmentsPath = |
| getPartitionAssignmentsPath(getApplicationAttempt(), |
| getSuperstep()); |
| WritableUtils.writeListToZnode( |
| getZkExt(), |
| partitionAssignmentsPath, |
| -1, |
| new ArrayList<Writable>(partitionOwners)); |
| } |
| |
| /** |
| * Check whether the workers chosen for this superstep are still alive |
| * |
| * @param chosenWorkerHealthPath Path to the healthy workers in ZooKeeper |
| * @param chosenWorkerList List of the healthy workers |
| * @return true if they are all alive, false otherwise. |
| * @throws InterruptedException |
| * @throws KeeperException |
| */ |
| private boolean superstepChosenWorkerAlive( |
| String chosenWorkerInfoHealthPath, |
| List<WorkerInfo> chosenWorkerInfoList) |
| throws KeeperException, InterruptedException { |
| List<WorkerInfo> chosenWorkerInfoHealthyList = |
| getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false); |
| Set<WorkerInfo> chosenWorkerInfoHealthySet = |
| new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList); |
| boolean allChosenWorkersHealthy = true; |
| for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { |
| if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) { |
| allChosenWorkersHealthy = false; |
| LOG.error("superstepChosenWorkerAlive: Missing chosen " + |
| "worker " + chosenWorkerInfo + |
| " on superstep " + getSuperstep()); |
| } |
| } |
| return allChosenWorkersHealthy; |
| } |
| |
| @Override |
| public void restartFromCheckpoint(long checkpoint) { |
| // Process: |
| // 1. Remove all old input split data |
| // 2. Increase the application attempt and set to the correct checkpoint |
| // 3. Send command to all workers to restart their tasks |
| try { |
| getZkExt().deleteExt(INPUT_SPLIT_PATH, -1, true); |
| } catch (InterruptedException e) { |
| throw new RuntimeException( |
| "retartFromCheckpoint: InterruptedException", e); |
| } catch (KeeperException e) { |
| throw new RuntimeException( |
| "retartFromCheckpoint: KeeperException", e); |
| } |
| setApplicationAttempt(getApplicationAttempt() + 1); |
| setCachedSuperstep(checkpoint); |
| setRestartedSuperstep(checkpoint); |
| setJobState(ApplicationState.START_SUPERSTEP, |
| getApplicationAttempt(), |
| checkpoint); |
| } |
| |
| /** |
| * Only get the finalized checkpoint files |
| */ |
| public static class FinalizedCheckpointPathFilter implements PathFilter { |
| @Override |
| public boolean accept(Path path) { |
| if (path.getName().endsWith( |
| BspService.CHECKPOINT_FINALIZED_POSTFIX)) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| @Override |
| public long getLastGoodCheckpoint() throws IOException { |
| // Find the last good checkpoint if none have been written to the |
| // knowledge of this master |
| if (lastCheckpointedSuperstep == -1) { |
| FileStatus[] fileStatusArray = |
| getFs().listStatus(new Path(CHECKPOINT_BASE_PATH), |
| new FinalizedCheckpointPathFilter()); |
| if (fileStatusArray == null) { |
| return -1; |
| } |
| Arrays.sort(fileStatusArray); |
| lastCheckpointedSuperstep = getCheckpoint( |
| fileStatusArray[fileStatusArray.length - 1].getPath()); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("getLastGoodCheckpoint: Found last good checkpoint " + |
| lastCheckpointedSuperstep + " from " + |
| fileStatusArray[fileStatusArray.length - 1]. |
| getPath().toString()); |
| } |
| } |
| return lastCheckpointedSuperstep; |
| } |
| |
| /** |
| * Wait for a set of workers to signal that they are done with the |
| * barrier. |
| * |
| * @param finishedWorkerPath Path to where the workers will register their |
| * hostname and id |
| * @param workerInfoList List of the workers to wait for |
| * @return True if barrier was successful, false if there was a worker |
| * failure |
| */ |
| private boolean barrierOnWorkerList(String finishedWorkerPath, |
| List<WorkerInfo> workerInfoList, |
| BspEvent event) { |
| try { |
| getZkExt().createOnceExt(finishedWorkerPath, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: KeeperException - Couldn't create " + |
| finishedWorkerPath, e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: InterruptedException - Couldn't create " + |
| finishedWorkerPath, e); |
| } |
| List<String> hostnameIdList = |
| new ArrayList<String>(workerInfoList.size()); |
| for (WorkerInfo workerInfo : workerInfoList) { |
| hostnameIdList.add(workerInfo.getHostnameId()); |
| } |
| String workerInfoHealthyPath = |
| getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()); |
| List<String> finishedHostnameIdList; |
| long nextInfoMillis = System.currentTimeMillis(); |
| while (true) { |
| try { |
| finishedHostnameIdList = |
| getZkExt().getChildrenExt(finishedWorkerPath, |
| true, |
| false, |
| false); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: KeeperException - Couldn't get " + |
| "children of " + finishedWorkerPath, e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: IllegalException - Couldn't get " + |
| "children of " + finishedWorkerPath, e); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("barrierOnWorkerList: Got finished worker list = " + |
| finishedHostnameIdList + ", size = " + |
| finishedHostnameIdList.size() + |
| ", worker list = " + |
| workerInfoList + ", size = " + |
| workerInfoList.size() + |
| " from " + finishedWorkerPath); |
| } |
| |
| if (LOG.isInfoEnabled() && |
| (System.currentTimeMillis() > nextInfoMillis)) { |
| nextInfoMillis = System.currentTimeMillis() + 30000; |
| LOG.info("barrierOnWorkerList: " + |
| finishedHostnameIdList.size() + |
| " out of " + workerInfoList.size() + |
| " workers finished on superstep " + |
| getSuperstep() + " on path " + finishedWorkerPath); |
| } |
| getContext().setStatus(getGraphMapper().getMapFunctions() + " - " + |
| finishedHostnameIdList.size() + |
| " finished out of " + |
| workerInfoList.size() + |
| " on superstep " + getSuperstep()); |
| if (finishedHostnameIdList.containsAll(hostnameIdList)) { |
| break; |
| } |
| |
| // Wait for a signal or no more than 60 seconds to progress |
| // or else will continue. |
| event.waitMsecs(60*1000); |
| event.reset(); |
| getContext().progress(); |
| |
| // Did a worker die? |
| try { |
| if ((getSuperstep() > 0) && |
| !superstepChosenWorkerAlive( |
| workerInfoHealthyPath, |
| workerInfoList)) { |
| return false; |
| } |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: KeeperException - " + |
| "Couldn't get " + workerInfoHealthyPath, e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "barrierOnWorkerList: InterruptedException - " + |
| "Couldn't get " + workerInfoHealthyPath, e); |
| } |
| } |
| |
| return true; |
| } |
| |
| |
| @Override |
| public SuperstepState coordinateSuperstep() throws |
| KeeperException, InterruptedException { |
| // 1. Get chosen workers and set up watches on them. |
| // 2. Assign partitions to the workers |
| // (possibly reloading from a superstep) |
| // 3. Wait for all workers to complete |
| // 4. Collect and process aggregators |
| // 5. Create superstep finished node |
| // 6. If the checkpoint frequency is met, finalize the checkpoint |
| List<WorkerInfo> chosenWorkerInfoList = checkWorkers(); |
| if (chosenWorkerInfoList == null) { |
| LOG.fatal("coordinateSuperstep: Not enough healthy workers for " + |
| "superstep " + getSuperstep()); |
| setJobState(ApplicationState.FAILED, -1, -1); |
| } else { |
| for (WorkerInfo workerInfo : chosenWorkerInfoList) { |
| String workerInfoHealthyPath = |
| getWorkerInfoHealthyPath(getApplicationAttempt(), |
| getSuperstep()) + "/" + |
| workerInfo.getHostnameId(); |
| if (getZkExt().exists(workerInfoHealthyPath, true) == null) { |
| LOG.warn("coordinateSuperstep: Chosen worker " + |
| workerInfoHealthyPath + |
| " is no longer valid, failing superstep"); |
| } |
| } |
| } |
| |
| currentWorkersCounter.increment(chosenWorkerInfoList.size() - |
| currentWorkersCounter.getValue()); |
| assignPartitionOwners(allPartitionStatsList, |
| chosenWorkerInfoList, |
| masterGraphPartitioner); |
| |
| if (getSuperstep() == INPUT_SUPERSTEP) { |
| // Coordinate the workers finishing sending their vertices to the |
| // correct workers and signal when everything is done. |
| if (!barrierOnWorkerList(INPUT_SPLIT_DONE_PATH, |
| chosenWorkerInfoList, |
| getInputSplitsDoneStateChangedEvent())) { |
| throw new IllegalStateException( |
| "coordinateSuperstep: Worker failed during input split " + |
| "(currently not supported)"); |
| } |
| try { |
| getZkExt().create(INPUT_SPLITS_ALL_DONE_PATH, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| } catch (KeeperException.NodeExistsException e) { |
| LOG.info("coordinateInputSplits: Node " + |
| INPUT_SPLITS_ALL_DONE_PATH + " already exists."); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "coordinateInputSplits: KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "coordinateInputSplits: IllegalStateException", e); |
| } |
| } |
| |
| String finishedWorkerPath = |
| getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()); |
| if (!barrierOnWorkerList(finishedWorkerPath, |
| chosenWorkerInfoList, |
| getSuperstepStateChangedEvent())) { |
| return SuperstepState.WORKER_FAILURE; |
| } |
| |
| collectAndProcessAggregatorValues(getSuperstep()); |
| GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); |
| |
| // Let everyone know the aggregated application state through the |
| // superstep finishing znode. |
| String superstepFinishedNode = |
| getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep()); |
| WritableUtils.writeToZnode( |
| getZkExt(), superstepFinishedNode, -1, globalStats); |
| vertexCounter.increment( |
| globalStats.getVertexCount() - |
| vertexCounter.getValue()); |
| finishedVertexCounter.increment( |
| globalStats.getFinishedVertexCount() - |
| finishedVertexCounter.getValue()); |
| edgeCounter.increment( |
| globalStats.getEdgeCount() - |
| edgeCounter.getValue()); |
| sentMessagesCounter.increment( |
| globalStats.getMessageCount() - |
| sentMessagesCounter.getValue()); |
| |
| // Finalize the valid checkpoint file prefixes and possibly |
| // the aggregators. |
| if (checkpointFrequencyMet(getSuperstep())) { |
| try { |
| finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "coordinateSuperstep: IOException on finalizing checkpoint", |
| e); |
| } |
| } |
| |
| // Clean up the old supersteps (always keep this one) |
| long removeableSuperstep = getSuperstep() - 1; |
| if ((getConfiguration().getBoolean( |
| GiraphJob.KEEP_ZOOKEEPER_DATA, |
| GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT) == false) && |
| (removeableSuperstep >= 0)) { |
| String oldSuperstepPath = |
| getSuperstepPath(getApplicationAttempt()) + "/" + |
| (removeableSuperstep); |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("coordinateSuperstep: Cleaning up old Superstep " + |
| oldSuperstepPath); |
| } |
| getZkExt().deleteExt(oldSuperstepPath, |
| -1, |
| true); |
| } catch (KeeperException.NoNodeException e) { |
| LOG.warn("coordinateBarrier: Already cleaned up " + |
| oldSuperstepPath); |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "coordinateSuperstep: KeeperException on " + |
| "finalizing checkpoint", e); |
| } |
| } |
| incrCachedSuperstep(); |
| // Counter starts at zero, so no need to increment |
| if (getSuperstep() > 0) { |
| superstepCounter.increment(1); |
| } |
| SuperstepState superstepState; |
| if ((globalStats.getFinishedVertexCount() == |
| globalStats.getVertexCount()) && |
| globalStats.getMessageCount() == 0) { |
| superstepState = SuperstepState.ALL_SUPERSTEPS_DONE; |
| } else { |
| superstepState = SuperstepState.THIS_SUPERSTEP_DONE; |
| } |
| try { |
| aggregatorWriter.writeAggregator(getAggregatorMap(), |
| (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ? |
| AggregatorWriter.LAST_SUPERSTEP : getSuperstep()); |
| } catch (IOException e) { |
| throw new IllegalStateException( |
| "coordinateSuperstep: IOException while " + |
| "writing aggregators data", e); |
| } |
| |
| return superstepState; |
| } |
| |
| /** |
| * Need to clean up ZooKeeper nicely. Make sure all the masters and workers |
| * have reported ending their ZooKeeper connections. |
| */ |
| private void cleanUpZooKeeper() { |
| try { |
| getZkExt().createExt(CLEANED_UP_PATH, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| } catch (KeeperException.NodeExistsException e) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanUpZooKeeper: Node " + CLEANED_UP_PATH + |
| " already exists, no need to create."); |
| } |
| } catch (KeeperException e) { |
| throw new IllegalStateException( |
| "cleanupZooKeeper: Got KeeperException", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException( |
| "cleanupZooKeeper: Got IllegalStateException", e); |
| } |
| // Need to wait for the number of workers and masters to complete |
| int maxTasks = BspInputFormat.getMaxTasks(getConfiguration()); |
| if ((getGraphMapper().getMapFunctions() == MapFunctions.ALL) || |
| (getGraphMapper().getMapFunctions() == |
| MapFunctions.ALL_EXCEPT_ZOOKEEPER)) { |
| maxTasks *= 2; |
| } |
| List<String> cleanedUpChildrenList = null; |
| while (true) { |
| try { |
| cleanedUpChildrenList = |
| getZkExt().getChildrenExt( |
| CLEANED_UP_PATH, true, false, true); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanUpZooKeeper: Got " + |
| cleanedUpChildrenList.size() + " of " + |
| maxTasks + " desired children from " + |
| CLEANED_UP_PATH); |
| } |
| if (cleanedUpChildrenList.size() == maxTasks) { |
| break; |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanedUpZooKeeper: Waiting for the " + |
| "children of " + CLEANED_UP_PATH + |
| " to change since only got " + |
| cleanedUpChildrenList.size() + " nodes."); |
| } |
| } |
| catch (Exception e) { |
| // We are in the cleanup phase -- just log the error |
| LOG.error("cleanUpZooKeeper: Got exception, but will continue", |
| e); |
| return; |
| } |
| |
| getCleanedUpChildrenChangedEvent().waitForever(); |
| getCleanedUpChildrenChangedEvent().reset(); |
| } |
| |
| // At this point, all processes have acknowledged the cleanup, |
| // and the master can do any final cleanup |
| try { |
| if (getConfiguration().getBoolean( |
| GiraphJob.KEEP_ZOOKEEPER_DATA, |
| GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT) == false) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanupZooKeeper: Removing the following path " + |
| "and all children - " + BASE_PATH); |
| } |
| getZkExt().deleteExt(BASE_PATH, -1, true); |
| } |
| } catch (Exception e) { |
| LOG.error("cleanupZooKeeper: Failed to do cleanup of " + |
| BASE_PATH, e); |
| } |
| } |
| |
| @Override |
| public void cleanup() throws IOException { |
| // All master processes should denote they are done by adding special |
| // znode. Once the number of znodes equals the number of partitions |
| // for workers and masters, the master will clean up the ZooKeeper |
| // znodes associated with this job. |
| String cleanedUpPath = CLEANED_UP_PATH + "/" + |
| getTaskPartition() + MASTER_SUFFIX; |
| try { |
| String finalFinishedPath = |
| getZkExt().createExt(cleanedUpPath, |
| null, |
| Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT, |
| true); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanup: Notifying master its okay to cleanup with " + |
| finalFinishedPath); |
| } |
| } catch (KeeperException.NodeExistsException e) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanup: Couldn't create finished node '" + |
| cleanedUpPath); |
| } |
| } catch (KeeperException e) { |
| LOG.error("cleanup: Got KeeperException, continuing", e); |
| } catch (InterruptedException e) { |
| LOG.error("cleanup: Got InterruptedException, continuing", e); |
| } |
| |
| if (isMaster) { |
| cleanUpZooKeeper(); |
| // If desired, cleanup the checkpoint directory |
| if (getConfiguration().getBoolean( |
| GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, |
| GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) { |
| boolean success = |
| getFs().delete(new Path(CHECKPOINT_BASE_PATH), true); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("cleanup: Removed HDFS checkpoint directory (" + |
| CHECKPOINT_BASE_PATH + ") with return = " + |
| success + " since this job succeeded "); |
| } |
| } |
| aggregatorWriter.close(); |
| } |
| |
| try { |
| getZkExt().close(); |
| } catch (InterruptedException e) { |
| // cleanup phase -- just log the error |
| LOG.error("cleanup: Zookeeper failed to close", e); |
| } |
| } |
| |
| /** |
| * Event that the master watches that denotes if a worker has done something |
| * that changes the state of a superstep (either a worker completed or died) |
| * |
| * @return Event that denotes a superstep state change |
| */ |
| final public BspEvent getSuperstepStateChangedEvent() { |
| return superstepStateChanged; |
| } |
| |
| /** |
| * Should this worker failure cause the current superstep to fail? |
| * |
| * @param failedWorkerPath Full path to the failed worker |
| */ |
| final private void checkHealthyWorkerFailure(String failedWorkerPath) { |
| if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) { |
| return; |
| } |
| |
| Collection<PartitionOwner> partitionOwners = |
| masterGraphPartitioner.getCurrentPartitionOwners(); |
| String hostnameId = |
| getHealthyHostnameIdFromPath(failedWorkerPath); |
| for (PartitionOwner partitionOwner : partitionOwners) { |
| WorkerInfo workerInfo = partitionOwner.getWorkerInfo(); |
| WorkerInfo previousWorkerInfo = |
| partitionOwner.getPreviousWorkerInfo(); |
| if (workerInfo.getHostnameId().equals(hostnameId) || |
| ((previousWorkerInfo != null) && |
| previousWorkerInfo.getHostnameId().equals(hostnameId))) { |
| LOG.warn("checkHealthyWorkerFailure: " + |
| "at least one healthy worker went down " + |
| "for superstep " + getSuperstep() + " - " + |
| hostnameId + ", will try to restart from " + |
| "checkpointed superstep " + |
| lastCheckpointedSuperstep); |
| superstepStateChanged.signal(); |
| } |
| } |
| } |
| |
| @Override |
| public boolean processEvent(WatchedEvent event) { |
| boolean foundEvent = false; |
| if (event.getPath().contains(WORKER_HEALTHY_DIR) && |
| (event.getType() == EventType.NodeDeleted)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("processEvent: Healthy worker died (node deleted) " + |
| "in " + event.getPath()); |
| } |
| checkHealthyWorkerFailure(event.getPath()); |
| superstepStateChanged.signal(); |
| foundEvent = true; |
| } else if (event.getPath().contains(WORKER_FINISHED_DIR) && |
| event.getType() == EventType.NodeChildrenChanged) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("processEvent: Worker finished (node change) " + |
| "event - superstepStateChanged signaled"); |
| } |
| superstepStateChanged.signal(); |
| foundEvent = true; |
| } |
| |
| return foundEvent; |
| } |
| } |