blob: 8a74fb2a69f18fed89f3bcaa84a8720a60c1e901 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common;
import com.google.common.annotations.VisibleForTesting;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* This class is a customized output stream implementation backed by
* {@link ByteBuffer}, which utilizes off heap memory when writing the data.
* Memory is allocated when needed by the specified {@code pageSize}.
* Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
* when the corresponding block is deleted.
* TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
*/
public final class DirectByteBufferOutputStream extends OutputStream {
private LinkedList<ByteBuffer> dataList = new LinkedList<>();
private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
private final int pageSize;
private ByteBuffer currentBuf;
/**
* Default constructor.
* Sets the {@code pageSize} as default size of 4096 bytes.
*/
public DirectByteBufferOutputStream() {
this(DEFAULT_PAGE_SIZE);
}
/**
* Constructor which sets {@code pageSize} as specified {@code size}.
* Note that the {@code pageSize} has trade-off between memory fragmentation and
* native memory (de)allocation overhead.
*
* @param size should be a power of 2 and greater than or equal to 4096.
*/
public DirectByteBufferOutputStream(final int size) {
if (size < DEFAULT_PAGE_SIZE || (size & (size - 1)) != 0) {
throw new IllegalArgumentException("Invalid pageSize");
}
this.pageSize = size;
newLastBuffer();
currentBuf = dataList.getLast();
}
/**
* Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
*/
// TODO #388: Off-heap memory management (reuse ByteBuffer)
private void newLastBuffer() {
dataList.addLast(ByteBuffer.allocateDirect(pageSize));
}
/**
* Writes the specified byte to this output stream.
*
* @param b the byte to be written.
*/
@Override
public void write(final int b) {
if (currentBuf.remaining() <= 0) {
newLastBuffer();
currentBuf = dataList.getLast();
}
currentBuf.put((byte) b);
}
/**
* Writes {@code b.length} bytes from the specified byte array to this output stream.
*
* @param b the byte to be written.
*/
@Override
public void write(final byte[] b) {
write(b, 0, b.length);
}
/**
* Writes {@code len} bytes from the specified byte array
* starting at offset {@code off} to this output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
@Override
public void write(final byte[] b, final int off, final int len) {
int byteToWrite = len;
int offset = off;
while (byteToWrite > 0) {
if (currentBuf.remaining() <= 0) {
newLastBuffer();
currentBuf = dataList.getLast();
}
final int bufRemaining = currentBuf.remaining();
if (bufRemaining < byteToWrite) {
currentBuf.put(b, offset, bufRemaining);
offset += bufRemaining;
byteToWrite -= bufRemaining;
} else {
currentBuf.put(b, offset, byteToWrite);
offset += byteToWrite;
byteToWrite = 0;
}
}
}
/**
* Creates a byte array that contains the whole content currently written in this output stream.
*
* USED BY TESTS ONLY.
* @return the current contents of this output stream, as byte array.
*/
@VisibleForTesting
byte[] toByteArray() {
if (dataList.isEmpty()) {
final byte[] byteArray = new byte[0];
return byteArray;
}
ByteBuffer lastBuf = dataList.getLast();
// pageSize equals the size of the data filled in the ByteBuffers
// except for the last ByteBuffer. The size of the data in the
// ByteBuffer can be obtained by calling ByteBuffer.position().
final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
final byte[] byteArray = new byte[arraySize];
int start = 0;
for (final ByteBuffer buffer : dataList) {
// We use duplicated buffer to read the data so that there is no complicated
// alteration of position and limit when switching between read and write mode.
final ByteBuffer dupBuffer = buffer.duplicate();
dupBuffer.flip();
final int byteToWrite = dupBuffer.remaining();
dupBuffer.get(byteArray, start, byteToWrite);
start += byteToWrite;
}
return byteArray;
}
/**
* Returns the list of {@code ByteBuffer}s that contains the written data.
* List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
* position and limit, to reduce erroneous data read/write.
* This function has to be called when intended to read from the start of the list of
* {@link ByteBuffer}s, not for additional write.
*
* @return the {@code LinkedList} of {@code ByteBuffer}s.
*/
public List<ByteBuffer> getDirectByteBufferList() {
List<ByteBuffer> result = new ArrayList<>(dataList.size());
for (final ByteBuffer buffer : dataList) {
final ByteBuffer dupBuffer = buffer.duplicate();
dupBuffer.flip();
result.add(dupBuffer);
}
return result;
}
/**
* Returns the size of the data written in this output stream.
*
* @return the size of the data
*/
public int size() {
return pageSize * (dataList.size() - 1) + dataList.getLast().position();
}
/**
* Closing this output stream has no effect.
*/
public void close() {
}
}