blob: a2b2c6d13bfd1a1ca6a7fa98ff5c5f8810a49f8b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.Future;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.utils.CrailImmediateOperation;
import org.apache.crail.utils.CrailUtils;
import org.apache.crail.utils.RingBuffer;
import org.slf4j.Logger;
public class CrailBufferedOutputStream extends OutputStream {
public static final Logger LOG = CrailUtils.getLogger();
private CrailStore crailFS;
private CrailFile file;
private long writeHint;
private CrailOutputStream outputStream;
private LinkedList<CrailBuffer> originalBuffers;
private RingBuffer<CrailBuffer> readySlices;
private RingBuffer<CrailBuffer> pendingSlices;
private RingBuffer<Future<CrailResult>> pendingFutures;
private long position;
private boolean open;
private CrailBufferedStatistics statistics;
private int actualSliceSize;
private CrailImmediateOperation noOp;
private ByteBuffer tmpBoundaryBuffer;
private byte[] tmpByteBuf;
CrailBufferedOutputStream(CrailFile file, long writeHint) throws Exception {
this.crailFS = file.getFileSystem();
this.file = file;
this.writeHint = writeHint;
this.outputStream = null;
this.statistics = new CrailBufferedStatistics("buffered/out");
int allocationSize = Math.max(CrailConstants.BUFFER_SIZE, CrailConstants.SLICE_SIZE);
this.actualSliceSize = Math.min(CrailConstants.BUFFER_SIZE, CrailConstants.SLICE_SIZE);
int sliceCount = allocationSize / actualSliceSize;
this.originalBuffers = new LinkedList<>();
this.readySlices = new RingBuffer<>(sliceCount);
this.pendingSlices = new RingBuffer<>(sliceCount);
this.pendingFutures = new RingBuffer<>(sliceCount);
for (int currentSize = 0; currentSize < allocationSize; currentSize += CrailConstants.BUFFER_SIZE){
CrailBuffer buffer = crailFS.allocateBuffer();
originalBuffers.add(buffer);
}
for (CrailBuffer buffer : originalBuffers){
while(buffer.hasRemaining()){
buffer.limit(buffer.position() + actualSliceSize);
CrailBuffer slice = buffer.slice();
slice.clear();
readySlices.add(slice);
int newpos = buffer.position() + actualSliceSize;
buffer.clear();
buffer.position(newpos);
}
}
/* adjust first slice remaining to align position to slice size */
CrailBuffer firstSlice = readySlices.peek();
long streamPosition = outputStream().position();
firstSlice.limit(firstSlice.remaining() - (int)(streamPosition % firstSlice.remaining()));
this.tmpByteBuf = new byte[1];
this.tmpBoundaryBuffer = ByteBuffer.allocate(8);
this.noOp = new CrailImmediateOperation(0);
this.position = 0;
this.open = true;
}
public final void write(int dataBuf) throws IOException {
tmpByteBuf[0] = (byte) dataBuf;
this.write(tmpByteBuf);
}
public final void write(byte[] dataBuf) throws IOException {
this.write(dataBuf, 0, dataBuf.length);
}
public final void write(byte[] dataBuf, int off, int len) throws IOException {
try {
if (dataBuf == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > dataBuf.length) || (len < 0) || ((off + len) > dataBuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (!open) {
throw new IOException("stream closed");
} else if (len == 0) {
return;
}
while (len > 0){
CrailBuffer slice = getSlice();
int bufferRemaining = Math.min(len, slice.remaining());
slice.put(dataBuf, off, bufferRemaining);
off += bufferRemaining;
len -= bufferRemaining;
position += bufferRemaining;
syncSlice();
}
} catch (Exception e) {
throw new IOException(e);
}
}
public final void write(ByteBuffer dataBuf) throws IOException {
try {
if (dataBuf == null) {
throw new NullPointerException();
} else if (!open) {
throw new IOException("stream closed");
} else if (dataBuf.remaining() == 0) {
return;
}
int len = dataBuf.remaining();
while (len > 0) {
CrailBuffer slice = getSlice();
int bufferRemaining = Math.min(len, slice.remaining());
int oldLimit = dataBuf.limit();
dataBuf.limit(dataBuf.position() + bufferRemaining);
slice.put(dataBuf);
dataBuf.limit(oldLimit);
len -= bufferRemaining;
position += bufferRemaining;
syncSlice();
}
} catch (Exception e) {
throw new IOException(e);
}
}
public final void writeDouble(double value) throws Exception {
CrailBuffer slice = getSlice();
if (slice.remaining() >= Double.BYTES){
slice.putDouble(value);
syncSlice();
position += Double.BYTES;
} else {
tmpBoundaryBuffer.clear();
tmpBoundaryBuffer.putDouble(value);
tmpBoundaryBuffer.flip();
write(tmpBoundaryBuffer);
}
}
public final void writeFloat(float value) throws Exception {
CrailBuffer slice = getSlice();
if (slice.remaining() >= Float.BYTES){
slice.putFloat(value);
syncSlice();
position += Float.BYTES;
} else {
tmpBoundaryBuffer.clear();
tmpBoundaryBuffer.putFloat(value);
tmpBoundaryBuffer.flip();
write(tmpBoundaryBuffer);
}
}
public final void writeInt(int value) throws Exception {
CrailBuffer slice = getSlice();
if (slice.remaining() >= Integer.BYTES){
slice.putInt(value);
syncSlice();
position += Integer.BYTES;
} else {
tmpBoundaryBuffer.clear();
tmpBoundaryBuffer.putInt(value);
tmpBoundaryBuffer.flip();
write(tmpBoundaryBuffer);
}
}
public final void writeLong(long value) throws Exception {
CrailBuffer slice = getSlice();
if (slice.remaining() >= Long.BYTES){
slice.putLong(value);
syncSlice();
position += Long.BYTES;
} else {
tmpBoundaryBuffer.clear();
tmpBoundaryBuffer.putLong(value);
tmpBoundaryBuffer.flip();
write(tmpBoundaryBuffer);
}
}
public final void writeShort(short value) throws Exception {
CrailBuffer slice = getSlice();
if (slice.remaining() >= Short.BYTES){
slice.putShort(value);
syncSlice();
position += Short.BYTES;
} else {
tmpBoundaryBuffer.clear();
tmpBoundaryBuffer.putShort(value);
tmpBoundaryBuffer.flip();
write(tmpBoundaryBuffer);
}
}
public Future<CrailResult> purge() throws IOException {
if (!open) {
throw new IOException("stream closed");
}
try {
while(!readySlices.isEmpty()){
CrailBuffer slice = readySlices.poll();
if (slice.position() > 0){
slice.flip();
Future<CrailResult> future = outputStream().write(slice);
pendingSlices.add(slice);
pendingFutures.add(future);
}
}
if (pendingFutures.isEmpty()){
return noOp;
} else {
CrailPurgeOperation purgeOp = new CrailPurgeOperation();
while(!pendingFutures.isEmpty()){
Future<CrailResult> future = pendingFutures.poll();
purgeOp.add(future);
}
return purgeOp;
}
} catch (Exception e) {
throw new IOException(e);
}
}
public Future<Void> sync() throws IOException {
Future<Void> future = outputStream().sync();
return future;
}
public void close() throws IOException {
try {
if (!open){
return;
}
while(!readySlices.isEmpty()){
CrailBuffer slice = readySlices.poll();
if (slice.position() > 0){
slice.flip();
Future<CrailResult> future = outputStream().write(slice);
pendingSlices.add(slice);
pendingFutures.add(future);
}
}
while(!pendingFutures.isEmpty()){
Future<CrailResult> future = pendingFutures.poll();
future.get();
}
while(!originalBuffers.isEmpty()){
CrailBuffer buffer = originalBuffers.remove();
crailFS.freeBuffer(buffer);
}
outputStream().close();
this.crailFS.getStatistics().addProvider(statistics);
open = false;
} catch (Exception e) {
throw new IOException(e);
}
}
public long position() {
return position;
}
@Override
public void flush() throws IOException {
//flush should not be called, use purge instead!
}
public CrailNode getFile() throws IOException {
return outputStream().getFile();
}
private CrailBuffer getSlice() throws Exception {
CrailBuffer slice = readySlices.peek();
if (slice == null){
Future<CrailResult> future = pendingFutures.poll();
statistics.incTotalOps();
if (future.isDone()){
statistics.incNonBlockingOps();
} else {
statistics.incBlockingOps();
}
future.get();
slice = pendingSlices.poll();
slice.clear();
readySlices.add(slice);
}
return slice;
}
private void syncSlice() throws Exception {
CrailBuffer slice = readySlices.peek();
if (slice != null && slice.remaining() == 0){
slice = readySlices.poll();
slice.flip();
Future<CrailResult> future = outputStream().write(slice);
pendingSlices.add(slice);
pendingFutures.add(future);
}
}
final CrailOutputStream outputStream() throws IOException {
if (outputStream == null){
try {
outputStream = file.getDirectOutputStream(writeHint);
} catch(Exception e){
throw new IOException(e);
}
}
return outputStream;
}
}