blob: 0514f8c9384520b0a4db4a94b58ad19ecbf35f83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
import org.junit.Assert._
import org.junit.Test
class TestExponentialSleepStrategy {
@Test def testGetNextDelayReturnsIncrementalDelay {
val strategy = new ExponentialSleepStrategy
assertEquals(100, strategy.getNextDelay(0))
assertEquals(200, strategy.getNextDelay(100))
assertEquals(400, strategy.getNextDelay(200))
assertEquals(800, strategy.getNextDelay(400))
}
@Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached {
val strategy = new ExponentialSleepStrategy
assertEquals(10000, strategy.getNextDelay(6400))
assertEquals(10000, strategy.getNextDelay(10000))
}
@Test def testSleepStrategyIsConfigurable {
val strategy = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
assertEquals(10, strategy.getNextDelay(0))
assertEquals(30, strategy.getNextDelay(10))
assertEquals(90, strategy.getNextDelay(30))
assertEquals(270, strategy.getNextDelay(90))
}
@Test def testResetToInitialDelay {
val strategy = new ExponentialSleepStrategy
val loop = strategy.startLoop.asInstanceOf[ExponentialSleepStrategy#RetryLoopState]
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(100, loop.previousDelay)
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(400, loop.previousDelay)
loop.reset
loop.previousDelay = strategy.getNextDelay(loop.previousDelay)
assertEquals(100, loop.previousDelay)
}
@Test def testRetryWithoutException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
val result = strategy.run(
loop => {
loopObject = loop
iterations += 1
if (iterations == 3) loop.done
iterations
},
(exception, loop) => throw exception
)
assertEquals(Some(3), result)
assertEquals(3, iterations)
assertEquals(2, loopObject.sleepCount)
}
@Test def testRetryWithException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
strategy.run(
loop => { throw new IllegalArgumentException("boom") },
(exception, loop) => {
assertEquals("boom", exception.getMessage)
loopObject = loop
iterations += 1
if (iterations == 3) loop.done
}
)
assertEquals(3, iterations)
assertEquals(2, loopObject.sleepCount)
}
@Test def testReThrowingException {
val strategy = new ExponentialSleepStrategy(initialDelayMs = 1)
var iterations = 0
var loopObject: RetryLoop = null
try {
strategy.run(
loop => {
loopObject = loop
iterations += 1
throw new IllegalArgumentException("boom")
},
(exception, loop) => throw exception
)
fail("expected exception to be thrown")
} catch {
case e: IllegalArgumentException => assertEquals("boom", e.getMessage)
}
assertEquals(1, iterations)
assertEquals(0, loopObject.sleepCount)
}
def interruptedThread(operationStartLatch: CountDownLatch, operation: => Unit): Option[Throwable] = {
var exception: Option[Throwable] = None
val interruptee = new Thread(new Runnable {
def run {
try { operation } catch { case e: Exception => exception = Some(e) }
}
})
interruptee.start()
assertTrue("Operation start latch timed out.", operationStartLatch.await(1, TimeUnit.MINUTES))
interruptee.interrupt()
interruptee.join()
exception
}
@Test def testThreadInterruptInRetryLoop {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
val exception = interruptedThread(
loopStartLatch,
strategy.run(
loop => { loopObject = loop; loopStartLatch.countDown(); iterations += 1; },
(exception, loop) => throw exception
)
)
// The interrupt can cause either,
// 1. the retry loop to exit with None result, no exception and isDone == false, or
// 2. the sleeping thread (during the back-off) to throw an InterruptedException.
assertTrue((!loopObject.isDone && exception.isEmpty) ||
exception.get.getClass.equals(classOf[InterruptedException]))
}
@Test def testThreadInterruptInOperationSleep {
val strategy = new ExponentialSleepStrategy
var iterations = 0
var loopObject: RetryLoop = null
val loopStartLatch = new CountDownLatch(1) // ensures that we've executed the operation at least once
val exception = interruptedThread(
loopStartLatch,
strategy.run(
loop => { loopObject = loop; iterations += 1; loopStartLatch.countDown(); Thread.sleep(1000) },
(exception, loop) => throw exception
)
)
// The interrupt can cause either,
// 1. the retry loop to exit with None result, no exception and isDone == false, or
// 2. the sleeping thread (in the operation or during the back-off) to throw an InterruptedException.
assertTrue((!loopObject.isDone && exception.isEmpty) ||
exception.get.getClass.equals(classOf[InterruptedException]))
}
}