GIRAPH-1085: Add InMemoryDataAccessor
Summary: When we deal with graphs which have a lot of vertices with very little total data associated with them (values + edges) we start experiencing memory problems because of too many objects created, since every vertex has multiple objects associated with it. To solve this problem, we should have a serialized partition representation (current ByteArrayPartition just keeps byte[] per vertex, not per partition). We can leverage the out-of-core infrastructure and just add data accessor which won't be backed by disk but in memory buffers.
Test Plan: Successfully ran a job which was failing without this.
Differential Revision: https://reviews.facebook.net/D60435
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
index e9ab167..c8da9a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -241,8 +241,9 @@
index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+ DataInput dataInput = inputWrapper.getDataInput();
for (int i = 0; i < numBuffers; ++i) {
- T entry = readNextEntry(inputWrapper.getDataInput());
+ T entry = readNextEntry(dataInput);
addEntryToInMemoryPartitionData(partitionId, entry);
}
numBytes += inputWrapper.finalizeInput(true);
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
new file mode 100644
index 0000000..4eca0f1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.io.BigDataInput;
+import org.apache.giraph.utils.io.BigDataOutput;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of data accessor which keeps all the data serialized but in
+ * memory. Useful to keep the number of used objects under control.
+ *
+ * TODO currently doesn't reuse any of the byte arrays so could cause more GCs
+ */
+public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+ /** DataInputOutput for each DataIndex used */
+ private final ConcurrentHashMap<DataIndex, BigDataOutput> data;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public InMemoryDataAccessor(
+ ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+ this.conf = conf;
+ data = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void initialize() {
+ // No-op
+ }
+
+ @Override
+ public void shutdown() {
+ // No-op
+ }
+
+ @Override
+ public int getNumAccessorThreads() {
+ return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
+ }
+
+ @Override
+ public DataInputWrapper prepareInput(int threadId,
+ DataIndex index) throws IOException {
+ return new InMemoryDataInputWrapper(
+ new BigDataInput(data.get(index)), index);
+ }
+
+ @Override
+ public DataOutputWrapper prepareOutput(int threadId,
+ DataIndex index, boolean shouldAppend) throws IOException {
+ // Don't need to worry about synchronization here since only one thread
+ // can deal with one index
+ BigDataOutput output = data.get(index);
+ if (output == null || !shouldAppend) {
+ output = new BigDataOutput(conf);
+ data.put(index, output);
+ }
+ return new InMemoryDataOutputWrapper(output);
+ }
+
+ @Override
+ public boolean dataExist(int threadId, DataIndex index) {
+ return data.containsKey(index);
+ }
+
+ /**
+ * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
+ */
+ public class InMemoryDataOutputWrapper implements DataOutputWrapper {
+ /** Output to write data to */
+ private final BigDataOutput output;
+ /** Size of output at the moment it was created */
+ private final long initialSize;
+
+ /**
+ * Constructor
+ *
+ * @param output Output to write data to
+ */
+ public InMemoryDataOutputWrapper(BigDataOutput output) {
+ this.output = output;
+ initialSize = output.getSize();
+ }
+
+ @Override
+ public DataOutput getDataOutput() {
+ return output;
+ }
+
+ @Override
+ public long finalizeOutput() {
+ return output.getSize() - initialSize;
+ }
+ }
+
+ /**
+ * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
+ */
+ public class InMemoryDataInputWrapper implements DataInputWrapper {
+ /** Input to read data from */
+ private final BigDataInput input;
+ /** DataIndex which this wrapper belongs to */
+ private final DataIndex index;
+
+ /**
+ * Constructor
+ *
+ * @param input Input to read data from
+ * @param index DataIndex which this wrapper belongs to
+ */
+ public InMemoryDataInputWrapper(
+ BigDataInput input, DataIndex index) {
+ this.input = input;
+ this.index = index;
+ }
+
+ @Override
+ public DataInput getDataInput() {
+ return input;
+ }
+
+ @Override
+ public long finalizeInput(boolean deleteOnClose) {
+ if (deleteOnClose) {
+ data.remove(index);
+ }
+ return input.getPos();
+ }
+ }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
index d4ddc62..cecb0f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
@@ -82,7 +82,8 @@
/** Interface to wrap <code>DataInput</code> */
interface DataInputWrapper {
/**
- * @return the <code>DataInput</code>
+ * @return the <code>DataInput</code>, should return the same instance
+ * every time it's called (not start from the beginning)
*/
DataInput getDataInput();
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
index c0fff60..9e84ebc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -125,6 +125,21 @@
return conf;
}
+ /**
+ * Get number of bytes written to this data output
+ *
+ * @return Size in bytes
+ */
+ public long getSize() {
+ long size = currentDataOutput.getPos();
+ if (dataOutputs != null) {
+ for (ExtendedDataOutput dataOutput : dataOutputs) {
+ size += dataOutput.getPos();
+ }
+ }
+ return size;
+ }
+
@Override
public void write(int b) throws IOException {
getDataOutputToWriteTo().write(b);