blob: 3dc1019c2969a0af0614cb23d3c42ea7fba094c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
import org.apache.giraph.ooc.command.StorePartitionIOCommand;
import org.apache.giraph.ooc.command.WaitIOCommand;
import org.apache.giraph.ooc.policy.OutOfCoreOracle;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Representation of IO thread scheduler for out-of-core mechanism
*/
public class OutOfCoreIOScheduler {
/**
* If an IO thread does not have any command to do, it waits for certain a
* period and check back again to see if there exist any command to perform.
* This constant determines this wait period in milliseconds.
*/
public static final IntConfOption OOC_WAIT_INTERVAL =
new IntConfOption("giraph.oocWaitInterval", 1000,
"Duration (in milliseconds) which IO threads in out-of-core " +
"mechanism would wait until a command becomes available");
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(OutOfCoreIOScheduler.class);
/** Out-of-core engine */
private final OutOfCoreEngine oocEngine;
/** How much an IO thread should wait if there is no IO command */
private final int waitInterval;
/**
* Queue of IO commands for loading partitions to memory. Load commands are
* urgent and should be done once loading data is a viable IO command.
*/
private final List<Queue<IOCommand>> threadLoadCommandQueue;
/** Whether IO threads should terminate */
private volatile boolean shouldTerminate;
/**
* Constructor
*
* @param conf configuration
* @param oocEngine out-of-core engine
* @param numDisks number of disks (IO threads)
*/
OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
OutOfCoreEngine oocEngine, int numDisks) {
this.oocEngine = oocEngine;
this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
threadLoadCommandQueue = new ArrayList<>(numDisks);
for (int i = 0; i < numDisks; ++i) {
threadLoadCommandQueue.add(
new ConcurrentLinkedQueue<IOCommand>());
}
shouldTerminate = false;
}
/**
* Generate and return the next appropriate IO command for a given thread
*
* @param threadId id of the thread ready to execute the next IO command
* @return next IO command to be executed by the given thread
*/
public IOCommand getNextIOCommand(int threadId) {
if (shouldTerminate) {
return null;
}
IOCommand command = null;
do {
if (command != null && LOG.isInfoEnabled()) {
LOG.info("getNextIOCommand: command " + command + " was proposed to " +
"the oracle, but got denied. Generating another command!");
}
OutOfCoreOracle.IOAction[] actions =
oocEngine.getOracle().getNextIOActions();
if (LOG.isDebugEnabled()) {
LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
}
// Check whether there are any urgent outstanding load requests
if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
// Check whether loading a partition is a viable (allowed) action to do
boolean canLoad = false;
for (OutOfCoreOracle.IOAction action : actions) {
if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION ||
action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION ||
action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION ||
action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) {
canLoad = true;
break;
}
}
if (canLoad) {
command = threadLoadCommandQueue.get(threadId).poll();
checkNotNull(command);
if (oocEngine.getOracle().approve(command)) {
return command;
} else {
// Loading is not viable at this moment. We should put the command
// back in the load queue and wait until loading becomes viable.
threadLoadCommandQueue.get(threadId).offer(command);
}
}
}
command = null;
for (OutOfCoreOracle.IOAction action : actions) {
Integer partitionId;
switch (action) {
case STORE_MESSAGES_AND_BUFFERS:
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadPartitionBufferId(threadId);
if (partitionId != null) {
command = new StoreDataBufferIOCommand(oocEngine, partitionId,
StoreDataBufferIOCommand.DataBufferType.PARTITION);
} else {
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadMessageBufferId(threadId);
if (partitionId != null) {
command = new StoreDataBufferIOCommand(oocEngine, partitionId,
StoreDataBufferIOCommand.DataBufferType.MESSAGE);
} else {
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadMessageId(threadId);
if (partitionId != null) {
command = new StoreIncomingMessageIOCommand(oocEngine,
partitionId);
}
}
}
break;
case STORE_PROCESSED_PARTITION:
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadPartitionId(threadId);
if (partitionId != null &&
oocEngine.getMetaPartitionManager()
.isPartitionProcessed(partitionId)) {
command = new StorePartitionIOCommand(oocEngine, partitionId);
}
break;
case STORE_PARTITION:
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadPartitionId(threadId);
if (partitionId != null) {
command = new StorePartitionIOCommand(oocEngine, partitionId);
}
break;
case LOAD_UNPROCESSED_PARTITION:
partitionId = oocEngine.getMetaPartitionManager()
.getLoadPartitionId(threadId);
if (partitionId != null &&
!oocEngine.getMetaPartitionManager()
.isPartitionProcessed(partitionId)) {
command = new LoadPartitionIOCommand(oocEngine, partitionId,
oocEngine.getSuperstep());
}
break;
case LOAD_TO_SWAP_PARTITION:
partitionId = oocEngine.getMetaPartitionManager()
.getLoadPartitionId(threadId);
if (partitionId != null &&
!oocEngine.getMetaPartitionManager()
.isPartitionProcessed(partitionId) &&
oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) {
command = new LoadPartitionIOCommand(oocEngine, partitionId,
oocEngine.getSuperstep());
}
break;
case LOAD_PARTITION:
partitionId = oocEngine.getMetaPartitionManager()
.getLoadPartitionId(threadId);
if (partitionId != null) {
if (oocEngine.getMetaPartitionManager()
.isPartitionProcessed(partitionId)) {
command = new LoadPartitionIOCommand(oocEngine, partitionId,
oocEngine.getSuperstep() + 1);
} else {
command = new LoadPartitionIOCommand(oocEngine, partitionId,
oocEngine.getSuperstep());
}
}
break;
case URGENT_LOAD_PARTITION:
// Do nothing
break;
default:
throw new IllegalStateException("getNextIOCommand: the IO action " +
"is not defined!");
}
if (command != null) {
break;
}
}
if (command == null) {
command = new WaitIOCommand(oocEngine, waitInterval);
}
} while (!oocEngine.getOracle().approve(command));
return command;
}
/**
* Notify IO scheduler that the IO command is completed
*
* @param command completed command
*/
public void ioCommandCompleted(IOCommand command) {
oocEngine.ioCommandCompleted(command);
}
/**
* Add an IO command to the scheduling queue of the IO scheduler
*
* @param ioCommand IO command to add to the scheduler
*/
public void addIOCommand(IOCommand ioCommand) {
if (ioCommand instanceof LoadPartitionIOCommand) {
int ownerThread = oocEngine.getMetaPartitionManager()
.getOwnerThreadId(ioCommand.getPartitionId());
threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
} else {
throw new IllegalStateException("addIOCommand: IO command type is not " +
"supported for addition");
}
}
/**
* Shutdown/Terminate the IO scheduler, and notify all IO threads to halt
*/
public void shutdown() {
shouldTerminate = true;
if (LOG.isInfoEnabled()) {
LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
}
}
}