blob: 002dc85e7def91fa4110346ff3a719bc8f19e22d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc.policy;
import com.sun.management.GarbageCollectionNotificationInfo;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.command.StorePartitionIOCommand;
import org.apache.log4j.Logger;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkState;
/** Oracle for fixed out-of-core mechanism */
public class FixedPartitionsOracle implements OutOfCoreOracle {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(FixedPartitionsOracle.class);
/** Maximum number of partitions to be kept in memory */
private final int maxPartitionsInMemory;
/**
* Number of partitions to be added (loaded) or removed (stored) to/from
* memory. Each outstanding load partition counts +1 and each outstanding
* store partition counts -1 toward this counter.
*/
private final AtomicInteger deltaNumPartitionsInMemory =
new AtomicInteger(0);
/** Out-of-core engine */
private final OutOfCoreEngine oocEngine;
/**
* Constructor
*
* @param conf configuration
* @param oocEngine out-of-core engine
*/
public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
OutOfCoreEngine oocEngine) {
this.maxPartitionsInMemory =
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
this.oocEngine = oocEngine;
}
@Override
public IOAction[] getNextIOActions() {
int numPartitionsInMemory =
oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
int numPartialPartitionsInMemory =
oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
if (LOG.isDebugEnabled()) {
LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
" partitions entirely in memory and " + numPartialPartitionsInMemory +
" partitions partially in memory, " +
deltaNumPartitionsInMemory.get() + " to be loaded");
}
checkState(numPartitionsInMemory >= 0);
checkState(numPartialPartitionsInMemory >= 0);
int numPartitions =
numPartitionsInMemory + deltaNumPartitionsInMemory.get();
// Fixed out-of-core policy:
// - if the number of partitions in memory is less than the max number of
// partitions in memory, we should load a partition to memory. This
// basically means we are prefetching partition's data either for the
// current superstep, or for the next superstep.
// - if the number of partitions in memory is equal to the the max number
// of partitions in memory, we do a 'soft store', meaning, we store
// processed partition to disk only if there is an unprocessed partition
// on disk. This basically makes room for unprocessed partitions on disk
// to be prefetched.
// - if the number of partitions in memory is more than the max number of
// partitions in memory, we do a 'hard store', meaning we store a
// partition to disk, regardless of its processing state.
if (numPartitions < maxPartitionsInMemory) {
return new IOAction[]{
IOAction.LOAD_PARTITION,
IOAction.STORE_MESSAGES_AND_BUFFERS};
} else if (numPartitions > maxPartitionsInMemory) {
LOG.warn("getNextIOActions: number of partitions in memory passed the " +
"specified threshold!");
return new IOAction[]{
IOAction.STORE_PARTITION,
IOAction.STORE_MESSAGES_AND_BUFFERS};
} else {
return new IOAction[]{
IOAction.STORE_MESSAGES_AND_BUFFERS,
IOAction.LOAD_TO_SWAP_PARTITION};
}
}
@Override
public boolean approve(IOCommand command) {
int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
.getNumInMemoryPartitions();
// If loading a partition result in having more partition in memory, the
// command should be denied. Also, if number of partitions in memory is
// already less than the max number of partitions, any command for storing
// a partition should be denied.
if (command instanceof LoadPartitionIOCommand &&
numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
maxPartitionsInMemory) {
deltaNumPartitionsInMemory.getAndDecrement();
return false;
} else if (command instanceof StorePartitionIOCommand &&
numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
maxPartitionsInMemory) {
deltaNumPartitionsInMemory.getAndIncrement();
return false;
}
return true;
}
@Override
public void commandCompleted(IOCommand command) {
if (command instanceof LoadPartitionIOCommand) {
deltaNumPartitionsInMemory.getAndDecrement();
} else if (command instanceof StorePartitionIOCommand) {
deltaNumPartitionsInMemory.getAndIncrement();
}
}
@Override
public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
@Override
public void shutdown() { }
}