| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal; |
| |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.BufferOverflowException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SocketChannel; |
| |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.internal.cache.BytesAndBitsForCompactor; |
| import org.apache.geode.internal.serialization.KnownVersion; |
| import org.apache.geode.internal.serialization.StaticSerialization; |
| import org.apache.geode.internal.tcp.ByteBufferInputStream; |
| |
| /** |
| * HeapDataOutputStream is an OutputStream that also implements DataOutput and stores all data |
| * written to it in heap memory. It is always better to use this class instead |
| * ByteArrayOutputStream. |
| * <p> |
| * This class is not thread safe |
| * <p> |
| * Added boolean flag that when turned on will throw an exception instead of allocating a new |
| * buffer. The exception is a BufferOverflowException thrown from expand, and will restore the |
| * position to the point at which the flag was set with the disallowExpansion method. |
| * |
| * Usage Model: boolean succeeded = true; stream.disallowExpansion(); try { |
| * DataSerializer.writeObject(obj, stream); } catch (BufferOverflowException e) { succeeded = false; |
| * } |
| * |
| * @since GemFire 5.0.2 |
| */ |
| public class HeapDataOutputStream extends |
| org.apache.geode.internal.serialization.BufferDataOutputStream |
| implements ObjToByteArraySerializer, ByteBufferWriter { |
| |
| public HeapDataOutputStream(KnownVersion version) { |
| this(INITIAL_CAPACITY, version); |
| } |
| |
| /** |
| * Create a HeapDataOutputStream optimized to contain just the specified string. The string will |
| * be written to this stream encoded as utf. |
| */ |
| public HeapDataOutputStream(String s) { |
| super(s); |
| } |
| |
| public HeapDataOutputStream(int allocSize, KnownVersion version) { |
| this(allocSize, version, false); |
| } |
| |
| public HeapDataOutputStream(int allocSize) { |
| this(allocSize, null, false); |
| } |
| |
| /** |
| * @param doNotCopy if true then byte arrays/buffers/sources will not be copied to this hdos but |
| * instead referenced. |
| */ |
| public HeapDataOutputStream(int allocSize, KnownVersion version, boolean doNotCopy) { |
| super(allocSize, version, doNotCopy); |
| } |
| |
| /** |
| * @param doNotCopy if true then byte arrays/buffers/sources will not be copied to this hdos but |
| * instead referenced. |
| */ |
| public HeapDataOutputStream(ByteBuffer initialBuffer, KnownVersion version, boolean doNotCopy) { |
| super(initialBuffer, version, doNotCopy); |
| } |
| |
| /** |
| * Construct a HeapDataOutputStream which uses the byte array provided as its underlying |
| * ByteBuffer |
| * |
| */ |
| public HeapDataOutputStream(byte[] bytes) { |
| super(bytes); |
| } |
| |
| /** |
| * Free up any unused memory |
| */ |
| public void trim() { |
| finishWriting(); |
| if (this.buffer.limit() < this.buffer.capacity()) { |
| // buffer is less than half full so allocate a new one and copy it in |
| ByteBuffer bb = ByteBuffer.allocate(this.buffer.limit()); |
| bb.put(this.buffer); |
| bb.flip(); // now ready for reading |
| this.buffer = bb; |
| } |
| } |
| |
| |
| /** |
| * Writes this stream to the wrapper object of BytesAndBitsForCompactor type. The byte array |
| * retrieved from the HeapDataOutputStream is set in the wrapper object. The byte array may be |
| * partially filled. The valid length of data in the byte array is set in the wrapper. It is |
| * assumed that the HeapDataOutputStream is appropriately seeded with a byte array from the |
| * wrapper. However the filled byte array may or may not be the same as that used for seeding , |
| * depending upon whether the data got accommodated in the original byte buffer or not. |
| * |
| */ |
| // Asif |
| public void sendTo(BytesAndBitsForCompactor wrapper, byte userBits) { |
| ByteBuffer bb = toByteBuffer(); |
| if (bb.hasArray() && bb.arrayOffset() == 0) { |
| wrapper.setData(bb.array(), userBits, bb.limit(), true /* is Reusable */); |
| } else { |
| // create a new buffer of just the right size and copy the old buffer into |
| // it |
| ByteBuffer tmp = ByteBuffer.allocate(bb.remaining()); |
| tmp.put(bb); |
| tmp.flip(); |
| this.buffer = tmp; |
| byte[] bytes = this.buffer.array(); |
| wrapper.setData(bytes, userBits, bytes.length, true /* is Reusable */); |
| } |
| } |
| |
| /** |
| * Write this stream to the specified channel. Call multiple times until size returns zero to make |
| * sure all bytes in the stream have been written. |
| * |
| * @return the number of bytes written, possibly zero. |
| * @throws IOException if channel is closed, not yet connected, or some other I/O error occurs. |
| */ |
| public int sendTo(SocketChannel chan) throws IOException { |
| finishWriting(); |
| if (size() == 0) { |
| return 0; |
| } |
| int result; |
| if (this.chunks != null) { |
| ByteBuffer[] bufs = new ByteBuffer[this.chunks.size() + 1]; |
| bufs = this.chunks.toArray(bufs); |
| bufs[this.chunks.size()] = this.buffer; |
| result = (int) chan.write(bufs); |
| } else { |
| result = chan.write(this.buffer); |
| } |
| this.size -= result; |
| return result; |
| } |
| |
| public void sendTo(SocketChannel chan, ByteBuffer out) throws IOException { |
| finishWriting(); |
| if (size() == 0) { |
| return; |
| } |
| out.clear(); |
| if (this.chunks != null) { |
| for (ByteBuffer bb : this.chunks) { |
| sendChunkTo(bb, chan, out); |
| } |
| } |
| sendChunkTo(this.buffer, chan, out); |
| flushBuffer(chan, out); |
| } |
| |
| /** |
| * sends the data from "in" by writing it to "sc" through "out" (out is used to chunk to data and |
| * is probably a direct memory buffer). |
| */ |
| private void sendChunkTo(ByteBuffer in, SocketChannel sc, ByteBuffer out) throws IOException { |
| int bytesSent = in.remaining(); |
| if (in.isDirect()) { |
| flushBuffer(sc, out); |
| while (in.remaining() > 0) { |
| sc.write(in); |
| } |
| } else { |
| // copy in to out. If out fills flush it |
| int OUT_MAX = out.remaining(); |
| if (bytesSent <= OUT_MAX) { |
| out.put(in); |
| } else { |
| final byte[] bytes = in.array(); |
| int off = in.arrayOffset() + in.position(); |
| int len = bytesSent; |
| while (len > 0) { |
| int bytesThisTime = len; |
| if (bytesThisTime > OUT_MAX) { |
| bytesThisTime = OUT_MAX; |
| } |
| out.put(bytes, off, bytesThisTime); |
| off += bytesThisTime; |
| len -= bytesThisTime; |
| flushBuffer(sc, out); |
| OUT_MAX = out.remaining(); |
| } |
| in.position(in.limit()); |
| } |
| } |
| this.size -= bytesSent; |
| } |
| |
| /** |
| * Write the contents of this stream to the byte buffer. |
| * |
| * @throws BufferOverflowException if out is not large enough to contain all of our data. |
| */ |
| public void sendTo(ByteBuffer out) { |
| finishWriting(); |
| if (out.remaining() < size()) { |
| throw new BufferOverflowException(); |
| } |
| if (this.chunks != null) { |
| for (ByteBuffer bb : this.chunks) { |
| int bytesToWrite = bb.remaining(); |
| if (bytesToWrite > 0) { |
| out.put(bb); |
| this.size -= bytesToWrite; |
| } |
| } |
| } |
| { |
| ByteBuffer bb = this.buffer; |
| int bytesToWrite = bb.remaining(); |
| if (bytesToWrite > 0) { |
| out.put(bb); |
| this.size -= bytesToWrite; |
| } |
| } |
| } |
| |
| /** |
| * Write the contents of this stream to the specified stream using outBuf if a buffer is needed. |
| */ |
| public void sendTo(OutputStream out, ByteBuffer outBuf) throws IOException { |
| finishWriting(); |
| if (this.chunks != null) { |
| for (ByteBuffer bb : this.chunks) { |
| sendTo(out, outBuf, bb); |
| } |
| } |
| sendTo(out, outBuf, this.buffer); |
| flushStream(out, outBuf); |
| } |
| |
| private void sendTo(OutputStream out, ByteBuffer outBuf, ByteBuffer inBuf) throws IOException { |
| this.size -= writeByteBufferToStream(out, outBuf, inBuf); |
| } |
| |
| /** |
| * Returns the number of bytes written |
| */ |
| public static int writeByteBufferToStream(OutputStream out, ByteBuffer outBuf, ByteBuffer inBuf) |
| throws IOException { |
| int bytesToWrite = inBuf.remaining(); |
| if (bytesToWrite > 0) { |
| if (inBuf.hasArray()) { |
| flushStream(out, outBuf); |
| out.write(inBuf.array(), inBuf.arrayOffset() + inBuf.position(), bytesToWrite); |
| inBuf.position(inBuf.limit()); |
| } else { // fix for bug 43007 |
| // copy direct inBuf to heap outBuf. If out fills flush it |
| int bytesToWriteThisTime = bytesToWrite; |
| int OUT_MAX = outBuf.remaining(); |
| while (bytesToWriteThisTime > OUT_MAX) { |
| // copy only OUT_MAX bytes and flush outBuf |
| int oldLimit = inBuf.limit(); |
| inBuf.limit(inBuf.position() + OUT_MAX); |
| outBuf.put(inBuf); |
| inBuf.limit(oldLimit); |
| flushStream(out, outBuf); |
| bytesToWriteThisTime -= OUT_MAX; |
| OUT_MAX = outBuf.remaining(); |
| } |
| outBuf.put(inBuf); |
| } |
| } |
| return bytesToWrite; |
| } |
| |
| /** |
| * Write the contents of this stream to the specified stream. |
| */ |
| public void sendTo(ByteBufferWriter out) { |
| finishWriting(); |
| if (this.chunks != null) { |
| for (ByteBuffer bb : this.chunks) { |
| basicSendTo(out, bb); |
| } |
| } |
| basicSendTo(out, this.buffer); |
| } |
| |
| private void basicSendTo(ByteBufferWriter out, ByteBuffer bb) { |
| int bytesToWrite = bb.remaining(); |
| if (bytesToWrite > 0) { |
| out.write(bb.duplicate()); |
| this.size -= bytesToWrite; |
| } |
| } |
| |
| /** |
| * Write the contents of this stream to the specified stream. |
| * <p> |
| * Note this implementation is exactly the same as writeTo(OutputStream) but they do not both |
| * implement a common interface. |
| */ |
| public void sendTo(DataOutput out) throws IOException { |
| finishWriting(); |
| if (this.chunks != null) { |
| for (ByteBuffer bb : this.chunks) { |
| int bytesToWrite = bb.remaining(); |
| if (bytesToWrite > 0) { |
| if (bb.hasArray()) { |
| out.write(bb.array(), bb.arrayOffset() + bb.position(), bytesToWrite); |
| bb.position(bb.limit()); |
| } else { |
| byte[] bytes = new byte[bytesToWrite]; |
| bb.get(bytes); |
| out.write(bytes); |
| } |
| this.size -= bytesToWrite; |
| } |
| } |
| } |
| { |
| ByteBuffer bb = this.buffer; |
| int bytesToWrite = bb.remaining(); |
| if (bytesToWrite > 0) { |
| if (bb.hasArray()) { |
| out.write(bb.array(), bb.arrayOffset() + bb.position(), bytesToWrite); |
| bb.position(bb.limit()); |
| } else { |
| byte[] bytes = new byte[bytesToWrite]; |
| bb.get(bytes); |
| out.write(bytes); |
| } |
| this.size -= bytesToWrite; |
| } |
| } |
| } |
| |
| // DataOutput methods |
| |
| /** |
| * Writes the given object to this stream as a byte array. The byte array is produced by |
| * serializing v. The serialization is done by calling DataSerializer.writeObject. |
| */ |
| @Override |
| public void writeAsSerializedByteArray(Object v) throws IOException { |
| if (this.ignoreWrites) |
| return; |
| checkIfWritable(); |
| ensureCapacity(5); |
| if (v instanceof HeapDataOutputStream) { |
| HeapDataOutputStream other = (HeapDataOutputStream) v; |
| other.finishWriting(); |
| InternalDataSerializer.writeArrayLength(other.size(), this); |
| if (this.doNotCopy) { |
| if (other.chunks != null) { |
| for (ByteBuffer bb : other.chunks) { |
| write(bb); |
| } |
| } |
| write(other.buffer); |
| } else { |
| other.sendTo((ByteBufferWriter) this); |
| other.rewind(); |
| } |
| } else { |
| ByteBuffer sizeBuf = this.buffer; |
| int sizePos = sizeBuf.position(); |
| sizeBuf.position(sizePos + 5); |
| final int preArraySize = size(); |
| DataSerializer.writeObject(v, this); |
| int arraySize = size() - preArraySize; |
| sizeBuf.put(sizePos, StaticSerialization.INT_ARRAY_LEN); |
| sizeBuf.putInt(sizePos + 1, arraySize); |
| } |
| } |
| |
| /** |
| * Write a byte source to this HeapDataOutputStream, |
| * |
| * the contents of the buffer between the position and the limit are copied to the output stream. |
| */ |
| public void write(ByteBufferInputStream.ByteSource source) { |
| ByteBuffer bb = source.getBackingByteBuffer(); |
| if (bb != null) { |
| write(bb); |
| return; |
| } |
| if (this.ignoreWrites) |
| return; |
| checkIfWritable(); |
| int remainingSpace = this.buffer.limit() - this.buffer.position(); |
| if (remainingSpace < source.remaining()) { |
| int oldLimit = source.limit(); |
| source.limit(source.position() + remainingSpace); |
| source.sendTo(this.buffer); |
| source.limit(oldLimit); |
| ensureCapacity(source.remaining()); |
| } |
| source.sendTo(this.buffer); |
| } |
| |
| |
| } |