/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dubbo.registry.kubernetes;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.EndpointsBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import static org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst.NAMESPACE;
import static org.awaitility.Awaitility.await;

@ExtendWith({MockitoExtension.class})
class KubernetesServiceDiscoveryTest {

    private static final String SERVICE_NAME = "TestService";

    private static final String POD_NAME = "TestServer";

    public KubernetesServer mockServer = new KubernetesServer(false, true);

    private NamespacedKubernetesClient mockClient;

    private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);

    private URL serverUrl;

    private Map<String, String> selector;

    private KubernetesServiceDiscovery serviceDiscovery;

    @BeforeEach
    public void setUp() {
        mockServer.before();
        mockClient = mockServer.getClient().inNamespace("dubbo-demo");

        ApplicationModel applicationModel = ApplicationModel.defaultModel();
        applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig());

        serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
                .setProtocol("kubernetes")
                .addParameter(NAMESPACE, "dubbo-demo")
                .addParameter(KubernetesClientConst.USE_HTTPS, "false")
                .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
        serverUrl.setScopeModel(applicationModel);

        this.serviceDiscovery = new KubernetesServiceDiscovery(applicationModel, serverUrl);

        System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
        System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");

        selector = new HashMap<>(4);
        selector.put("l", "v");
        Pod pod = new PodBuilder()
                .withNewMetadata()
                .withName(POD_NAME)
                .withLabels(selector)
                .endMetadata()
                .build();

        Service service = new ServiceBuilder()
                .withNewMetadata()
                .withName(SERVICE_NAME)
                .endMetadata()
                .withNewSpec()
                .withSelector(selector)
                .endSpec()
                .build();

        Endpoints endPoints = new EndpointsBuilder()
                .withNewMetadata()
                .withName(SERVICE_NAME)
                .endMetadata()
                .addNewSubset()
                .addNewAddress()
                .withIp("ip1")
                .withNewTargetRef()
                .withUid("uid1")
                .withName(POD_NAME)
                .endTargetRef()
                .endAddress()
                .addNewPort("Test", "Test", 12345, "TCP")
                .endSubset()
                .build();

        mockClient.pods().resource(pod).create();
        mockClient.services().resource(service).create();
        mockClient.endpoints().resource(endPoints).create();
    }

    @AfterEach
    public void destroy() throws Exception {
        serviceDiscovery.destroy();
        mockClient.close();
        mockServer.after();
    }

    @Test
    void testEndpointsUpdate() {
        serviceDiscovery.setCurrentHostname(POD_NAME);
        serviceDiscovery.setKubernetesClient(mockClient);

        ServiceInstance serviceInstance = new DefaultServiceInstance(
                SERVICE_NAME,
                "Test",
                12345,
                ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));

        serviceDiscovery.doRegister(serviceInstance);

        HashSet<String> serviceList = new HashSet<>(4);
        serviceList.add(SERVICE_NAME);
        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());

        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
        mockClient.endpoints().withName(SERVICE_NAME).edit(endpoints -> new EndpointsBuilder(endpoints)
                .editFirstSubset()
                .addNewAddress()
                .withIp("ip2")
                .withNewTargetRef()
                .withUid("uid2")
                .withName(POD_NAME)
                .endTargetRef()
                .endAddress()
                .endSubset()
                .build());

        await().until(() -> {
            ArgumentCaptor<ServiceInstancesChangedEvent> captor =
                    ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
            Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
            return captor.getValue().getServiceInstances().size() == 2;
        });
        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
        Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
        Assertions.assertEquals(
                2, eventArgumentCaptor.getValue().getServiceInstances().size());

        serviceDiscovery.doUnregister(serviceInstance);
    }

    @Test
    void testPodsUpdate() throws Exception {
        serviceDiscovery.setCurrentHostname(POD_NAME);
        serviceDiscovery.setKubernetesClient(mockClient);

        ServiceInstance serviceInstance = new DefaultServiceInstance(
                SERVICE_NAME,
                "Test",
                12345,
                ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));

        serviceDiscovery.doRegister(serviceInstance);

        HashSet<String> serviceList = new HashSet<>(4);
        serviceList.add(SERVICE_NAME);
        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());

        serviceDiscovery.addServiceInstancesChangedListener(mockListener);

        serviceInstance = new DefaultServiceInstance(
                SERVICE_NAME,
                "Test12345",
                12345,
                ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
        serviceDiscovery.doUpdate(serviceInstance, serviceInstance);

        await().until(() -> {
            ArgumentCaptor<ServiceInstancesChangedEvent> captor =
                    ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
            Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
            return captor.getValue().getServiceInstances().size() == 1;
        });
        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
        Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
        Assertions.assertEquals(
                1, eventArgumentCaptor.getValue().getServiceInstances().size());

        serviceDiscovery.doUnregister(serviceInstance);
    }

    @Test
    void testServiceUpdate() {
        serviceDiscovery.setCurrentHostname(POD_NAME);
        serviceDiscovery.setKubernetesClient(mockClient);

        ServiceInstance serviceInstance = new DefaultServiceInstance(
                SERVICE_NAME,
                "Test",
                12345,
                ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));

        serviceDiscovery.doRegister(serviceInstance);

        HashSet<String> serviceList = new HashSet<>(4);
        serviceList.add(SERVICE_NAME);
        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());

        serviceDiscovery.addServiceInstancesChangedListener(mockListener);

        selector.put("app", "test");
        mockClient.services().withName(SERVICE_NAME).edit(service -> new ServiceBuilder(service)
                .editSpec()
                .addToSelector(selector)
                .endSpec()
                .build());

        await().until(() -> {
            ArgumentCaptor<ServiceInstancesChangedEvent> captor =
                    ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
            Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
            return captor.getValue().getServiceInstances().size() == 1;
        });
        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
        Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
        Assertions.assertEquals(
                1, eventArgumentCaptor.getValue().getServiceInstances().size());

        serviceDiscovery.doUnregister(serviceInstance);
    }

    @Test
    void testGetInstance() {
        serviceDiscovery.setCurrentHostname(POD_NAME);
        serviceDiscovery.setKubernetesClient(mockClient);

        ServiceInstance serviceInstance = new DefaultServiceInstance(
                SERVICE_NAME,
                "Test",
                12345,
                ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));

        serviceDiscovery.doRegister(serviceInstance);

        serviceDiscovery.doUpdate(serviceInstance, serviceInstance);

        Assertions.assertEquals(1, serviceDiscovery.getServices().size());
        Assertions.assertEquals(1, serviceDiscovery.getInstances(SERVICE_NAME).size());

        serviceDiscovery.doUnregister(serviceInstance);
    }
}
