| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.util; |
| |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.channels.WritableByteChannel; |
| |
| import com.google.common.base.Preconditions; |
| |
| import net.nicoulaj.compilecommand.annotations.DontInline; |
| import org.apache.cassandra.config.Config; |
| import org.apache.cassandra.utils.FastByteOperations; |
| import org.apache.cassandra.utils.memory.MemoryUtil; |
| |
| /** |
| * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes |
| * before flushing them to an underlying channel. |
| * |
| * This class is completely thread unsafe. |
| */ |
| public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus |
| { |
| private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32); |
| |
| protected ByteBuffer buffer; |
| |
| public BufferedDataOutputStreamPlus(RandomAccessFile ras) |
| { |
| this(ras.getChannel()); |
| } |
| |
| public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize) |
| { |
| this(ras.getChannel(), bufferSize); |
| } |
| |
| public BufferedDataOutputStreamPlus(FileOutputStream fos) |
| { |
| this(fos.getChannel()); |
| } |
| |
| public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize) |
| { |
| this(fos.getChannel(), bufferSize); |
| } |
| |
| public BufferedDataOutputStreamPlus(WritableByteChannel wbc) |
| { |
| this(wbc, DEFAULT_BUFFER_SIZE); |
| } |
| |
| public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize) |
| { |
| this(wbc, ByteBuffer.allocateDirect(bufferSize)); |
| Preconditions.checkNotNull(wbc); |
| Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double"); |
| } |
| |
| protected BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer) |
| { |
| super(channel); |
| this.buffer = buffer; |
| } |
| |
| protected BufferedDataOutputStreamPlus(ByteBuffer buffer) |
| { |
| super(); |
| this.buffer = buffer; |
| } |
| |
| @Override |
| public void write(byte[] b) throws IOException |
| { |
| write(b, 0, b.length); |
| } |
| |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException |
| { |
| if (b == null) |
| throw new NullPointerException(); |
| |
| // avoid int overflow |
| if (off < 0 || off > b.length || len < 0 |
| || len > b.length - off) |
| throw new IndexOutOfBoundsException(); |
| |
| if (len == 0) |
| return; |
| |
| int copied = 0; |
| while (copied < len) |
| { |
| if (buffer.hasRemaining()) |
| { |
| int toCopy = Math.min(len - copied, buffer.remaining()); |
| buffer.put(b, off + copied, toCopy); |
| copied += toCopy; |
| } |
| else |
| { |
| doFlush(len - copied); |
| } |
| } |
| } |
| |
| /* |
| * Makes a defensive copy of the incoming ByteBuffer and don't modify the position or limit |
| * even temporarily so it is thread-safe WRT to the incoming buffer |
| * (non-Javadoc) |
| * @see org.apache.cassandra.io.util.DataOutputPlus#write(java.nio.ByteBuffer) |
| */ |
| @Override |
| public void write(ByteBuffer src) throws IOException |
| { |
| int srcPos = src.position(); |
| int srcCount; |
| int trgAvailable; |
| while ((srcCount = src.limit() - srcPos) > (trgAvailable = buffer.remaining())) |
| { |
| FastByteOperations.copy(src, srcPos, buffer, buffer.position(), trgAvailable); |
| buffer.position(buffer.position() + trgAvailable); |
| srcPos += trgAvailable; |
| doFlush(src.limit() - srcPos); |
| } |
| FastByteOperations.copy(src, srcPos, buffer, buffer.position(), srcCount); |
| buffer.position(buffer.position() + srcCount); |
| } |
| |
| @Override |
| public void write(int b) throws IOException |
| { |
| if (!buffer.hasRemaining()) |
| doFlush(1); |
| buffer.put((byte) (b & 0xFF)); |
| } |
| |
| @Override |
| public void writeBoolean(boolean v) throws IOException |
| { |
| if (!buffer.hasRemaining()) |
| doFlush(1); |
| buffer.put(v ? (byte)1 : (byte)0); |
| } |
| |
| @Override |
| public void writeByte(int v) throws IOException |
| { |
| write(v); |
| } |
| |
| @Override |
| public void writeShort(int v) throws IOException |
| { |
| writeChar(v); |
| } |
| |
| @Override |
| public void writeChar(int v) throws IOException |
| { |
| if (buffer.remaining() < 2) |
| writeSlow(v, 2); |
| else |
| buffer.putChar((char) v); |
| } |
| |
| @Override |
| public void writeInt(int v) throws IOException |
| { |
| if (buffer.remaining() < 4) |
| writeSlow(v, 4); |
| else |
| buffer.putInt(v); |
| } |
| |
| @Override |
| public void writeLong(long v) throws IOException |
| { |
| if (buffer.remaining() < 8) |
| writeSlow(v, 8); |
| else |
| buffer.putLong(v); |
| } |
| |
| @Override |
| public void writeFloat(float v) throws IOException |
| { |
| writeInt(Float.floatToRawIntBits(v)); |
| } |
| |
| @Override |
| public void writeDouble(double v) throws IOException |
| { |
| writeLong(Double.doubleToRawLongBits(v)); |
| } |
| |
| @DontInline |
| private void writeSlow(long bytes, int count) throws IOException |
| { |
| int origCount = count; |
| if (ByteOrder.BIG_ENDIAN == buffer.order()) |
| while (count > 0) writeByte((int) (bytes >>> (8 * --count))); |
| else |
| while (count > 0) writeByte((int) (bytes >>> (8 * (origCount - count--)))); |
| } |
| |
| @Override |
| public void writeBytes(String s) throws IOException |
| { |
| for (int index = 0; index < s.length(); index++) |
| writeByte(s.charAt(index)); |
| } |
| |
| @Override |
| public void writeChars(String s) throws IOException |
| { |
| for (int index = 0; index < s.length(); index++) |
| writeChar(s.charAt(index)); |
| } |
| |
| @Override |
| public void writeUTF(String s) throws IOException |
| { |
| UnbufferedDataOutputStreamPlus.writeUTF(s, this); |
| } |
| |
| /* |
| * Count is the number of bytes remaining to write ignoring already remaining capacity |
| */ |
| @DontInline |
| protected void doFlush(int count) throws IOException |
| { |
| buffer.flip(); |
| |
| while (buffer.hasRemaining()) |
| channel.write(buffer); |
| |
| buffer.clear(); |
| } |
| |
| @Override |
| public void flush() throws IOException |
| { |
| doFlush(0); |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| doFlush(0); |
| channel.close(); |
| FileUtils.clean(buffer); |
| buffer = null; |
| } |
| |
| public BufferedDataOutputStreamPlus order(ByteOrder order) |
| { |
| this.buffer.order(order); |
| return this; |
| } |
| } |