/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.atlas;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;

import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;

public class AtlasClientTest {

    @Mock
    private WebResource service;

    @Mock
    private Configuration configuration;

    @Mock
    private Client client;

    @BeforeMethod
    public void setup() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void shouldVerifyServerIsReady() throws AtlasServiceException {
        setupRetryParams();

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," +
                "\"Description\":\"Metadata Management and Data Governance Platform over Hadoop\"}");
        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);

        assertTrue(atlasClient.isServerReady());
    }

    @Test
    public void testCreateEntity() throws Exception {
        setupRetryParams();
        AtlasClient atlasClient = new AtlasClient(service, configuration);

        WebResource.Builder builder = setupBuilder(AtlasClient.API.CREATE_ENTITY, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.CREATED.getStatusCode());

        JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString());
        when(response.getEntity(String.class)).thenReturn(jsonResponse.toString());
        when(response.getLength()).thenReturn(jsonResponse.length());
        String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true);
        when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response);

        List<String> ids = atlasClient.createEntity(entityJson);
        assertEquals(ids.size(), 1);
        assertEquals(ids.get(0), "id");
    }

    private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) {
        when(webResource.path(api.getPath())).thenReturn(service);
        return getBuilder(service);
    }

    @Test
    public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException {
        setupRetryParams();
        AtlasClient atlasClient = new AtlasClient(service, configuration);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow(
                new ClientHandlerException());
        assertFalse(atlasClient.isServerReady());
    }

    @Test
    public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException {
        setupRetryParams();
        AtlasClient atlasClient = new AtlasClient(service, configuration);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
        when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);

        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);

        assertFalse(atlasClient.isServerReady());
    }

    @Test(expectedExceptions = AtlasServiceException.class)
    public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException {
        setupRetryParams();

        AtlasClient atlasClient = new AtlasClient(service, configuration);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.VERSION, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
        when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);

        when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response);

        atlasClient.isServerReady();
        fail("Should throw exception");
    }
    
    @Test
    public void shouldGetAdminStatus() throws AtlasServiceException {
        setupRetryParams();

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"Active\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);

//         Fix after AtlasBaseClient
//        atlasClient.setService();


        String status = atlasClient.getAdminStatus();
        assertEquals(status, "Active");
    }

    @Test(expectedExceptions = AtlasServiceException.class)
    public void shouldReturnStatusAsUnknownOnException() throws AtlasServiceException {
        setupRetryParams();

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
        when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR);
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);

        String status = atlasClient.getAdminStatus();
        fail("Should fail with AtlasServiceException");
    }

    @Test
    public void shouldReturnStatusAsUnknownIfJSONIsInvalid() throws AtlasServiceException {
        setupRetryParams();
        AtlasClient atlasClient = new AtlasClient(service, configuration);

        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        when(response.getEntity(String.class)).thenReturn("{\"status\":\"Active\"}");
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).thenReturn(response);

        String status = atlasClient.getAdminStatus();
        assertEquals(status, AtlasClient.UNKNOWN_STATUS);
    }
    
    @Test
    public void shouldReturnBaseURLAsPassedInURL() {
        AtlasClient atlasClient = new AtlasClient(service, configuration);

        String serviceURL = atlasClient.determineActiveServiceURL(new String[]{"http://localhost:21000"}, client);
        assertEquals(serviceURL, "http://localhost:21000");
    }

    @Test
    public void shouldSelectActiveAmongMultipleServersIfHAIsEnabled() {
        setupRetryParams();

        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
        when(client.resource(UriBuilder.fromUri("http://localhost:41000").build())).thenReturn(service);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse firstResponse = mock(ClientResponse.class);
        when(firstResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String passiveStatus = "{\"Status\":\"PASSIVE\"}";
        when(firstResponse.getEntity(String.class)).thenReturn(passiveStatus);
        when(firstResponse.getLength()).thenReturn(passiveStatus.length());
        ClientResponse secondResponse = mock(ClientResponse.class);
        when(secondResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(secondResponse.getEntity(String.class)).thenReturn(activeStatus);
        when(secondResponse.getLength()).thenReturn(activeStatus.length());
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
                thenReturn(firstResponse).thenReturn(firstResponse).thenReturn(firstResponse).
                thenReturn(secondResponse);

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        String serviceURL = atlasClient.determineActiveServiceURL(
                new String[]{"http://localhost:31000", "http://localhost:41000"},
                client);
        assertEquals(serviceURL, "http://localhost:41000");
    }

    @Test
    public void shouldRetryUntilServiceBecomesActive() {
        setupRetryParams();

        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
        ClientResponse nextResponse = mock(ClientResponse.class);
        when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
                thenReturn(response).thenReturn(response).thenReturn(nextResponse);

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        String serviceURL = atlasClient.determineActiveServiceURL(
                new String[] {"http://localhost:31000","http://localhost:41000"},
                client);
        assertEquals(serviceURL, "http://localhost:31000");
    }

    @Test
    public void shouldRetryIfCannotConnectToServiceInitially() {
        setupRetryParams();

        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
        ClientResponse nextResponse = mock(ClientResponse.class);
        when(nextResponse.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
                thenThrow(new ClientHandlerException("Simulating connection exception")).
                thenReturn(response).
                thenReturn(nextResponse);

        AtlasClient atlasClient = new AtlasClient(service, configuration);
        atlasClient.setService(service);
        atlasClient.setConfiguration(configuration);

        String serviceURL = atlasClient.determineActiveServiceURL(
                new String[] {"http://localhost:31000","http://localhost:41000"},
                client);
        assertEquals(serviceURL, "http://localhost:31000");
    }

    @Test(expectedExceptions = IllegalArgumentException.class)
    public void shouldThrowExceptionIfActiveServerIsNotFound() {
        setupRetryParams();

        when(client.resource(UriBuilder.fromUri("http://localhost:31000").build())).thenReturn(service);
        WebResource.Builder builder = setupBuilder(AtlasClient.API.STATUS, service);
        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        when(response.getEntity(String.class)).thenReturn("{\"Status\":\"BECOMING_ACTIVE\"}");
        when(builder.method(AtlasClient.API.STATUS.getMethod(), ClientResponse.class, null)).
                thenThrow(new ClientHandlerException("Simulating connection exception")).
                thenReturn(response).
                thenReturn(response);

        AtlasClient atlasClient = new AtlasClient(service, configuration);

        String serviceURL = atlasClient.determineActiveServiceURL(
                new String[] {"http://localhost:31000","http://localhost:41000"},
                client);
        assertNull(serviceURL);
    }

    @Test
    public void shouldRetryAPICallsOnClientHandlerException() throws AtlasServiceException, URISyntaxException {
        setupRetryParams();

        ResourceCreator resourceCreator = mock(ResourceCreator.class);
        WebResource resourceObject = mock(WebResource.class);
        when(resourceObject.getURI()).
                thenReturn(new URI("http://localhost:31000/api/atlas/types")).
                thenReturn(new URI("http://localhost:41000/api/atlas/types")).
                thenReturn(new URI("http://localhost:41000/api/atlas/types"));

        WebResource.Builder builder = getBuilder(resourceObject);

        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());

        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
                thenReturn(response);

        when(resourceCreator.createResource()).thenReturn(resourceObject);

        AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");

        atlasClient.setService(service);
        atlasClient.setConfiguration(configuration);

        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);

        verify(client).destroy();
        verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
        verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
    }

    @Test
    public void shouldRetryWithSameClientIfSingleAddressIsUsed() throws URISyntaxException, AtlasServiceException {
        setupRetryParams();

        ResourceCreator resourceCreator = mock(ResourceCreator.class);
        WebResource resourceObject = mock(WebResource.class);
        when(resourceObject.getURI()).
                thenReturn(new URI("http://localhost:31000/api/atlas/types"));

        WebResource.Builder builder = getBuilder(resourceObject);

        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());

        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
                thenReturn(response);

        when(resourceCreator.createResource()).thenReturn(resourceObject);
        when(configuration.getString("atlas.http.authentication.type", "simple")).thenReturn("simple");

        AtlasClient atlasClient = getClientForTest("http://localhost:31000");

        atlasClient.setService(resourceObject);
        atlasClient.setConfiguration(configuration);

        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);

        verify(client).destroy();
        verify(client, times(2)).resource(UriBuilder.fromUri("http://localhost:31000").build());
    }

    @Test
    public void shouldRetryAPICallsOnServiceUnavailable() throws AtlasServiceException, URISyntaxException {
        setupRetryParams();

        ResourceCreator resourceCreator = mock(ResourceCreator.class);
        WebResource resourceObject = mock(WebResource.class);
        when(resourceObject.getURI()).
                thenReturn(new URI("http://localhost:31000/api/atlas/types")).
                thenReturn(new URI("http://localhost:41000/api/atlas/types")).
                thenReturn(new URI("http://localhost:41000/api/atlas/types"));

        WebResource.Builder builder = getBuilder(resourceObject);

        ClientResponse firstResponse = mock(ClientResponse.class);
        when(firstResponse.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode());
        when(firstResponse.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE);

        ClientResponse response = mock(ClientResponse.class);
        when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode());
        String activeStatus = "{\"Status\":\"ACTIVE\"}";
        when(response.getEntity(String.class)).thenReturn(activeStatus);
        when(response.getLength()).thenReturn(activeStatus.length());

        when(builder.method(AtlasClient.API.LIST_TYPES.getMethod(), ClientResponse.class, null)).
                thenThrow(new ClientHandlerException("simulating exception in calling API", new ConnectException())).
                thenReturn(firstResponse).
                thenReturn(response);

        when(resourceCreator.createResource()).thenReturn(resourceObject);

        AtlasClient atlasClient = getClientForTest("http://localhost:31000","http://localhost:41000");
        atlasClient.setService(resourceObject);
        atlasClient.setConfiguration(configuration);

        atlasClient.callAPIWithRetries(AtlasClient.API.LIST_TYPES, null, resourceCreator);


        verify(client).destroy();
        verify(client).resource(UriBuilder.fromUri("http://localhost:31000").build());
        verify(client).resource(UriBuilder.fromUri("http://localhost:41000").build());
    }

    private WebResource.Builder getBuilder(WebResource resourceObject) {
        WebResource.Builder builder = mock(WebResource.Builder.class);
        when(resourceObject.path(anyString())).thenReturn(resourceObject);
        when(resourceObject.accept(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
        when(builder.type(AtlasBaseClient.JSON_MEDIA_TYPE)).thenReturn(builder);
        return builder;
    }

    private void setupRetryParams() {
        when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasClient.DEFAULT_NUM_RETRIES)).
                thenReturn(3);
        when(configuration.getInt(AtlasClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY,
                AtlasClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS)).
                thenReturn(1);
    }

    private AtlasClient getClientForTest(final String... baseUrls) {
        return new AtlasClient((UserGroupInformation)null, (String)null, baseUrls) {
            boolean firstCall = true;
            @Override
            protected String determineActiveServiceURL(String[] baseUrls, Client client) {
                String returnUrl = baseUrls[0];
                if (baseUrls.length > 1 && !firstCall) {
                    returnUrl = baseUrls[1];
                }
                firstCall = false;
                return returnUrl;
            }

            @Override
            protected Configuration getClientProperties() {
                return configuration;
            }

            @Override
            protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
                return client;
            }
        };
    }
}
