blob: e44b3732f775429b824769771e85d18709744b2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Basic subpartition behaviour tests. */
public abstract class SubpartitionTestBase extends TestLogger {
/** Return the subpartition to be tested. */
abstract ResultSubpartition createSubpartition() throws Exception;
/** Return the subpartition to be used for tests where write calls should fail. */
abstract ResultSubpartition createFailingWritesSubpartition() throws Exception;
// ------------------------------------------------------------------------
@Test
public void createReaderAfterDispose() throws Exception {
final ResultSubpartition subpartition = createSubpartition();
subpartition.release();
try {
subpartition.createReadView(() -> {});
fail("expected an exception");
} catch (IllegalStateException e) {
// expected
}
}
@Test
public void testAddAfterFinish() throws Exception {
final ResultSubpartition subpartition = createSubpartition();
try {
subpartition.finish();
assertEquals(1, subpartition.getTotalNumberOfBuffersUnsafe());
assertEquals(0, subpartition.getBuffersInBacklogUnsafe());
BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(4096);
assertEquals(-1, subpartition.add(bufferConsumer));
assertTrue(bufferConsumer.isRecycled());
assertEquals(1, subpartition.getTotalNumberOfBuffersUnsafe());
assertEquals(0, subpartition.getBuffersInBacklogUnsafe());
} finally {
if (subpartition != null) {
subpartition.release();
}
}
}
@Test
public void testAddAfterRelease() throws Exception {
final ResultSubpartition subpartition = createSubpartition();
try {
subpartition.release();
BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(4096);
assertEquals(-1, subpartition.add(bufferConsumer));
assertTrue(bufferConsumer.isRecycled());
} finally {
if (subpartition != null) {
subpartition.release();
}
}
}
@Test
public void testReleasingReaderDoesNotReleasePartition() throws Exception {
final ResultSubpartition partition = createSubpartition();
partition.add(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
partition.finish();
final ResultSubpartitionView reader =
partition.createReadView(new NoOpBufferAvailablityListener());
assertFalse(partition.isReleased());
assertFalse(reader.isReleased());
reader.releaseAllResources();
assertTrue(reader.isReleased());
assertFalse(partition.isReleased());
partition.release();
}
@Test
public void testReleaseIsIdempotent() throws Exception {
final ResultSubpartition partition = createSubpartition();
partition.add(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
partition.finish();
partition.release();
partition.release();
partition.release();
}
@Test
public void testReadAfterDispose() throws Exception {
final ResultSubpartition partition = createSubpartition();
partition.add(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
partition.finish();
final ResultSubpartitionView reader =
partition.createReadView(new NoOpBufferAvailablityListener());
reader.releaseAllResources();
// the reader must not throw an exception
reader.getNextBuffer();
// ideally, we want this to be null, but the pipelined partition still serves data
// after dispose (which is unintuitive, but does not affect correctness)
// assertNull(reader.getNextBuffer());
}
@Test
public void testRecycleBufferAndConsumerOnFailure() throws Exception {
final ResultSubpartition subpartition = createFailingWritesSubpartition();
try {
final BufferConsumer consumer =
BufferBuilderTestUtils.createFilledFinishedBufferConsumer(100);
try {
subpartition.add(consumer);
subpartition.flush();
fail("should fail with an exception");
} catch (Exception ignored) {
// expected
}
assertTrue(consumer.isRecycled());
} finally {
subpartition.release();
}
}
}