[NEMO-388] Off-heap memory management (reuse ByteBuffer) (#223)

JIRA: [NEMO-388: Off-heap memory management (reuse ByteBuffer)](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-388)

**Major changes:**
- New executor option: `max_offheap_mb` and `chunk_size_kb`
- `MemoryPoolAssigner` class is involved in block creation to use off-heap memory
- `ByteBuffer` is now reused, wrapped by `MemoryChunk`

**Tests for the changes:**
- `MemoryChunk` put/get tests included
- `MemoryPoolAssigner` tests included

**Other comments:**
- Design doc attached in JIRA will be edited soon
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 6725e65..e371293 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -416,6 +416,8 @@
     cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
     cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
     cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
+    cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
+    cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
     cl.processCommandLine(args);
     return confBuilder.build();
   }
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
deleted file mode 100644
index 8a74fb2..0000000
--- a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This class is a customized output stream implementation backed by
- * {@link ByteBuffer}, which utilizes off heap memory when writing the data.
- * Memory is allocated when needed by the specified {@code pageSize}.
- * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
- * when the corresponding block is deleted.
- * TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
- */
-public final class DirectByteBufferOutputStream extends OutputStream {
-
-  private LinkedList<ByteBuffer> dataList = new LinkedList<>();
-  private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
-  private final int pageSize;
-  private ByteBuffer currentBuf;
-
-  /**
-   * Default constructor.
-   * Sets the {@code pageSize} as default size of 4096 bytes.
-   */
-  public DirectByteBufferOutputStream() {
-    this(DEFAULT_PAGE_SIZE);
-  }
-
-  /**
-   * Constructor which sets {@code pageSize} as specified {@code size}.
-   * Note that the {@code pageSize} has trade-off between memory fragmentation and
-   * native memory (de)allocation overhead.
-   *
-   * @param size should be a power of 2 and greater than or equal to 4096.
-   */
-  public DirectByteBufferOutputStream(final int size) {
-    if (size < DEFAULT_PAGE_SIZE || (size & (size - 1)) != 0) {
-      throw new IllegalArgumentException("Invalid pageSize");
-    }
-    this.pageSize = size;
-    newLastBuffer();
-    currentBuf = dataList.getLast();
-  }
-
-  /**
-   * Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
-   */
-  // TODO #388: Off-heap memory management (reuse ByteBuffer)
-  private void newLastBuffer() {
-    dataList.addLast(ByteBuffer.allocateDirect(pageSize));
-  }
-
-  /**
-   * Writes the specified byte to this output stream.
-   *
-   * @param   b   the byte to be written.
-   */
-  @Override
-  public void write(final int b) {
-    if (currentBuf.remaining() <= 0) {
-      newLastBuffer();
-      currentBuf = dataList.getLast();
-    }
-    currentBuf.put((byte) b);
-  }
-
-  /**
-   * Writes {@code b.length} bytes from the specified byte array to this output stream.
-   *
-   * @param b the byte to be written.
-   */
-  @Override
-  public void write(final byte[] b) {
-    write(b, 0, b.length);
-  }
-
-  /**
-   * Writes {@code len} bytes from the specified byte array
-   * starting at offset {@code off} to this output stream.
-   *
-   * @param   b     the data.
-   * @param   off   the start offset in the data.
-   * @param   len   the number of bytes to write.
-   */
-  @Override
-  public void write(final byte[] b, final int off, final int len) {
-    int byteToWrite = len;
-    int offset = off;
-    while (byteToWrite > 0) {
-      if (currentBuf.remaining() <= 0) {
-        newLastBuffer();
-        currentBuf = dataList.getLast();
-      }
-      final int bufRemaining = currentBuf.remaining();
-      if (bufRemaining < byteToWrite) {
-        currentBuf.put(b, offset, bufRemaining);
-        offset += bufRemaining;
-        byteToWrite -= bufRemaining;
-      } else {
-        currentBuf.put(b, offset, byteToWrite);
-        offset += byteToWrite;
-        byteToWrite = 0;
-      }
-    }
-  }
-
-
-  /**
-   * Creates a byte array that contains the whole content currently written in this output stream.
-   *
-   * USED BY TESTS ONLY.
-   * @return the current contents of this output stream, as byte array.
-   */
-  @VisibleForTesting
-  byte[] toByteArray() {
-    if (dataList.isEmpty()) {
-      final byte[] byteArray = new byte[0];
-      return byteArray;
-    }
-
-    ByteBuffer lastBuf = dataList.getLast();
-    // pageSize equals the size of the data filled in the ByteBuffers
-    // except for the last ByteBuffer. The size of the data in the
-    // ByteBuffer can be obtained by calling ByteBuffer.position().
-    final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
-    final byte[] byteArray = new byte[arraySize];
-    int start = 0;
-
-    for (final ByteBuffer buffer : dataList) {
-      // We use duplicated buffer to read the data so that there is no complicated
-      // alteration of position and limit when switching between read and write mode.
-      final ByteBuffer dupBuffer = buffer.duplicate();
-      dupBuffer.flip();
-      final int byteToWrite = dupBuffer.remaining();
-      dupBuffer.get(byteArray, start, byteToWrite);
-      start += byteToWrite;
-    }
-
-    return byteArray;
-  }
-
-  /**
-   * Returns the list of {@code ByteBuffer}s that contains the written data.
-   * List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
-   * position and limit, to reduce erroneous data read/write.
-   * This function has to be called when intended to read from the start of the list of
-   * {@link ByteBuffer}s, not for additional write.
-   *
-   * @return the {@code LinkedList} of {@code ByteBuffer}s.
-   */
-  public List<ByteBuffer> getDirectByteBufferList() {
-    List<ByteBuffer> result = new ArrayList<>(dataList.size());
-    for (final ByteBuffer buffer : dataList) {
-      final ByteBuffer dupBuffer = buffer.duplicate();
-      dupBuffer.flip();
-      result.add(dupBuffer);
-    }
-    return result;
-  }
-
-  /**
-   * Returns the size of the data written in this output stream.
-   *
-   * @return the size of the data
-   */
-  public int size() {
-    return pageSize * (dataList.size() - 1) + dataList.getLast().position();
-  }
-
-  /**
-   * Closing this output stream has no effect.
-   */
-  public void close() {
-  }
-}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 23cf597..485d35c 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -288,6 +288,23 @@
   public final class PartitionTransportClientNumThreads implements Name<Integer> {
   }
 
+  /**
+   * Maximum off-heap memory size in the executor.
+   */
+  // TODO #397: Separation of JVM heap region and off-heap memory region
+  @NamedParameter(doc = "The maximum off-heap memory that can be allocated",
+    short_name = "max_offheap_mb", default_value = "8192")
+  public final class MaxOffheapMb implements Name<Integer> {
+  }
+
+  /**
+   * MemoryChunk size in the memory pool.
+   */
+  @NamedParameter(doc = "The memory chunk size in the memory pool of the executor",
+    short_name = "chunk_size_kb", default_value = "32")
+  public final class ChunkSizeKb implements Name<Integer> {
+  }
+
   //////////////////////////////// Intermediate Configurations
 
   /**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 01ba02a..fade5b0 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -20,9 +20,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.DefaultFileRegion;
-import io.netty.channel.FileRegion;
+import io.netty.channel.*;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.FileArea;
@@ -38,6 +36,8 @@
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static io.netty.buffer.Unpooled.wrappedBuffer;
@@ -144,12 +144,19 @@
      * Writes {@link SerializedPartition}.
      *
      * @param serializedPartition {@link SerializedPartition} to write.
+     * @param releaseOnComplete   wheter to release the partition upon completion.
      * @return {@code this}
      * @throws IOException when an exception has been set or this stream was closed
      */
-    public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition)
+    public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition,
+                                                           final boolean releaseOnComplete)
       throws IOException {
-      writeBuffer(serializedPartition.getDirectBufferList());
+      if (releaseOnComplete) {
+        ChannelFutureListener listener = future -> serializedPartition.release();
+        writeBuffer(serializedPartition.getDirectBufferList(), Arrays.asList(listener));
+      } else {
+        writeBuffer(serializedPartition.getDirectBufferList(), Collections.emptyList());
+      }
       return this;
     }
 
