blob: 1f3a82687a7cb58cbedd83234badea991284b1a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.test.time;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.LoggingUtils;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.test.time.util.AlarmProducer;
import org.apache.reef.wake.test.time.util.EventRecorder;
import org.apache.reef.wake.time.Time;
import org.apache.reef.wake.time.event.Alarm;
import org.apache.reef.wake.time.runtime.LogicalTimer;
import org.apache.reef.wake.time.runtime.RealTimer;
import org.apache.reef.wake.time.runtime.RuntimeClock;
import org.apache.reef.wake.time.runtime.Timer;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
/**
* Tests for RuntimeClock event loop.
*/
public class RuntimeClockTest {
private static final Tang TANG = Tang.Factory.getTang();
private final Random rand = new Random();
/**
* Create new RuntimeClock object injected with the given timer.
*
* @param timerClass Timer to use inside the RuntimeClock. Must implement the Timer interface.
* @return A new instance of the RuntimeClock, instrumented with the given timer.
* @throws InjectionException On configuration error.
*/
private static RuntimeClock buildClock(
final Class<? extends Timer> timerClass) throws InjectionException {
final Configuration clockConfig = TANG.newConfigurationBuilder()
.bind(Timer.class, timerClass)
.build();
return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class);
}
/**
* Create 10 threads to produce 40 alarms at random intervals
* and check if all alarms get processed.
* @throws Exception ThreadPoolStage can throw anything.
*/
@Test
public void testClock() throws Exception {
LoggingUtils.setLoggingLevel(Level.FINEST);
try (RuntimeClock clock = buildClock(RealTimer.class)) {
new Thread(clock).start();
final CountDownLatch eventCountLatch = new CountDownLatch(40);
final AlarmProducer alarmProducer = new AlarmProducer(clock, eventCountLatch) {
@Override
public int getOffset() {
return randomOffsetUniform(rand, 1, 100);
}
};
try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) {
stage.onNext(null);
Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
}
}
}
@Test
public void testAlarmRegistrationRaceConditions() throws Exception {
LoggingUtils.setLoggingLevel(Level.FINEST);
try (RuntimeClock clock = buildClock(RealTimer.class)) {
new Thread(clock).start();
final EventRecorder earlierAlarmRecorder = new EventRecorder();
final EventRecorder laterAlarmRecorder = new EventRecorder();
// Schedule an Alarm that's far in the future
clock.scheduleAlarm(5000, laterAlarmRecorder);
Thread.sleep(1000);
// By now, RuntimeClockImpl should be in a timed wait() for 5000 ms.
// Scheduler an Alarm that should fire before the existing Alarm:
clock.scheduleAlarm(2000, earlierAlarmRecorder);
Thread.sleep(1000);
// The earlier Alarm shouldn't have fired yet (we've only slept 1/2 time):
Assert.assertEquals(0, earlierAlarmRecorder.getEventCount());
Thread.sleep(1500);
// The earlier Alarm should have fired, since 3500 > 2000 ms have passed:
Assert.assertEquals(1, earlierAlarmRecorder.getEventCount());
// And the later Alarm shouldn't have fired yet:
Assert.assertEquals(0, laterAlarmRecorder.getEventCount());
Thread.sleep(2500);
// The later Alarm should have fired, since 6000 > 5000 ms have passed:
Assert.assertEquals(1, laterAlarmRecorder.getEventCount());
}
}
@Test
public void testMultipleCloseCalls() throws Exception {
LoggingUtils.setLoggingLevel(Level.FINEST);
final int numThreads = 3;
final CountDownLatch eventCountLatch = new CountDownLatch(numThreads);
try (RuntimeClock clock = buildClock(RealTimer.class)) {
final EventHandler<Alarm> handler = new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm value) {
clock.close();
eventCountLatch.countDown();
}
};
new Thread(clock).start();
try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) {
for (int i = 0; i < numThreads; ++i) {
stage.onNext(null);
}
Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
}
}
}
@Test
public void testSimultaneousAlarms() throws Exception {
LoggingUtils.setLoggingLevel(Level.FINEST);
final int expectedEvent = 2;
final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent);
try (RuntimeClock clock = buildClock(LogicalTimer.class)) {
new Thread(clock).start();
final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
clock.scheduleAlarm(500, alarmRecorder);
clock.scheduleAlarm(500, alarmRecorder);
eventCountLatch.await(10, TimeUnit.SECONDS);
Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount());
}
}
@Test
public void testAlarmOrder() throws Exception {
LoggingUtils.setLoggingLevel(Level.FINEST);
final int numAlarms = 10;
final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
final long[] expected = new long[numAlarms];
try (RuntimeClock clock = buildClock(RealTimer.class)) {
new Thread(clock).start();
for (int i = 0; i < numAlarms; ++i) {
final Time event = clock.scheduleAlarm(i * 100, alarmRecorder);
expected[i] = event.getTimestamp();
}
}
eventCountLatch.await(10, TimeUnit.SECONDS);
int i = 0;
final long[] actual = new long[numAlarms];
for (final Time event : alarmRecorder.getEvents()) {
actual[i++] = event.getTimestamp();
}
Assert.assertEquals(
"Number of alarms does not match the expected count",
numAlarms, alarmRecorder.getEventCount());
Assert.assertArrayEquals("Alarms processed in the wrong order", expected, actual);
}
/**
* Test graceful shutdown of the event loop.
* Schedule two events and close the clock. Make sure that no events occur soon after
* closing the alarm and both of them occur at the scheduled time. Check that the clock
* is closed after that.
* @throws InjectionException Error building a runtime clock object.
* @throws InterruptedException Sleep interrupted.
*/
@Test
public void testGracefulClose() throws InjectionException, InterruptedException {
LoggingUtils.setLoggingLevel(Level.FINEST);
final int numAlarms = 2;
final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
final RuntimeClock clock = buildClock(RealTimer.class);
new Thread(clock).start();
clock.scheduleAlarm(100, alarmRecorder);
clock.scheduleAlarm(101, alarmRecorder);
clock.close();
Assert.assertFalse("Clock cannot be idle yet", clock.isIdle());
Assert.assertTrue("Clock must be in closed state", clock.isClosed());
Thread.sleep(10);
Assert.assertTrue(
"No events should occur immediately after the graceful shutdown",
alarmRecorder.getEvents().isEmpty());
Thread.sleep(200);
final List<Time> events = alarmRecorder.getEvents();
Assert.assertEquals("Expected events on graceful shutdown", events.size(), numAlarms);
Assert.assertTrue("No client alarms should be scheduled at this time", clock.isIdle());
Assert.assertTrue("Clock must be in closed state", clock.isClosed());
}
/**
* Test forceful shutdown of the event loop. Schedule two events and close the clock.
* Make sure that no events occur after that and the clock is in closed and idle state.
* @throws InjectionException Error building a runtime clock object.
* @throws InterruptedException Sleep interrupted.
*/
@Test
public void testForcefulStop() throws InjectionException, InterruptedException {
LoggingUtils.setLoggingLevel(Level.FINEST);
final int numAlarms = 2;
final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
final RuntimeClock clock = buildClock(RealTimer.class);
new Thread(clock).start();
clock.scheduleAlarm(100, alarmRecorder);
clock.scheduleAlarm(101, alarmRecorder);
clock.stop();
Assert.assertTrue("Clock must be idle already", clock.isIdle());
Assert.assertTrue("Clock must be in closed state", clock.isClosed());
Thread.sleep(200);
Assert.assertTrue("No events should be in the schedule", alarmRecorder.getEvents().isEmpty());
}
}