blob: 31874685aef9a971adbb83d7bce995447b83d7d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.yammer.metrics.core.Gauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.data.MetaPartitionManager;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
import org.apache.giraph.ooc.policy.OutOfCoreOracle;
import org.apache.giraph.utils.AdjustableSemaphore;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.log4j.Logger;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.base.Preconditions.checkState;
/**
* Class to represent an out-of-core engine.
*/
public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
/**
* Number of 'units of processing' after which an active thread should
* check-in with the out-of-core engine in order to re-claim its permission to
* stay active. For a compute thread, the 'unit of processing' is processing
* of one vertex, and for an input thread, the 'unit of processing' is reading
* a row of input data.
*/
public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
/** Name of metric for percentage of graph on disk */
public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
/** Class logger. */
private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
/**
* When getting partitions, how many milliseconds to wait if no partition was
* available in memory
*/
private static final long MSEC_TO_WAIT = 10000;
/** Service worker */
private final CentralizedServiceWorker<?, ?, ?> service;
/** Flow control used in sending requests */
private FlowControl flowControl;
/** Scheduler for IO threads */
private final OutOfCoreIOScheduler ioScheduler;
/** Data structure to keep meta partition information */
private final MetaPartitionManager metaPartitionManager;
/** Out-of-core oracle (brain of out-of-core mechanism) */
private final OutOfCoreOracle oracle;
/**
* Whether the job should fail due to IO threads terminating because of
* exceptions
*/
private volatile boolean jobFailed = false;
/** IO statistics collector */
private final OutOfCoreIOStatistics statistics;
/**
* Global lock for entire superstep. This lock helps to avoid overlapping of
* out-of-core decisions (what to do next to help the out-of-core mechanism)
* with out-of-core operations (actual IO operations).
*/
private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
/** Data accessor object (DAO) used as persistence layer in out-of-core */
private final OutOfCoreDataAccessor dataAccessor;
/** Callable factory for IO threads */
private final OutOfCoreIOCallableFactory oocIOCallableFactory;
/**
* Dummy object to wait on until a partition becomes available in memory
* for processing
*/
private final Object partitionAvailable = new Object();
/** How many compute threads do we have? */
private int numComputeThreads;
/** How many threads (input/compute) are processing data? */
private volatile int numProcessingThreads;
/** Semaphore used for controlling number of active threads at each moment */
private final AdjustableSemaphore activeThreadsPermit;
/**
* Generally, the logic in Giraph for change of the superstep happens in the
* following order:
* (1) Compute threads are done processing all partitions
* (2) Superstep number increases
* (3) New message store is created and message stores are prepared
* (4) Iteration over partitions starts
* Note that there are other operations happening at the same time as well as
* the above operations, but the above operations are the ones which may
* interfere with out-of-core operations. The goal of `superstepLock` is to
* isolate operations 2, 3, and 4 from the rest of computations and IO
* operations. Specifically, increasing the superstep counter (operation 2)
* should be exclusive and no IO operation should happen at the same time.
* This is due to the fact that prefetching mechanism uses superstep counter
* as a mean to identify which data should be read. That being said, superstep
* counter should be cached in out-of-core engine, and all IO operations and
* out-of-core logic should access superstep counter through this cached
* value.
*/
private long superstep;
/**
* Generally, the logic of a graph computations happens in the following order
* with respect to `startIteration` and `reset` method:
* ...
* startIteration (for moving edges)
* ...
* reset (to prepare messages/partitions for superstep 0)
* ...
* startIteration (superstep 0)
* ...
* reset (to prepare messages/partitions for superstep 1)
* ...
*
* However, in the unit tests, we usually consider only one superstep (usually
* INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-
* core mechanism works only if partitions are reset in a proper way. So,
* we keep the following flag to reset partitions if necessary.
*/
private boolean resetDone;
/**
* Constructor
*
* @param conf Configuration
* @param service Service worker
*/
public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
CentralizedServiceWorker<?, ?, ?> service) {
this.service = service;
Class<? extends OutOfCoreDataAccessor> accessorClass =
GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
try {
Constructor<?> constructor = accessorClass.getConstructor(
ImmutableClassesGiraphConfiguration.class);
this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
} catch (NoSuchMethodException | InstantiationException |
InvocationTargetException | IllegalAccessException e) {
throw new IllegalStateException("OutOfCoreEngine: caught exception " +
"while creating the data accessor instance!", e);
}
int numIOThreads = dataAccessor.getNumAccessorThreads();
this.oocIOCallableFactory =
new OutOfCoreIOCallableFactory(this, numIOThreads);
this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
int maxPartitionsInMemory =
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
Class<? extends OutOfCoreOracle> oracleClass =
GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
if (maxPartitionsInMemory != 0 &&
oracleClass != FixedPartitionsOracle.class) {
LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
"but the out-of-core oracle used is not tailored for fixed " +
"out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
oracleClass = FixedPartitionsOracle.class;
}
try {
Constructor<?> constructor = oracleClass.getConstructor(
ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
} catch (NoSuchMethodException | IllegalAccessException |
InstantiationException | InvocationTargetException e) {
throw new IllegalStateException("OutOfCoreEngine: caught exception " +
"while creating the oracle!", e);
}
this.numComputeThreads = conf.getNumComputeThreads();
// At the beginning of the execution, only input threads are processing data
this.numProcessingThreads = conf.getNumInputSplitsThreads();
this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
this.superstep = BspService.INPUT_SUPERSTEP;
this.resetDone = false;
GiraphMetrics.get().addSuperstepResetObserver(this);
}
/**
* Initialize/Start the out-of-core engine.
*/
public void initialize() {
dataAccessor.initialize();
oocIOCallableFactory.createCallable();
}
/**
* Shutdown/Stop the out-of-core engine.
*/
public void shutdown() {
if (LOG.isInfoEnabled()) {
LOG.info("shutdown: out-of-core engine shutting down, signalling IO " +
"threads to shutdown");
}
ioScheduler.shutdown();
oocIOCallableFactory.shutdown();
dataAccessor.shutdown();
}
/**
* Get a reference to the server data
*
* @return ServerData
*/
public ServerData getServerData() {
return service.getServerData();
}
/**
* Get a reference to the service worker
*
* @return CentralizedServiceWorker
*/
public CentralizedServiceWorker getServiceWorker() {
return service;
}
/**
* Get a reference to IO scheduler
*
* @return OutOfCoreIOScheduler
*/
public OutOfCoreIOScheduler getIOScheduler() {
return ioScheduler;
}
/**
* Get a reference to meta partition information
*
* @return MetaPartitionManager
*/
public MetaPartitionManager getMetaPartitionManager() {
return metaPartitionManager;
}
/**
* Get a reference to superstep lock
*
* @return read/write lock used for global superstep lock
*/
public ReadWriteLock getSuperstepLock() {
return superstepLock;
}
/**
* Get a reference to IO statistics collector
*
* @return IO statistics collector
*/
public OutOfCoreIOStatistics getIOStatistics() {
return statistics;
}
/**
* Get a reference to out-of-core oracle
*
* @return out-of-core oracle
*/
public OutOfCoreOracle getOracle() {
return oracle;
}
/**
* Get the id of the next partition to process in the current iteration over
* all the partitions. If all partitions are already processed, this method
* returns null.
*
* @return id of a partition to process. 'null' if all partitions are
* processed in current iteration over partitions.
*/
public Integer getNextPartition() {
Integer partitionId;
synchronized (partitionAvailable) {
while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
try {
if (LOG.isInfoEnabled()) {
LOG.info("getNextPartition: waiting until a partition becomes " +
"available!");
}
partitionAvailable.wait(MSEC_TO_WAIT);
} catch (InterruptedException e) {
throw new IllegalStateException("getNextPartition: caught " +
"InterruptedException while waiting to retrieve a partition to " +
"process");
}
if (jobFailed) {
throw new RuntimeException("Job Failed due to a failure in an " +
"out-of-core IO thread!");
}
}
if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
partitionAvailable.notifyAll();
partitionId = null;
}
}
return partitionId;
}
/**
* Notify out-of-core engine that processing of a particular partition is done
*
* @param partitionId id of the partition that its processing is done
*/
public void doneProcessingPartition(int partitionId) {
metaPartitionManager.setPartitionIsProcessed(partitionId);
if (LOG.isInfoEnabled()) {
LOG.info("doneProcessingPartition: processing partition " + partitionId +
" is done!");
}
}
/**
* Notify out-of-core engine that iteration cycle over all partitions is about
* to begin.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
"UL_UNRELEASED_LOCK_EXCEPTION_PATH")
public void startIteration() {
if (!resetDone) {
superstepLock.writeLock().lock();
metaPartitionManager.resetPartitions();
superstepLock.writeLock().unlock();
}
if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
numProcessingThreads != numComputeThreads) {
// This method is only executed by the main thread, and at this point
// no other input/compute thread is alive. So, all the permits in
// `activeThreadsPermit` is available. However, now that we are changing
// the maximum number of active threads, we need to adjust the number
// of available permits on `activeThreadsPermit`.
activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() *
numComputeThreads / numProcessingThreads);
numProcessingThreads = numComputeThreads;
}
if (LOG.isInfoEnabled()) {
LOG.info("startIteration: with " +
metaPartitionManager.getNumInMemoryPartitions() +
" partitions in memory and " +
activeThreadsPermit.availablePermits() + " active threads");
}
resetDone = false;
}
/**
* Retrieve a particular partition. After this method is complete the
* requested partition should be in memory.
*
* @param partitionId id of the partition to retrieve
*/
public void retrievePartition(int partitionId) {
if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
superstep));
synchronized (partitionAvailable) {
while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
try {
if (LOG.isInfoEnabled()) {
LOG.info("retrievePartition: waiting until partition " +
partitionId + " becomes available");
}
partitionAvailable.wait();
} catch (InterruptedException e) {
throw new IllegalStateException("retrievePartition: caught " +
"InterruptedException while waiting to retrieve partition " +
partitionId);
}
}
}
}
}
/**
* Notify out-of-core engine that an IO command is completed by an IO thread
*
* @param command the IO command that is completed
*/
public void ioCommandCompleted(IOCommand command) {
oracle.commandCompleted(command);
if (command instanceof LoadPartitionIOCommand) {
// Notifying compute threads who are waiting for a partition to become
// available in memory to process.
synchronized (partitionAvailable) {
partitionAvailable.notifyAll();
}
}
}
/**
* Set a flag to fail the job.
*/
public void failTheJob() {
jobFailed = true;
}
/**
* Update the fraction of processing threads that should remain active. It is
* the responsibility of out-of-core oracle to update the number of active
* threads.
*
* @param fraction the fraction of processing threads to remain active. This
* number is in range [0, 1]
*/
public void updateActiveThreadsFraction(double fraction) {
checkState(fraction >= 0 && fraction <= 1);
int numActiveThreads = (int) (numProcessingThreads * fraction);
if (LOG.isInfoEnabled()) {
LOG.info("updateActiveThreadsFraction: updating the number of active " +
"threads to " + numActiveThreads);
}
activeThreadsPermit.setMaxPermits(numActiveThreads);
}
/**
* A processing thread would check in with out-of-core engine every once in a
* while to make sure that it can still remain active. It is the
* responsibility of the out-of-core oracle to update the number of active
* threads in a way that the computation never fails, and yet achieve the
* optimal performance it can achieve.
*/
public void activeThreadCheckIn() {
activeThreadsPermit.release();
try {
activeThreadsPermit.acquire();
} catch (InterruptedException e) {
LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
"remain an active thread");
throw new IllegalStateException(e);
}
}
/**
* Notify the out-of-core engine that a processing (input/compute) thread has
* started.
*/
public void processingThreadStart() {
try {
activeThreadsPermit.acquire();
} catch (InterruptedException e) {
LOG.error("processingThreadStart: exception while acquiring a permit to" +
" start the processing thread!");
throw new IllegalStateException(e);
}
}
/**
* Notify the out-of-core engine that a processing (input/compute) thread has
* finished.
*/
public void processingThreadFinish() {
activeThreadsPermit.release();
}
/**
* Reset partitions and messages meta data. Also, reset the cached value of
* superstep counter.
*/
public void reset() {
metaPartitionManager.resetPartitions();
metaPartitionManager.resetMessages();
superstep = service.getSuperstep();
resetDone = true;
}
/**
* @return cached value of the superstep counter
*/
public long getSuperstep() {
return superstep;
}
/**
* Notify the out-of-core engine that a GC has just been completed
*
* @param info GC information
*/
public void gcCompleted(GarbageCollectionNotificationInfo info) {
oracle.gcCompleted(info);
}
@Override
public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
@Override
public Double value() {
return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
}
});
}
public FlowControl getFlowControl() {
return flowControl;
}
public void setFlowControl(FlowControl flowControl) {
this.flowControl = flowControl;
}
public OutOfCoreDataAccessor getDataAccessor() {
return dataAccessor;
}
}