/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.vxquery.app.util;

import static org.apache.vxquery.rest.Constants.Properties.AVAILABLE_PROCESSORS;
import static org.apache.vxquery.rest.Constants.Properties.HDFS_CONFIG;
import static org.apache.vxquery.rest.Constants.Properties.JOIN_HASH_SIZE;
import static org.apache.vxquery.rest.Constants.Properties.MAXIMUM_DATA_SIZE;

import java.io.IOException;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.util.Arrays;

import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.vxquery.app.VXQueryApplication;
import org.apache.vxquery.rest.service.VXQueryConfig;
import org.apache.vxquery.rest.service.VXQueryService;

/**
 * A utility class to start the a local hyracks cluster.
 *
 * @author Preston Carman
 */
public class LocalClusterUtil {
    /*
     * Start local virtual cluster with cluster controller node and node controller
     * nodes. IP address provided for node controller is localhost. Unassigned ports
     * 39000 and 39001 are used for client and cluster port respectively.
     */
    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 39000;
    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 39001;
    public static final int DEFAULT_HYRACKS_CC_HTTP_PORT = 39002;
    public static final int DEFAULT_VXQUERY_REST_PORT = 39003;

    private ClusterControllerService clusterControllerService;
    private NodeControllerService nodeControllerSerivce;
    private VXQueryService vxQueryService;

    public void init(VXQueryConfig config) throws Exception {
        // Following properties are needed by the app to setup
        System.setProperty(AVAILABLE_PROCESSORS, String.valueOf(config.getAvailableProcessors()));
        System.setProperty(JOIN_HASH_SIZE, String.valueOf(config.getJoinHashSize()));
        System.setProperty(MAXIMUM_DATA_SIZE, String.valueOf(config.getMaximumDataSize()));
        if (config.getHdfsConf() != null) {
            System.setProperty(HDFS_CONFIG, config.getHdfsConf());
        }

        // Cluster controller
        CCConfig ccConfig = createCCConfig();
        clusterControllerService = new ClusterControllerService(ccConfig);
        clusterControllerService.start();

        // Node controller
        NCConfig ncConfig = createNCConfig();
        nodeControllerSerivce = new NodeControllerService(ncConfig);
        nodeControllerSerivce.start();

        // REST controller
        config.setHyracksClientIp(ccConfig.getClientListenAddress());
        config.setHyracksClientPort(ccConfig.getClientListenPort());
        vxQueryService = new VXQueryService(config);
        vxQueryService.start();
    }

    protected CCConfig createCCConfig() throws IOException {
        String localAddress = getIpAddress();
        CCConfig ccConfig = new CCConfig();
        ccConfig.setClientListenAddress(localAddress);
        ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT);
        ccConfig.setClusterListenAddress(localAddress);
        ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
        ccConfig.setConsoleListenPort(DEFAULT_HYRACKS_CC_HTTP_PORT);
        ccConfig.setProfileDumpPeriod(10000);
        ccConfig.setAppClass(VXQueryApplication.class.getName());
        ccConfig.getAppArgs().addAll(Arrays.asList("-restPort", String.valueOf(DEFAULT_VXQUERY_REST_PORT)));
        return ccConfig;
    }

    protected NCConfig createNCConfig() throws IOException {
        String localAddress = getIpAddress();
        String nodeId = "test_node";
        NCConfig ncConfig = new NCConfig(nodeId);
        ncConfig.setClusterAddress("localhost");
        ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
        ncConfig.setClusterListenAddress(localAddress);
        ncConfig.setDataListenAddress(localAddress);
        ncConfig.setResultListenAddress(localAddress);
        ncConfig.setIODevices(new String[] { Files.createTempDirectory(nodeId).toString() });
        ncConfig.setVirtualNC();
        return ncConfig;
    }
    public VXQueryService getVxQueryService() {
        return vxQueryService;
    }

    public void deinit() throws Exception {
        vxQueryService.stop();
        nodeControllerSerivce.stop();
        clusterControllerService.stop();
    }

    public static void main(String[] args) {
        LocalClusterUtil localClusterUtil = new LocalClusterUtil();
        VXQueryConfig config = new VXQueryConfig();
        run(localClusterUtil, config);
    }

    protected static void run(final LocalClusterUtil localClusterUtil, VXQueryConfig config) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    localClusterUtil.deinit();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        try {
            localClusterUtil.init(config);
            while (true) {
                Thread.sleep(10000);
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public String getIpAddress() throws UnknownHostException {
        return Inet4Address.getLoopbackAddress().getHostAddress();
    }

    public int getRestPort() {
        return DEFAULT_VXQUERY_REST_PORT;
    }
}