blob: f3f31739c6e060c380aa8cc69e4c79f57c4aa2c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
/**
* {@link ChunkBuffer} implementation using a list of {@link ByteBuffer}s.
* Not thread-safe.
*/
public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
/** Buffer list backing the ChunkBuffer. */
private final List<ByteBuffer> buffers;
private final int limit;
private int limitPrecedingCurrent;
private int currentIndex;
ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
Preconditions.checkArgument(buffers != null, "buffer == null");
this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
ImmutableList.of(EMPTY_BUFFER);
this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
findCurrent();
}
private void findCurrent() {
boolean found = false;
for (int i = 0; i < buffers.size(); i++) {
final ByteBuffer buf = buffers.get(i);
final int pos = buf.position();
if (found) {
Preconditions.checkArgument(pos == 0,
"all buffers after current one should have position=0");
} else if (pos < buf.limit()) {
found = true;
currentIndex = i;
} else {
limitPrecedingCurrent += buf.limit();
}
}
if (!found) {
currentIndex = buffers.size() - 1;
limitPrecedingCurrent -= current().limit();
}
}
private ByteBuffer current() {
return buffers.get(currentIndex);
}
private void advanceCurrent() {
if (currentIndex < buffers.size() - 1) {
final ByteBuffer current = buffers.get(currentIndex);
if (!current.hasRemaining()) {
currentIndex++;
limitPrecedingCurrent += current.limit();
}
}
}
private void rewindCurrent() {
currentIndex = 0;
limitPrecedingCurrent = 0;
}
@Override
public int position() {
return limitPrecedingCurrent + current().position();
}
@Override
public int remaining() {
return limit - position();
}
@Override
public int limit() {
return limit;
}
@Override
public boolean hasRemaining() {
return position() < limit;
}
@Override
public ChunkBuffer rewind() {
buffers.forEach(ByteBuffer::rewind);
rewindCurrent();
return this;
}
@Override
public ChunkBuffer clear() {
buffers.forEach(ByteBuffer::clear);
rewindCurrent();
return this;
}
@Override
public ChunkBuffer put(ByteBuffer that) {
final int thisRemaining = remaining();
int thatRemaining = that.remaining();
if (thatRemaining > thisRemaining) {
final BufferOverflowException boe = new BufferOverflowException();
boe.initCause(new IllegalArgumentException(
"Failed to put since that.remaining() = " + thatRemaining
+ " > this.remaining() = " + thisRemaining));
throw boe;
}
while (thatRemaining > 0) {
final ByteBuffer b = current();
final int bytes = Math.min(b.remaining(), thatRemaining);
that.limit(that.position() + bytes);
b.put(that);
thatRemaining -= bytes;
advanceCurrent();
}
return this;
}
@Override
public ChunkBuffer duplicate(int newPosition, int newLimit) {
Preconditions.checkArgument(newPosition >= 0);
Preconditions.checkArgument(newPosition <= newLimit);
Preconditions.checkArgument(newLimit <= limit());
final List<ByteBuffer> duplicates = new ArrayList<>(buffers.size());
int min = 0;
for (final ByteBuffer buf : buffers) {
final int max = min + buf.limit();
final int pos = relativeToRange(newPosition, min, max);
final int lim = relativeToRange(newLimit, min, max);
final ByteBuffer duplicate = buf.duplicate();
duplicate.position(pos).limit(lim);
duplicates.add(duplicate);
min = max;
}
return new ChunkBufferImplWithByteBufferList(duplicates);
}
@Override
/**
* Returns the next buffer in the list irrespective of the bufferSize.
*/
public Iterable<ByteBuffer> iterate(int bufferSize) {
return () -> new Iterator<ByteBuffer>() {
@Override
public boolean hasNext() {
return hasRemaining();
}
@Override
public ByteBuffer next() {
if (!hasRemaining()) {
throw new NoSuchElementException();
}
findCurrent();
ByteBuffer current = buffers.get(currentIndex);
final ByteBuffer duplicated = current.duplicate();
duplicated.limit(current.limit());
current.position(current.limit());
return duplicated;
}
};
}
@Override
public List<ByteBuffer> asByteBufferList() {
return buffers;
}
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
findCurrent();
return bytes;
}
@Override
public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
ByteString result = ByteString.EMPTY;
for (ByteBuffer buffer : buffers) {
result = result.concat(f.apply(buffer));
}
return result;
}
@Override
public List<ByteString> toByteStringListImpl(
Function<ByteBuffer, ByteString> f) {
List<ByteString> byteStringList = new ArrayList<>();
for (ByteBuffer buffer : buffers) {
byteStringList.add(f.apply(buffer));
}
return byteStringList;
}
@Override
public String toString() {
return getClass().getSimpleName()
+ ":n=" + buffers.size()
+ ":p=" + position()
+ ":l=" + limit();
}
private static int relativeToRange(int value, int min, int max) {
final int pos;
if (value <= min) {
pos = 0;
} else if (value <= max) {
pos = value - min;
} else {
pos = max - min;
}
return pos;
}
}