| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.ooc.data; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.lang3.tuple.MutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
| import org.apache.giraph.conf.IntConfOption; |
| import org.apache.giraph.ooc.OutOfCoreEngine; |
| import org.apache.giraph.ooc.persistence.DataIndex; |
| import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry; |
| import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; |
| import org.apache.log4j.Logger; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static org.apache.giraph.conf.GiraphConstants.ONE_MB; |
| |
| /** |
| * This class provides basic operations for data structures that have to |
| * participate in out-of-core mechanism. Essential subclasses of this class are: |
| * - DiskBackedPartitionStore (for partition data) |
| * - DiskBackedMessageStore (for messages) |
| * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP) |
| * Basically, any data structure that may cause OOM to happen can be implemented |
| * as a subclass of this class. |
| * |
| * There are two different terms used in the rest of this class: |
| * - "data store" refers to in-memory representation of data. Usually this is |
| * stored per-partition in in-memory implementations of data structures. For |
| * instance, "data store" of a DiskBackedPartitionStore would collection of |
| * all partitions kept in the in-memory partition store within the |
| * DiskBackedPartitionStore. |
| * - "raw data buffer" refers to raw data which were supposed to be |
| * de-serialized and added to the data store, but they remain 'as is' in the |
| * memory because their corresponding partition is offloaded to disk and is |
| * not available in the data store. |
| * |
| * @param <T> raw data format of the data store subclassing this class |
| */ |
| public abstract class DiskBackedDataStore<T> { |
| /** |
| * Minimum size of a buffer (in bytes) to flush to disk. This is used to |
| * decide whether vertex/edge buffers are large enough to flush to disk. |
| */ |
| public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = |
| new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, |
| "Minimum size of a buffer (in bytes) to flush to disk."); |
| |
| /** Class logger. */ |
| private static final Logger LOG = Logger.getLogger( |
| DiskBackedDataStore.class); |
| /** Out-of-core engine */ |
| protected final OutOfCoreEngine oocEngine; |
| /** |
| * Set containing ids of all partitions where the partition data is in some |
| * file on disk. |
| * Note that the out-of-core mechanism may decide to put the data for a |
| * partition on disk, while the partition data is empty. For instance, at the |
| * beginning of a superstep, out-of-core mechanism may decide to put incoming |
| * messages of a partition on disk, while the partition has not received any |
| * messages. In such scenarios, the "out-of-core mechanism" thinks that the |
| * partition data is on disk, while disk-backed data stores may want to |
| * optimize for IO/metadata accesses and decide not to create/write anything |
| * on files on disk. |
| * In summary, there is a subtle difference between this field and |
| * `hasPartitionOnDisk` field. Basically, this field is used for optimizing |
| * IO (mainly metadata) accesses by disk-backed stores, while |
| * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has |
| * regarding partition storage statuses. Since out-of-core mechanism does not |
| * know about the actual data for a partition, these two fields have to be |
| * separate. |
| */ |
| protected final Set<Integer> hasPartitionDataOnFile = |
| Sets.newConcurrentHashSet(); |
| /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ |
| private final int minBufferSizeToOffload; |
| /** Set containing ids of all out-of-core partitions */ |
| private final Set<Integer> hasPartitionDataOnDisk = |
| Sets.newConcurrentHashSet(); |
| /** |
| * Map of partition ids to list of raw data buffers. The map will have entries |
| * only for partitions that their in-memory data structures are currently |
| * offloaded to disk. We keep the aggregate size of buffers for each partition |
| * as part of the values in the map to estimate how much memory we can free up |
| * if we offload data buffers of a particular partition to disk. |
| */ |
| private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = |
| Maps.newConcurrentMap(); |
| /** |
| * Map of partition ids to number of raw data buffers offloaded to disk for |
| * each partition. The map will have entries only for partitions that their |
| * in-memory data structures are currently out of core. It is necessary to |
| * know the number of data buffers on disk for a particular partition when we |
| * are loading all these buffers back in memory. |
| */ |
| private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = |
| Maps.newConcurrentMap(); |
| /** |
| * Lock to avoid overlapping of read and write on data associated with each |
| * partition. |
| * */ |
| private final ConcurrentMap<Integer, ReadWriteLock> locks = |
| Maps.newConcurrentMap(); |
| |
| /** |
| * Constructor. |
| * |
| * @param conf Configuration |
| * @param oocEngine Out-of-core engine |
| */ |
| DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf, |
| OutOfCoreEngine oocEngine) { |
| this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); |
| this.oocEngine = oocEngine; |
| } |
| |
| /** |
| * Retrieves a lock for a given partition. If the lock for the given partition |
| * does not exist, creates a new lock. |
| * |
| * @param partitionId id of the partition the lock is needed for |
| * @return lock for a given partition |
| */ |
| private ReadWriteLock getPartitionLock(int partitionId) { |
| ReadWriteLock readWriteLock = locks.get(partitionId); |
| if (readWriteLock == null) { |
| readWriteLock = new ReentrantReadWriteLock(); |
| ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock); |
| if (temp != null) { |
| readWriteLock = temp; |
| } |
| } |
| return readWriteLock; |
| } |
| |
| /** |
| * Adds a data entry for a given partition to the current data store. If data |
| * of a given partition in data store is already offloaded to disk, adds the |
| * data entry to appropriate raw data buffer list. |
| * |
| * @param partitionId id of the partition to add the data entry to |
| * @param entry data entry to add |
| */ |
| protected void addEntry(int partitionId, T entry) { |
| // Addition of data entries to a data store is much more common than |
| // out-of-core operations. Besides, in-memory data store implementations |
| // existing in the code base already account for parallel addition to data |
| // stores. Therefore, using read lock would optimize for parallel addition |
| // to data stores, specially for cases where the addition should happen for |
| // partitions that are entirely in memory. |
| ReadWriteLock rwLock = getPartitionLock(partitionId); |
| rwLock.readLock().lock(); |
| if (hasPartitionDataOnDisk.contains(partitionId)) { |
| List<T> entryList = new ArrayList<>(); |
| entryList.add(entry); |
| int entrySize = entrySerializedSize(entry); |
| MutablePair<Integer, List<T>> newPair = |
| new MutablePair<>(entrySize, entryList); |
| Pair<Integer, List<T>> oldPair = |
| dataBuffers.putIfAbsent(partitionId, newPair); |
| if (oldPair != null) { |
| synchronized (oldPair) { |
| newPair = (MutablePair<Integer, List<T>>) oldPair; |
| newPair.setLeft(oldPair.getLeft() + entrySize); |
| newPair.getRight().add(entry); |
| } |
| } |
| } else { |
| addEntryToInMemoryPartitionData(partitionId, entry); |
| } |
| rwLock.readLock().unlock(); |
| } |
| |
| /** |
| * Loads and assembles all data for a given partition, and put it into the |
| * data store. Returns the number of bytes transferred from disk to memory in |
| * the loading process. |
| * |
| * @param partitionId id of the partition to load and assemble all data for |
| * @return number of bytes loaded from disk to memory |
| * @throws IOException |
| */ |
| public abstract long loadPartitionData(int partitionId) throws IOException; |
| |
| /** |
| * The proxy method that does the actual operation for `loadPartitionData`, |
| * but uses the data index given by the caller. |
| * |
| * @param partitionId id of the partition to load and assemble all data for |
| * @param index data index chain for the data to load |
| * @return number of bytes loaded from disk to memory |
| * @throws IOException |
| */ |
| protected long loadPartitionDataProxy(int partitionId, DataIndex index) |
| throws IOException { |
| long numBytes = 0; |
| ReadWriteLock rwLock = getPartitionLock(partitionId); |
| rwLock.writeLock().lock(); |
| if (hasPartitionDataOnDisk.contains(partitionId)) { |
| int ioThreadId = |
| oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
| numBytes += loadInMemoryPartitionData(partitionId, ioThreadId, |
| index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); |
| hasPartitionDataOnDisk.remove(partitionId); |
| // Loading raw data buffers from disk if there is any and applying those |
| // to already loaded in-memory data. |
| Integer numBuffers = numDataBuffersOnDisk.remove(partitionId); |
| if (numBuffers != null) { |
| checkState(numBuffers > 0); |
| index.addIndex(DataIndex.TypeIndexEntry.BUFFER); |
| OutOfCoreDataAccessor.DataInputWrapper inputWrapper = |
| oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); |
| for (int i = 0; i < numBuffers; ++i) { |
| T entry = readNextEntry(inputWrapper.getDataInput()); |
| addEntryToInMemoryPartitionData(partitionId, entry); |
| } |
| numBytes += inputWrapper.finalizeInput(true); |
| index.removeLastIndex(); |
| } |
| index.removeLastIndex(); |
| // Applying in-memory raw data buffers to in-memory partition data. |
| Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId); |
| if (pair != null) { |
| for (T entry : pair.getValue()) { |
| addEntryToInMemoryPartitionData(partitionId, entry); |
| } |
| } |
| } |
| rwLock.writeLock().unlock(); |
| return numBytes; |
| } |
| |
| /** |
| * Offloads partition data of a given partition in the data store to disk, and |
| * returns the number of bytes offloaded from memory to disk. |
| * |
| * @param partitionId id of the partition to offload its data |
| * @return number of bytes offloaded from memory to disk |
| * @throws IOException |
| */ |
| public abstract long offloadPartitionData(int partitionId) throws IOException; |
| |
| /** |
| * The proxy method that does the actual operation for `offloadPartitionData`, |
| * but uses the data index given by the caller. |
| * |
| * @param partitionId id of the partition to offload its data |
| * @param index data index chain for the data to offload |
| * @return number of bytes offloaded from memory to disk |
| * @throws IOException |
| */ |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings( |
| "UL_UNRELEASED_LOCK_EXCEPTION_PATH") |
| protected long offloadPartitionDataProxy( |
| int partitionId, DataIndex index) throws IOException { |
| ReadWriteLock rwLock = getPartitionLock(partitionId); |
| rwLock.writeLock().lock(); |
| hasPartitionDataOnDisk.add(partitionId); |
| rwLock.writeLock().unlock(); |
| int ioThreadId = |
| oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
| long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId, |
| index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); |
| index.removeLastIndex(); |
| return numBytes; |
| } |
| |
| /** |
| * Offloads raw data buffers of a given partition to disk, and returns the |
| * number of bytes offloaded from memory to disk. |
| * |
| * @param partitionId id of the partition to offload its raw data buffers |
| * @return number of bytes offloaded from memory to disk |
| * @throws IOException |
| */ |
| public abstract long offloadBuffers(int partitionId) throws IOException; |
| |
| /** |
| * The proxy method that does the actual operation for `offloadBuffers`, |
| * but uses the data index given by the caller. |
| * |
| * @param partitionId id of the partition to offload its raw data buffers |
| * @param index data index chain for the data to offload its buffers |
| * @return number of bytes offloaded from memory to disk |
| * @throws IOException |
| */ |
| protected long offloadBuffersProxy(int partitionId, DataIndex index) |
| throws IOException { |
| Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); |
| if (pair == null || pair.getLeft() < minBufferSizeToOffload) { |
| return 0; |
| } |
| ReadWriteLock rwLock = getPartitionLock(partitionId); |
| rwLock.writeLock().lock(); |
| pair = dataBuffers.remove(partitionId); |
| rwLock.writeLock().unlock(); |
| checkNotNull(pair); |
| checkState(!pair.getRight().isEmpty()); |
| int ioThreadId = |
| oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); |
| index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)) |
| .addIndex(DataIndex.TypeIndexEntry.BUFFER); |
| OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = |
| oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(), |
| true); |
| for (T entry : pair.getRight()) { |
| writeEntry(entry, outputWrapper.getDataOutput()); |
| } |
| long numBytes = outputWrapper.finalizeOutput(); |
| index.removeLastIndex().removeLastIndex(); |
| int numBuffers = pair.getRight().size(); |
| Integer oldNumBuffersOnDisk = |
| numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); |
| if (oldNumBuffersOnDisk != null) { |
| numDataBuffersOnDisk.replace(partitionId, |
| oldNumBuffersOnDisk + numBuffers); |
| } |
| return numBytes; |
| } |
| |
| /** |
| * Looks through all partitions that their data is not in the data store (is |
| * offloaded to disk), and sees if any of them has enough raw data buffer in |
| * memory. If so, puts that partition in a list to return. |
| * |
| * @return Set of partition ids of all partition raw buffers where the |
| * aggregate size of buffers are large enough and it is worth flushing |
| * those buffers to disk |
| */ |
| public Set<Integer> getCandidateBuffersToOffload() { |
| Set<Integer> result = new HashSet<>(); |
| for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : |
| dataBuffers.entrySet()) { |
| if (entry.getValue().getLeft() > minBufferSizeToOffload) { |
| result.add(entry.getKey()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Writes a single raw entry to a given output stream. |
| * |
| * @param entry entry to write to output |
| * @param out output stream to write the entry to |
| * @throws IOException |
| */ |
| protected abstract void writeEntry(T entry, DataOutput out) |
| throws IOException; |
| |
| /** |
| * Reads the next available raw entry from a given input stream. |
| * |
| * @param in input stream to read the entry from |
| * @return entry read from an input stream |
| * @throws IOException |
| */ |
| protected abstract T readNextEntry(DataInput in) throws IOException; |
| |
| /** |
| * Loads data of a partition into data store. Returns number of bytes loaded. |
| * |
| * @param partitionId id of the partition to load its data |
| * @param ioThreadId id of the IO thread performing the load |
| * @param index data index chain for the data to load |
| * @return number of bytes loaded from disk to memory |
| * @throws IOException |
| */ |
| protected abstract long loadInMemoryPartitionData( |
| int partitionId, int ioThreadId, DataIndex index) throws IOException; |
| |
| /** |
| * Offloads data of a partition in data store to disk. Returns the number of |
| * bytes offloaded to disk |
| * |
| * @param partitionId id of the partition to offload to disk |
| * @param ioThreadId id of the IO thread performing the offload |
| * @param index data index chain for the data to offload |
| * @return number of bytes offloaded from memory to disk |
| * @throws IOException |
| */ |
| protected abstract long offloadInMemoryPartitionData( |
| int partitionId, int ioThreadId, DataIndex index) throws IOException; |
| |
| /** |
| * Gets the size of a given entry in bytes. |
| * |
| * @param entry input entry to find its size |
| * @return size of given input entry in bytes |
| */ |
| protected abstract int entrySerializedSize(T entry); |
| |
| /** |
| * Adds a single entry for a given partition to the in-memory data store. |
| * |
| * @param partitionId id of the partition to add the data to |
| * @param entry input entry to add to the data store |
| */ |
| protected abstract void addEntryToInMemoryPartitionData(int partitionId, |
| T entry); |
| } |