blob: 7d55fe1c13ca8420967586ae6236a714475b6b55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.retry;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test;
public class TestFailoverProxy {
public static class FlipFlopProxyProvider<T> implements FailoverProxyProvider<T> {
private Class<T> iface;
private T currentlyActive;
private T impl1;
private T impl2;
private int failoversOccurred = 0;
public FlipFlopProxyProvider(Class<T> iface, T activeImpl,
T standbyImpl) {
this.iface = iface;
this.impl1 = activeImpl;
this.impl2 = standbyImpl;
currentlyActive = impl1;
}
@Override
public ProxyInfo<T> getProxy() {
return new ProxyInfo<T>(currentlyActive, currentlyActive.toString());
}
@Override
public synchronized void performFailover(Object currentProxy) {
currentlyActive = impl1 == currentProxy ? impl2 : impl1;
failoversOccurred++;
}
@Override
public Class<T> getInterface() {
return iface;
}
@Override
public void close() throws IOException {
// Nothing to do.
}
public int getFailoversOccurred() {
return failoversOccurred;
}
}
public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) {
return failovers < 1 ? RetryAction.FAILOVER_AND_RETRY : RetryAction.FAIL;
}
}
private static FlipFlopProxyProvider<UnreliableInterface>
newFlipFlopProxyProvider() {
return new FlipFlopProxyProvider<UnreliableInterface>(
UnreliableInterface.class,
new UnreliableImplementation("impl1"),
new UnreliableImplementation("impl2"));
}
private static FlipFlopProxyProvider<UnreliableInterface>
newFlipFlopProxyProvider(TypeOfExceptionToFailWith t1,
TypeOfExceptionToFailWith t2) {
return new FlipFlopProxyProvider<UnreliableInterface>(
UnreliableInterface.class,
new UnreliableImplementation("impl1", t1),
new UnreliableImplementation("impl2", t2));
}
@Test
public void testSuccedsOnceThenFailOver() throws UnreliableException,
IOException, StandbyException {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class, newFlipFlopProxyProvider(),
new FailOverOnceOnAnyExceptionPolicy());
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
try {
unreliable.succeedsOnceThenFailsReturningString();
fail("should not have succeeded more than twice");
} catch (UnreliableException e) {
// expected
}
}
@Test
public void testSucceedsTenTimesThenFailOver() throws UnreliableException,
IOException, StandbyException {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class,
newFlipFlopProxyProvider(),
new FailOverOnceOnAnyExceptionPolicy());
for (int i = 0; i < 10; i++) {
assertEquals("impl1", unreliable.succeedsTenTimesThenFailsReturningString());
}
assertEquals("impl2", unreliable.succeedsTenTimesThenFailsReturningString());
}
@Test
public void testNeverFailOver() throws UnreliableException,
IOException, StandbyException {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class,
newFlipFlopProxyProvider(),
RetryPolicies.TRY_ONCE_THEN_FAIL);
unreliable.succeedsOnceThenFailsReturningString();
try {
unreliable.succeedsOnceThenFailsReturningString();
fail("should not have succeeded twice");
} catch (UnreliableException e) {
assertEquals("impl1", e.getMessage());
}
}
@Test
public void testFailoverOnStandbyException()
throws UnreliableException, IOException, StandbyException {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class,
newFlipFlopProxyProvider(),
RetryPolicies.failoverOnNetworkException(1));
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
try {
unreliable.succeedsOnceThenFailsReturningString();
fail("should not have succeeded twice");
} catch (UnreliableException e) {
// Make sure there was no failover on normal exception.
assertEquals("impl1", e.getMessage());
}
unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class,
newFlipFlopProxyProvider(
TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
RetryPolicies.failoverOnNetworkException(1));
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
// Make sure we fail over since the first implementation threw a StandbyException
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
}
@Test
public void testFailoverOnNetworkExceptionIdempotentOperation()
throws UnreliableException, IOException, StandbyException {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class,
newFlipFlopProxyProvider(
TypeOfExceptionToFailWith.IO_EXCEPTION,
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
RetryPolicies.failoverOnNetworkException(1));
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
try {
unreliable.succeedsOnceThenFailsReturningString();
fail("should not have succeeded twice");
} catch (IOException e) {
// Make sure we *don't* fail over since the first implementation threw an
// IOException and this method is not idempotent
assertEquals("impl1", e.getMessage());
}
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
// Make sure we fail over since the first implementation threw an
// IOException and this method is idempotent.
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
}
/**
* Test that if a non-idempotent void function is called, and there is an exception,
* the exception is properly propagated
*/
@Test
public void testExceptionPropagatedForNonIdempotentVoid() throws Exception {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class,
newFlipFlopProxyProvider(
TypeOfExceptionToFailWith.IO_EXCEPTION,
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
RetryPolicies.failoverOnNetworkException(1));
try {
unreliable.nonIdempotentVoidFailsIfIdentifierDoesntMatch("impl2");
fail("did not throw an exception");
} catch (Exception e) {
}
}
private static class SynchronizedUnreliableImplementation extends UnreliableImplementation {
private CountDownLatch methodLatch;
public SynchronizedUnreliableImplementation(String identifier,
TypeOfExceptionToFailWith exceptionToFailWith, int threadCount) {
super(identifier, exceptionToFailWith);
methodLatch = new CountDownLatch(threadCount);
}
@Override
public String failsIfIdentifierDoesntMatch(String identifier)
throws UnreliableException, StandbyException, IOException {
// Wait until all threads are trying to invoke this method
methodLatch.countDown();
try {
methodLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.failsIfIdentifierDoesntMatch(identifier);
}
}
private static class ConcurrentMethodThread extends Thread {
private UnreliableInterface unreliable;
public String result;
public ConcurrentMethodThread(UnreliableInterface unreliable) {
this.unreliable = unreliable;
}
@Override
public void run() {
try {
result = unreliable.failsIfIdentifierDoesntMatch("impl2");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* Test that concurrent failed method invocations only result in a single
* failover.
*/
@Test
public void testConcurrentMethodFailures() throws InterruptedException {
FlipFlopProxyProvider<UnreliableInterface> proxyProvider
= new FlipFlopProxyProvider<UnreliableInterface>(
UnreliableInterface.class,
new SynchronizedUnreliableImplementation("impl1",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
2),
new UnreliableImplementation("impl2",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class, proxyProvider,
RetryPolicies.failoverOnNetworkException(10));
ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable);
ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable);
t1.start();
t2.start();
t1.join();
t2.join();
assertEquals("impl2", t1.result);
assertEquals("impl2", t2.result);
assertEquals(1, proxyProvider.getFailoversOccurred());
}
/**
* Ensure that when all configured services are throwing StandbyException
* that we fail over back and forth between them until one is no longer
* throwing StandbyException.
*/
@Test
public void testFailoverBetweenMultipleStandbys()
throws UnreliableException, StandbyException, IOException {
final long millisToSleep = 10000;
final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
FlipFlopProxyProvider<UnreliableInterface> proxyProvider
= new FlipFlopProxyProvider<UnreliableInterface>(
UnreliableInterface.class,
impl1,
new UnreliableImplementation("impl2",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class, proxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
new Thread() {
@Override
public void run() {
ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep);
impl1.setIdentifier("renamed-impl1");
}
}.start();
String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1");
assertEquals("renamed-impl1", result);
}
/**
* Ensure that normal IO exceptions don't result in a failover.
*/
@Test
public void testExpectedIOException() {
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
UnreliableInterface.class,
newFlipFlopProxyProvider(
TypeOfExceptionToFailWith.REMOTE_EXCEPTION,
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
try {
unreliable.failsIfIdentifierDoesntMatch("no-such-identifier");
fail("Should have thrown *some* exception");
} catch (Exception e) {
assertTrue("Expected IOE but got " + e.getClass(),
e instanceof IOException);
}
}
}