blob: 56c4ae697a5f449d10b84fa7f98476bdeeb514b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
public class AtlasClientTest {
@Mock
private WebResource service;
@Mock
private Configuration configuration;
@Mock
private Client client;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldVerifyServerIsReady() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
"\"Description\":\"Metadata Management and Data Governance Platform over Hadoop\"}");
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
assertTrue(atlasClient.isServerReady());
}
@Test
public void testCreateEntity() throws Exception {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.CREATE_ENTITY, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.CREATED.getStatusCode());
JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString());
when(response.getEntity(String.class)).thenReturn(jsonResponse.toString());
when(response.getLength()).thenReturn(jsonResponse.length());
String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true);
when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response);
List<String> ids = atlasClient.createEntity(entityJson);
assertEquals(ids.size(), 1);
assertEquals(ids.get(0), "id");
}
private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
when(webResource.path(api.getPath())).thenReturn(service);
return getBuilder(service);
}
@Test
public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
new ClientHandlerException());
assertFalse(atlasClient.isServerReady());
}
@Test
public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
assertFalse(atlasClient.isServerReady());
}
@Test(expectedExceptions = AtlasServiceException.class)
public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);
atlasClient.isServerReady();
fail("Should throw exception");
}
@Test
public void shouldGetAdminStatus() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"Active\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
// Fix after AtlasBaseClient
// atlasClient.setService();
String status = atlasClient.getAdminStatus();
assertEquals(status, "Active");
}
@Test(expectedExceptions = AtlasServiceException.class)
public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
String status = atlasClient.getAdminStatus();
fail("Should fail with AtlasServiceException");
}
@Test
public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException {
setupRetryParams();
AtlasClient atlasClient = new AtlasClient(service, configuration);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);
String status = atlasClient.getAdminStatus();
assertEquals(status, AtlasClient.UNKNOWN_STATUS);
}
@Test
public void shouldReturnBaseURLAsPassedInURL() {
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(new String[]{"http://localhost:21000"}, client);
assertEquals(serviceURL, "http://localhost:21000");
}
@Test
public void shouldSelectActiveAmongMultipleServersIfHAIsEnabled() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
when(client.resource(UriBuilder.fromUri("http://localhost:41000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse firstResponse = mock(ClientResponse.class);
when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String passiveStatus = "{\"Status\":\"PASSIVE\"}";
when(firstResponse.getEntity(String.class)).thenReturn(passiveStatus);
when(firstResponse.getLength()).thenReturn(passiveStatus.length());
ClientResponse secondResponse = mock(ClientResponse.class);
when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(secondResponse.getEntity(String.class)).thenReturn(activeStatus);
when(secondResponse.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse).
thenReturn(secondResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[]{"http://localhost:31000", "http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:41000");
}
@Test
public void shouldRetryUntilServiceBecomesActive() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenReturn(response).thenReturn(response).thenReturn(nextResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:31000");
}
@Test
public void shouldRetryIfCannotConnectToServiceInitially() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
ClientResponse nextResponse = mock(ClientResponse.class);
when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("Simulating connection exception")).
thenReturn(response).
thenReturn(nextResponse);
AtlasClient atlasClient = new AtlasClient(service, configuration);
atlasClient.setService(service);
atlasClient.setConfiguration(configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertEquals(serviceURL, "http://localhost:31000");
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void shouldThrowExceptionIfActiveServerIsNotFound() {
setupRetryParams();
when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("Simulating connection exception")).
thenReturn(response).
thenReturn(response);
AtlasClient atlasClient = new AtlasClient(service, configuration);
String serviceURL = atlasClient.determineActiveServiceURL(
new String[] {"http://localhost:31000","http://localhost:41000"},
client);
assertNull(serviceURL);
}
@Test
public void shouldRetryAPICallsOnClientHandlerException() throws AtlasServiceException, URISyntaxException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.setService(service);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
}
@Test
public void shouldRetryWithSameClientIfSingleAddressIsUsed() throws URISyntaxException, AtlasServiceException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");
AtlasClient atlasClient = getClientForTest("http://localhost:31000");
atlasClient.setService(resourceObject);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client, times(2)).resource(UriBuilder.fromUri("http://localhost:31000").build());
}
@Test
public void shouldRetryAPICallsOnServiceUnavailable() throws AtlasServiceException, URISyntaxException {
setupRetryParams();
ResourceCreator resourceCreator = mock(ResourceCreator.class);
WebResource resourceObject = mock(WebResource.class);
when(resourceObject.getURI()).
thenReturn(new URI("http://localhost:31000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types")).
thenReturn(new URI("http://localhost:41000/api/atlas/types"));
WebResource.Builder builder = getBuilder(resourceObject);
ClientResponse firstResponse = mock(ClientResponse.class);
when(firstResponse.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
when(firstResponse.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
String activeStatus = "{\"Status\":\"ACTIVE\"}";
when(response.getEntity(String.class)).thenReturn(activeStatus);
when(response.getLength()).thenReturn(activeStatus.length());
when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
thenReturn(firstResponse).
thenReturn(response);
when(resourceCreator.createResource()).thenReturn(resourceObject);
AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
atlasClient.setService(resourceObject);
atlasClient.setConfiguration(configuration);
atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);
verify(client).destroy();
verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
}
private WebResource.Builder getBuilder(WebResource resourceObject) {
WebResource.Builder builder = mock(WebResource.Builder.class);
when(resourceObject.path(anyString())).thenReturn(resourceObject);
when(resourceObject.accept(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
when(builder.type(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
return builder;
}
private void setupRetryParams() {
when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasClient.DEFAULT_NUM_RETRIES)).
thenReturn(3);
when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY,
AtlasClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS)).
thenReturn(1);
}
private AtlasClient getClientForTest(final String... baseUrls) {
return new AtlasClient((UserGroupInformation)null, (String)null, baseUrls) {
boolean firstCall = true;
@Override
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
String returnUrl = baseUrls[0];
if (baseUrls.length > 1 && !firstCall) {
returnUrl = baseUrls[1];
}
firstCall = false;
return returnUrl;
}
@Override
protected Configuration getClientProperties() {
return configuration;
}
@Override
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
return client;
}
};
}
}