blob: b5d9da0f1aa3e28fe347bca3852fb2861522ede6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.util.ArrayList;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link BufferBuilder}.
*/
public class BufferBuilderAndConsumerTest {
private static final int BUFFER_INT_SIZE = 10;
private static final int BUFFER_SIZE = BUFFER_INT_SIZE * Integer.BYTES;
@Test
public void referenceCounting() {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
assertEquals(3 * Integer.BYTES, bufferBuilder.appendAndCommit(toByteBuffer(1, 2, 3)));
Buffer buffer = bufferConsumer.build();
assertFalse(buffer.isRecycled());
buffer.recycleBuffer();
assertFalse(buffer.isRecycled());
bufferConsumer.close();
assertTrue(buffer.isRecycled());
}
@Test
public void append() {
BufferBuilder bufferBuilder = createBufferBuilder();
int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
assertEquals(bytesToWrite.limit(), bufferBuilder.appendAndCommit(bytesToWrite));
assertEquals(bytesToWrite.limit(), bytesToWrite.position());
assertFalse(bufferBuilder.isFull());
assertContent(bufferBuilder.createBufferConsumer(), intsToWrite);
}
@Test
public void multipleAppends() {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
bufferBuilder.appendAndCommit(toByteBuffer(2));
bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
assertContent(bufferConsumer, 0, 1, 2, 3, 42);
}
@Test
public void multipleNotCommittedAppends() {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
bufferBuilder.append(toByteBuffer(0, 1));
bufferBuilder.append(toByteBuffer(2));
bufferBuilder.append(toByteBuffer(3, 42));
assertContent(bufferConsumer);
bufferBuilder.commit();
assertContent(bufferConsumer, 0, 1, 2, 3, 42);
}
@Test
public void appendOverSize() {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42);
assertEquals(BUFFER_SIZE, bufferBuilder.appendAndCommit(bytesToWrite));
assertTrue(bufferBuilder.isFull());
assertContent(bufferConsumer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
bufferBuilder = createBufferBuilder();
bufferConsumer = bufferBuilder.createBufferConsumer();
assertEquals(Integer.BYTES, bufferBuilder.appendAndCommit(bytesToWrite));
assertFalse(bufferBuilder.isFull());
assertContent(bufferConsumer, 42);
}
@Test(expected = IllegalStateException.class)
public void creatingBufferConsumerTwice() {
BufferBuilder bufferBuilder = createBufferBuilder();
bufferBuilder.createBufferConsumer();
bufferBuilder.createBufferConsumer();
}
@Test
public void copy() {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer1 = bufferBuilder.createBufferConsumer();
bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
BufferConsumer bufferConsumer2 = bufferConsumer1.copy();
bufferBuilder.appendAndCommit(toByteBuffer(2));
assertContent(bufferConsumer1, 0, 1, 2);
assertContent(bufferConsumer2, 0, 1, 2);
BufferConsumer bufferConsumer3 = bufferConsumer1.copy();
bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
BufferConsumer bufferConsumer4 = bufferConsumer1.copy();
assertContent(bufferConsumer1, 3, 42);
assertContent(bufferConsumer2, 3, 42);
assertContent(bufferConsumer3, 3, 42);
assertContent(bufferConsumer4, 3, 42);
}
@Test
public void buildEmptyBuffer() {
Buffer buffer = buildSingleBuffer(createBufferBuilder());
assertEquals(0, buffer.getSize());
assertContent(buffer);
}
@Test
public void buildingBufferMultipleTimes() {
BufferBuilder bufferBuilder = createBufferBuilder();
try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) {
bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
bufferBuilder.appendAndCommit(toByteBuffer(2));
assertContent(bufferConsumer, 0, 1, 2);
bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
bufferBuilder.appendAndCommit(toByteBuffer(44));
assertContent(bufferConsumer, 3, 42, 44);
ArrayList<Integer> originalValues = new ArrayList<>();
while (!bufferBuilder.isFull()) {
bufferBuilder.appendAndCommit(toByteBuffer(1337));
originalValues.add(1337);
}
assertContent(bufferConsumer, originalValues.stream().mapToInt(Integer::intValue).toArray());
}
}
@Test
public void emptyIsFinished() {
testIsFinished(0);
}
@Test
public void partiallyFullIsFinished() {
testIsFinished(BUFFER_INT_SIZE / 2);
}
@Test
public void fullIsFinished() {
testIsFinished(BUFFER_INT_SIZE);
}
private static void testIsFinished(int writes) {
BufferBuilder bufferBuilder = createBufferBuilder();
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
for (int i = 0; i < writes; i++) {
assertEquals(Integer.BYTES, bufferBuilder.appendAndCommit(toByteBuffer(42)));
}
int expectedWrittenBytes = writes * Integer.BYTES;
assertFalse(bufferBuilder.isFinished());
assertFalse(bufferConsumer.isFinished());
assertEquals(0, bufferConsumer.getWrittenBytes());
bufferConsumer.build();
assertFalse(bufferBuilder.isFinished());
assertFalse(bufferConsumer.isFinished());
assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes());
int actualWrittenBytes = bufferBuilder.finish();
assertEquals(expectedWrittenBytes, actualWrittenBytes);
assertTrue(bufferBuilder.isFinished());
assertFalse(bufferConsumer.isFinished());
assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes());
actualWrittenBytes = bufferBuilder.finish();
assertEquals(expectedWrittenBytes, actualWrittenBytes);
assertTrue(bufferBuilder.isFinished());
assertFalse(bufferConsumer.isFinished());
assertEquals(expectedWrittenBytes, bufferConsumer.getWrittenBytes());
assertEquals(0, bufferConsumer.build().getSize());
assertTrue(bufferConsumer.isFinished());
}
private static ByteBuffer toByteBuffer(int... data) {
ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES);
byteBuffer.asIntBuffer().put(data);
return byteBuffer;
}
private static void assertContent(BufferConsumer actualConsumer, int... expected) {
assertFalse(actualConsumer.isFinished());
Buffer buffer = actualConsumer.build();
assertFalse(buffer.isRecycled());
assertContent(buffer, expected);
assertEquals(expected.length * Integer.BYTES, buffer.getSize());
buffer.recycleBuffer();
}
private static void assertContent(Buffer actualBuffer, int... expected) {
IntBuffer actualIntBuffer = actualBuffer.getNioBufferReadable().asIntBuffer();
int[] actual = new int[actualIntBuffer.limit()];
actualIntBuffer.get(actual);
assertArrayEquals(expected, actual);
assertEquals(FreeingBufferRecycler.INSTANCE, actualBuffer.getRecycler());
}
private static BufferBuilder createBufferBuilder() {
return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), FreeingBufferRecycler.INSTANCE);
}
}