[NEMO-388] Off-heap memory management (reuse ByteBuffer) (#223)
JIRA: [NEMO-388: Off-heap memory management (reuse ByteBuffer)](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-388)
**Major changes:**
- New executor option: `max_offheap_mb` and `chunk_size_kb`
- `MemoryPoolAssigner` class is involved in block creation to use off-heap memory
- `ByteBuffer` is now reused, wrapped by `MemoryChunk`
**Tests for the changes:**
- `MemoryChunk` put/get tests included
- `MemoryPoolAssigner` tests included
**Other comments:**
- Design doc attached in JIRA will be edited soon
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 6725e65..e371293 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -416,6 +416,8 @@
cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
+ cl.registerShortNameOfClass(JobConf.MaxOffheapMb.class);
+ cl.registerShortNameOfClass(JobConf.ChunkSizeKb.class);
cl.processCommandLine(args);
return confBuilder.build();
}
diff --git a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java b/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
deleted file mode 100644
index 8a74fb2..0000000
--- a/common/src/main/java/org/apache/nemo/common/DirectByteBufferOutputStream.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This class is a customized output stream implementation backed by
- * {@link ByteBuffer}, which utilizes off heap memory when writing the data.
- * Memory is allocated when needed by the specified {@code pageSize}.
- * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
- * when the corresponding block is deleted.
- * TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
- */
-public final class DirectByteBufferOutputStream extends OutputStream {
-
- private LinkedList<ByteBuffer> dataList = new LinkedList<>();
- private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
- private final int pageSize;
- private ByteBuffer currentBuf;
-
- /**
- * Default constructor.
- * Sets the {@code pageSize} as default size of 4096 bytes.
- */
- public DirectByteBufferOutputStream() {
- this(DEFAULT_PAGE_SIZE);
- }
-
- /**
- * Constructor which sets {@code pageSize} as specified {@code size}.
- * Note that the {@code pageSize} has trade-off between memory fragmentation and
- * native memory (de)allocation overhead.
- *
- * @param size should be a power of 2 and greater than or equal to 4096.
- */
- public DirectByteBufferOutputStream(final int size) {
- if (size < DEFAULT_PAGE_SIZE || (size & (size - 1)) != 0) {
- throw new IllegalArgumentException("Invalid pageSize");
- }
- this.pageSize = size;
- newLastBuffer();
- currentBuf = dataList.getLast();
- }
-
- /**
- * Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
- */
- // TODO #388: Off-heap memory management (reuse ByteBuffer)
- private void newLastBuffer() {
- dataList.addLast(ByteBuffer.allocateDirect(pageSize));
- }
-
- /**
- * Writes the specified byte to this output stream.
- *
- * @param b the byte to be written.
- */
- @Override
- public void write(final int b) {
- if (currentBuf.remaining() <= 0) {
- newLastBuffer();
- currentBuf = dataList.getLast();
- }
- currentBuf.put((byte) b);
- }
-
- /**
- * Writes {@code b.length} bytes from the specified byte array to this output stream.
- *
- * @param b the byte to be written.
- */
- @Override
- public void write(final byte[] b) {
- write(b, 0, b.length);
- }
-
- /**
- * Writes {@code len} bytes from the specified byte array
- * starting at offset {@code off} to this output stream.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- */
- @Override
- public void write(final byte[] b, final int off, final int len) {
- int byteToWrite = len;
- int offset = off;
- while (byteToWrite > 0) {
- if (currentBuf.remaining() <= 0) {
- newLastBuffer();
- currentBuf = dataList.getLast();
- }
- final int bufRemaining = currentBuf.remaining();
- if (bufRemaining < byteToWrite) {
- currentBuf.put(b, offset, bufRemaining);
- offset += bufRemaining;
- byteToWrite -= bufRemaining;
- } else {
- currentBuf.put(b, offset, byteToWrite);
- offset += byteToWrite;
- byteToWrite = 0;
- }
- }
- }
-
-
- /**
- * Creates a byte array that contains the whole content currently written in this output stream.
- *
- * USED BY TESTS ONLY.
- * @return the current contents of this output stream, as byte array.
- */
- @VisibleForTesting
- byte[] toByteArray() {
- if (dataList.isEmpty()) {
- final byte[] byteArray = new byte[0];
- return byteArray;
- }
-
- ByteBuffer lastBuf = dataList.getLast();
- // pageSize equals the size of the data filled in the ByteBuffers
- // except for the last ByteBuffer. The size of the data in the
- // ByteBuffer can be obtained by calling ByteBuffer.position().
- final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
- final byte[] byteArray = new byte[arraySize];
- int start = 0;
-
- for (final ByteBuffer buffer : dataList) {
- // We use duplicated buffer to read the data so that there is no complicated
- // alteration of position and limit when switching between read and write mode.
- final ByteBuffer dupBuffer = buffer.duplicate();
- dupBuffer.flip();
- final int byteToWrite = dupBuffer.remaining();
- dupBuffer.get(byteArray, start, byteToWrite);
- start += byteToWrite;
- }
-
- return byteArray;
- }
-
- /**
- * Returns the list of {@code ByteBuffer}s that contains the written data.
- * List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
- * position and limit, to reduce erroneous data read/write.
- * This function has to be called when intended to read from the start of the list of
- * {@link ByteBuffer}s, not for additional write.
- *
- * @return the {@code LinkedList} of {@code ByteBuffer}s.
- */
- public List<ByteBuffer> getDirectByteBufferList() {
- List<ByteBuffer> result = new ArrayList<>(dataList.size());
- for (final ByteBuffer buffer : dataList) {
- final ByteBuffer dupBuffer = buffer.duplicate();
- dupBuffer.flip();
- result.add(dupBuffer);
- }
- return result;
- }
-
- /**
- * Returns the size of the data written in this output stream.
- *
- * @return the size of the data
- */
- public int size() {
- return pageSize * (dataList.size() - 1) + dataList.getLast().position();
- }
-
- /**
- * Closing this output stream has no effect.
- */
- public void close() {
- }
-}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 23cf597..485d35c 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -288,6 +288,23 @@
public final class PartitionTransportClientNumThreads implements Name<Integer> {
}
+ /**
+ * Maximum off-heap memory size in the executor.
+ */
+ // TODO #397: Separation of JVM heap region and off-heap memory region
+ @NamedParameter(doc = "The maximum off-heap memory that can be allocated",
+ short_name = "max_offheap_mb", default_value = "8192")
+ public final class MaxOffheapMb implements Name<Integer> {
+ }
+
+ /**
+ * MemoryChunk size in the memory pool.
+ */
+ @NamedParameter(doc = "The memory chunk size in the memory pool of the executor",
+ short_name = "chunk_size_kb", default_value = "32")
+ public final class ChunkSizeKb implements Name<Integer> {
+ }
+
//////////////////////////////// Intermediate Configurations
/**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 01ba02a..fade5b0 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -20,9 +20,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.DefaultFileRegion;
-import io.netty.channel.FileRegion;
+import io.netty.channel.*;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.FileArea;
@@ -38,6 +36,8 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static io.netty.buffer.Unpooled.wrappedBuffer;
@@ -144,12 +144,19 @@
* Writes {@link SerializedPartition}.
*
* @param serializedPartition {@link SerializedPartition} to write.
+ * @param releaseOnComplete wheter to release the partition upon completion.
* @return {@code this}
* @throws IOException when an exception has been set or this stream was closed
*/
- public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition)
+ public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition,
+ final boolean releaseOnComplete)
throws IOException {
- writeBuffer(serializedPartition.getDirectBufferList());
+ if (releaseOnComplete) {
+ ChannelFutureListener listener = future -> serializedPartition.release();
+ writeBuffer(serializedPartition.getDirectBufferList(), Arrays.asList(listener));
+ } else {
+ writeBuffer(serializedPartition.getDirectBufferList(), Collections.emptyList());
+ }
return this;
}
@@ -158,23 +165,25 @@
* to write a data frame.
*
* @param bufList list of {@link ByteBuffer}s to wrap.
+ * @param listeners to add.
* @throws IOException when fails to write the data.
*/
- public void writeBuffer(final List<ByteBuffer> bufList) throws IOException {
+ void writeBuffer(final List<ByteBuffer> bufList,
+ final List<ChannelFutureListener> listeners) throws IOException {
final ByteBuf byteBuf = wrappedBuffer(bufList.toArray(new ByteBuffer[bufList.size()]));
- writeByteBuf(byteBuf);
+ writeByteBuf(byteBuf, listeners);
}
-
/**
* Writes a data frame, from {@link ByteBuf}.
*
* @param byteBuf {@link ByteBuf} to write.
+ * @param listeners to add.
* @throws IOException when fails to write data.
*/
- private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
+ private void writeByteBuf(final ByteBuf byteBuf, final List<ChannelFutureListener> listeners) throws IOException {
if (byteBuf.readableBytes() > 0) {
- writeDataFrame(byteBuf, byteBuf.readableBytes());
+ writeDataFrame(byteBuf, byteBuf.readableBytes(), listeners);
}
}
@@ -192,7 +201,7 @@
while (bytesToSend > 0) {
final long size = Math.min(bytesToSend, DataFrameEncoder.LENGTH_MAX);
final FileRegion fileRegion = new DefaultFileRegion(FileChannel.open(path), cursor, size);
- writeDataFrame(fileRegion, size);
+ writeDataFrame(fileRegion, size, Collections.emptyList());
cursor += size;
bytesToSend -= size;
}
@@ -206,7 +215,7 @@
}
if (newSubStream) {
// to emit a frame with new sub-stream flag
- writeDataFrame(null, 0);
+ writeDataFrame(null, 0, Collections.emptyList());
}
closed = true;
}
@@ -228,7 +237,7 @@
encoder.encode(element);
wrapped.close();
- writeByteBuf(byteBuf);
+ writeByteBuf(byteBuf, Collections.emptyList());
} catch (final IOException e) {
throw new RuntimeException(e);
}
@@ -239,15 +248,20 @@
*
* @param body the body or {@code null}
* @param length the length of the body, in bytes
+ * @param listeners to add.
* @throws IOException when an exception has been set or this stream was closed
*/
- private void writeDataFrame(final Object body, final long length) throws IOException {
+ private void writeDataFrame(final Object body, final long length,
+ final List<ChannelFutureListener> listeners) throws IOException {
ensureNoException();
if (closed) {
throw new IOException("Stream already closed.");
}
- channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body, length, newSubStream))
+ final ChannelFuture beforeAddingGivenListener = channel
+ .writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body, length, newSubStream))
.addListener(getChannelWriteListener());
+ listeners.forEach(beforeAddingGivenListener::addListener);
+
newSubStream = false;
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 8806432..576831f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -38,6 +38,7 @@
import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.block.FileBlock;
+import org.apache.nemo.runtime.executor.data.block.SerializedMemoryBlock;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.stores.*;
@@ -356,7 +357,13 @@
final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
for (final SerializedPartition partition : partitions) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
- os.writeSerializedPartitionBuffer(partition);
+ if (optionalBlock.get().getClass() == SerializedMemoryBlock.class) {
+ os.writeSerializedPartitionBuffer(partition, false);
+ } else {
+ // For NonSerializedMemoryBlock, the serialized partition to be sent is transient and needs
+ // to be released right after the data transfer.
+ os.writeSerializedPartitionBuffer(partition, true);
+ }
}
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index f0362dc..16df0b6 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -20,7 +20,6 @@
import com.google.common.io.CountingInputStream;
import org.apache.nemo.common.ByteBufferInputStream;
-import org.apache.nemo.common.DirectByteBufferOutputStream;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
@@ -32,7 +31,6 @@
import org.slf4j.LoggerFactory;
import java.io.*;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -105,16 +103,19 @@
* @param serializer the serializer for serialization.
* @param partitionsToConvert the partitions to convert.
* @param <K> the key type of the partitions.
+ * @param memoryPoolAssigner the memory pool assigner for DirectByteBufferOutputStream.
* @return the converted {@link SerializedPartition}s.
* @throws IOException if fail to convert.
+ * @throws MemoryAllocationException if fail to allocate memory.
*/
public static <K extends Serializable> Iterable<SerializedPartition<K>> convertToSerPartitions(
final Serializer serializer,
- final Iterable<NonSerializedPartition<K>> partitionsToConvert) throws IOException {
+ final Iterable<NonSerializedPartition<K>> partitionsToConvert,
+ final MemoryPoolAssigner memoryPoolAssigner) throws IOException, MemoryAllocationException {
final List<SerializedPartition<K>> serializedPartitions = new ArrayList<>();
for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
try (
- DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream();
+ DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers())
) {
serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
@@ -123,10 +124,10 @@
wrappedStream.close();
// Note that serializedBytes include invalid bytes.
// So we have to use it with the actualLength by using size() whenever needed.
- final List<ByteBuffer> serializedBufList = bytesOutputStream.getDirectByteBufferList();
+ final List<MemoryChunk> serializedBufList = bytesOutputStream.getMemoryChunkList();
final int actualLength = bytesOutputStream.size();
serializedPartitions.add(
- new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength));
+ new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength, memoryPoolAssigner));
}
}
return serializedPartitions;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java
new file mode 100644
index 0000000..dae3859
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStream.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is a customized output stream implementation backed by
+ * {@link ByteBuffer}, which utilizes off heap memory when writing the data via MemoryPoolAssigner.
+ * Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
+ * when the corresponding block is deleted.
+ */
+public final class DirectByteBufferOutputStream extends OutputStream {
+
+ private LinkedList<MemoryChunk> dataList = new LinkedList<>();
+ private final int chunkSize;
+ private ByteBuffer currentBuf;
+ private final MemoryPoolAssigner memoryPoolAssigner;
+
+ /**
+ * Default constructor.
+ *
+ * @param memoryPoolAssigner for memory allocation.
+ * @throws MemoryAllocationException if fails to allocate new memory.
+ */
+ public DirectByteBufferOutputStream(final MemoryPoolAssigner memoryPoolAssigner) throws MemoryAllocationException {
+ this.chunkSize = memoryPoolAssigner.getChunkSize();
+ this.memoryPoolAssigner = memoryPoolAssigner;
+ newLastBuffer();
+ currentBuf = dataList.getLast().getBuffer();
+ }
+
+ /**
+ * Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
+ *
+ * @throws MemoryAllocationException if fail to allocate memory chunk.
+ */
+ private void newLastBuffer() throws MemoryAllocationException {
+ dataList.addLast(memoryPoolAssigner.allocateChunk());
+ }
+
+ /**
+ * Writes the specified byte to this output stream.
+ *
+ * @param b the byte to be written.
+ */
+ @Override
+ public void write(final int b) throws IOException {
+ try {
+ if (currentBuf.remaining() <= 0) {
+ newLastBuffer();
+ currentBuf = dataList.getLast().getBuffer();
+ }
+ currentBuf.put((byte) b);
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } catch (MemoryAllocationException e) {
+ throw new IOException("Failed to allocate memory");
+ }
+ }
+
+ /**
+ * Writes {@code b.length} bytes from the specified byte array to this output stream.
+ *
+ * @param b the byte to be written.
+ */
+ @Override
+ public void write(final byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Writes {@code len} bytes from the specified byte array
+ * starting at offset {@code off} to this output stream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ */
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ int byteToWrite = len;
+ int offset = off;
+ try {
+ while (byteToWrite > 0) {
+ if (currentBuf.remaining() <= 0) {
+ newLastBuffer();
+ currentBuf = dataList.getLast().getBuffer();
+ }
+ final int bufRemaining = currentBuf.remaining();
+ if (bufRemaining < byteToWrite) {
+ currentBuf.put(b, offset, bufRemaining);
+ offset += bufRemaining;
+ byteToWrite -= bufRemaining;
+ } else {
+ currentBuf.put(b, offset, byteToWrite);
+ offset += byteToWrite;
+ byteToWrite = 0;
+ }
+ }
+ } catch (MemoryAllocationException e) {
+ throw new IOException("Failed to allocate memory");
+ }
+ }
+
+
+ /**
+ * Creates a byte array that contains the whole content currently written in this output stream.
+ *
+ * USED BY TESTS ONLY.
+ * @return the current contents of this output stream, as byte array.
+ */
+ @VisibleForTesting
+ byte[] toByteArray() {
+ if (dataList.isEmpty()) {
+ final byte[] byteArray = new byte[0];
+ return byteArray;
+ }
+ MemoryChunk lastBuf = dataList.getLast();
+ // pageSize equals the size of the data filled in the ByteBuffers
+ // except for the last ByteBuffer. The size of the data in the
+ // ByteBuffer can be obtained by calling MemoryChunk.position().
+ final int arraySize = chunkSize * (dataList.size() - 1) + lastBuf.getBuffer().position();
+ final byte[] byteArray = new byte[arraySize];
+ int start = 0;
+
+ for (final MemoryChunk chunk : dataList) {
+ // We use duplicated buffer to read the data so that there is no complicated
+ // alteration of position and limit when switching between read and write mode.
+ final MemoryChunk dupChunk = chunk.duplicate();
+ final ByteBuffer dupBuffer = dupChunk.getBuffer();
+ dupBuffer.flip();
+ final int byteToWrite = dupBuffer.remaining();
+ dupBuffer.get(byteArray, start, byteToWrite);
+ start += byteToWrite;
+ }
+
+ return byteArray;
+ }
+
+ /**
+ * Returns the list of {@code MemoryChunk}s that contains the written data.
+ * List of flipped and duplicated {@link MemoryChunk}s are returned, which has independent
+ * position and limit, to reduce erroneous data read/write.
+ * This function has to be called when intended to read from the start of the list of
+ * {@link MemoryChunk}s, not for additional write.
+ *
+ * @return the {@code LinkedList} of {@code MemoryChunk}s.
+ */
+ public List<MemoryChunk> getMemoryChunkList() {
+ List<MemoryChunk> result = new LinkedList<>();
+ for (final MemoryChunk chunk: dataList) {
+ final MemoryChunk dupChunk = chunk.duplicate();
+ dupChunk.getBuffer().flip();
+ result.add(dupChunk);
+ }
+ return result;
+ }
+
+ /**
+ * Returns the size of the data written in this output stream.
+ *
+ * @return the size of the data
+ */
+ public int size() {
+ return chunkSize * (dataList.size() - 1) + dataList.getLast().getBuffer().position();
+ }
+
+ /**
+ * Closing this output stream has no effect.
+ */
+ public void close() {
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java
new file mode 100644
index 0000000..0603d74
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryAllocationException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.runtime.executor.data;
+
+/**
+ * An exception to be thrown when a memory allocation operation is not successful.
+ * Cases where allocation is not successful are when the allocated memory exceeds the
+ * amount specified by the job configuration or
+ */
+public class MemoryAllocationException extends Exception {
+
+ public MemoryAllocationException(final String message) {
+ super(message);
+ }
+
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java
new file mode 100644
index 0000000..fe914a6
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryChunk.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * This class represents chunk of memory residing in off-heap region
+ * managed by {@link MemoryPoolAssigner}, which is backed by {@link ByteBuffer}.
+ */
+@NotThreadSafe
+public class MemoryChunk {
+
+ // UNSAFE is used for random access and manipulation of the ByteBuffer.
+ @SuppressWarnings("restriction") // to suppress warnings that are invoked whenever we use UNSAFE.
+ protected static final sun.misc.Unsafe UNSAFE = getUnsafe();
+ @SuppressWarnings("restriction")
+ protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+ private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+ private static final int SHORT_SIZE = 2;
+ private static final int CHAR_SIZE = 2;
+ private static final int INT_SIZE = 4;
+ private static final int LONG_SIZE = 8;
+ private final ByteBuffer buffer;
+ // Since using UNSAFE does not automatically track the address and limit, it should be accessed
+ // through address for data write and get, and addressLimit for sanity checks on the buffer use.
+ private long address;
+ private final long addressLimit;
+ private final int size;
+
+ /**
+ * Creates a new memory chunk that represents the off-heap memory at the absolute address.
+ * This class supports random access and manipulation of the data in the {@code ByteBuffer} using UNSAFE.
+ * For sequential access, ByteBuffer of this class can be accessed and manipulated.
+ *
+ * @param offHeapAddress the address of the off-heap memory, {@link ByteBuffer}, of this MemoryChunk
+ * @param buffer the off-heap memory of this MemoryChunk
+ */
+ MemoryChunk(final long offHeapAddress, final ByteBuffer buffer) {
+ if (offHeapAddress <= 0) {
+ throw new IllegalArgumentException("negative pointer or size");
+ }
+ if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("MemoryChunk initialized with too large address");
+ }
+ this.buffer = buffer;
+ this.size = buffer.capacity();
+ this.address = offHeapAddress;
+ this.addressLimit = this.address + this.size;
+ }
+
+ /**
+ * Creates a new memory chunk that represents the off-heap memory at the absolute address.
+ *
+ * @param buffer the off-heap memory of this MemoryChunk
+ */
+ MemoryChunk(final ByteBuffer buffer) {
+ this(getAddress(buffer), buffer);
+ }
+
+ /**
+ * Gets the {@link ByteBuffer} from this MemoryChunk.
+ *
+ * @return {@link ByteBuffer}
+ */
+ public ByteBuffer getBuffer() {
+ if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ }
+ return buffer;
+ }
+
+ /**
+ * Makes the duplicated instance of this MemoryChunk.
+ *
+ * @return the MemoryChunk with the same content of the caller instance
+ */
+ public MemoryChunk duplicate() {
+ return new MemoryChunk(buffer.duplicate());
+ }
+
+ /**
+ * Frees this MemoryChunk. No further operation possible after calling this method.
+ */
+ public void release() {
+ buffer.clear();
+ address = addressLimit + 1;
+ }
+
+ /**
+ * Reads the byte at the given index.
+ *
+ * @param index from which the byte will be read
+ * @return the byte at the given position
+ */
+ @SuppressWarnings("restriction")
+ public final byte get(final int index) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, 0)) {
+ return UNSAFE.getByte(pos);
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Writes the given byte into this buffer at the given index.
+ *
+ * @param index The position at which the byte will be written.
+ * @param b The byte value to be written.
+ */
+ @SuppressWarnings("restriction")
+ public final void put(final int index, final byte b) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, 0)) {
+ UNSAFE.putByte(pos, b);
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Copies the data of the MemoryChunk from the specified position to target byte array.
+ *
+ * @param index The position at which the first byte will be read.
+ * @param dst The memory into which the memory will be copied.
+ */
+ public final void get(final int index, final byte[] dst) {
+ get(index, dst, 0, dst.length);
+ }
+
+ /**
+ * Copies all the data from the source byte array into the MemoryChunk
+ * beginning at the specified position.
+ *
+ * @param index the position in MemoryChunk to start copying the data.
+ * @param src the source byte array that holds the data to copy.
+ */
+ public final void put(final int index, final byte[] src) {
+ put(index, src, 0, src.length);
+ }
+
+ /**
+ * Bulk get method using nk.the specified index in the MemoryChunk.
+ *
+ * @param index the index in the MemoryChunk to start copying the data.
+ * @param dst the target byte array to copy the data from MemoryChunk.
+ * @param offset the offset in the destination byte array.
+ * @param length the number of bytes to be copied.
+ */
+ @SuppressWarnings("restriction")
+ public final void get(final int index, final byte[] dst, final int offset, final int length) {
+ if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ final long pos = address + index;
+ if (checkIndex(index, pos, length)) {
+ final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+ UNSAFE.copyMemory(null, pos, dst, arrayAddress, length);
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Bulk put method using the specified index in the MemoryChunk.
+ *
+ * @param index the index in the MemoryChunk to start copying the data.
+ * @param src the source byte array that holds the data to be copied to MemoryChunk.
+ * @param offset the offset in the source byte array.
+ * @param length the number of bytes to be copied.
+ */
+ @SuppressWarnings("restriction")
+ public final void put(final int index, final byte[] src, final int offset, final int length) {
+ if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ final long pos = address + index;
+ if (checkIndex(index, pos, length)) {
+ final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
+ UNSAFE.copyMemory(src, arrayAddress, null, pos, length);
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Reads a char value from the given position.
+ *
+ * @param index The position from which the memory will be read.
+ * @return The char value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus CHAR_SIZE.
+ */
+ @SuppressWarnings("restriction")
+ public final char getChar(final int index) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, CHAR_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ return UNSAFE.getChar(pos);
+ } else {
+ return Character.reverseBytes(UNSAFE.getChar(pos));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("This MemoryChunk has been freed.");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Writes a char value to the given position.
+ *
+ * @param index The position at which the memory will be written.
+ * @param value The char value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus CHAR_SIZE.
+ */
+ @SuppressWarnings("restriction")
+ public final void putChar(final int index, final char value) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, CHAR_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ UNSAFE.putChar(pos, value);
+ } else {
+ UNSAFE.putChar(pos, Character.reverseBytes(value));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Reads a short integer value from the given position, composing them into a short value
+ * according to the current byte order.
+ *
+ * @param index The position from which the memory will be read.
+ * @return The short value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus SHORT_SIZE.
+ */
+ @SuppressWarnings("restriction")
+ public final short getShort(final int index) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, SHORT_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ return UNSAFE.getShort(pos);
+ } else {
+ return Short.reverseBytes(UNSAFE.getShort(pos));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Writes the given short value into this buffer at the given position, using
+ * the native byte order of the system.
+ *
+ * @param index The position at which the value will be written.
+ * @param value The short value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus SHORT_SIZE.
+ */
+ @SuppressWarnings("restriction")
+ public final void putShort(final int index, final short value) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, SHORT_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ UNSAFE.putShort(pos, value);
+ } else {
+ UNSAFE.putShort(pos, Short.reverseBytes(value));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Reads an int value from the given position, in the system's native byte order.
+ *
+ * @param index The position from which the value will be read.
+ * @return The int value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus INT_SIZE.
+ */
+ public final int getInt(final int index) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, INT_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ return UNSAFE.getInt(pos);
+ } else {
+ return Integer.reverseBytes(UNSAFE.getInt(pos));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Writes the given int value to the given position in the system's native byte order.
+ *
+ * @param index The position at which the value will be written.
+ * @param value The int value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus INT_SIZE.
+ */
+ public final void putInt(final int index, final int value) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, INT_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ UNSAFE.putInt(pos, value);
+ } else {
+ UNSAFE.putInt(pos, Integer.reverseBytes(value));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Reads a long value from the given position.
+ *
+ * @param index The position from which the value will be read.
+ * @return The long value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus LONG_SIZE.
+ */
+ public final long getLong(final int index) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, LONG_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ return UNSAFE.getLong(pos);
+ } else {
+ return Long.reverseBytes(UNSAFE.getLong(pos));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Writes the given long value to the given position in the system's native byte order.
+ *
+ * @param index The position at which the value will be written.
+ * @param value The long value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus LONG_SIZE.
+ */
+ public final void putLong(final int index, final long value) {
+ final long pos = address + index;
+ if (checkIndex(index, pos, LONG_SIZE)) {
+ if (LITTLE_ENDIAN) {
+ UNSAFE.putLong(pos, value);
+ } else {
+ UNSAFE.putLong(pos, Long.reverseBytes(value));
+ }
+ } else if (address > addressLimit) {
+ throw new IllegalStateException("MemoryChunk has been freed");
+ } else {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ /**
+ * Reads a float value from the given position, in the system's native byte order.
+ *
+ * @param index The position from which the value will be read.
+ * @return The float value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of float.
+ */
+ public final float getFloat(final int index) {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ /**
+ * Writes the given float value to the given position in the system's native byte order.
+ *
+ * @param index The position at which the value will be written.
+ * @param value The float value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of float.
+ */
+ public final void putFloat(final int index, final float value) {
+ putInt(index, Float.floatToRawIntBits(value));
+ }
+
+ /**
+ * Reads a double value from the given position, in the system's native byte order.
+ *
+ * @param index The position from which the value will be read.
+ * @return The double value at the given position.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of double.
+ */
+ public final double getDouble(final int index) {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ /**
+ * Writes the given double value to the given position in the system's native byte order.
+ *
+ * @param index The position at which the memory will be written.
+ * @param value The double value to be written.
+ *
+ * @throws IndexOutOfBoundsException If the index is negative, or larger then the chunk size minus size of double.
+ */
+ public final void putDouble(final int index, final double value) {
+ putLong(index, Double.doubleToRawLongBits(value));
+ }
+
+ private boolean checkIndex(final int index, final long pos, final int typeSize) throws IndexOutOfBoundsException {
+ if (!(index >= 0 && pos <= addressLimit - typeSize)) {
+ throw new IndexOutOfBoundsException();
+ } else {
+ return true;
+ }
+ }
+
+ @SuppressWarnings("restriction")
+ private static sun.misc.Unsafe getUnsafe() {
+ try {
+ Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ unsafeField.setAccessible(true);
+ return (sun.misc.Unsafe) unsafeField.get(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while trying to access the sun.misc.Unsafe handle.");
+ }
+ }
+
+ private static final Field ADDRESS_FIELD;
+
+ static {
+ try {
+ ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
+ ADDRESS_FIELD.setAccessible(true);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot initialize MemoryChunk: off-heap memory is incompatible with this JVM.");
+ }
+ }
+
+ private static long getAddress(final ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new NullPointerException("Buffer null");
+ }
+ if (!buffer.isDirect()) {
+ throw new IllegalArgumentException("Cannot initialize from non-direct ByteBuffer.");
+ }
+ try {
+ return (Long) ADDRESS_FIELD.get(buffer);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not access ByteBuffer address.");
+ }
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java
new file mode 100644
index 0000000..02af48a
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssigner.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.nemo.conf.JobConf;
+
+import javax.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * The MemoryPoolAssigner assigns the memory that Nemo uses for writing data blocks from the {@link MemoryPool}.
+ * Memory is represented in chunks of equal size. Consumers of off-heap memory acquire the memory by requesting
+ * a number of {@link MemoryChunk} they need.
+ *
+ * MemoryPoolAssigner currently supports allocation of off-heap memory only.
+ *
+ * MemoryChunks are allocated on-demand, but if the total allocated memory exceeds the maxOffheapMb,
+ * MemoryAllocationException is thrown and the job fails.
+ * TODO #397: Separation of JVM heap region and off-heap memory region
+ */
+@ThreadSafe
+public class MemoryPoolAssigner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryPoolAssigner.class.getName());
+
+ private final int chunkSize;
+
+ private static final int MIN_CHUNK_SIZE_KB = 4;
+
+ private final MemoryPool memoryPool;
+
+ @Inject
+ public MemoryPoolAssigner(@Parameter(JobConf.MaxOffheapMb.class) final int maxOffheapMb,
+ @Parameter(JobConf.ChunkSizeKb.class) final int chunkSizeKb) {
+ if (chunkSizeKb < MIN_CHUNK_SIZE_KB) {
+ throw new IllegalArgumentException("Chunk size too small. Minimum chunk size is 4KB");
+ }
+ final long maxNumChunks = (long) maxOffheapMb * 1024 / chunkSizeKb;
+ if (maxNumChunks > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Too many pages to allocate (exceeds MAX_INT)");
+ }
+ if (maxNumChunks < 1) {
+ throw new IllegalArgumentException("The given amount of memory amounted to less than one chunk.");
+ }
+ this.chunkSize = chunkSizeKb * 1024;
+ this.memoryPool = new MemoryPool((int) maxNumChunks, this.chunkSize);
+ }
+
+ /**
+ * Returns a single {@link MemoryChunk} from {@link MemoryPool}.
+ *
+ * @return a MemoryChunk
+ * @throws MemoryAllocationException if fails to allocate MemoryChunk.
+ */
+ public MemoryChunk allocateChunk() throws MemoryAllocationException {
+ return memoryPool.requestChunkFromPool();
+ }
+
+ /**
+ * Returns all the MemoryChunks in the given List of MemoryChunks.
+ *
+ * @param target the list of MemoryChunks to be returned to the memory pool.
+ */
+ public void returnChunksToPool(final Iterable<MemoryChunk> target) {
+ for (final MemoryChunk chunk: target) {
+ memoryPool.returnChunkToPool(chunk);
+ }
+ }
+
+ /**
+ * Returns the chunk size of the memory pool.
+ *
+ * @return the chunk size in bytes.
+ */
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ /**
+ * Returns the number of chunks in the pool. This is unrecommended since it is very complex to
+ * check the size of {@link ConcurrentLinkedQueue}.
+ *
+ * @return the pool size.
+ */
+ int poolSize() {
+ return memoryPool.size();
+ }
+
+ /**
+ * Memory pool that utilizes off-heap memory.
+ * Supports pre-allocation of memory according to user specification.
+ */
+ @ThreadSafe
+ private class MemoryPool {
+
+ private final ConcurrentLinkedQueue<ByteBuffer> pool;
+ private final int chunkSize;
+ private long maxNumChunks;
+ private long numChunks;
+
+ MemoryPool(final long maxNumChunks, final int chunkSize) {
+ this.chunkSize = chunkSize;
+ this.pool = new ConcurrentLinkedQueue<>();
+ this.maxNumChunks = maxNumChunks;
+ }
+
+ synchronized MemoryChunk allocateNewChunk() throws MemoryAllocationException {
+ if (maxNumChunks <= numChunks) {
+ throw new MemoryAllocationException("Exceeded maximum off-heap memory");
+ }
+ ByteBuffer memory = ByteBuffer.allocateDirect(chunkSize);
+ numChunks += 1;
+ return new MemoryChunk(memory);
+ }
+
+ MemoryChunk requestChunkFromPool() throws MemoryAllocationException {
+ try {
+ // Try to reuse a byte buffer in the current pool
+ // Return the byte buffer if there's one that we can reuse
+ final ByteBuffer byteBufferThatWeCanReuse = pool.remove(); // this can throw NoSuchElementException
+ return new MemoryChunk(byteBufferThatWeCanReuse);
+ } catch (final NoSuchElementException e) {
+ // No more byte buffer to reuse in the current pool
+ // So we try to allocate a new chunk
+ // This method is synchronized :)
+ return allocateNewChunk();
+ } catch (final OutOfMemoryError e) {
+ throw new MemoryAllocationException("Memory allocation failed due to lack of memory");
+ }
+ }
+
+ /**
+ * Returns MemoryChunk back to memory pool.
+ *
+ * @param chunk the target MemoryChunk to be returned to the pool.
+ */
+ void returnChunkToPool(final MemoryChunk chunk) {
+ ByteBuffer buf = chunk.getBuffer();
+ chunk.release();
+ pool.add(buf);
+ }
+
+ int size() {
+ return pool.size();
+ }
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index 65b167b..a869e8d 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -19,6 +19,8 @@
package org.apache.nemo.runtime.executor.data.block;
import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
@@ -66,6 +68,7 @@
private final Serializer serializer;
private final String filePath;
private final FileMetadata<K> metadata;
+ private final MemoryPoolAssigner memoryPoolAssigner;
/**
* Constructor.
@@ -74,16 +77,19 @@
* @param serializer the {@link Serializer}.
* @param filePath the path of the file that this block will be stored.
* @param metadata the metadata for this block.
+ * @param memoryPoolAssigner the MemoryPoolAssigner for memory allocation.
*/
public FileBlock(final String blockId,
final Serializer serializer,
final String filePath,
- final FileMetadata<K> metadata) {
+ final FileMetadata<K> metadata,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.id = blockId;
this.nonCommittedPartitionsMap = new HashMap<>();
this.serializer = serializer;
this.filePath = filePath;
this.metadata = metadata;
+ this.memoryPoolAssigner = memoryPoolAssigner;
}
/**
@@ -103,6 +109,8 @@
for (final ByteBuffer buffer: serializedPartition.getDirectBufferList()) {
fileOutputChannel.write(buffer);
}
+ // after the writing to disk, data in memory is released.
+ serializedPartition.release();
}
}
}
@@ -125,11 +133,11 @@
try {
SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
if (partition == null) {
- partition = new SerializedPartition<>(key, serializer);
+ partition = new SerializedPartition<>(key, serializer, memoryPoolAssigner);
nonCommittedPartitionsMap.put(key, partition);
}
partition.write(element);
- } catch (final IOException e) {
+ } catch (final IOException | MemoryAllocationException e) {
throw new BlockWriteException(e);
}
}
@@ -150,9 +158,9 @@
} else {
try {
final Iterable<SerializedPartition<K>> convertedPartitions =
- DataUtil.convertToSerPartitions(serializer, partitions);
+ DataUtil.convertToSerPartitions(serializer, partitions, memoryPoolAssigner);
writeSerializedPartitions(convertedPartitions);
- } catch (final IOException e) {
+ } catch (final IOException | MemoryAllocationException e) {
throw new BlockWriteException(e);
}
}
@@ -251,7 +259,7 @@
throw new IOException("The read data size does not match with the partition size.");
}
partitionsInRange.add(new SerializedPartition<>(
- key, serializedData, serializedData.length));
+ key, serializedData, serializedData.length, memoryPoolAssigner));
} else {
// Have to skip this partition.
skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index eee0759..7529667 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -19,6 +19,8 @@
package org.apache.nemo.runtime.executor.data.block;
import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -45,20 +47,24 @@
private final Map<K, NonSerializedPartition<K>> nonCommittedPartitionsMap;
private final Serializer serializer;
private volatile boolean committed;
+ private final MemoryPoolAssigner memoryPoolAssigner;
/**
* Constructor.
*
* @param blockId the ID of this block.
* @param serializer the {@link Serializer}.
+ * @param memoryPoolAssigner the MemoryPoolAssigner for memory allocation.
*/
public NonSerializedMemoryBlock(final String blockId,
- final Serializer serializer) {
+ final Serializer serializer,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.id = blockId;
this.nonSerializedPartitions = new ArrayList<>();
this.nonCommittedPartitionsMap = new HashMap<>();
this.serializer = serializer;
this.committed = false;
+ this.memoryPoolAssigner = memoryPoolAssigner;
}
/**
@@ -166,8 +172,8 @@
@Override
public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
try {
- return DataUtil.convertToSerPartitions(serializer, readPartitions(keyRange));
- } catch (final IOException e) {
+ return DataUtil.convertToSerPartitions(serializer, readPartitions(keyRange), memoryPoolAssigner);
+ } catch (final IOException | MemoryAllocationException e) {
throw new BlockFetchException(e);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 6ecc2b9..d4e3185 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -19,6 +19,8 @@
package org.apache.nemo.runtime.executor.data.block;
import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -45,20 +47,24 @@
private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
private final Serializer serializer;
private volatile boolean committed;
+ private final MemoryPoolAssigner memoryPoolAssigner;
/**
* Constructor.
*
* @param blockId the ID of this block.
* @param serializer the {@link Serializer}.
+ * @param memoryPoolAssigner the MemoryPoolAssigner for memory allocation.
*/
public SerializedMemoryBlock(final String blockId,
- final Serializer serializer) {
+ final Serializer serializer,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.id = blockId;
this.serializedPartitions = new ArrayList<>();
this.nonCommittedPartitionsMap = new HashMap<>();
this.serializer = serializer;
this.committed = false;
+ this.memoryPoolAssigner = memoryPoolAssigner;
}
/**
@@ -79,11 +85,11 @@
try {
SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
if (partition == null) {
- partition = new SerializedPartition<>(key, serializer);
+ partition = new SerializedPartition<>(key, serializer, memoryPoolAssigner);
nonCommittedPartitionsMap.put(key, partition);
}
partition.write(element);
- } catch (final IOException e) {
+ } catch (final IOException | MemoryAllocationException e) {
throw new BlockWriteException(e);
}
}
@@ -102,9 +108,9 @@
if (!committed) {
try {
final Iterable<SerializedPartition<K>> convertedPartitions = DataUtil.convertToSerPartitions(
- serializer, partitions);
+ serializer, partitions, memoryPoolAssigner);
writeSerializedPartitions(convertedPartitions);
- } catch (final IOException e) {
+ } catch (final IOException | MemoryAllocationException e) {
throw new BlockWriteException(e);
}
} else {
@@ -234,4 +240,13 @@
public synchronized boolean isCommitted() {
return committed;
}
+
+ /**
+ * Releases the resource (i.e., off-heap memory) that the block holds.
+ */
+ public void release() {
+ for (SerializedPartition partition: serializedPartitions) {
+ partition.release();
+ }
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index 070b618..a2abbdb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -18,7 +18,10 @@
*/
package org.apache.nemo.runtime.executor.data.partition;
-import org.apache.nemo.common.DirectByteBufferOutputStream;
+import org.apache.nemo.runtime.executor.data.DirectByteBufferOutputStream;
+import org.apache.nemo.runtime.executor.data.MemoryAllocationException;
+import org.apache.nemo.runtime.executor.data.MemoryChunk;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
@@ -28,7 +31,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import static org.apache.nemo.runtime.executor.data.DataUtil.buildOutputStream;
@@ -55,7 +58,8 @@
private final OutputStream wrappedStream;
@Nullable
private final EncoderFactory.Encoder encoder;
- private volatile List<ByteBuffer> dataList;
+ private final MemoryPoolAssigner memoryPoolAssigner;
+ private volatile List<MemoryChunk> dataList;
private final boolean offheap;
/**
@@ -64,17 +68,22 @@
*
* @param key the key of this partition.
* @param serializer the serializer to be used to serialize data.
+ * @param memoryPoolAssigner the memory pool assigner for memory allocation.
* @throws IOException if fail to chain the output stream.
+ * @throws MemoryAllocationException if fail to allocate memory.
*/
public SerializedPartition(final K key,
- final Serializer serializer) throws IOException {
+ final Serializer serializer,
+ final MemoryPoolAssigner memoryPoolAssigner) throws IOException,
+ MemoryAllocationException {
this.key = key;
this.serializedData = new byte[0];
this.length = 0;
this.committed = false;
- this.bytesOutputStream = new DirectByteBufferOutputStream();
+ this.bytesOutputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
this.wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
this.encoder = serializer.getEncoderFactory().create(wrappedStream);
+ this.memoryPoolAssigner = memoryPoolAssigner;
this.offheap = true;
}
@@ -85,10 +94,12 @@
* @param key the key.
* @param serializedData the serialized data.
* @param length the length of the actual serialized data. (It can be different with serializedData.length)
+ * @param memoryPoolAssigner the memory pool assigner.
*/
public SerializedPartition(final K key,
final byte[] serializedData,
- final int length) {
+ final int length,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.key = key;
this.serializedData = serializedData;
this.length = length;
@@ -96,6 +107,7 @@
this.bytesOutputStream = null;
this.wrappedStream = null;
this.encoder = null;
+ this.memoryPoolAssigner = memoryPoolAssigner;
this.offheap = false;
}
@@ -103,20 +115,23 @@
* Creates a serialized {@link Partition} with actual data residing in off-heap region.
* Data cannot be written to this partition after the construction.
*
- * @param key the key.
- * @param serializedBufList the serialized data in list list of {@link ByteBuffer}s.
- * @param length the length of the actual serialized data. (It can be different with serializedData.length)
+ * @param key the key.
+ * @param serializedChunkList the serialized data in list list of {@link MemoryChunk}s.
+ * @param length the length of the actual serialized data.(It can be different with serializedData.length)
+ * @param memoryPoolAssigner the memory pool assigner.
*/
public SerializedPartition(final K key,
- final List<ByteBuffer> serializedBufList,
- final int length) {
+ final List<MemoryChunk> serializedChunkList,
+ final int length,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.key = key;
- this.dataList = serializedBufList;
+ this.dataList = serializedChunkList;
this.length = length;
this.committed = true;
this.bytesOutputStream = null;
this.wrappedStream = null;
this.encoder = null;
+ this.memoryPoolAssigner = memoryPoolAssigner;
this.offheap = true;
}
@@ -150,7 +165,7 @@
// We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
// inner buffer directly, which can be an unfinished(not flushed) buffer.
wrappedStream.close();
- this.dataList = bytesOutputStream.getDirectByteBufferList();
+ this.dataList = bytesOutputStream.getMemoryChunkList();
this.length = bytesOutputStream.size();
this.committed = true;
}
@@ -199,9 +214,9 @@
if (!committed) {
throw new IOException("The partition is not committed yet!");
} else {
- List<ByteBuffer> result = new ArrayList<>(dataList.size());
- for (final ByteBuffer buffer : dataList) {
- final ByteBuffer dupBuffer = buffer.duplicate();
+ List<ByteBuffer> result = new LinkedList<>();
+ for (final MemoryChunk chunk : dataList) {
+ final ByteBuffer dupBuffer = chunk.duplicate().getBuffer();
result.add(dupBuffer);
}
return result;
@@ -226,4 +241,15 @@
public boolean isOffheap() {
return offheap;
}
+
+ /**
+ * Releases the off-heap memory that this SerializedPartition holds.
+ * TODO #403: Remove 'transient' uses of SerializedPartition.
+ */
+ public void release() {
+ if (!committed) {
+ throw new IllegalStateException("The partition is not committed yet!");
+ }
+ memoryPoolAssigner.returnChunksToPool(dataList);
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
index 170d98c..880237f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/AbstractBlockStore.java
@@ -19,6 +19,7 @@
package org.apache.nemo.runtime.executor.data.stores;
import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
@@ -28,14 +29,18 @@
*/
public abstract class AbstractBlockStore implements BlockStore {
private final SerializerManager serializerManager;
+ private final MemoryPoolAssigner memoryPoolAssigner;
/**
* Constructor.
*
* @param serializerManager the coder manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
- protected AbstractBlockStore(final SerializerManager serializerManager) {
+ AbstractBlockStore(final SerializerManager serializerManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
this.serializerManager = serializerManager;
+ this.memoryPoolAssigner = memoryPoolAssigner;
}
/**
@@ -44,8 +49,17 @@
* @param blockId the ID of the block to get the coder.
* @return the coder.
*/
- protected final Serializer getSerializerFromWorker(final String blockId) {
+ final Serializer getSerializerFromWorker(final String blockId) {
final String runtimeEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(blockId);
return serializerManager.getSerializer(runtimeEdgeId);
}
+
+ /**
+ * Gets the memory pool assigner for this executor.
+ *
+ * @return the memory pool assigner.
+ */
+ final MemoryPoolAssigner getMemoryPoolAssigner() {
+ return memoryPoolAssigner;
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
index 2077a05..6ca78b1 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/GlusterFileStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.stores;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.conf.JobConf;
@@ -53,12 +54,14 @@
* @param volumeDirectory the remote volume directory which will contain the files.
* @param jobId the job id.
* @param serializerManager the serializer manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
@Inject
private GlusterFileStore(@Parameter(JobConf.GlusterVolumeDirectory.class) final String volumeDirectory,
@Parameter(JobConf.JobId.class) final String jobId,
- final SerializerManager serializerManager) {
- super(serializerManager);
+ final SerializerManager serializerManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
+ super(serializerManager, memoryPoolAssigner);
this.fileDirectory = volumeDirectory + "/" + jobId;
new File(fileDirectory).mkdirs();
}
@@ -70,7 +73,7 @@
final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
final RemoteFileMetadata metadata =
RemoteFileMetadata.create(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
- return new FileBlock<>(blockId, serializer, filePath, metadata);
+ return new FileBlock<>(blockId, serializer, filePath, metadata, getMemoryPoolAssigner());
}
/**
@@ -151,6 +154,6 @@
final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory);
final RemoteFileMetadata<K> metadata =
RemoteFileMetadata.open(DataUtil.blockIdToMetaFilePath(blockId, fileDirectory));
- return new FileBlock<>(blockId, serializer, filePath, metadata);
+ return new FileBlock<>(blockId, serializer, filePath, metadata, getMemoryPoolAssigner());
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
index 9a7b8f7..5af8faf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalBlockStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.stores;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.data.block.Block;
@@ -38,9 +39,11 @@
* Constructor.
*
* @param coderManager the coder manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
- protected LocalBlockStore(final SerializerManager coderManager) {
- super(coderManager);
+ protected LocalBlockStore(final SerializerManager coderManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
+ super(coderManager, memoryPoolAssigner);
this.blockMap = new ConcurrentHashMap<>();
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
index 0b22dae..efbad9b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/LocalFileStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.stores;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockFetchException;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.conf.JobConf;
@@ -46,11 +47,13 @@
*
* @param fileDirectory the directory which will contain the files.
* @param serializerManager the serializer manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
@Inject
private LocalFileStore(@Parameter(JobConf.FileDirectory.class) final String fileDirectory,
- final SerializerManager serializerManager) {
- super(serializerManager);
+ final SerializerManager serializerManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
+ super(serializerManager, memoryPoolAssigner);
this.fileDirectory = fileDirectory;
new File(fileDirectory).mkdirs();
}
@@ -62,7 +65,8 @@
final Serializer serializer = getSerializerFromWorker(blockId);
final LocalFileMetadata metadata = new LocalFileMetadata();
- return new FileBlock(blockId, serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory), metadata);
+ return new FileBlock(blockId, serializer, DataUtil.blockIdToFilePath(blockId, fileDirectory),
+ metadata, getMemoryPoolAssigner());
}
/**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
index 00346c4..bd87031 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.stores;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.data.block.Block;
@@ -38,10 +39,12 @@
* Constructor.
*
* @param serializerManager the serializer manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
@Inject
- private MemoryStore(final SerializerManager serializerManager) {
- super(serializerManager);
+ private MemoryStore(final SerializerManager serializerManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
+ super(serializerManager, memoryPoolAssigner);
}
/**
@@ -50,7 +53,7 @@
@Override
public NonSerializedMemoryBlock createBlock(final String blockId) {
final Serializer serializer = getSerializerFromWorker(blockId);
- return new NonSerializedMemoryBlock(blockId, serializer);
+ return new NonSerializedMemoryBlock(blockId, serializer, getMemoryPoolAssigner());
}
/**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index f5298be..de46951 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.runtime.executor.data.stores;
+import org.apache.nemo.runtime.executor.data.MemoryPoolAssigner;
import org.apache.nemo.common.exception.BlockWriteException;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.data.block.Block;
@@ -37,10 +38,12 @@
* Constructor.
*
* @param serializerManager the serializer manager.
+ * @param memoryPoolAssigner the memory pool assigner.
*/
@Inject
- private SerializedMemoryStore(final SerializerManager serializerManager) {
- super(serializerManager);
+ private SerializedMemoryStore(final SerializerManager serializerManager,
+ final MemoryPoolAssigner memoryPoolAssigner) {
+ super(serializerManager, memoryPoolAssigner);
}
/**
@@ -49,7 +52,7 @@
@Override
public Block createBlock(final String blockId) {
final Serializer serializer = getSerializerFromWorker(blockId);
- return new SerializedMemoryBlock(blockId, serializer);
+ return new SerializedMemoryBlock(blockId, serializer, getMemoryPoolAssigner());
}
/**
@@ -75,6 +78,8 @@
*/
@Override
public boolean deleteBlock(final String blockId) {
+ SerializedMemoryBlock block = (SerializedMemoryBlock) getBlockMap().get(blockId);
+ block.release();
return getBlockMap().remove(blockId) != null;
}
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
index 9346803..67d1f5c 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/BlockTest.java
@@ -42,6 +42,7 @@
public final class BlockTest {
private Serializer serializer;
private Map<Integer, List<Integer>> testData;
+ private MemoryPoolAssigner memoryPoolAssigner;
/**
* Generates the test data and serializer.
@@ -52,6 +53,7 @@
public void setUp() throws Exception {
serializer = new Serializer<>(IntEncoderFactory.of(), IntDecoderFactory.of(), new ArrayList<>(), new ArrayList<>());
testData = new HashMap<>();
+ memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
final List<Integer> list1 = Collections.singletonList(1);
final List<Integer> list2 = Arrays.asList(1, 2);
@@ -69,7 +71,7 @@
*/
@Test(timeout = 10000)
public void testNonSerializedMemoryBlock() throws Exception {
- final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer);
+ final Block<Integer> block = new NonSerializedMemoryBlock<>("testBlock", serializer, memoryPoolAssigner);
testBlock(block);
}
@@ -80,7 +82,7 @@
*/
@Test(timeout = 10000)
public void testSerializedMemoryBlock() throws Exception {
- final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer);
+ final Block<Integer> block = new SerializedMemoryBlock<>("testBlock", serializer, memoryPoolAssigner);
testBlock(block);
}
@@ -96,7 +98,7 @@
try {
new File(tmpDir).mkdirs();
final LocalFileMetadata<Integer> metadata = new LocalFileMetadata<>();
- final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata);
+ final Block<Integer> block = new FileBlock<>("testBlock", serializer, filePath, metadata, memoryPoolAssigner);
testBlock(block);
} finally {
FileUtils.deleteDirectory(new File(tmpDir));
diff --git a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
similarity index 77%
rename from common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
rename to runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
index 564ec5a..3bd5df3 100644
--- a/common/src/test/java/org/apache/nemo/common/DirectByteBufferOutputStreamTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/DirectByteBufferOutputStreamTest.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.common;
+package org.apache.nemo.runtime.executor.data;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
@@ -33,28 +34,29 @@
*/
public class DirectByteBufferOutputStreamTest {
private DirectByteBufferOutputStream outputStream;
+ private static final MemoryPoolAssigner memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
@Before
- public void setup(){
- outputStream = new DirectByteBufferOutputStream();
+ public void setup() throws MemoryAllocationException {
+ outputStream = new DirectByteBufferOutputStream(memoryPoolAssigner);
}
@Test
- public void testSingleWrite() {
+ public void testSingleWrite() throws IOException {
int value = 1;
outputStream.write(value);
assertEquals(value, outputStream.toByteArray()[0]);
}
@Test
- public void testWrite(){
+ public void testWrite() throws IOException {
String value = "value";
outputStream.write(value.getBytes());
assertEquals(value, new String(outputStream.toByteArray()));
}
@Test
- public void testReWrite() {
+ public void testReWrite() throws IOException {
String value1 = "value1";
String value2 = "value2";
outputStream.write(value1.getBytes());
@@ -64,7 +66,7 @@
}
@Test
- public void testReRead() {
+ public void testReRead() throws IOException {
String value = "value";
outputStream.write(value.getBytes());
assertEquals(value, new String(outputStream.toByteArray()));
@@ -72,14 +74,14 @@
}
@Test
- public void testLongWrite() {
+ public void testLongWrite() throws IOException {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
assertEquals(value,new String(outputStream.toByteArray()));
}
@Test
- public void testLongReWrite() {
+ public void testLongReWrite() throws IOException {
String value1 = RandomStringUtils.randomAlphanumeric(10000);
String value2 = RandomStringUtils.randomAlphanumeric(5000);
outputStream.write(value1.getBytes());
@@ -89,7 +91,7 @@
}
@Test
- public void testLongReRead() {
+ public void testLongReRead() throws IOException {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
assertEquals(value, new String(outputStream.toByteArray()));
@@ -97,17 +99,17 @@
}
@Test
- public void testGetDirectBufferList() {
+ public void testGetMemoryChunkList() throws IOException {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
byte[] totalOutput = outputStream.toByteArray();
- List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
+ List<MemoryChunk> chunkList = outputStream.getMemoryChunkList();
int offset = 0;
int byteToRead;
- for (final ByteBuffer temp : bufList) {
- byteToRead = temp.remaining();
+ for (final MemoryChunk temp : chunkList) {
+ byteToRead = temp.getBuffer().remaining();
byte[] output = new byte[byteToRead];
- temp.get(output, 0, byteToRead);
+ temp.getBuffer().get(output, 0, byteToRead);
byte[] expected = Arrays.copyOfRange(totalOutput, offset, offset+byteToRead);
assertEquals(new String(expected), new String(output));
offset += byteToRead;
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java
new file mode 100644
index 0000000..f99546a
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryChunkTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests basic operations on {@link MemoryChunk}.
+ */
+public class MemoryChunkTest {
+ private MemoryChunk chunk;
+ private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+
+ @Before
+ public void setup() {
+ chunk = new MemoryChunk(buffer);
+ }
+
+ @Test
+ public void testPutAndGet() {
+ byte input = 5;
+ chunk.put(0, input);
+ assertEquals(input, chunk.get(0));
+ }
+
+ @Test
+ public void testPutAndGetByArray() {
+ byte[] input = new byte[1000]; new Random().nextBytes(input);
+ chunk.put(0, input);
+ byte[] output = new byte[1000]; chunk.get(0, output);
+ assertArrayEquals(input, output);
+ }
+
+ @Test
+ public void testBulkPutAndGet() {
+ byte[] input1 = new byte[1000]; new Random().nextBytes(input1);
+ byte[] input2 = new byte[24]; new Random().nextBytes(input2);
+ chunk.put(0, input1);
+ chunk.put(input1.length, input2);
+ byte[] output = new byte[1024]; chunk.get(0, output);
+ byte[] concat = ArrayUtils.addAll(input1, input2);
+ assertArrayEquals(concat, output);
+ }
+
+ @Test
+ public void testPutCharAndGetChar() {
+ char input = 'a';
+ chunk.putChar(0,input);
+ assertEquals(input, chunk.getChar(0));
+ }
+
+ @Test
+ public void testPutShortAndGetShort() {
+ short input = 1;
+ chunk.putShort(0, input);
+ assertEquals(input, chunk.getShort(0));
+ }
+
+ @Test
+ public void testPutIntAndGetInt() {
+ int input = 14;
+ chunk.putInt(0, input);
+ assertEquals(input, chunk.getInt(0));
+ }
+
+ @Test
+ public void testPutLongAndGetLong() {
+ long input = 28;
+ chunk.putLong(0, input);
+ assertEquals(input, chunk.getLong(0));
+ }
+
+ @Test
+ public void testPutFloatAndGetFloat() {
+ float input = 3;
+ chunk.putFloat(0, input);
+ assertEquals(input, chunk.getFloat(0), 0.1);
+ }
+
+ @Test
+ public void testPutDoubleAndGetDouble() {
+ double input = 3.7;
+ chunk.putDouble(0, input);
+ assertEquals(input, chunk.getDouble(0), 0.01);
+ }
+}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java
new file mode 100644
index 0000000..b351c44
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/data/MemoryPoolAssignerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class MemoryPoolAssignerTest {
+ private MemoryPoolAssigner memoryPoolAssigner;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryPoolAssignerTest.class.getName());
+
+ private static final int NUM_CONCURRENT_THREADS = 5;
+
+ private static final int MAX_MEM_MB = 1;
+ private static final int CHUNK_SIZE_KB = 32;
+ private static final int MAX_NUM_CHUNKS = MAX_MEM_MB * 1024 / CHUNK_SIZE_KB;
+
+ @Before
+ public void setUp() {
+ this.memoryPoolAssigner = new MemoryPoolAssigner(1, 32);
+ }
+
+ @Test(expected = MemoryAllocationException.class)
+ public void testTooMuchRequest() throws MemoryAllocationException {
+ List<MemoryChunk> chunkList = new LinkedList<>();
+ for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+ chunkList.add(memoryPoolAssigner.allocateChunk());
+ }
+ memoryPoolAssigner.allocateChunk();
+ }
+
+ @Test
+ public void testConcurrentAllocReturnAlloc() throws InterruptedException {
+ final ExecutorService executor = Executors.newFixedThreadPool(NUM_CONCURRENT_THREADS);
+ for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+ executor.submit(() -> {
+ try {
+ // We return this one immediately
+ final MemoryChunk toReturnImmediately = memoryPoolAssigner.allocateChunk();
+ memoryPoolAssigner.returnChunksToPool(Arrays.asList(toReturnImmediately));
+
+ // We don't return this one
+ memoryPoolAssigner.allocateChunk();
+ } catch (MemoryAllocationException e) {
+ throw new RuntimeException();
+ }
+ });
+ }
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ assertEquals(0, memoryPoolAssigner.poolSize());
+ }
+
+ @Test
+ public void testConcurrentUniqueAllocations() throws InterruptedException {
+ final ExecutorService executorService = Executors.newFixedThreadPool(NUM_CONCURRENT_THREADS);
+ final ConcurrentLinkedQueue<MemoryChunk> allocatedChunks = new ConcurrentLinkedQueue<>();
+
+ for (int i = 0; i < MAX_NUM_CHUNKS; i++) {
+ executorService.submit(()->{
+ try {
+ allocatedChunks.add(memoryPoolAssigner.allocateChunk());
+ } catch (MemoryAllocationException e) {
+ throw new RuntimeException();
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ assertEquals(0, memoryPoolAssigner.poolSize());
+
+ // All chunks should be unique
+ assertEquals(allocatedChunks.size(), new HashSet<>(allocatedChunks).size());
+ }
+}