package org.apache.s4.tools.yarn;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import junit.framework.Assert;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.AppConstants;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.s4.tools.Tools;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.io.Files;
import com.google.inject.Injector;

// NOTE: you must have an updated s4-yarn installed app (this is automatic when tests are run from the command line, otherwise do: gradlew s4-yarn:installApp)
public class TestYarnDeployment extends ZkBasedTest {

    private static Logger logger = LoggerFactory.getLogger(TestYarnDeployment.class);

    protected static MiniYARNCluster yarnCluster = null;
    protected static MiniDFSCluster dfsCluster = null;
    protected static Configuration conf = new Configuration();

    @BeforeClass
    public static void setup() throws InterruptedException, IOException {
        logger.info("Starting up YARN cluster");
        conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);

        // NOTE : we use default ports
        dfsCluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).build();
        dfsCluster.waitActive();

        String fsDefaultName = "hdfs://localhost:" + dfsCluster.getNameNodePort();
        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsDefaultName);

        conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);

        yarnCluster = new MiniYARNCluster(TestYarnDeployment.class.getName(), 1, 1, 1);
        yarnCluster.init(conf);
        yarnCluster.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            logger.info("setup thread sleep interrupted. message=" + e.getMessage());
        }

    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (yarnCluster != null) {
            yarnCluster.stop();
            yarnCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
        }
    }

    private ZkClient zkClient;

    private File tmpAppsDir;

    @Test
    public void testDeployment() throws Exception {

        tmpAppsDir = Files.createTempDir();

        File gradlewFile = CoreTestUtils.findGradlewInRootDir();

        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
                + "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
                + tmpAppsDir.getAbsolutePath() });

        zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
        zkClient.setZkSerializer(new ZNRecordSerializer());
        List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
        Assert.assertTrue(processes.size() == 0);
        final CountDownLatch signalProcessesReady = new CountDownLatch(1);

        zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {

            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                if (currentChilds.size() == 2) {
                    signalProcessesReady.countDown();
                }

            }
        });

        CountDownLatch signalAppInitialized = new CountDownLatch(1);
        CountDownLatch signalAppStarted = new CountDownLatch(1);
        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
                CommTestUtils.createZkClient());
        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
                CommTestUtils.createZkClient());

        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));

        FileSystem fs = FileSystem.get(conf);
        Path destS4rPath = new Path(fs.getHomeDirectory() + "/simpleDeployableApp.s4r");
        fs.copyFromLocalFile(new Path(new File(tmpAppsDir.getAbsolutePath()
                + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r").toURI()), destS4rPath);

        fs.close();

        // deploy with Yarn Client
        final String[] params = ("-cluster=cluster1 -nbTasks=2 -flp=14000 -s4r=" + destS4rPath.toUri().toString()
                + " -zk=localhost:2181 -s4Dir=" + gradlewFile.getParentFile().getAbsolutePath()).split("[ ]");

        S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();

        Tools.parseArgs(yarnArgs, params);
        final S4YarnClient client = new S4YarnClient(yarnArgs, conf);

        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {

                try {

                    boolean result = client.run(true);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail();
                }

            }
        }, "Yarn Client");
        t.start();

        Assert.assertTrue(signalAppInitialized.await(200, TimeUnit.SECONDS));
        Assert.assertTrue(signalAppStarted.await(200, TimeUnit.SECONDS));

        String time1 = String.valueOf(System.currentTimeMillis());

        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
        CommTestUtils
                .watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());

        Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();

        TCPEmitter emitter = injector.getInstance(TCPEmitter.class);

        Event event = new Event();
        event.put("line", String.class, time1);
        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
                .serialize(event)));

        // check event processed
        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));

        // cleanup
        client.killApplication(client.getApplicationList().get(0).getApplicationId());

    }

}