@@ -158,23 +165,25 @@
      * to write a data frame.
      *
      * @param bufList       list of {@link ByteBuffer}s to wrap.
+     * @param listeners     to add.
      * @throws IOException  when fails to write the data.
      */
-    public void writeBuffer(final List<ByteBuffer> bufList) throws IOException {
+    void writeBuffer(final List<ByteBuffer> bufList,
+                            final List<ChannelFutureListener> listeners) throws IOException {
       final ByteBuf byteBuf = wrappedBuffer(bufList.toArray(new ByteBuffer[bufList.size()]));
-      writeByteBuf(byteBuf);
+      writeByteBuf(byteBuf, listeners);
     }
 
-
     /**
      * Writes a data frame, from {@link ByteBuf}.
      *
      * @param byteBuf       {@link ByteBuf} to write.
+     * @param listeners     to add.
      * @throws IOException  when fails to write data.
      */
-    private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
+    private void writeByteBuf(final ByteBuf byteBuf, final List<ChannelFutureListener> listeners) throws IOException {
       if (byteBuf.readableBytes() > 0) {
-        writeDataFrame(byteBuf, byteBuf.readableBytes());
+        writeDataFrame(byteBuf, byteBuf.readableBytes(), listeners);
       }
     }
 
@@ -192,7 +201,7 @@
       while (bytesToSend > 0) {
         final long size = Math.min(bytesToSend, DataFrameEncoder.LENGTH_MAX);
         final FileRegion fileRegion = new DefaultFileRegion(FileChannel.open(path), cursor, size);
-        writeDataFrame(fileRegion, size);
+        writeDataFrame(fileRegion, size, Collections.emptyList());
         cursor += size;
         bytesToSend -= size;
       }
@@ -206,7 +215,7 @@
       }
       if (newSubStream) {
         // to emit a frame with new sub-stream flag
-        writeDataFrame(null, 0);
+        writeDataFrame(null, 0, Collections.emptyList());
       }
       closed = true;
     }
@@ -228,7 +237,7 @@
         encoder.encode(element);
         wrapped.close();
 
-        writeByteBuf(byteBuf);
+        writeByteBuf(byteBuf, Collections.emptyList());
       } catch (final IOException e) {
         throw new RuntimeException(e);
       }
@@ -239,15 +248,20 @@
      *
      * @param body   the body or {@code null}
      * @param length the length of the body, in bytes
+     * @param listeners to add.
      * @throws IOException when an exception has been set or this stream was closed
      */
-    private void writeDataFrame(final Object body, final long length) throws IOException {
+    private void writeDataFrame(final Object body, final long length,
+                                final List<ChannelFutureListener> listeners) throws IOException {
       ensureNoException();
       if (closed) {
         throw new IOException("Stream already closed.");
       }
-      channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body, length, newSubStream))
+      final ChannelFuture beforeAddingGivenListener = channel
+        .writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body, length, newSubStream))
         .addListener(getChannelWriteListener());
+      listeners.forEach(beforeAddingGivenListener::addListener);
+
       newSubStream = false;
     }
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 8806432..576831f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -38,6 +38,7 @@
 import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
 import org.apache.nemo.runtime.executor.data.block.Block;
 import org.apache.nemo.runtime.executor.data.block.FileBlock;
+import org.apache.nemo.runtime.executor.data.block.SerializedMemoryBlock;
 import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import org.apache.nemo.runtime.executor.data.stores.*;
@@ -356,7 +357,13 @@
               final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
               for (final SerializedPartition partition : partitions) {
                 try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
-                  os.writeSerializedPartitionBuffer(partition);
+                  if (optionalBlock.get().getClass() == SerializedMemoryBlock.class) {
+                    os.writeSerializedPartitionBuffer(partition, false);
+                  } else {
+                    // For NonSerializedMemoryBlock, the serialized partition to be sent is transient and needs
+                    // to be released right after the data transfer.
+                    os.writeSerializedPartitionBuffer(partition, true);
+                  }
                 }
               }
             }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index f0362dc..16df0b6 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -20,7 +20,6 @@
 
 import com.google.common.io.CountingInputStream;
 import org.apache.nemo.common.ByteBufferInputStream;
-import org.apache.nemo.common.DirectByteBufferOutputStream;
 import org.apache.nemo.common.coder.DecoderFactory;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
@@ -32,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -105,16 +103,19 @@
    * @param serializer          the serializer for serialization.
    * @param partitionsToConvert the partitions to convert.
    * @param <K>                 the key type of the partitions.
+   * @param memoryPoolAssigner  the memory pool assigner for DirectByteBufferOutputStream.
    * @return the converted {@link SerializedPartition}s.
    * @throws IOException if fail to convert.
+   * @throws MemoryAllocationException  if fail to allocate memory.
    */
   public static <K extends Serializable> Iterable<SerializedPartition<K>> convertToSerPartitions(
     final Serializer serializer,
-    final Iterable<NonSerializedPartition<K>> partitionsToConvert) throws IOException {
+    final Iterable<NonSerializedPartition<K>> partitionsToConvert,
+    final MemoryPoolAssigner memoryPoolAssigner) throws IOException, MemoryAllocationException {
     final List<SerializedPartition<K>> serializedPartitions = new ArrayList<>();
     for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
       try (
-        DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream();
+        DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
         OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers())
       ) {
         serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
@@ -123,10 +124,10 @@
         wrappedStream.close();
         // Note that serializedBytes include invalid bytes.
         // So we have to use it with the actualLength by using size() whenever needed.
-        final List<ByteBuffer> serializedBufList = bytesOutputStream.getDirectByteBufferList();
+        final List<MemoryChunk> serializedBufList = bytesOutputStream.getMemoryChunkList();
         final int actualLength = bytesOutputStream.size();
         serializedPartitions.add(
-          new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength));
+          new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength, memoryPoolAssigner));
       }
     }
     return serializedPartitions;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java
