blob: d054be33c3806b33971deb08d4d560bfadbe9f84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.worker;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.requests.AskForInputSplitRequest;
import org.apache.giraph.io.InputType;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Requests splits from master and keeps track of them
*/
public class WorkerInputSplitsHandler {
/** Worker info of this worker */
private final WorkerInfo workerInfo;
/** Task id of master */
private final int masterTaskId;
/** Worker client, used for communication */
private final WorkerClient workerClient;
/** Map with currently available splits received from master */
private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
/**
* Constructor
*
* @param workerInfo Worker info of this worker
* @param masterTaskId Task id of master
* @param workerClient Worker client, used for communication
*/
public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
WorkerClient workerClient) {
this.workerInfo = workerInfo;
this.masterTaskId = masterTaskId;
this.workerClient = workerClient;
availableInputSplits = new EnumMap<>(InputType.class);
for (InputType inputType : InputType.values()) {
availableInputSplits.put(
inputType, new LinkedBlockingQueue<byte[]>());
}
}
/**
* Called when an input split has been received from master, adding it to
* the map
*
* @param splitType Type of split
* @param serializedInputSplit Split
*/
public void receivedInputSplit(InputType splitType,
byte[] serializedInputSplit) {
try {
availableInputSplits.get(splitType).put(serializedInputSplit);
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted", e);
}
}
/**
* Try to reserve an InputSplit for loading. While InputSplits exists that
* are not finished, wait until they are.
*
* NOTE: iterations on the InputSplit list only halt for each worker when it
* has scanned the entire list once and found every split marked RESERVED.
* When a worker fails, its Ephemeral RESERVED znodes will disappear,
* allowing other iterating workers to claim it's previously read splits.
* Only when the last worker left iterating on the list fails can a danger
* of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
* causes job failure, this is OK. As the failure model evolves, this
* behavior might need to change. We could add watches on
* inputSplitFinishedNodes and stop iterating only when all these nodes
* have been created.
*
* @param splitType Type of split
* @param isFirstSplit Whether this is the first split input thread reads
* @return reserved InputSplit or null if no unfinished InputSplits exist
*/
public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) {
// Send request
workerClient.sendWritableRequest(masterTaskId,
new AskForInputSplitRequest(
splitType, workerInfo.getTaskId(), isFirstSplit));
try {
// Wait for some split to become available
byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
return serializedInputSplit.length == 0 ? null : serializedInputSplit;
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted", e);
}
}
}