blob: e6432067dec0ee68bc04c1391c651b0dcf3fdc92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.feed.test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ConcurrentFramePool;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.test.FrameWriterTestUtils;
import org.apache.hyracks.api.test.TestControlledFrameWriter;
import org.apache.hyracks.api.test.TestFrameWriter;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
import org.mockito.Mockito;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
public class InputHandlerTest extends TestCase {
private static final int DEFAULT_FRAME_SIZE = 32768;
private static final int NUM_FRAMES = 128;
private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
private static final String DATAVERSE = "dataverse";
private static final String DATASET = "dataset";
private static final String FEED = "feed";
private static final String NODE_ID = "NodeId";
private static final float DISCARD_ALLOWANCE = 0.15f;
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
private volatile static HyracksDataException cause = null;
public InputHandlerTest(String testName) {
super(testName);
}
public static Test suite() {
return new TestSuite(InputHandlerTest.class);
}
private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
EntityId feedId = new EntityId(FeedConnectionId.FEED_EXTENSION_NAME, DATAVERSE, FEED);
FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0);
return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
}
/*
* Testing the following scenarios
* 01. Positive Frames memory budget with fixed size frames, no spill, no discard.
* 02. Positive Frames memory budget with variable size frames, no spill, no discard.
* 03. Positive Frames memory budget with fixed size frames, with spill, no discard.
* 04. Positive Frames memory budget with variable size frames, with spill, no discard.
* 05. Positive Frames memory budget with fixed size frames, no spill, with discard.
* 06. Positive Frames memory budget with variable size frames, no spill, with discard.
* 07. Positive Frames memory budget with fixed size frames, with spill, with discard.
* 08. Positive Frames memory budget with variable size frames, with spill, with discard.
* 09. 0 Frames memory budget with fixed size frames, with spill, no discard.
* 10. 0 Frames memory budget with variable size frames, with spill, no discard.
* 11. TODO 0 Frames memory budget with fixed size frames, with spill, with discard.
* 12. TODO 0 Frames memory budget with variable size frames, with spill, with discard.
* 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail exception throwing FrameWriter
* 14. TODO Test exception while waiting for subscription
*/
private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget,
float discardFraction) {
FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction);
return fpa;
}
@org.junit.Test
public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
try {
int numRounds = 5;
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
handler.nextFrame(buffer);
Assert.assertEquals(0, handler.getNumProcessedInMemory());
Assert.assertEquals(1, handler.getNumSpilled());
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
int multiplier = random.nextInt(10) + 1;
buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
handler.nextFrame(buffer);
}
// Check that no records were discarded
Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
writer.validate(false);
handler.close();
// Check that nextFrame was called
Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
writer.validate(true);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
} finally {
Assert.assertNull(cause);
}
}
@org.junit.Test
public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
try {
int numRounds = 10;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
handler.nextFrame(frame.getBuffer());
Assert.assertEquals(0, handler.getNumProcessedInMemory());
Assert.assertEquals(1, handler.getNumSpilled());
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
handler.nextFrame(frame.getBuffer());
}
// Check that no records were discarded
Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
writer.validate(false);
handler.close();
// Check that nextFrame was called
Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
writer.validate(true);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
} finally {
Assert.assertNull(cause);
}
}
/*
* Spill = false;
* Discard = true; discard only 5%
* Fixed size frames
*/
@org.junit.Test
public void testMemoryVarSizeFrameWithSpillWithDiscard() {
try {
int numberOfMemoryFrames = 50;
int numberOfSpillFrames = 50;
int notDiscarded = 0;
int totalMinFrames = 0;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3);
ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4);
ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5);
while (true) {
if (totalMinFrames + 1 < numberOfMemoryFrames) {
handler.nextFrame(buffer1);
notDiscarded++;
totalMinFrames++;
} else {
break;
}
if (totalMinFrames + 2 < numberOfMemoryFrames) {
notDiscarded++;
totalMinFrames += 2;
handler.nextFrame(buffer2);
} else {
break;
}
if (totalMinFrames + 3 < numberOfMemoryFrames) {
notDiscarded++;
totalMinFrames += 3;
handler.nextFrame(buffer3);
} else {
break;
}
}
// Now we need to verify that the frame pool memory has been consumed!
Assert.assertTrue(framePool.remaining() < 3);
Assert.assertEquals(0, handler.getNumSpilled());
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals(0, handler.getNumDiscarded());
while (true) {
if (handler.getNumSpilled() < numberOfSpillFrames) {
notDiscarded++;
handler.nextFrame(buffer3);
} else {
break;
}
if (handler.getNumSpilled() < numberOfSpillFrames) {
notDiscarded++;
handler.nextFrame(buffer4);
} else {
break;
}
if (handler.getNumSpilled() < numberOfSpillFrames) {
notDiscarded++;
handler.nextFrame(buffer5);
} else {
break;
}
}
Assert.assertTrue(framePool.remaining() < 3);
Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames);
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals(0, handler.getNumDiscarded());
// We can only discard one frame
double numDiscarded = 0;
boolean nextShouldDiscard =
((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(buffer5);
numDiscarded++;
nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
}
Assert.assertTrue(framePool.remaining() < 3);
Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Next Call should block since we're exceeding the discard allowance
Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler));
if (result.isDone()) {
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
}
// consume memory frames
writer.unfreeze();
result.get();
handler.close();
Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = true;
* Discard = true
* Fixed size frames
*/
@org.junit.Test
public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
try {
int numberOfMemoryFrames = 50;
int numberOfSpillFrames = 50;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
for (int i = 0; i < numberOfMemoryFrames; i++) {
handler.nextFrame(frame.getBuffer());
}
// Now we need to verify that the frame pool memory has been consumed!
Assert.assertEquals(0, framePool.remaining());
Assert.assertEquals(numberOfMemoryFrames, handler.getTotal());
Assert.assertEquals(0, handler.getNumSpilled());
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals(0, handler.getNumDiscarded());
for (int i = 0; i < numberOfSpillFrames; i++) {
handler.nextFrame(frame.getBuffer());
}
Assert.assertEquals(0, framePool.remaining());
Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames, handler.getTotal());
Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals(0, handler.getNumDiscarded());
// We can only discard one frame
double numDiscarded = 0;
boolean nextShouldDiscard =
((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(frame.getBuffer());
numDiscarded++;
nextShouldDiscard = (numDiscarded + 1.0) / (handler.getTotal() + 1.0) <= fpa.getMaxFractionDiscard();
}
Assert.assertEquals(0, framePool.remaining());
Assert.assertEquals((int) (numberOfMemoryFrames + numberOfSpillFrames + numDiscarded), handler.getTotal());
Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
Assert.assertEquals(0, handler.getNumStalled());
Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Next Call should block since we're exceeding the discard allowance
Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
if (result.isDone()) {
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
} else {
Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
}
// consume memory frames
writer.unfreeze();
result.get();
handler.close();
Assert.assertTrue(result.isDone());
Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames + numberOfSpillFrames + 1);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = false;
* Discard = true; discard only 5%
* Fixed size frames
*/
@org.junit.Test
public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
try {
int discardTestFrames = 100;
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
// add NUM_FRAMES times
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
int multiplier = 1;
int numFrames = 0;
// add NUM_FRAMES times
while ((multiplier <= framePool.remaining())) {
numFrames++;
handler.nextFrame(buffer);
multiplier = random.nextInt(10) + 1;
buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
}
// Next call should NOT block but should discard.
double numDiscarded = 0.0;
boolean nextShouldDiscard =
((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(buffer);
numDiscarded++;
nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
}
Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
if (result.isDone()) {
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
} else {
// Check that no records were discarded
assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Check that one frame is spilled
assertEquals(handler.getNumSpilled(), 0);
}
// consume memory frames
writer.unfreeze();
result.get();
handler.close();
Assert.assertEquals(writer.nextFrameCount(), numFrames + 1);
// exit
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = false;
* Discard = true; discard only 5%
* Fixed size frames
*/
@org.junit.Test
public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
try {
int discardTestFrames = 100;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool =
new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
// add NUM_FRAMES times
for (int i = 0; i < discardTestFrames; i++) {
handler.nextFrame(frame.getBuffer());
}
// Next 5 calls call should NOT block but should discard.
double numDiscarded = 0.0;
boolean nextShouldDiscard =
((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(frame.getBuffer());
numDiscarded++;
nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
}
// Next Call should block since we're exceeding the discard allowance
Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
if (result.isDone()) {
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
} else {
// Check that no records were discarded
assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Check that one frame is spilled
assertEquals(handler.getNumSpilled(), 0);
}
// consume memory frames
writer.unfreeze();
result.get();
handler.close();
Assert.assertEquals(writer.nextFrameCount(), discardTestFrames + 1);
// exit
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = true;
* Discard = false;
* Fixed size frames
*/
@org.junit.Test
public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
try {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES; i++) {
handler.nextFrame(frame.getBuffer());
}
// Next call should NOT block. we will do it in a different thread
Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
result.get();
// Check that no records were discarded
assertEquals(handler.getNumDiscarded(), 0);
// Check that one frame is spilled
assertEquals(handler.getNumSpilled(), 1);
// consume memory frames
writer.unfreeze();
handler.close();
Assert.assertEquals(handler.framesOnDisk(), 0);
// exit
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = false;
* Discard = false;
* Fixed size frames
* Very fast next operator
*/
@org.junit.Test
public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
try {
int numRounds = 10;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
handler.nextFrame(frame.getBuffer());
}
// Check that no records were discarded
Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
Assert.assertEquals(handler.getNumSpilled(), 0);
writer.validate(false);
handler.close();
// Check that nextFrame was called
Assert.assertEquals(NUM_FRAMES * numRounds, writer.nextFrameCount());
writer.validate(true);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = false;
* Discard = false;
* Fixed size frames
* Slow next operator
*/
@org.junit.Test
public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
try {
int numRounds = 10;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
TestFrameWriter writer =
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
writer.setNextDuration(1);
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
handler.nextFrame(frame.getBuffer());
}
// Check that no records were discarded
Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
Assert.assertEquals(handler.getNumSpilled(), 0);
// Check that nextFrame was called
writer.validate(false);
handler.close();
Assert.assertEquals(writer.nextFrameCount(), (NUM_FRAMES * numRounds));
writer.validate(true);
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = false
* Discard = false
* VarSizeFrame
*/
public void testMemoryVarSizeFrameNoDiskNoDiscard() {
try {
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
int multiplier = 1;
// add NUM_FRAMES times
while ((multiplier <= framePool.remaining())) {
handler.nextFrame(buffer);
multiplier = random.nextInt(10) + 1;
buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
}
// we can't satisfy the next request
// Next call should block we will do it in a different thread
Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
// Check that the nextFrame didn't return
if (result.isDone()) {
Assert.fail();
}
// Check that no records were discarded
assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
assertEquals(handler.getNumSpilled(), 0);
// Check that number of stalled is not greater than 1
Assert.assertTrue(handler.getNumStalled() <= 1);
writer.unfreeze();
result.get();
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
/*
* Spill = true;
* Discard = false;
* Variable size frames
*/
@org.junit.Test
public void testMemoryVarSizeFrameWithSpillNoDiscard() {
for (int k = 0; k < 1000; k++) {
try {
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
FeedPolicyAccessor fpa =
createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
int multiplier = 1;
// add NUM_FRAMES times
while ((multiplier <= framePool.remaining())) {
handler.nextFrame(buffer);
multiplier = random.nextInt(10) + 1;
buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
}
// Next call should Not block. we will do it in a different thread
Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
result.get();
// Check that no records were discarded
assertEquals(handler.getNumDiscarded(), 0);
// Check that one frame is spilled
assertEquals(handler.getNumSpilled(), 1);
int numOfBuffersInMemory = handler.getInternalBuffer().size();
// consume memory frames
while (numOfBuffersInMemory > 0) {
writer.kick();
numOfBuffersInMemory--;
}
// There should be 1 frame on disk
Assert.assertEquals(1, handler.framesOnDisk());
writer.unfreeze();
handler.close();
Assert.assertEquals(0, handler.framesOnDisk());
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
}
Assert.assertNull(cause);
}
/*
* Spill = false;
* Discard = false;
* Fixed size frames
*/
@org.junit.Test
public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
try {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
// add NUM_FRAMES times
for (int i = 0; i < NUM_FRAMES; i++) {
handler.nextFrame(frame.getBuffer());
}
// Next call should block we will do it in a different thread
Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
// Check that the nextFrame didn't return
if (result.isDone()) {
Assert.fail();
} else {
// Check that no records were discarded
Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
Assert.assertEquals(handler.getNumSpilled(), 0);
// Check that no records were discarded
// Check that the inputHandler subscribed to the framePool
// Check that number of stalled is not greater than 1
Assert.assertTrue(handler.getNumStalled() <= 1);
writer.kick();
}
result.get();
writer.unfreeze();
handler.close();
} catch (Throwable th) {
th.printStackTrace();
Assert.fail();
}
Assert.assertNull(cause);
}
private class Pusher implements Runnable {
private final ByteBuffer buffer;
private final IFrameWriter writer;
public Pusher(ByteBuffer buffer, IFrameWriter writer) {
this.buffer = buffer;
this.writer = writer;
}
@Override
public void run() {
try {
writer.nextFrame(buffer);
} catch (HyracksDataException e) {
e.printStackTrace();
cause = e;
}
}
}
}