new file mode 100644
index 0000000..dae3859
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is a customized output stream implementation backed by
+ * {@link ByteBuffer}, which utilizes off heap memory when writing the data via MemoryPoolAssigner.
+ * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
+ * when the corresponding block is deleted.
+ */
+public final class DirectByteBufferOutputStream extends OutputStream {
+
+  private LinkedList<MemoryChunk> dataList = new LinkedList<>();
+  private final int chunkSize;
+  private ByteBuffer currentBuf;
+  private final MemoryPoolAssigner memoryPoolAssigner;
+
+  /**
+   * Default constructor.
+   *
+   * @param memoryPoolAssigner  for memory allocation.
+   * @throws MemoryAllocationException  if fails to allocate new memory.
+   */
+  public DirectByteBufferOutputStream(final MemoryPoolAssigner memoryPoolAssigner) throws MemoryAllocationException {
+    this.chunkSize = memoryPoolAssigner.getChunkSize();
+    this.memoryPoolAssigner = memoryPoolAssigner;
+    newLastBuffer();
+    currentBuf = dataList.getLast().getBuffer();
+  }
+
+  /**
+   * Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
+   *
+   * @throws MemoryAllocationException  if fail to allocate memory chunk.
+   */
+  private void newLastBuffer() throws MemoryAllocationException {
+    dataList.addLast(memoryPoolAssigner.allocateChunk());
+  }
+
+  /**
+   * Writes the specified byte to this output stream.
+   *
+   * @param   b   the byte to be written.
+   */
+  @Override
+  public void write(final int b) throws IOException {
+    try {
+      if (currentBuf.remaining() <= 0) {
+        newLastBuffer();
+        currentBuf = dataList.getLast().getBuffer();
+      }
+      currentBuf.put((byte) b);
+    } catch (IllegalStateException e) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } catch (MemoryAllocationException e) {
+      throw new IOException("Failed to allocate memory");
+    }
+  }
+
+  /**
+   * Writes {@code b.length} bytes from the specified byte array to this output stream.
+   *
+   * @param b the byte to be written.
+   */
+  @Override
+  public void write(final byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  /**
+   * Writes {@code len} bytes from the specified byte array
+   * starting at offset {@code off} to this output stream.
+   *
+   * @param   b     the data.
+   * @param   off   the start offset in the data.
+   * @param   len   the number of bytes to write.
+   */
+  @Override
+  public void write(final byte[] b, final int off, final int len) throws IOException {
+    int byteToWrite = len;
+    int offset = off;
+    try {
+      while (byteToWrite > 0) {
+        if (currentBuf.remaining() <= 0) {
+          newLastBuffer();
+          currentBuf = dataList.getLast().getBuffer();
+        }
+        final int bufRemaining = currentBuf.remaining();
+        if (bufRemaining < byteToWrite) {
+          currentBuf.put(b, offset, bufRemaining);
+          offset += bufRemaining;
+          byteToWrite -= bufRemaining;
+        } else {
+          currentBuf.put(b, offset, byteToWrite);
+          offset += byteToWrite;
+          byteToWrite = 0;
+        }
+      }
+    } catch (MemoryAllocationException e) {
+      throw new IOException("Failed to allocate memory");
+    }
+  }
+
+
+  /**
+   * Creates a byte array that contains the whole content currently written in this output stream.
+   *
+   * USED BY TESTS ONLY.
+   * @return the current contents of this output stream, as byte array.
+   */
+  @VisibleForTesting
+  byte[] toByteArray() {
+    if (dataList.isEmpty()) {
+      final byte[] byteArray = new byte[0];
+      return byteArray;
+    }
+    MemoryChunk lastBuf = dataList.getLast();
+    // pageSize equals the size of the data filled in the ByteBuffers
+    // except for the last ByteBuffer. The size of the data in the
+    // ByteBuffer can be obtained by calling MemoryChunk.position().
+    final int arraySize = chunkSize * (dataList.size() - 1) + lastBuf.getBuffer().position();
+    final byte[] byteArray = new byte[arraySize];
+    int start = 0;
+
+    for (final MemoryChunk chunk : dataList) {
+      // We use duplicated buffer to read the data so that there is no complicated
+      // alteration of position and limit when switching between read and write mode.
+      final MemoryChunk dupChunk = chunk.duplicate();
+      final ByteBuffer dupBuffer = dupChunk.getBuffer();
+      dupBuffer.flip();
+      final int byteToWrite = dupBuffer.remaining();
+      dupBuffer.get(byteArray, start, byteToWrite);
+      start += byteToWrite;
+    }
+
+    return byteArray;
+  }
+
+  /**
+   * Returns the list of {@code MemoryChunk}s that contains the written data.
+   * List of flipped and duplicated {@link MemoryChunk}s are returned, which has independent
+   * position and limit, to reduce erroneous data read/write.
+   * This function has to be called when intended to read from the start of the list of
+   * {@link MemoryChunk}s, not for additional write.
+   *
+   * @return the {@code LinkedList} of {@code MemoryChunk}s.
+   */
+  public List<MemoryChunk> getMemoryChunkList() {
+    List<MemoryChunk> result = new LinkedList<>();
+    for (final MemoryChunk chunk: dataList) {
+      final MemoryChunk dupChunk = chunk.duplicate();
+      dupChunk.getBuffer().flip();
+      result.add(dupChunk);
+    }
+    return result;
+  }
+
+  /**
+   * Returns the size of the data written in this output stream.
+   *
+   * @return the size of the data
+   */
+  public int size() {
+    return chunkSize * (dataList.size() - 1) + dataList.getLast().getBuffer().position();
+  }
+
+  /**
+   * Closing this output stream has no effect.
+   */
+  public void close() {
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java
new file mode 100644
index 0000000..0603d74
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.executor.data;
+
+/**
+ * An exception to be thrown when a memory allocation operation is not successful.
+ * Cases where allocation is not successful are when the allocated memory exceeds the
+ * amount specified by the job configuration or
+ */
+public class MemoryAllocationException extends Exception {
+
+  public MemoryAllocationException(final String message) {
+    super(message);
+  }
+
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java
new file mode 100644
index 0000000..fe914a6
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * This class represents chunk of memory residing in off-heap region
+ * managed by {@link MemoryPoolAssigner}, which is backed by {@link ByteBuffer}.
+ */
+@NotThreadSafe
+public class MemoryChunk {
+
+  // UNSAFE is used for random access and manipulation of the ByteBuffer.
+  @SuppressWarnings("restriction") // to suppress warnings that are invoked whenever we use UNSAFE.
+  protected static final sun.misc.Unsafe UNSAFE = getUnsafe();
+  @SuppressWarnings("restriction")
+  protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+  private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+  private static final int SHORT_SIZE = 2;
+  private static final int CHAR_SIZE = 2;
+  private static final int INT_SIZE = 4;
+  private static final int LONG_SIZE = 8;
+  private final ByteBuffer buffer;
+  // Since using UNSAFE does not automatically track the address and limit, it should be accessed
+  // through address for data write and get, and addressLimit for sanity checks on the buffer use.
+  private long address;
+  private final long addressLimit;
+  private final int size;
+
+  /**
+   * Creates a new memory chunk that represents the off-heap memory at the absolute address.
+   * This class supports random access and manipulation of the data in the {@code ByteBuffer} using UNSAFE.
+   * For sequential access, ByteBuffer of this class can be accessed and manipulated.
+   *
+   * @param offHeapAddress the address of the off-heap memory, {@link ByteBuffer}, of this MemoryChunk
+   * @param buffer         the off-heap memory of this MemoryChunk
+   */
+  MemoryChunk(final long offHeapAddress, final ByteBuffer buffer) {
+    if (offHeapAddress <= 0) {
+      throw new IllegalArgumentException("negative pointer or size");
+    }
+    if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("MemoryChunk initialized with too large address");
+    }
+    this.buffer = buffer;
+    this.size = buffer.capacity();
+    this.address = offHeapAddress;
+    this.addressLimit = this.address + this.size;
+  }
+
+  /**
+   * Creates a new memory chunk that represents the off-heap memory at the absolute address.
+   *
+   * @param buffer  the off-heap memory of this MemoryChunk
+   */
+  MemoryChunk(final ByteBuffer buffer) {
+    this(getAddress(buffer), buffer);
+  }
+
+  /**
+   * Gets the {@link ByteBuffer} from this MemoryChunk.
+   *
+   * @return  {@link ByteBuffer}
+   */
+  public ByteBuffer getBuffer() {
+    if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    }
+    return buffer;
+  }
+
+  /**
+   * Makes the duplicated instance of this MemoryChunk.
+   *
+   * @return the MemoryChunk with the same content of the caller instance
+   */
+  public MemoryChunk duplicate() {
+    return new MemoryChunk(buffer.duplicate());
+  }
+
+  /**
+   * Frees this MemoryChunk. No further operation possible after calling this method.
+   */
+  public void release() {
+    buffer.clear();
+    address = addressLimit + 1;
+  }
+
+  /**
+   * Reads the byte at the given index.
+   *
+   * @param index from which the byte will be read
+   * @return the byte at the given position
+   */
+  @SuppressWarnings("restriction")
+  public final byte get(final int index) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, 0)) {
+      return UNSAFE.getByte(pos);
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Writes the given byte into this buffer at the given index.
+   *
+   * @param index The position at which the byte will be written.
+   * @param b     The byte value to be written.
+   */
+  @SuppressWarnings("restriction")
+  public final void put(final int index, final byte b) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, 0)) {
+      UNSAFE.putByte(pos, b);
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Copies the data of the MemoryChunk from the specified position to target byte array.
+   *
+   * @param index The position at which the first byte will be read.
+   * @param dst The memory into which the memory will be copied.
+   */
+  public final void get(final int index, final byte[] dst) {
+    get(index, dst, 0, dst.length);
+  }
+
+  /**
+   * Copies all the data from the source byte array into the MemoryChunk
+   * beginning at the specified position.
+   *
+   * @param index the position in MemoryChunk to start copying the data.
+   * @param src the source byte array that holds the data to copy.
+   */
+  public final void put(final int index, final byte[] src) {
+    put(index, src, 0, src.length);
+  }
+
+  /**
+   * Bulk get method using nk.the specified index in the MemoryChunk.
+   *
+   * @param index the index in the MemoryChunk to start copying the data.
+   * @param dst the target byte array to copy the data from MemoryChunk.
+   * @param offset the offset in the destination byte array.
+   * @param length the number of bytes to be copied.
+   */
+  @SuppressWarnings("restriction")
+  public final void get(final int index, final byte[] dst, final int offset, final int length) {
+    if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    final long pos = address + index;
+    if (checkIndex(index, pos, length)) {
+      final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+      UNSAFE.copyMemory(null, pos, dst, arrayAddress, length);
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Bulk put method using the specified index in the MemoryChunk.
+   *
+   * @param index the index in the MemoryChunk to start copying the data.
+   * @param src the source byte array that holds the data to be copied to MemoryChunk.
+   * @param offset the offset in the source byte array.
+   * @param length the number of bytes to be copied.
+   */
+  @SuppressWarnings("restriction")
+  public final void put(final int index, final byte[] src, final int offset, final int length) {
+    if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    final long pos = address + index;
+    if (checkIndex(index, pos, length)) {
+      final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+      UNSAFE.copyMemory(src, arrayAddress, null, pos, length);
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Reads a char value from the given position.
+   *
+   * @param index The position from which the memory will be read.
+   * @return The char value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus CHAR_SIZE.
+   */
+  @SuppressWarnings("restriction")
+  public final char getChar(final int index) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, CHAR_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        return UNSAFE.getChar(pos);
+      } else {
+        return Character.reverseBytes(UNSAFE.getChar(pos));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("This MemoryChunk has been freed.");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Writes a char value to the given position.
+   *
+   * @param index The position at which the memory will be written.
+   * @param value The char value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus CHAR_SIZE.
+   */
+  @SuppressWarnings("restriction")
+  public final void putChar(final int index, final char value) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, CHAR_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        UNSAFE.putChar(pos, value);
+      } else {
+        UNSAFE.putChar(pos, Character.reverseBytes(value));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Reads a short integer value from the given position, composing them into a short value
+   * according to the current byte order.
+   *
+   * @param index The position from which the memory will be read.
+   * @return The short value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus SHORT_SIZE.
+   */
+  @SuppressWarnings("restriction")
+  public final short getShort(final int index) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, SHORT_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        return UNSAFE.getShort(pos);
+      } else {
+        return Short.reverseBytes(UNSAFE.getShort(pos));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Writes the given short value into this buffer at the given position, using
+   * the native byte order of the system.
+   *
+   * @param index The position at which the value will be written.
+   * @param value The short value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus SHORT_SIZE.
+   */
+  @SuppressWarnings("restriction")
+  public final void putShort(final int index, final short value) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, SHORT_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        UNSAFE.putShort(pos, value);
+      } else {
+        UNSAFE.putShort(pos, Short.reverseBytes(value));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Reads an int value from the given position, in the system's native byte order.
+   *
+   * @param index The position from which the value will be read.
+   * @return The int value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus INT_SIZE.
+   */
+  public final int getInt(final int index) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, INT_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        return UNSAFE.getInt(pos);
+      } else {
+        return Integer.reverseBytes(UNSAFE.getInt(pos));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Writes the given int value to the given position in the system's native byte order.
+   *
+   * @param index The position at which the value will be written.
+   * @param value The int value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus INT_SIZE.
+   */
+  public final void putInt(final int index, final int value) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, INT_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        UNSAFE.putInt(pos, value);
+      } else {
+        UNSAFE.putInt(pos, Integer.reverseBytes(value));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Reads a long value from the given position.
+   *
+   * @param index The position from which the value will be read.
+   * @return The long value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus LONG_SIZE.
+   */
+  public final long getLong(final int index) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, LONG_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        return UNSAFE.getLong(pos);
+      } else {
+        return Long.reverseBytes(UNSAFE.getLong(pos));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Writes the given long value to the given position in the system's native byte order.
+   *
+   * @param index The position at which the value will be written.
+   * @param value The long value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus LONG_SIZE.
+   */
+  public final void putLong(final int index, final long value) {
+    final long pos = address + index;
+    if (checkIndex(index, pos, LONG_SIZE)) {
+      if (LITTLE_ENDIAN) {
+        UNSAFE.putLong(pos, value);
+      } else {
+        UNSAFE.putLong(pos, Long.reverseBytes(value));
+      }
+    } else if (address > addressLimit) {
+      throw new IllegalStateException("MemoryChunk has been freed");
+    } else {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+
+  /**
+   * Reads a float value from the given position, in the system's native byte order.
+   *
+   * @param index The position from which the value will be read.
+   * @return The float value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of float.
+   */
+  public final float getFloat(final int index) {
+    return Float.intBitsToFloat(getInt(index));
+  }
+
+  /**
+   * Writes the given float value to the given position in the system's native byte order.
+   *
+   * @param index The position at which the value will be written.
+   * @param value The float value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of float.
+   */
+  public final void putFloat(final int index, final float value) {
+    putInt(index, Float.floatToRawIntBits(value));
+  }
+
+  /**
+   * Reads a double value from the given position, in the system's native byte order.
+   *
+   * @param index The position from which the value will be read.
+   * @return The double value at the given position.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of double.
+   */
+  public final double getDouble(final int index) {
+    return Double.longBitsToDouble(getLong(index));
+  }
+
+  /**
+   * Writes the given double value to the given position in the system's native byte order.
+   *
+   * @param index The position at which the memory will be written.
+   * @param value The double value to be written.
+   *
+   * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of double.
+   */
+  public final void putDouble(final int index, final double value) {
+    putLong(index, Double.doubleToRawLongBits(value));
+  }
+
+  private boolean checkIndex(final int index, final long pos, final int typeSize) throws IndexOutOfBoundsException {
+    if (!(index >= 0 && pos <= addressLimit - typeSize)) {
+      throw new IndexOutOfBoundsException();
+    } else {
+      return true;
+    }
+  }
+
+  @SuppressWarnings("restriction")
+  private static sun.misc.Unsafe getUnsafe() {
+    try {
+      Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      unsafeField.setAccessible(true);
+      return (sun.misc.Unsafe) unsafeField.get(null);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while trying to access the sun.misc.Unsafe handle.");
+    }
+  }
+
+  private static final Field ADDRESS_FIELD;
+
+  static {
+    try {
+      ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
+      ADDRESS_FIELD.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot initialize MemoryChunk: off-heap memory is incompatible with this JVM.");
+    }
+  }
+
+  private static long getAddress(final ByteBuffer buffer) {
+    if (buffer == null) {
+      throw new NullPointerException("Buffer null");
+    }
+    if (!buffer.isDirect()) {
+      throw new IllegalArgumentException("Cannot initialize from non-direct ByteBuffer.");
+    }
+    try {
+      return (Long) ADDRESS_FIELD.get(buffer);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not access ByteBuffer address.");
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java
new file mode 100644
index 0000000..02af48a
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.nemo.conf.JobConf;
+
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * The MemoryPoolAssigner assigns the memory that Nemo uses for writing data blocks from the {@link MemoryPool}.
+ * Memory is represented in chunks of equal size. Consumers of off-heap memory acquire the memory by requesting
+ * a number of {@link MemoryChunk} they need.
+ *
+ * MemoryPoolAssigner currently supports allocation of off-heap memory only.
+ *
+ * MemoryChunks are allocated on-demand, but if the total allocated memory exceeds the maxOffheapMb,
+ * MemoryAllocationException is thrown and the job fails.
+ * TODO #397: Separation of JVM heap region and off-heap memory region
+ */
+@ThreadSafe
+public class MemoryPoolAssigner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryPoolAssigner.class.getName());
+
+  private final int chunkSize;
+
+  private static final int MIN_CHUNK_SIZE_KB = 4;
+
+  private final MemoryPool memoryPool;
+
+  @Inject
+  public MemoryPoolAssigner(@Parameter(JobConf.MaxOffheapMb.class) final int maxOffheapMb,
+                            @Parameter(JobConf.ChunkSizeKb.class) final int chunkSizeKb) {
+    if (chunkSizeKb < MIN_CHUNK_SIZE_KB) {
+      throw new IllegalArgumentException("Chunk size too small. Minimum chunk size is 4KB");
+    }
+    final long maxNumChunks = (long) maxOffheapMb * 1024 / chunkSizeKb;
+    if (maxNumChunks > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("Too many pages to allocate (exceeds MAX_INT)");
+    }
+    if (maxNumChunks < 1) {
+      throw new IllegalArgumentException("The given amount of memory amounted to less than one chunk.");
+    }
+    this.chunkSize = chunkSizeKb * 1024;
+    this.memoryPool = new MemoryPool((int) maxNumChunks, this.chunkSize);
+  }
+
+  /**
+   * Returns a single {@link MemoryChunk} from {@link MemoryPool}.
+   *
+   * @return a MemoryChunk
+   * @throws MemoryAllocationException if fails to allocate MemoryChunk.
+   */
+  public MemoryChunk allocateChunk() throws MemoryAllocationException {
+    return memoryPool.requestChunkFromPool();
+  }
+
+  /**
+   * Returns all the MemoryChunks in the given List of MemoryChunks.
+   *
+   * @param target  the list of MemoryChunks to be returned to the memory pool.
+   */
+  public void returnChunksToPool(final Iterable<MemoryChunk> target) {
+    for (final MemoryChunk chunk: target) {
+      memoryPool.returnChunkToPool(chunk);
+    }
+  }
+
+  /**
+   * Returns the chunk size of the memory pool.
+   *
+   * @return the chunk size in bytes.
+   */
+  public int getChunkSize() {
+    return chunkSize;
+  }
+
+  /**
+   * Returns the number of chunks in the pool. This is unrecommended since it is very complex to
+   * check the size of {@link ConcurrentLinkedQueue}.
+   *
+   * @return the pool size.
+   */
+  int poolSize() {
+    return memoryPool.size();
+  }
+
+  /**
+   * Memory pool that utilizes off-heap memory.
+   * Supports pre-allocation of memory according to user specification.
+   */
+  @ThreadSafe
+  private class MemoryPool {
+
+    private final ConcurrentLinkedQueue<ByteBuffer> pool;
+    private final int chunkSize;
+    private long maxNumChunks;
+    private long numChunks;
+
+    MemoryPool(final long maxNumChunks, final int chunkSize) {
+      this.chunkSize = chunkSize;
+      this.pool = new ConcurrentLinkedQueue<>();
+      this.maxNumChunks = maxNumChunks;
+    }
+
+    synchronized MemoryChunk allocateNewChunk() throws MemoryAllocationException {
+      if (maxNumChunks <= numChunks) {
+        throw new MemoryAllocationException("Exceeded maximum off-heap memory");
+      }
+      ByteBuffer memory = ByteBuffer.allocateDirect(chunkSize);
+      numChunks += 1;
+      return new MemoryChunk(memory);
+    }
+
+    MemoryChunk requestChunkFromPool() throws MemoryAllocationException {
+      try {
+        // Try to reuse a byte buffer in the current pool
+        // Return the byte buffer if there's one that we can reuse
+        final ByteBuffer byteBufferThatWeCanReuse = pool.remove(); // this can throw NoSuchElementException
+        return new MemoryChunk(byteBufferThatWeCanReuse);
+      } catch (final NoSuchElementException e) {
+        // No more byte buffer to reuse in the current pool
+        // So we try to allocate a new chunk
+        // This method is synchronized :)
+        return allocateNewChunk();
+      } catch (final OutOfMemoryError e) {
+        throw new MemoryAllocationException("Memory allocation failed due to lack of memory");
+      }
+    }
+
+    /**
+     * Returns MemoryChunk back to memory pool.
+     *
+     * @param chunk the target MemoryChunk to be returned to the pool.
+     */
+    void returnChunkToPool(final MemoryChunk chunk) {
+      ByteBuffer buf = chunk.getBuffer();
+      chunk.release();
+      pool.add(buf);
+    }
+
+    int size() {
+      return pool.size();
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index 65b167b..a869e8d 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -19,6 +19,8 @@
 package org.apache.nemo.runtime.executor.data.block;
 
 import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
@@ -66,6 +68,7 @@
   private final Serializer serializer;
   private final String filePath;
   private final FileMetadata<K> metadata;
+  private final MemoryPoolAssigner memoryPoolAssigner;
 
   /**
    * Constructor.
@@ -74,16 +77,19 @@
    * @param serializer the {@link Serializer}.
    * @param filePath   the path of the file that this block will be stored.
    * @param metadata   the metadata for this block.
+   * @param memoryPoolAssigner  the MemoryPoolAssigner for memory allocation.
    */
   public FileBlock(final String blockId,
                    final Serializer serializer,
                    final String filePath,
-                   final FileMetadata<K> metadata) {
+                   final FileMetadata<K> metadata,
+                   final MemoryPoolAssigner memoryPoolAssigner) {
     this.id = blockId;
     this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
     this.filePath = filePath;
     this.metadata = metadata;
+    this.memoryPoolAssigner = memoryPoolAssigner;
   }
 
   /**
@@ -103,6 +109,8 @@
         for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) {
           fileOutputChannel.write(buffer);
         }
+        // after the writing to disk, data in memory is released.
+        serializedPartition.release();
       }
     }
   }
@@ -125,11 +133,11 @@
       try {
         SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
         if (partition == null) {
-          partition = new SerializedPartition<>(key, serializer);
+          partition = new SerializedPartition<>(key, serializer, memoryPoolAssigner);
           nonCommittedPartitionsMap.put(key, partition);
         }
         partition.write(element);
-      } catch (final IOException e) {
+      } catch (final IOException | MemoryAllocationException e) {
         throw new BlockWriteException(e);
       }
     }
@@ -150,9 +158,9 @@
     } else {
       try {
         final Iterable<SerializedPartition<K>> convertedPartitions =
-          DataUtil.convertToSerPartitions(serializer, partitions);
+          DataUtil.convertToSerPartitions(serializer, partitions, memoryPoolAssigner);
         writeSerializedPartitions(convertedPartitions);
-      } catch (final IOException e) {
+      } catch (final IOException | MemoryAllocationException e) {
         throw new BlockWriteException(e);
       }
     }
@@ -251,7 +259,7 @@
                 throw new IOException("The read data size does not match with the partition size.");
               }
               partitionsInRange.add(new SerializedPartition<>(
-                key, serializedData, serializedData.length));
+                key, serializedData, serializedData.length, memoryPoolAssigner));
             } else {
               // Have to skip this partition.
               skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index eee0759..7529667 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -19,6 +19,8 @@
 package org.apache.nemo.runtime.executor.data.block;
 
 import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -45,20 +47,24 @@
   private final Map<K, NonSerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
   private volatile boolean committed;
+  private final MemoryPoolAssigner memoryPoolAssigner;
 
   /**
    * Constructor.
    *
    * @param blockId    the ID of this block.
    * @param serializer the {@link Serializer}.
+   * @param memoryPoolAssigner  the MemoryPoolAssigner for memory allocation.
    */
   public NonSerializedMemoryBlock(final String blockId,
-                                  final Serializer serializer) {
+                                  final Serializer serializer,
+                                  final MemoryPoolAssigner memoryPoolAssigner) {
     this.id = blockId;
     this.nonSerializedPartitions = new ArrayList<>();
     this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
     this.committed = false;
+    this.memoryPoolAssigner = memoryPoolAssigner;
   }
 
   /**
@@ -166,8 +172,8 @@
   @Override
   public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
     try {
-      return DataUtil.convertToSerPartitions(serializer, readPartitions(keyRange));
-    } catch (final IOException e) {
+      return DataUtil.convertToSerPartitions(serializer, readPartitions(keyRange), memoryPoolAssigner);
+    } catch (final IOException | MemoryAllocationException e) {
       throw new BlockFetchException(e);
     }
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 6ecc2b9..d4e3185 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -19,6 +19,8 @@
 package org.apache.nemo.runtime.executor.data.block;
 
 import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -45,20 +47,24 @@
   private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
   private volatile boolean committed;
+  private final MemoryPoolAssigner memoryPoolAssigner;
 
   /**
    * Constructor.
    *
    * @param blockId    the ID of this block.
    * @param serializer the {@link Serializer}.
+   * @param memoryPoolAssigner  the MemoryPoolAssigner for memory allocation.
    */
   public SerializedMemoryBlock(final String blockId,
-                               final Serializer serializer) {
+                               final Serializer serializer,
+                               final MemoryPoolAssigner memoryPoolAssigner) {
     this.id = blockId;
     this.serializedPartitions = new ArrayList<>();
     this.nonCommittedPartitionsMap = new HashMap<>();
     this.serializer = serializer;
     this.committed = false;
+    this.memoryPoolAssigner = memoryPoolAssigner;
   }
 
   /**
@@ -79,11 +85,11 @@
       try {
         SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
         if (partition == null) {
-          partition = new SerializedPartition<>(key, serializer);
+          partition = new SerializedPartition<>(key, serializer, memoryPoolAssigner);
           nonCommittedPartitionsMap.put(key, partition);
         }
         partition.write(element);
-      } catch (final IOException e) {
+      } catch (final IOException | MemoryAllocationException e) {
         throw new BlockWriteException(e);
       }
     }
@@ -102,9 +108,9 @@
     if (!committed) {
       try {
         final Iterable<SerializedPartition<K>> convertedPartitions = DataUtil.convertToSerPartitions(
-          serializer, partitions);
+          serializer, partitions, memoryPoolAssigner);
         writeSerializedPartitions(convertedPartitions);
-      } catch (final IOException e) {
+      } catch (final IOException | MemoryAllocationException e) {
         throw new BlockWriteException(e);
       }
     } else {
@@ -234,4 +240,13 @@
   public synchronized boolean isCommitted() {
     return committed;
   }
+
+  /**
+   * Releases the resource (i.e., off-heap memory) that the block holds.
+   */
+  public void release() {
+    for (SerializedPartition partition: serializedPartitions) {
+      partition.release();
+    }
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index 070b618..a2abbdb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -18,7 +18,10 @@
  */
 package org.apache.nemo.runtime.executor.data.partition;
 
-import org.apache.nemo.common.DirectByteBufferOutputStream;
+import org.apache.nemo.runtime.executor.data.DirectByteBufferOutputStream;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryChunk;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
@@ -28,7 +31,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.nemo.runtime.executor.data.DataUtil.buildOutputStream;
@@ -55,7 +58,8 @@
   private final OutputStream wrappedStream;
   @Nullable
   private final EncoderFactory.Encoder encoder;
-  private volatile List<ByteBuffer> dataList;
+  private final MemoryPoolAssigner memoryPoolAssigner;
+  private volatile List<MemoryChunk> dataList;
   private final boolean offheap;
 
   /**
@@ -64,17 +68,22 @@
    *
    * @param key        the key of this partition.
    * @param serializer the serializer to be used to serialize data.
+   * @param memoryPoolAssigner  the memory pool assigner for memory allocation.
    * @throws IOException if fail to chain the output stream.
+   * @throws MemoryAllocationException  if fail to allocate memory.
    */
   public SerializedPartition(final K key,
-                             final Serializer serializer) throws IOException {
+                             final Serializer serializer,
+                             final MemoryPoolAssigner memoryPoolAssigner) throws IOException,
+                                                                          MemoryAllocationException {
     this.key = key;
     this.serializedData = new byte[0];
     this.length = 0;
     this.committed = false;
-    this.bytesOutputStream = new DirectByteBufferOutputStream();
+    this.bytesOutputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
     this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
     this.encoder = serializer.getEncoderFactory().create(wrappedStream);
+    this.memoryPoolAssigner = memoryPoolAssigner;
     this.offheap = true;
   }
 
@@ -85,10 +94,12 @@
    * @param key            the key.
    * @param serializedData the serialized data.
    * @param length         the length of the actual serialized data. (It can be different with serializedData.length)
+   * @param memoryPoolAssigner the memory pool assigner.
    */
   public SerializedPartition(final K key,
                              final byte[] serializedData,
-                             final int length) {
+                             final int length,
+                             final MemoryPoolAssigner memoryPoolAssigner) {
     this.key = key;
     this.serializedData = serializedData;
     this.length = length;
@@ -96,6 +107,7 @@
     this.bytesOutputStream = null;
     this.wrappedStream = null;
     this.encoder = null;
+    this.memoryPoolAssigner = memoryPoolAssigner;
     this.offheap = false;
   }
 
@@ -103,20 +115,23 @@
    * Creates a serialized {@link Partition} with actual data residing in off-heap region.
    * Data cannot be written to this partition after the construction.
    *
-   * @param key               the key.
-   * @param serializedBufList the serialized data in list list of {@link ByteBuffer}s.
-   * @param length            the length of the actual serialized data. (It can be different with serializedData.length)
+   * @param key                the key.
+   * @param serializedChunkList the serialized data in list list of {@link MemoryChunk}s.
+   * @param length             the length of the actual serialized data.(It can be different with serializedData.length)
+   * @param memoryPoolAssigner  the memory pool assigner.
    */
   public SerializedPartition(final K key,
-                             final List<ByteBuffer> serializedBufList,
-                             final int length) {
+                             final List<MemoryChunk> serializedChunkList,
+                             final int length,
+                             final MemoryPoolAssigner memoryPoolAssigner) {
     this.key = key;
-    this.dataList = serializedBufList;
+    this.dataList = serializedChunkList;
     this.length = length;
     this.committed = true;
     this.bytesOutputStream = null;
     this.wrappedStream = null;
     this.encoder = null;
+    this.memoryPoolAssigner = memoryPoolAssigner;
     this.offheap = true;
   }
 
@@ -150,7 +165,7 @@
       // We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
       // inner buffer directly, which can be an unfinished(not flushed) buffer.
       wrappedStream.close();
-      this.dataList = bytesOutputStream.getDirectByteBufferList();
+      this.dataList = bytesOutputStream.getMemoryChunkList();
       this.length = bytesOutputStream.size();
       this.committed = true;
     }
@@ -199,9 +214,9 @@
     if (!committed) {
       throw new IOException("The partition is not committed yet!");
     } else {
-      List<ByteBuffer> result = new ArrayList<>(dataList.size());
-      for (final ByteBuffer buffer : dataList) {
-        final ByteBuffer dupBuffer = buffer.duplicate();
+      List<ByteBuffer> result = new LinkedList<>();
+      for (final MemoryChunk chunk : dataList) {
+        final ByteBuffer dupBuffer = chunk.duplicate().getBuffer();
         result.add(dupBuffer);
       }
       return result;
@@ -226,4 +241,15 @@
   public boolean isOffheap() {
     return offheap;
   }
+
+  /**
+   * Releases the off-heap memory that this SerializedPartition holds.
+   * TODO #403: Remove 'transient' uses of SerializedPartition.
+   */
+  public void release() {
+    if (!committed) {
+      throw new IllegalStateException("The partition is not committed yet!");
+    }
+    memoryPoolAssigner.returnChunksToPool(dataList);
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index 170d98c..880237f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.runtime.executor.data.stores;
 
 import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 
@@ -28,14 +29,18 @@
  */
 public abstract class AbstractBlockStore implements BlockStore {
   private final SerializerManager serializerManager;
+  private final MemoryPoolAssigner memoryPoolAssigner;
 
   /**
    * Constructor.
    *
    * @param serializerManager the coder manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
-  protected AbstractBlockStore(final SerializerManager serializerManager) {
+  AbstractBlockStore(final SerializerManager serializerManager,
+                               final MemoryPoolAssigner memoryPoolAssigner) {
     this.serializerManager = serializerManager;
+    this.memoryPoolAssigner = memoryPoolAssigner;
   }
 
   /**
@@ -44,8 +49,17 @@
    * @param blockId the ID of the block to get the coder.
    * @return the coder.
    */
-  protected final Serializer getSerializerFromWorker(final String blockId) {
+  final Serializer getSerializerFromWorker(final String blockId) {
     final String runtimeEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(blockId);
     return serializerManager.getSerializer(runtimeEdgeId);
   }
+
+  /**
+   * Gets the memory pool assigner for this executor.
+   *
+   * @return the memory pool assigner.
+   */
+  final MemoryPoolAssigner getMemoryPoolAssigner() {
+    return memoryPoolAssigner;
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
index 2077a05..6ca78b1 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.stores;
 
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.conf.JobConf;
@@ -53,12 +54,14 @@
    * @param volumeDirectory   the remote volume directory which will contain the files.
    * @param jobId             the job id.
    * @param serializerManager the serializer manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
   @Inject
   private GlusterFileStore(@Parameter(JobConf.GlusterVolumeDirectory.class) final String volumeDirectory,
                            @Parameter(JobConf.JobId.class) final String jobId,
-                           final SerializerManager serializerManager) {
-    super(serializerManager);
+                           final SerializerManager serializerManager,
+                           final MemoryPoolAssigner memoryPoolAssigner) {
+    super(serializerManager, memoryPoolAssigner);
     this.fileDirectory = volumeDirectory + "/" + jobId;
     new File(fileDirectory).mkdirs();
   }
@@ -70,7 +73,7 @@
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
     final RemoteFileMetadata metadata =
       RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
-    return new FileBlock<>(blockId, serializer, filePath, metadata);
+    return new FileBlock<>(blockId, serializer, filePath, metadata, getMemoryPoolAssigner());
   }
 
   /**
@@ -151,6 +154,6 @@
     final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
     final RemoteFileMetadata<K> metadata =
       RemoteFileMetadata.open(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
-    return new FileBlock<>(blockId, serializer, filePath, metadata);
+    return new FileBlock<>(blockId, serializer, filePath, metadata, getMemoryPoolAssigner());
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
index 9a7b8f7..5af8faf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.stores;
 
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.data.block.Block;
 
@@ -38,9 +39,11 @@
    * Constructor.
    *
    * @param coderManager the coder manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
-  protected LocalBlockStore(final SerializerManager coderManager) {
-    super(coderManager);
+  protected LocalBlockStore(final SerializerManager coderManager,
+                            final MemoryPoolAssigner memoryPoolAssigner) {
+    super(coderManager, memoryPoolAssigner);
     this.blockMap = new ConcurrentHashMap<>();
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
index 0b22dae..efbad9b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.stores;
 
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.conf.JobConf;
@@ -46,11 +47,13 @@
    *
    * @param fileDirectory     the directory which will contain the files.
    * @param serializerManager the serializer manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
   @Inject
   private LocalFileStore(@Parameter(JobConf.FileDirectory.class) final String fileDirectory,
-                         final SerializerManager serializerManager) {
-    super(serializerManager);
+                         final SerializerManager serializerManager,
+                         final MemoryPoolAssigner memoryPoolAssigner) {
+    super(serializerManager, memoryPoolAssigner);
     this.fileDirectory = fileDirectory;
     new File(fileDirectory).mkdirs();
   }
@@ -62,7 +65,8 @@
     final Serializer serializer = getSerializerFromWorker(blockId);
     final LocalFileMetadata metadata = new LocalFileMetadata();
 
-    return new FileBlock(blockId, serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory), metadata);
+    return new FileBlock(blockId, serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory),
+                                                                  metadata, getMemoryPoolAssigner());
   }
 
   /**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
index 00346c4..bd87031 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.stores;
 
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.data.block.Block;
@@ -38,10 +39,12 @@
    * Constructor.
    *
    * @param serializerManager the serializer manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
   @Inject
-  private MemoryStore(final SerializerManager serializerManager) {
-    super(serializerManager);
+  private MemoryStore(final SerializerManager serializerManager,
+                      final MemoryPoolAssigner memoryPoolAssigner) {
+    super(serializerManager, memoryPoolAssigner);
   }
 
   /**
@@ -50,7 +53,7 @@
   @Override
   public NonSerializedMemoryBlock createBlock(final String blockId) {
     final Serializer serializer = getSerializerFromWorker(blockId);
-    return new NonSerializedMemoryBlock(blockId, serializer);
+    return new NonSerializedMemoryBlock(blockId, serializer, getMemoryPoolAssigner());
   }
 
   /**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index f5298be..de46951 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.data.stores;
 
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.data.block.Block;
@@ -37,10 +38,12 @@
    * Constructor.
    *
    * @param serializerManager the serializer manager.
+   * @param memoryPoolAssigner the memory pool assigner.
    */
   @Inject
-  private SerializedMemoryStore(final SerializerManager serializerManager) {
-    super(serializerManager);
+  private SerializedMemoryStore(final SerializerManager serializerManager,
+                                final MemoryPoolAssigner memoryPoolAssigner) {
+    super(serializerManager, memoryPoolAssigner);
   }
 
   /**
@@ -49,7 +52,7 @@
   @Override
   public Block createBlock(final String blockId) {
     final Serializer serializer = getSerializerFromWorker(blockId);
-    return new SerializedMemoryBlock(blockId, serializer);
+    return new SerializedMemoryBlock(blockId, serializer, getMemoryPoolAssigner());
   }
 
   /**
@@ -75,6 +78,8 @@
    */
   @Override
   public boolean deleteBlock(final String blockId) {
+    SerializedMemoryBlock block = (SerializedMemoryBlock) getBlockMap().get(blockId);
+    block.release();
     return getBlockMap().remove(blockId) != null;
   }
 }
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
index 9346803..67d1f5c 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
@@ -42,6 +42,7 @@
 public final class BlockTest {
   private Serializer serializer;
   private Map<Integer, List<Integer>> testData;
+  private MemoryPoolAssigner memoryPoolAssigner;
 
   /**
    * Generates the test data and serializer.
@@ -52,6 +53,7 @@
   public void setUp() throws Exception {
     serializer = new Serializer<>(IntEncoderFactory.of(), IntDecoderFactory.of(), new ArrayList<>(), new ArrayList<>());
     testData = new HashMap<>();
+    memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
 
     final List<Integer> list1 = Collections.singletonList(1);
     final List<Integer> list2 = Arrays.asList(1, 2);
@@ -69,7 +71,7 @@
    */
   @Test(timeout = 10000)
   public void testNonSerializedMemoryBlock() throws Exception {
-    final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer);
+    final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer, memoryPoolAssigner);
     testBlock(block);
   }
 
@@ -80,7 +82,7 @@
    */
   @Test(timeout = 10000)
   public void testSerializedMemoryBlock() throws Exception {
-    final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer);
+    final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer, memoryPoolAssigner);
     testBlock(block);
   }
 
@@ -96,7 +98,7 @@
     try {
       new File(tmpDir).mkdirs();
       final LocalFileMetadata<Integer> metadata = new LocalFileMetadata<>();
-      final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata);
+      final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata, memoryPoolAssigner);
       testBlock(block);
     } finally {
       FileUtils.deleteDirectory(new File(tmpDir));
diff --git a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
similarity index 77%
rename from common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
rename to runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
index 564ec5a..3bd5df3 100644
--- a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.common;
+package org.apache.nemo.runtime.executor.data;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -33,28 +34,29 @@
  */
 public class DirectByteBufferOutputStreamTest {
   private DirectByteBufferOutputStream outputStream;
+  private static final MemoryPoolAssigner memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
 
   @Before
-  public void setup(){
-    outputStream = new DirectByteBufferOutputStream();
+  public void setup() throws MemoryAllocationException {
+    outputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
   }
 
   @Test
-  public void testSingleWrite() {
+  public void testSingleWrite() throws IOException {
     int value = 1;
     outputStream.write(value);
     assertEquals(value, outputStream.toByteArray()[0]);
   }
 
   @Test
-  public void testWrite(){
+  public void testWrite() throws IOException {
     String value = "value";
     outputStream.write(value.getBytes());
     assertEquals(value, new String(outputStream.toByteArray()));
   }
 
   @Test
-  public void testReWrite() {
+  public void testReWrite() throws IOException {
     String value1 = "value1";
     String value2 = "value2";
     outputStream.write(value1.getBytes());
@@ -64,7 +66,7 @@
   }
 
   @Test
-  public void testReRead() {
+  public void testReRead() throws IOException {
     String value = "value";
     outputStream.write(value.getBytes());
     assertEquals(value, new String(outputStream.toByteArray()));
@@ -72,14 +74,14 @@
   }
 
   @Test
-  public void testLongWrite() {
+  public void testLongWrite() throws IOException {
     String value = RandomStringUtils.randomAlphanumeric(10000);
     outputStream.write(value.getBytes());
     assertEquals(value,new String(outputStream.toByteArray()));
   }
 
   @Test
-  public void testLongReWrite() {
+  public void testLongReWrite() throws IOException {
     String value1 = RandomStringUtils.randomAlphanumeric(10000);
     String value2 = RandomStringUtils.randomAlphanumeric(5000);
     outputStream.write(value1.getBytes());
@@ -89,7 +91,7 @@
   }
 
   @Test
-  public void testLongReRead() {
+  public void testLongReRead() throws IOException {
     String value = RandomStringUtils.randomAlphanumeric(10000);
     outputStream.write(value.getBytes());
     assertEquals(value, new String(outputStream.toByteArray()));
@@ -97,17 +99,17 @@
   }
 
   @Test
-  public void testGetDirectBufferList() {
+  public void testGetMemoryChunkList() throws IOException {
     String value = RandomStringUtils.randomAlphanumeric(10000);
     outputStream.write(value.getBytes());
     byte[] totalOutput = outputStream.toByteArray();
-    List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
+    List<MemoryChunk> chunkList = outputStream.getMemoryChunkList();
     int offset = 0;
     int byteToRead;
-    for (final ByteBuffer temp : bufList) {
-      byteToRead = temp.remaining();
+    for (final MemoryChunk temp : chunkList) {
+      byteToRead = temp.getBuffer().remaining();
       byte[] output = new byte[byteToRead];
-      temp.get(output, 0, byteToRead);
+      temp.getBuffer().get(output, 0, byteToRead);
       byte[] expected = Arrays.copyOfRange(totalOutput, offset, offset+byteToRead);
       assertEquals(new String(expected), new String(output));
       offset += byteToRead;
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java
new file mode 100644
index 0000000..f99546a
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests basic operations on {@link MemoryChunk}.
+ */
+public class MemoryChunkTest {
+  private MemoryChunk chunk;
+  private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+
+  @Before
+  public void setup() {
+    chunk = new MemoryChunk(buffer);
+  }
+
+  @Test
+  public void testPutAndGet() {
+    byte input = 5;
+    chunk.put(0, input);
+    assertEquals(input, chunk.get(0));
+  }
+
+  @Test
+  public void testPutAndGetByArray() {
+    byte[] input = new byte[1000]; new Random().nextBytes(input);
+    chunk.put(0, input);
+    byte[] output = new byte[1000]; chunk.get(0, output);
+    assertArrayEquals(input, output);
+  }
+
+  @Test
+  public void testBulkPutAndGet() {
+    byte[] input1 = new byte[1000]; new Random().nextBytes(input1);
+    byte[] input2 = new byte[24]; new Random().nextBytes(input2);
+    chunk.put(0, input1);
+    chunk.put(input1.length, input2);
+    byte[] output = new byte[1024]; chunk.get(0, output);
+    byte[] concat = ArrayUtils.addAll(input1, input2);
+    assertArrayEquals(concat, output);
+  }
+
+  @Test
+  public void testPutCharAndGetChar() {
+    char input = 'a';
+    chunk.putChar(0,input);
+    assertEquals(input, chunk.getChar(0));
+  }
+
+  @Test
+  public void testPutShortAndGetShort() {
+    short input = 1;
+    chunk.putShort(0, input);
+    assertEquals(input, chunk.getShort(0));
+  }
+
+  @Test
+  public void testPutIntAndGetInt() {
+    int input = 14;
+    chunk.putInt(0, input);
+    assertEquals(input, chunk.getInt(0));
+  }
+
+  @Test
+  public void testPutLongAndGetLong() {
+    long input = 28;
+    chunk.putLong(0, input);
+    assertEquals(input, chunk.getLong(0));
+  }
+
+  @Test
+  public void testPutFloatAndGetFloat() {
+    float input = 3;
+    chunk.putFloat(0, input);
+    assertEquals(input, chunk.getFloat(0), 0.1);
+  }
+
+  @Test
+  public void testPutDoubleAndGetDouble() {
+    double input = 3.7;
+    chunk.putDouble(0, input);
+    assertEquals(input, chunk.getDouble(0), 0.01);
+  }
+}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java
new file mode 100644
index 0000000..b351c44
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class MemoryPoolAssignerTest {
+  private MemoryPoolAssigner memoryPoolAssigner;
+
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryPoolAssignerTest.class.getName());
+
+  private static final int NUM_CONCURRENT_THREADS = 5;
+
+  private static final int MAX_MEM_MB = 1;
+  private static final int CHUNK_SIZE_KB = 32;
+  private static final int MAX_NUM_CHUNKS = MAX_MEM_MB * 1024 / CHUNK_SIZE_KB;
+
+  @Before
+  public void setUp() {
+    this.memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
+  }
+
+  @Test(expected = MemoryAllocationException.class)
+  public void testTooMuchRequest() throws MemoryAllocationException {
+    List<MemoryChunk> chunkList = new LinkedList<>();
+    for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+      chunkList.add(memoryPoolAssigner.allocateChunk());
+    }
+    memoryPoolAssigner.allocateChunk();
+  }
+
+  @Test
+  public void testConcurrentAllocReturnAlloc() throws InterruptedException {
+    final ExecutorService executor = Executors.newFixedThreadPool(NUM_CONCURRENT_THREADS);
+    for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+      executor.submit(() -> {
+        try {
+          // We return this one immediately
+          final MemoryChunk toReturnImmediately = memoryPoolAssigner.allocateChunk();
+          memoryPoolAssigner.returnChunksToPool(Arrays.asList(toReturnImmediately));
+
+          // We don't return this one
+          memoryPoolAssigner.allocateChunk();
+        } catch (MemoryAllocationException e) {
+          throw new RuntimeException();
+        }
+      });
+    }
+    executor.shutdown();
+    executor.awaitTermination(30, TimeUnit.SECONDS);
+    assertEquals(0, memoryPoolAssigner.poolSize());
+  }
+
+  @Test
+  public void testConcurrentUniqueAllocations() throws InterruptedException {
+    final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CONCURRENT_THREADS);
+    final ConcurrentLinkedQueue<MemoryChunk> allocatedChunks = new ConcurrentLinkedQueue<>();
+
+    for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+      executorService.submit(()->{
+        try {
+          allocatedChunks.add(memoryPoolAssigner.allocateChunk());
+        } catch (MemoryAllocationException e) {
+          throw new RuntimeException();
+        }
+      });
+    }
+    executorService.shutdown();
+    executorService.awaitTermination(30, TimeUnit.SECONDS);
+    assertEquals(0, memoryPoolAssigner.poolSize());
+
+    // All chunks should be unique
+    assertEquals(allocatedChunks.size(), new HashSet<>(allocatedChunks).size());
+  }
+}