GIRAPH-1086: Use pool of byte arrays with InMemoryDataAccessor

Summary: Have a pool of byte arrays with InMemoryDataAccessor, to save on byte array creation and initialization.

Test Plan: Improved performance of a job using InMemoryDataAccessor

Differential Revision: https://reviews.facebook.net/D60621
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
index 4eca0f1..58fbcb6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java
@@ -20,6 +20,8 @@
 
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.io.BigDataInput;
 import org.apache.giraph.utils.io.BigDataOutput;
 
@@ -27,18 +29,20 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
 
 /**
  * Implementation of data accessor which keeps all the data serialized but in
  * memory. Useful to keep the number of used objects under control.
- *
- * TODO currently doesn't reuse any of the byte arrays so could cause more GCs
  */
 public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
+  /** Factory for data outputs */
+  private final PooledBigDataOutputFactory outputFactory;
   /** DataInputOutput for each DataIndex used */
-  private final ConcurrentHashMap<DataIndex, BigDataOutput> data;
+  private final ConcurrentHashMap<
+      DataIndex, PooledBigDataOutputFactory.Output> data;
 
   /**
    * Constructor
@@ -48,6 +52,7 @@
   public InMemoryDataAccessor(
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
     this.conf = conf;
+    outputFactory = new PooledBigDataOutputFactory(conf);
     data = new ConcurrentHashMap<>();
   }
 
@@ -78,9 +83,9 @@
       DataIndex index, boolean shouldAppend) throws IOException {
     // Don't need to worry about synchronization here since only one thread
     // can deal with one index
-    BigDataOutput output = data.get(index);
+    PooledBigDataOutputFactory.Output output = data.get(index);
     if (output == null || !shouldAppend) {
-      output = new BigDataOutput(conf);
+      output = outputFactory.createOutput();
       data.put(index, output);
     }
     return new InMemoryDataOutputWrapper(output);
@@ -94,7 +99,7 @@
   /**
    * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
    */
-  public class InMemoryDataOutputWrapper implements DataOutputWrapper {
+  public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
     /** Output to write data to */
     private final BigDataOutput output;
     /** Size of output at the moment it was created */
@@ -150,9 +155,90 @@
     @Override
     public long finalizeInput(boolean deleteOnClose) {
       if (deleteOnClose) {
-        data.remove(index);
+        data.remove(index).returnData();
       }
       return input.getPos();
     }
   }
+
+  /**
+   * Factory for pooled big data outputs
+   */
+  private static class PooledBigDataOutputFactory {
+    /** How big pool of byte arrays to keep */
+    public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
+        new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
+            "How big pool of byte arrays to keep");
+    /** How big byte arrays to make */
+    public static final IntConfOption BYTE_ARRAY_SIZE =
+        new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
+            "How big byte arrays to make");
+
+    /** Configuration */
+    private final ImmutableClassesGiraphConfiguration conf;
+    /** Pool of reusable byte[] */
+    private final LinkedBlockingDeque<byte[]> byteArrayPool;
+    /** How big byte arrays to make */
+    private final int byteArraySize;
+
+    /**
+     * Constructor
+     *
+     * @param conf Configuration
+     */
+    public PooledBigDataOutputFactory(
+        ImmutableClassesGiraphConfiguration conf) {
+      this.conf = conf;
+      byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
+      byteArraySize = BYTE_ARRAY_SIZE.get(conf);
+    }
+
+    /**
+     * Create new output to write to
+     *
+     * @return Output to write to
+     */
+    public Output createOutput() {
+      return new Output(conf);
+    }
+
+    /**
+     * Implementation of BigDataOutput
+     */
+    private class Output extends BigDataOutput {
+      /**
+       * Constructor
+       *
+       * @param conf Configuration
+       */
+      public Output(ImmutableClassesGiraphConfiguration conf) {
+        super(conf);
+      }
+
+      /**
+       * Return all data structures related to this data output.
+       * Can't use the same instance after this call anymore.
+       */
+      protected void returnData() {
+        if (dataOutputs != null) {
+          for (ExtendedDataOutput dataOutput : dataOutputs) {
+            byteArrayPool.offer(dataOutput.getByteArray());
+          }
+        }
+        byteArrayPool.offer(currentDataOutput.getByteArray());
+      }
+
+      @Override
+      protected ExtendedDataOutput createOutput(int size) {
+        byte[] data = byteArrayPool.pollLast();
+        return conf.createExtendedDataOutput(
+            data == null ? new byte[byteArraySize] : data, 0);
+      }
+
+      @Override
+      protected int getMaxSize() {
+        return byteArraySize;
+      }
+    }
+  }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
index 9e84ebc..094e4a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -50,11 +50,11 @@
   private static final int SIZE_DELTA = 100;
 
   /** Data output which we are currently writing to */
-  private ExtendedDataOutput currentDataOutput;
+  protected ExtendedDataOutput currentDataOutput;
   /** List of filled outputs, will be null until we get a lot of data */
-  private List<ExtendedDataOutput> dataOutputs;
+  protected List<ExtendedDataOutput> dataOutputs;
   /** Configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
+  protected final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
@@ -75,7 +75,26 @@
       ImmutableClassesGiraphConfiguration conf) {
     this.conf = conf;
     dataOutputs = null;
-    currentDataOutput = conf.createExtendedDataOutput(initialSize);
+    currentDataOutput = createOutput(initialSize);
+  }
+
+  /**
+   * Get max size for single data output
+   *
+   * @return Max size for single data output
+   */
+  protected int getMaxSize() {
+    return MAX_SIZE;
+  }
+
+  /**
+   * Create next data output
+   *
+   * @param size Size of data output to create
+   * @return Created data output
+   */
+  protected ExtendedDataOutput createOutput(int size) {
+    return conf.createExtendedDataOutput(size);
   }
 
   /**
@@ -85,16 +104,25 @@
    * @return DataOutput which data should be written to
    */
   private ExtendedDataOutput getDataOutputToWriteTo() {
-    if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) {
-      return currentDataOutput;
-    } else {
+    return getDataOutputToWriteTo(SIZE_DELTA);
+  }
+
+  /**
+   * Get DataOutput which data should be written to. If current DataOutput is
+   * full it will create a new one.
+   *
+   * @param additionalSize How many additional bytes we need space for
+   * @return DataOutput which data should be written to
+   */
+  private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
+    if (currentDataOutput.getPos() + additionalSize >= getMaxSize()) {
       if (dataOutputs == null) {
-        dataOutputs = new ArrayList<ExtendedDataOutput>(1);
+        dataOutputs = new ArrayList<>(1);
       }
       dataOutputs.add(currentDataOutput);
-      currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE);
-      return currentDataOutput;
+      currentDataOutput = createOutput(getMaxSize());
     }
+    return currentDataOutput;
   }
 
   /**
@@ -147,12 +175,12 @@
 
   @Override
   public void write(byte[] b) throws IOException {
-    getDataOutputToWriteTo().write(b);
+    getDataOutputToWriteTo(b.length + SIZE_DELTA).write(b);
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
-    getDataOutputToWriteTo().write(b, off, len);
+    getDataOutputToWriteTo(len + SIZE_DELTA).write(b, off, len);
   }
 
   @Override