blob: 43e3bba2d19c15c8c095635172cef81fb0c3001b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given this, the caller should
* always access the underlying ByteBuffer via the {@link #buffer()} method until all writes are completed.
*
* This class is typically used for 2 purposes:
*
* 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order to fit all the desired data
* 2. Write to a ByteBuffer via methods that expect an OutputStream interface
*
* Hard to track bugs can happen when this class is used for the second reason and unexpected buffer expansion happens.
* So, it's best to assume that buffer expansion can always happen. An improvement would be to create a separate class
* that throws an error if buffer expansion is required to avoid the issue altogether.
*/
public class ByteBufferOutputStream extends OutputStream {
private static final float REALLOCATION_FACTOR = 1.1f;
private final int initialCapacity;
private final int initialPosition;
private ByteBuffer buffer;
/**
* Creates an instance of this class that will write to the received `buffer` up to its `limit`. If necessary to
* satisfy `write` or `position` calls, larger buffers will be allocated so the {@link #buffer()} method may return
* a different buffer than the received `buffer` parameter.
*
* Prefer one of the constructors that allocate the internal buffer for clearer semantics.
*/
public ByteBufferOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
this.initialPosition = buffer.position();
this.initialCapacity = buffer.capacity();
}
public ByteBufferOutputStream(int initialCapacity) {
this(initialCapacity, false);
}
public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) {
this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity));
}
public void write(int b) {
ensureRemaining(1);
buffer.put((byte) b);
}
public void write(byte[] bytes, int off, int len) {
ensureRemaining(len);
buffer.put(bytes, off, len);
}
public void write(ByteBuffer sourceBuffer) {
ensureRemaining(sourceBuffer.remaining());
buffer.put(sourceBuffer);
}
public ByteBuffer buffer() {
return buffer;
}
public int position() {
return buffer.position();
}
public int remaining() {
return buffer.remaining();
}
public int limit() {
return buffer.limit();
}
public void position(int position) {
ensureRemaining(position - buffer.position());
buffer.position(position);
}
/**
* The capacity of the first internal ByteBuffer used by this class. This is useful in cases where a pooled
* ByteBuffer was passed via the constructor and it needs to be returned to the pool.
*/
public int initialCapacity() {
return initialCapacity;
}
/**
* Ensure there is enough space to write some number of bytes, expanding the underlying buffer if necessary.
* This can be used to avoid incremental expansions through calls to {@link #write(int)} when you know how
* many total bytes are needed.
*
* @param remainingBytesRequired The number of bytes required
*/
public void ensureRemaining(int remainingBytesRequired) {
if (remainingBytesRequired > buffer.remaining())
expandBuffer(remainingBytesRequired);
}
private void expandBuffer(int remainingRequired) {
int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
ByteBuffer temp = ByteBuffer.allocate(expandSize);
int limit = limit();
buffer.flip();
temp.put(buffer);
buffer.limit(limit);
// reset the old buffer's position so that the partial data in the new buffer cannot be mistakenly consumed
// we should ideally only do this for the original buffer, but the additional complexity doesn't seem worth it
buffer.position(initialPosition);
buffer = temp;
}
}