blob: 6f5aa2f8c26ea714110d2fdb6054197918eb1b44 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.TezInputContext;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class TestInputReadyTracker {
private static final long SLEEP_TIME = 500l;
@Test(timeout = 5000)
public void testWithoutGrouping1() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();
ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
// Test for simple inputs
List<Input> requestList;
long startTime = 0l;
long readyTime = 0l;
requestList = new ArrayList<Input>();
requestList.add(input1);
requestList.add(input2);
Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
assertTrue(input1.isReady);
assertFalse(input2.isReady);
assertEquals(input1, readyInput);
startTime = System.currentTimeMillis();
setDelayedInputReady(input2);
inputReadyTracker.waitForAllInputsReady(requestList);
readyTime = System.currentTimeMillis();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
assertTrue(input2.isReady);
assertTrue(readyTime >= startTime + SLEEP_TIME);
assertTrue(input1.isReady);
}
@Test(timeout = 5000)
public void testWithoutGrouping2() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();
ControlledReadyInputForTest input1 = new ControlledReadyInputForTest(inputReadyTracker);
ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
ControlledReadyInputForTest input3 = new ControlledReadyInputForTest(inputReadyTracker);
// Test for simple inputs
List<Input> requestList;
long startTime = 0l;
long readyTime = 0l;
requestList = new ArrayList<Input>();
requestList.add(input1);
requestList.add(input2);
requestList.add(input3);
startTime = System.currentTimeMillis();
setDelayedInputReady(input2);
Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
assertEquals(input2, readyInput);
readyTime = System.currentTimeMillis();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
assertTrue(input2.isReady);
assertTrue(readyTime >= startTime + SLEEP_TIME);
assertFalse(input1.isReady);
assertFalse(input3.isReady);
requestList = new ArrayList<Input>();
requestList.add(input1);
requestList.add(input3);
startTime = System.currentTimeMillis();
setDelayedInputReady(input1);
readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
assertEquals(input1, readyInput);
readyTime = System.currentTimeMillis();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
assertTrue(input1.isReady);
assertTrue(readyTime >= startTime + SLEEP_TIME);
assertTrue(input2.isReady);
assertFalse(input3.isReady);
requestList = new ArrayList<Input>();
requestList.add(input3);
startTime = System.currentTimeMillis();
setDelayedInputReady(input3);
readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
assertEquals(input3, readyInput);
readyTime = System.currentTimeMillis();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
assertTrue(input3.isReady);
assertTrue(readyTime >= startTime + SLEEP_TIME);
assertTrue(input1.isReady);
assertTrue(input2.isReady);
}
@Test(timeout = 5000)
public void testGrouped() throws InterruptedException {
InputReadyTracker inputReadyTracker = new InputReadyTracker();
ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
ImmediatelyReadyInputForTest input3 = new ImmediatelyReadyInputForTest(inputReadyTracker);
ControlledReadyInputForTest input4 = new ControlledReadyInputForTest(inputReadyTracker);
AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest();
AllMergedInputForTest group2 = new AllMergedInputForTest();
List<Input> group1Inputs = new ArrayList<Input>();
group1Inputs.add(input1);
group1Inputs.add(input2);
List<Input> group2Inputs = new ArrayList<Input>();
group2Inputs.add(input3);
group2Inputs.add(input4);
group1.initialize(group1Inputs);
group2.initialize(group2Inputs);
// Register groups with tracker
List<MergedLogicalInput> groups = Lists.newArrayList(group1, group2);
inputReadyTracker.setGroupedInputs(groups);
// Test for simple inputs
List<Input> requestList;
long startTime = 0l;
long readyTime = 0l;
requestList = new ArrayList<Input>();
requestList.add(group1);
Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
assertTrue(group1.isReady);
assertTrue(input1.isReady);
assertFalse(input2.isReady);
assertEquals(group1, readyInput);
requestList = new ArrayList<Input>();
requestList.add(group2);
startTime = System.currentTimeMillis();
setDelayedInputReady(input4);
inputReadyTracker.waitForAllInputsReady(requestList);
readyTime = System.currentTimeMillis();
// Should have moved into ready state - only happens when the setReady function is invoked.
// Ensure the method returned only after the specific Input was told it is ready
assertTrue(group2.isReady);
assertTrue(input3.isReady);
assertTrue(input4.isReady);
assertTrue(readyTime >= startTime + SLEEP_TIME);
}
private long setDelayedInputReady(final ControlledReadyInputForTest input) {
long startTime = System.currentTimeMillis();
new Thread() {
public void run() {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
input.setInputIsReady();
}
}.start();
return startTime;
}
private static class ImmediatelyReadyInputForTest implements LogicalInput {
private volatile boolean isReady = false;
ImmediatelyReadyInputForTest(InputReadyTracker inputReadyTracker) {
isReady = true;
inputReadyTracker.setInputIsReady(this);
}
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
return null;
}
@Override
public void start() throws Exception {
}
@Override
public Reader getReader() throws Exception {
return null;
}
@Override
public void handleEvents(List<Event> inputEvents) throws Exception {
}
@Override
public List<Event> close() throws Exception {
return null;
}
@Override
public void setNumPhysicalInputs(int numInputs) {
}
}
private static class ControlledReadyInputForTest implements LogicalInput {
private volatile boolean isReady = false;
private InputReadyTracker inputReadyTracker;
ControlledReadyInputForTest(InputReadyTracker inputReadyTracker) {
this.inputReadyTracker = inputReadyTracker;
}
@Override
public List<Event> initialize(TezInputContext inputContext) throws Exception {
return null;
}
@Override
public void start() throws Exception {
}
@Override
public Reader getReader() throws Exception {
return null;
}
@Override
public void handleEvents(List<Event> inputEvents) throws Exception {
}
@Override
public List<Event> close() throws Exception {
return null;
}
@Override
public void setNumPhysicalInputs(int numInputs) {
}
// Used by the test to control when this input will be ready
public void setInputIsReady() {
isReady = true;
inputReadyTracker.setInputIsReady(this);
}
}
private static class AnyOneMergedInputForTest extends MergedLogicalInput {
private volatile boolean isReady = false;
@Override
public Reader getReader() throws Exception {
return null;
}
@Override
public void setConstituentInputIsReady(Input input) {
isReady = true;
informInputReady();
}
}
private static class AllMergedInputForTest extends MergedLogicalInput {
private volatile boolean isReady = false;
private Set<Input> readyInputs = Sets.newHashSet();
@Override
public Reader getReader() throws Exception {
return null;
}
@Override
public void setConstituentInputIsReady(Input input) {
synchronized (this) {
readyInputs.add(input);
}
if (readyInputs.size() == getInputs().size()) {
isReady = true;
informInputReady();
}
}
}
}