blob: 69cb12e423ca82295aee40f6b76c3b428b622d87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.rpc;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.OngoingStubbing;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ServiceClientImplTest
{
private static final String SERVICE_NAME = "test-service";
private static final ServiceLocation SERVER1 = new ServiceLocation("example.com", -1, 8888, "/q");
private static final ServiceLocation SERVER2 = new ServiceLocation("example.com", -1, 9999, "/q");
private static final ServiceLocation SERVER3 = new ServiceLocation("example.com", -1, 1111, "/q");
private static final ServiceLocation SERVER4 = new ServiceLocation("example.com", -1, 2222, "/q");
private static final ServiceLocation SERVER5 = new ServiceLocation("example.com", -1, 3333, "/q");
private static final ServiceLocation SERVER6 = new ServiceLocation("mixed.com", 201, 111, "/q");
private static final ServiceLocation SERVER7 = new ServiceLocation("mixed.com", 203, 222, "/q");
private ScheduledExecutorService exec;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock
private HttpClient httpClient;
@Mock
private ServiceLocator serviceLocator;
private ServiceClient serviceClient;
@Before
public void setUp()
{
exec = new NoDelayScheduledExecutorService(Execs.directExecutor());
}
@After
public void tearDown() throws Exception
{
exec.shutdownNow();
if (!exec.awaitTermination(30, TimeUnit.SECONDS)) {
throw new ISE("Unable to shutdown executor in time");
}
}
@Test
public void test_request_ok() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// OK response from SERVER1.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1).thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_serverError()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Error response from SERVER1.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(errorResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, null, "oh no"));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(HttpResponseException.class));
final HttpResponseException httpResponseException = (HttpResponseException) e.getCause();
Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpResponseException.getResponse().getStatus());
Assert.assertEquals("oh no", httpResponseException.getResponse().getContent());
}
@Test
public void test_request_serverErrorRetry() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Error response from SERVER1, then OK response.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(errorResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, null, "oh no"))
.thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_ioError()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// IOException when contacting SERVER1.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(Futures.immediateFailedFuture(new IOException("oh no")));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString(
"Service [test-service] request [GET https://example.com:8888/q/foo] encountered exception on attempt #2"
)
)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("oh no"))
);
}
@Test
public void test_request_ioErrorRetry() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// IOException when contacting SERVER1, then OK response.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(Futures.immediateFailedFuture(new IOException("oh no")))
.thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_nullResponseFromClient()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Null response when contacting SERVER1. (HttpClient does this if an exception is encountered during processing.)
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1).thenReturn(Futures.immediateFuture(null));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString(
"Service [test-service] request [GET https://example.com:8888/q/foo] encountered exception on attempt #2"
)
)
);
}
@Test
public void test_request_nullResponseFromClientRetry() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Null response when contacting SERVER1. (HttpClient does this if an exception is encountered during processing.)
// Then, OK response.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(Futures.immediateFuture(null))
.thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_followRedirect() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Redirect from SERVER1 -> SERVER2.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER2).thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_followRedirect_mixedPorts() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Redirect from SERVER6 -> SERVER7.
stubLocatorCall(locations(SERVER6, SERVER7));
expectHttpCall(requestBuilder, SERVER6)
.thenReturn(redirectResponse(requestBuilder.build(SERVER7).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER7).thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_tooLongRedirectChain()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Redirect chain longer than max length.
stubLocatorCall(locations(SERVER1, SERVER2, SERVER3, SERVER4, SERVER5));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER2)
.thenReturn(redirectResponse(requestBuilder.build(SERVER3).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER3)
.thenReturn(redirectResponse(requestBuilder.build(SERVER4).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER4)
.thenReturn(redirectResponse(requestBuilder.build(SERVER5).getUrl().toString()));
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects"))
);
}
@Test
public void test_request_tooLongRedirectChainRetry() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Redirect chain longer than max length. Can be followed across retries.
stubLocatorCall(locations(SERVER1, SERVER2, SERVER3, SERVER4, SERVER5));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER2)
.thenReturn(redirectResponse(requestBuilder.build(SERVER3).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER3)
.thenReturn(redirectResponse(requestBuilder.build(SERVER4).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER4)
.thenReturn(redirectResponse(requestBuilder.build(SERVER5).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER5)
.thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_selfRedirectLoop()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Endless self-redirects.
stubLocatorCall(locations(SERVER1));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER1).getUrl().toString()));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(10).build());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects"))
);
}
@Test
public void test_request_twoServerRedirectLoop()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Endless redirects between the same two servers.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString()));
expectHttpCall(requestBuilder, SERVER2)
.thenReturn(redirectResponse(requestBuilder.build(SERVER1).getUrl().toString()));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(10).build());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("issued too many redirects"))
);
}
@Test
public void test_request_redirectInvalid()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Endless self-redirects.
stubLocatorCall(locations(SERVER1));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse("invalid-url"));
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString("redirected to invalid URL [invalid-url]"))
);
}
@Test
public void test_request_redirectNil()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Endless self-redirects.
stubLocatorCall(locations(SERVER1));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(errorResponse(HttpResponseStatus.TEMPORARY_REDIRECT, null, null));
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("redirected to invalid URL [null]"))
);
}
@Test
public void test_request_dontFollowRedirectToUnknownServer()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Redirect from SERVER1 -> SERVER2, but SERVER2 is unknown.
stubLocatorCall(locations(SERVER1));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(redirectResponse(requestBuilder.build(SERVER2).getUrl().toString()));
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"issued redirect to unknown URL [https://example.com:9999/q/foo]"))
);
}
@Test
public void test_request_serviceUnavailable()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Service unavailable.
stubLocatorCall(locations());
serviceClient = makeServiceClient(StandardRetryPolicy.noRetries());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Service [test-service] is not available"))
);
}
@Test
public void test_request_serviceUnavailableRetry() throws Exception
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
final ImmutableMap<String, String> expectedResponseObject = ImmutableMap.of("foo", "bar");
// Service unavailable at first, then available.
Mockito.when(serviceLocator.locate())
.thenReturn(Futures.immediateFuture(locations()))
.thenReturn(Futures.immediateFuture(locations(SERVER1)));
expectHttpCall(requestBuilder, SERVER1).thenReturn(valueResponse(expectedResponseObject));
serviceClient = makeServiceClient(StandardRetryPolicy.builder().maxAttempts(2).build());
final Map<String, String> response = doRequest(serviceClient, requestBuilder);
Assert.assertEquals(expectedResponseObject, response);
}
@Test
public void test_request_serviceUnavailableNoRetry()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Service unavailable.
stubLocatorCall(locations());
serviceClient = makeServiceClient(
StandardRetryPolicy.builder()
.retryNotAvailable(false)
.maxAttempts(ServiceRetryPolicy.UNLIMITED)
.build()
);
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceNotAvailableException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Service [test-service] is not available"))
);
}
@Test
public void test_request_serviceClosed()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Closed service.
stubLocatorCall(ServiceLocations.closed());
// Closed services are not retryable.
// Use an unlimited retry policy to ensure that the future actually resolves.
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(ServiceClosedException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Service [test-service] is closed"))
);
}
@Test
public void test_request_serviceLocatorException()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Service locator returns a bad future.
stubLocatorCall(Futures.immediateFailedFuture(new ISE("oh no")));
// Service locator exceptions are not retryable.
// Use an unlimited retry policy to ensure that the future actually resolves.
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
() -> doRequest(serviceClient, requestBuilder)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RpcException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(IllegalStateException.class));
MatcherAssert.assertThat(
e.getCause(),
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString("Service [test-service] locator encountered exception")
)
);
MatcherAssert.assertThat(
e.getCause().getCause(),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("oh no"))
);
}
@Test
public void test_request_cancelBeforeServiceLocated()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Service that will never be located.
stubLocatorCall(SettableFuture.create());
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ListenableFuture<Map<String, String>> response = doAsyncRequest(serviceClient, requestBuilder);
Assert.assertTrue(response.cancel(true));
Assert.assertTrue(response.isCancelled());
}
@Test
public void test_request_cancelDuringRetry()
{
final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.GET, "/foo");
// Error response from SERVER1, then a stalled future that will never resolve.
stubLocatorCall(locations(SERVER1, SERVER2));
expectHttpCall(requestBuilder, SERVER1)
.thenReturn(errorResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, null, "oh no"))
.thenReturn(SettableFuture.create());
serviceClient = makeServiceClient(StandardRetryPolicy.unlimited());
final ListenableFuture<Map<String, String>> response = doAsyncRequest(serviceClient, requestBuilder);
Assert.assertTrue(response.cancel(true));
Assert.assertTrue(response.isCancelled());
}
@Test
public void test_computeBackoffMs()
{
final StandardRetryPolicy retryPolicy = StandardRetryPolicy.unlimited();
Assert.assertEquals(100, ServiceClientImpl.computeBackoffMs(retryPolicy, 0));
Assert.assertEquals(200, ServiceClientImpl.computeBackoffMs(retryPolicy, 1));
Assert.assertEquals(3200, ServiceClientImpl.computeBackoffMs(retryPolicy, 5));
Assert.assertEquals(30000, ServiceClientImpl.computeBackoffMs(retryPolicy, 20));
}
@Test
public void test_serviceLocationNoPathFromUri()
{
Assert.assertNull(ServiceClientImpl.serviceLocationNoPathFromUri("/"));
Assert.assertEquals(
new ServiceLocation("1.2.3.4", 9999, -1, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("http://1.2.3.4:9999/foo")
);
Assert.assertEquals(
new ServiceLocation("1.2.3.4", 80, -1, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("http://1.2.3.4/foo")
);
Assert.assertEquals(
new ServiceLocation("1.2.3.4", -1, 9999, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("https://1.2.3.4:9999/foo")
);
Assert.assertEquals(
new ServiceLocation("1.2.3.4", -1, 443, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("https://1.2.3.4/foo")
);
Assert.assertEquals(
new ServiceLocation("1:2:3:4:5:6:7:8", 9999, -1, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("http://[1:2:3:4:5:6:7:8]:9999/foo")
);
Assert.assertEquals(
new ServiceLocation("1:2:3:4:5:6:7:8", 80, -1, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("http://[1:2:3:4:5:6:7:8]/foo")
);
Assert.assertEquals(
new ServiceLocation("1:2:3:4:5:6:7:8", -1, 9999, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("https://[1:2:3:4:5:6:7:8]:9999/foo")
);
Assert.assertEquals(
new ServiceLocation("1:2:3:4:5:6:7:8", -1, 443, ""),
ServiceClientImpl.serviceLocationNoPathFromUri("https://[1:2:3:4:5:6:7:8]/foo")
);
}
@Test
public void test_normalizeHost()
{
Assert.assertEquals("1:2:3:4:5:6:7:8", ServiceClientImpl.sanitizeHost("[1:2:3:4:5:6:7:8]"));
Assert.assertEquals("1:2:3:4:5:6:7:8", ServiceClientImpl.sanitizeHost("1:2:3:4:5:6:7:8"));
Assert.assertEquals("1.2.3.4", ServiceClientImpl.sanitizeHost("1.2.3.4"));
}
@Test
public void test_isRedirect()
{
Assert.assertTrue(ServiceClientImpl.isRedirect(HttpResponseStatus.FOUND));
Assert.assertTrue(ServiceClientImpl.isRedirect(HttpResponseStatus.MOVED_PERMANENTLY));
Assert.assertTrue(ServiceClientImpl.isRedirect(HttpResponseStatus.TEMPORARY_REDIRECT));
Assert.assertFalse(ServiceClientImpl.isRedirect(HttpResponseStatus.OK));
}
private <T> OngoingStubbing<ListenableFuture<Either<StringFullResponseHolder, T>>> expectHttpCall(
final RequestBuilder requestBuilder,
final ServiceLocation location
)
{
final Request expectedRequest = requestBuilder.build(location);
return Mockito.when(
httpClient.go(
ArgumentMatchers.argThat(
request ->
request != null
&& expectedRequest.getMethod().equals(request.getMethod())
&& expectedRequest.getUrl().equals(request.getUrl())
),
ArgumentMatchers.any(ObjectOrErrorResponseHandler.class),
ArgumentMatchers.eq(RequestBuilder.DEFAULT_TIMEOUT)
)
);
}
private void stubLocatorCall(final ServiceLocations locations)
{
stubLocatorCall(Futures.immediateFuture(locations));
}
private void stubLocatorCall(final ListenableFuture<ServiceLocations> locations)
{
Mockito.doReturn(locations).when(serviceLocator).locate();
}
private ServiceClient makeServiceClient(final ServiceRetryPolicy retryPolicy)
{
return new ServiceClientImpl(SERVICE_NAME, httpClient, serviceLocator, retryPolicy, exec);
}
private static Map<String, String> doRequest(
final ServiceClient serviceClient,
final RequestBuilder requestBuilder
) throws InterruptedException, ExecutionException
{
return serviceClient.request(requestBuilder, null /* Not verified by mocks */);
}
private static ListenableFuture<Map<String, String>> doAsyncRequest(
final ServiceClient serviceClient,
final RequestBuilder requestBuilder
)
{
return serviceClient.asyncRequest(requestBuilder, null /* Not verified by mocks */);
}
private static <T> ListenableFuture<Either<StringFullResponseHolder, T>> valueResponse(final T o)
{
return Futures.immediateFuture(Either.value(o));
}
private static <T> ListenableFuture<Either<StringFullResponseHolder, T>> errorResponse(
final HttpResponseStatus responseStatus,
@Nullable final Map<String, String> headers,
@Nullable final String content
)
{
final DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
if (headers != null) {
for (final Map.Entry<String, String> headerEntry : headers.entrySet()) {
response.headers().add(headerEntry.getKey(), headerEntry.getValue());
}
}
if (content != null) {
response.setContent(ChannelBuffers.wrappedBuffer(ByteBuffer.wrap(StringUtils.toUtf8(content))));
}
final StringFullResponseHolder errorHolder = new StringFullResponseHolder(response, StandardCharsets.UTF_8);
return Futures.immediateFuture(Either.error(errorHolder));
}
private static <T> ListenableFuture<Either<StringFullResponseHolder, T>> redirectResponse(final String newLocation)
{
return errorResponse(
HttpResponseStatus.TEMPORARY_REDIRECT,
ImmutableMap.of("location", newLocation),
null
);
}
private static ServiceLocations locations(final ServiceLocation... locations)
{
// ImmutableSet retains order, which is important.
return ServiceLocations.forLocations(ImmutableSet.copyOf(locations));
}
}