blob: 7622ffc001c31cac08672e5e6d1da87273c31afd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
/**
* Use a list of {@link ByteBuffer} to implement a single {@link ChunkBuffer}
* so that the buffer can be allocated incrementally.
*/
final class IncrementalChunkBuffer implements ChunkBuffer {
/**
* The limit of the entire {@link ChunkBuffer},
* but not individual {@link ByteBuffer}(s) in the list.
*/
private final int limit;
/** Increment is the capacity of each {@link ByteBuffer} in the list. */
private final int increment;
/** The index at limit. */
private final int limitIndex;
/** Buffer list to be allocated incrementally. */
private final List<ByteBuffer> buffers;
/** Is this a duplicated buffer? (for debug only) */
private final boolean isDuplicated;
/** The index of the first non-full buffer. */
private int firstNonFullIndex = 0;
IncrementalChunkBuffer(int limit, int increment, boolean isDuplicated) {
Preconditions.checkArgument(limit >= 0);
Preconditions.checkArgument(increment > 0);
this.limit = limit;
this.increment = increment;
this.limitIndex = limit/increment;
this.buffers = new ArrayList<>(limitIndex + (limit%increment == 0? 0: 1));
this.isDuplicated = isDuplicated;
}
/** @return the capacity for the buffer at the given index. */
private int getBufferCapacityAtIndex(int i) {
Preconditions.checkArgument(i >= 0);
Preconditions.checkArgument(i <= limitIndex);
return i < limitIndex? increment: limit%increment;
}
private void assertInt(int expected, int computed, String name, int i) {
ChunkBuffer.assertInt(expected, computed,
() -> this + ": Unexpected " + name + " at index " + i);
}
/** @return the i-th buffer if it exists; otherwise, return null. */
private ByteBuffer getAtIndex(int i) {
Preconditions.checkArgument(i >= 0);
Preconditions.checkArgument(i <= limitIndex);
final ByteBuffer ith = i < buffers.size() ? buffers.get(i) : null;
if (ith != null) {
// assert limit/capacity
if (!isDuplicated) {
assertInt(getBufferCapacityAtIndex(i), ith.capacity(), "capacity", i);
} else {
if (i < limitIndex) {
assertInt(increment, ith.capacity(), "capacity", i);
} else if (i == limitIndex) {
assertInt(getBufferCapacityAtIndex(i), ith.limit(), "capacity", i);
} else {
assertInt(0, ith.limit(), "capacity", i);
}
}
}
return ith;
}
/** @return the i-th buffer. It may allocate buffers. */
private ByteBuffer getAndAllocateAtIndex(int index) {
Preconditions.checkArgument(index >= 0);
// never allocate over limit
if (limit % increment == 0) {
Preconditions.checkArgument(index < limitIndex);
} else {
Preconditions.checkArgument(index <= limitIndex);
}
int i = buffers.size();
if (index < i) {
return getAtIndex(index);
}
// allocate upto the given index
ByteBuffer b = null;
for (; i <= index; i++) {
b = ByteBuffer.allocate(getBufferCapacityAtIndex(i));
buffers.add(b);
}
return b;
}
/** @return the buffer containing the position. It may allocate buffers. */
private ByteBuffer getAndAllocateAtPosition(int position) {
Preconditions.checkArgument(position >= 0);
Preconditions.checkArgument(position < limit);
final int i = position / increment;
final ByteBuffer ith = getAndAllocateAtIndex(i);
assertInt(position%increment, ith.position(), "position", i);
return ith;
}
/** @return the index of the first non-full buffer. */
private int firstNonFullIndex() {
for (int i = firstNonFullIndex; i < buffers.size(); i++) {
if (getAtIndex(i).position() != increment) {
firstNonFullIndex = i;
return firstNonFullIndex;
}
}
firstNonFullIndex = buffers.size();
return firstNonFullIndex;
}
@Override
public int position() {
// The buffers list must be in the following orders:
// full buffers, buffer containing the position, empty buffers, null buffers
final int i = firstNonFullIndex();
final ByteBuffer ith = getAtIndex(i);
final int position = i * increment + Optional.ofNullable(ith)
.map(ByteBuffer::position).orElse(0);
// remaining buffers must be empty
assert assertRemainingList(ith, i);
return position;
}
private boolean assertRemainingList(ByteBuffer ith, int i) {
if (ith != null) {
// buffers must be empty
for (i++; i < buffers.size(); i++) {
ith = getAtIndex(i);
if (ith == null) {
break; // found the first non-null
}
assertInt(0, ith.position(), "position", i);
}
}
final int j = i;
ChunkBuffer.assertInt(buffers.size(), i,
() -> "i = " + j + " != buffers.size() = " + buffers.size());
return true;
}
@Override
public int remaining() {
return limit - position();
}
@Override
public int limit() {
return limit;
}
@Override
public ChunkBuffer rewind() {
buffers.forEach(ByteBuffer::rewind);
firstNonFullIndex = 0;
return this;
}
@Override
public ChunkBuffer clear() {
buffers.forEach(ByteBuffer::clear);
firstNonFullIndex = 0;
return this;
}
@Override
public ChunkBuffer put(ByteBuffer that) {
if (that.remaining() > this.remaining()) {
final BufferOverflowException boe = new BufferOverflowException();
boe.initCause(new IllegalArgumentException(
"Failed to put since that.remaining() = " + that.remaining()
+ " > this.remaining() = " + this.remaining()));
throw boe;
}
final int thatLimit = that.limit();
for(int p = position(); that.position() < thatLimit;) {
final ByteBuffer b = getAndAllocateAtPosition(p);
final int min = Math.min(b.remaining(), thatLimit - that.position());
that.limit(that.position() + min);
b.put(that);
p += min;
}
return this;
}
@Override
public ChunkBuffer duplicate(int newPosition, int newLimit) {
Preconditions.checkArgument(newPosition >= 0);
Preconditions.checkArgument(newPosition <= newLimit);
Preconditions.checkArgument(newLimit <= limit);
final IncrementalChunkBuffer duplicated = new IncrementalChunkBuffer(
newLimit, increment, true);
final int pi = newPosition / increment;
final int pr = newPosition % increment;
final int li = newLimit / increment;
final int lr = newLimit % increment;
final int newSize = lr == 0? li: li + 1;
for (int i = 0; i < newSize; i++) {
final int pos = i < pi ? increment : i == pi ? pr : 0;
final int lim = i < li ? increment : i == li ? lr : 0;
duplicated.buffers.add(duplicate(i, pos, lim));
}
return duplicated;
}
private ByteBuffer duplicate(int i, int pos, int lim) {
final ByteBuffer ith = getAtIndex(i);
Objects.requireNonNull(ith, () -> "buffers[" + i + "] == null");
final ByteBuffer b = ith.duplicate();
b.position(pos).limit(lim);
return b;
}
/** Support only when bufferSize == increment. */
@Override
public Iterable<ByteBuffer> iterate(int bufferSize) {
if (bufferSize != increment) {
throw new UnsupportedOperationException(
"Buffer size and increment mismatched: bufferSize = " + bufferSize
+ " but increment = " + increment);
}
return asByteBufferList();
}
@Override
public List<ByteBuffer> asByteBufferList() {
return Collections.unmodifiableList(buffers);
}
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffers.toArray(new ByteBuffer[0]));
}
@Override
public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
ByteString result = ByteString.EMPTY;
for (ByteBuffer buffer : buffers) {
result = result.concat(f.apply(buffer));
}
return result;
}
@Override
public List<ByteString> toByteStringListImpl(
Function<ByteBuffer, ByteString> f) {
List<ByteString> byteStringList = new ArrayList<>();
for (ByteBuffer buffer : buffers) {
byteStringList.add(f.apply(buffer));
}
return byteStringList;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
} else if (!(obj instanceof IncrementalChunkBuffer)) {
return false;
}
final IncrementalChunkBuffer that = (IncrementalChunkBuffer)obj;
return this.limit == that.limit && this.buffers.equals(that.buffers);
}
@Override
public int hashCode() {
return buffers.hashCode();
}
@Override
public String toString() {
return getClass().getSimpleName()
+ ":limit=" + limit + ",increment=" + increment;
}
}