| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.util; |
| |
| import java.io.OutputStream; |
| import javax.annotation.concurrent.NotThreadSafe; |
| import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; |
| import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations; |
| |
| /** |
| * An {@link OutputStream} that produces {@link ByteString}s. |
| * |
| * <p>Closing this output stream does nothing. |
| * |
| * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe |
| * manner. This differs from {@link ByteString.Output} which synchronizes its writes. |
| */ |
| @NotThreadSafe |
| public final class ByteStringOutputStream extends OutputStream { |
| |
| // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which |
| // isn't public to prevent copying the bytes again when concatenating ByteStrings instead |
| // of appending. |
| private static final int DEFAULT_CAPACITY = 128; |
| |
| // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster |
| // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor |
| // of 2. |
| // |
| // This number should be tuned periodically as hardware changes. |
| private static final int MAX_CHUNK_SIZE = 256 * 1024; |
| |
| // ByteString to be concatenated to create the result |
| private ByteString result; |
| |
| // Current buffer to which we are writing |
| private byte[] buffer; |
| |
| // Location in buffer[] to which we write the next byte. |
| private int bufferPos; |
| |
| /** Creates a new output stream with a default capacity. */ |
| public ByteStringOutputStream() { |
| this(DEFAULT_CAPACITY); |
| } |
| |
| /** |
| * Creates a new output stream with the specified initial capacity. |
| * |
| * @param initialCapacity the initial capacity of the output stream. |
| */ |
| public ByteStringOutputStream(int initialCapacity) { |
| if (initialCapacity < 0) { |
| throw new IllegalArgumentException("Initial capacity < 0"); |
| } |
| this.buffer = new byte[initialCapacity]; |
| this.result = ByteString.EMPTY; |
| } |
| |
| @Override |
| public void write(int b) { |
| if (bufferPos == buffer.length) { |
| // We want to increase our total capacity by 50% but not larger than the max chunk size. |
| result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); |
| buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)]; |
| bufferPos = 0; |
| } |
| buffer[bufferPos++] = (byte) b; |
| } |
| |
| @Override |
| public void write(byte[] b, int offset, int length) { |
| int remainingSpaceInBuffer = buffer.length - bufferPos; |
| while (length > remainingSpaceInBuffer) { |
| // Use up the current buffer |
| System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer); |
| offset += remainingSpaceInBuffer; |
| length -= remainingSpaceInBuffer; |
| |
| result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); |
| // We want to increase our total capacity but not larger than the max chunk size. |
| remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE); |
| buffer = new byte[remainingSpaceInBuffer]; |
| bufferPos = 0; |
| } |
| |
| System.arraycopy(b, offset, buffer, bufferPos, length); |
| bufferPos += length; |
| } |
| |
| /** |
| * Creates a byte string with the size and contents of this output stream. |
| * |
| * <p>Note that the caller must not invoke {#link {@link #toByteStringAndReset} as the internal |
| * buffer maybe mutated by a future {@link #write} mutating {@link ByteString}s returned in the |
| * past. |
| */ |
| public ByteString toByteString() { |
| // We specifically choose to concatenate here since the user won't be re-using the buffer. |
| return result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos)); |
| } |
| |
| /** |
| * Creates a byte string with the size and contents of this output stream and resets the output |
| * stream to be re-used possibly re-using any existing buffers. |
| */ |
| public ByteString toByteStringAndReset() { |
| ByteString rval; |
| if (bufferPos > 0) { |
| final boolean copy; |
| // These thresholds are from the results of ByteStringOutputStreamBenchmark.CopyVewNew |
| // which show that at these thresholds we should copy the bytes instead to re-use |
| // the existing buffer since creating a new one is more expensive. |
| if (buffer.length <= 128) { |
| // Always copy small byte arrays to prevent large chunks of wasted space |
| // when dealing with very small amounts of data. |
| copy = true; |
| } else if (buffer.length <= 1024) { |
| copy = bufferPos <= buffer.length * 0.875; |
| } else if (buffer.length <= 8192) { |
| copy = bufferPos <= buffer.length * 0.75; |
| } else { |
| copy = bufferPos <= buffer.length * 0.4375; |
| } |
| if (copy) { |
| byte[] bufferCopy = new byte[bufferPos]; |
| System.arraycopy(buffer, 0, bufferCopy, 0, bufferPos); |
| rval = result.concat(UnsafeByteOperations.unsafeWrap(bufferCopy)); |
| } else { |
| rval = result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos)); |
| buffer = new byte[Math.min(rval.size(), MAX_CHUNK_SIZE)]; |
| } |
| bufferPos = 0; |
| } else { |
| rval = result; |
| } |
| result = ByteString.EMPTY; |
| return rval; |
| } |
| |
| /** |
| * Returns the current size of the output stream. |
| * |
| * @return the current size of the output stream |
| */ |
| public int size() { |
| return result.size() + bufferPos; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format( |
| "<ByteStringOutputStream@%s size=%d>", |
| Integer.toHexString(System.identityHashCode(this)), size()); |
| } |
| } |