GIRAPH-1086: Use pool of byte arrays with InMemoryDataAccessor
Summary: Have a pool of byte arrays with InMemoryDataAccessor, to save on byte array creation and initialization.
Test Plan: Improved performance of a job using InMemoryDataAccessor
Differential Revision: https://reviews.facebook.net/D60621
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
index 4eca0f1..58fbcb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
@@ -20,6 +20,8 @@
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.io.BigDataInput;
import org.apache.giraph.utils.io.BigDataOutput;
@@ -27,18 +29,20 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
/**
* Implementation of data accessor which keeps all the data serialized but in
* memory. Useful to keep the number of used objects under control.
- *
- * TODO currently doesn't reuse any of the byte arrays so could cause more GCs
*/
public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
/** Configuration */
private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+ /** Factory for data outputs */
+ private final PooledBigDataOutputFactory outputFactory;
/** DataInputOutput for each DataIndex used */
- private final ConcurrentHashMap<DataIndex, BigDataOutput> data;
+ private final ConcurrentHashMap<
+ DataIndex, PooledBigDataOutputFactory.Output> data;
/**
* Constructor
@@ -48,6 +52,7 @@
public InMemoryDataAccessor(
ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
this.conf = conf;
+ outputFactory = new PooledBigDataOutputFactory(conf);
data = new ConcurrentHashMap<>();
}
@@ -78,9 +83,9 @@
DataIndex index, boolean shouldAppend) throws IOException {
// Don't need to worry about synchronization here since only one thread
// can deal with one index
- BigDataOutput output = data.get(index);
+ PooledBigDataOutputFactory.Output output = data.get(index);
if (output == null || !shouldAppend) {
- output = new BigDataOutput(conf);
+ output = outputFactory.createOutput();
data.put(index, output);
}
return new InMemoryDataOutputWrapper(output);
@@ -94,7 +99,7 @@
/**
* {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
*/
- public class InMemoryDataOutputWrapper implements DataOutputWrapper {
+ public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
/** Output to write data to */
private final BigDataOutput output;
/** Size of output at the moment it was created */
@@ -150,9 +155,90 @@
@Override
public long finalizeInput(boolean deleteOnClose) {
if (deleteOnClose) {
- data.remove(index);
+ data.remove(index).returnData();
}
return input.getPos();
}
}
+
+ /**
+ * Factory for pooled big data outputs
+ */
+ private static class PooledBigDataOutputFactory {
+ /** How big pool of byte arrays to keep */
+ public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
+ new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
+ "How big pool of byte arrays to keep");
+ /** How big byte arrays to make */
+ public static final IntConfOption BYTE_ARRAY_SIZE =
+ new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
+ "How big byte arrays to make");
+
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+ /** Pool of reusable byte[] */
+ private final LinkedBlockingDeque<byte[]> byteArrayPool;
+ /** How big byte arrays to make */
+ private final int byteArraySize;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public PooledBigDataOutputFactory(
+ ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
+ byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
+ byteArraySize = BYTE_ARRAY_SIZE.get(conf);
+ }
+
+ /**
+ * Create new output to write to
+ *
+ * @return Output to write to
+ */
+ public Output createOutput() {
+ return new Output(conf);
+ }
+
+ /**
+ * Implementation of BigDataOutput
+ */
+ private class Output extends BigDataOutput {
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public Output(ImmutableClassesGiraphConfiguration conf) {
+ super(conf);
+ }
+
+ /**
+ * Return all data structures related to this data output.
+ * Can't use the same instance after this call anymore.
+ */
+ protected void returnData() {
+ if (dataOutputs != null) {
+ for (ExtendedDataOutput dataOutput : dataOutputs) {
+ byteArrayPool.offer(dataOutput.getByteArray());
+ }
+ }
+ byteArrayPool.offer(currentDataOutput.getByteArray());
+ }
+
+ @Override
+ protected ExtendedDataOutput createOutput(int size) {
+ byte[] data = byteArrayPool.pollLast();
+ return conf.createExtendedDataOutput(
+ data == null ? new byte[byteArraySize] : data, 0);
+ }
+
+ @Override
+ protected int getMaxSize() {
+ return byteArraySize;
+ }
+ }
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
index 9e84ebc..094e4a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -50,11 +50,11 @@
private static final int SIZE_DELTA = 100;
/** Data output which we are currently writing to */
- private ExtendedDataOutput currentDataOutput;
+ protected ExtendedDataOutput currentDataOutput;
/** List of filled outputs, will be null until we get a lot of data */
- private List<ExtendedDataOutput> dataOutputs;
+ protected List<ExtendedDataOutput> dataOutputs;
/** Configuration */
- private final ImmutableClassesGiraphConfiguration conf;
+ protected final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
@@ -75,7 +75,26 @@
ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
dataOutputs = null;
- currentDataOutput = conf.createExtendedDataOutput(initialSize);
+ currentDataOutput = createOutput(initialSize);
+ }
+
+ /**
+ * Get max size for single data output
+ *
+ * @return Max size for single data output
+ */
+ protected int getMaxSize() {
+ return MAX_SIZE;
+ }
+
+ /**
+ * Create next data output
+ *
+ * @param size Size of data output to create
+ * @return Created data output
+ */
+ protected ExtendedDataOutput createOutput(int size) {
+ return conf.createExtendedDataOutput(size);
}
/**
@@ -85,16 +104,25 @@
* @return DataOutput which data should be written to
*/
private ExtendedDataOutput getDataOutputToWriteTo() {
- if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) {
- return currentDataOutput;
- } else {
+ return getDataOutputToWriteTo(SIZE_DELTA);
+ }
+
+ /**
+ * Get DataOutput which data should be written to. If current DataOutput is
+ * full it will create a new one.
+ *
+ * @param additionalSize How many additional bytes we need space for
+ * @return DataOutput which data should be written to
+ */
+ private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
+ if (currentDataOutput.getPos() + additionalSize >= getMaxSize()) {
if (dataOutputs == null) {
- dataOutputs = new ArrayList<ExtendedDataOutput>(1);
+ dataOutputs = new ArrayList<>(1);
}
dataOutputs.add(currentDataOutput);
- currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE);
- return currentDataOutput;
+ currentDataOutput = createOutput(getMaxSize());
}
+ return currentDataOutput;
}
/**
@@ -147,12 +175,12 @@
@Override
public void write(byte[] b) throws IOException {
- getDataOutputToWriteTo().write(b);
+ getDataOutputToWriteTo(b.length + SIZE_DELTA).write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- getDataOutputToWriteTo().write(b, off, len);
+ getDataOutputToWriteTo(len + SIZE_DELTA).write(b, off, len);
}
@Override