/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

public class TestLocalOozieClientCoord extends XDataTestCase {

    private Services services;

    @Override
    protected void setUp() throws Exception {
        super.setUp();

        /*
         * DB cleanup below is needed to clean up the job records possibly left by previously executed tests.
         * For example, such records are left by test org.apache.oozie.executor.jpa.TestCoordActionUpdateJPAExecutor.
         * Note that by default Oozie tests are executed in "filesystem" (in fact, arbitrary) order. This way problems caused
         * by left records can be flaky (not reproduced constantly).
         * Services re-init is needed for the DB cleanup.
         */
        services = new Services();
        services.init();

        LocalOozie.start();
    }

    @Override
    protected void tearDown() throws Exception {
        LocalOozie.stop();
        services.destroy();
        super.tearDown();
    }

    public void testGetOozieUrl() {
        OozieClient client = LocalOozie.getCoordClient();
        assertEquals("localoozie", client.getOozieUrl());
    }

    public void testGetProtocolUrl() throws IOException, OozieClientException {
        OozieClient client = LocalOozie.getCoordClient();
        assertEquals("localoozie", client.getProtocolUrl());
    }

    public void testValidateWSVersion() throws IOException, OozieClientException {
        OozieClient client = LocalOozie.getCoordClient();
        client.validateWSVersion();
    }

    public void testHeaderMethods() {
        OozieClient client = LocalOozie.getCoordClient();
        client.setHeader("h", "v");
        assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h")));
        Iterator<String> hit = client.getHeaderNames();
        assertFalse(hit.hasNext());
        try {
            hit.next();
            fail("NoSuchElementException expected.");
        }
        catch (NoSuchElementException nsee) {
            // expected
        }
        client.removeHeader("h");
        assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h")));
    }

    public void testGetJobsInfo() {
        OozieClient client = LocalOozie.getCoordClient();
        try {
            client.getJobsInfo("foo");
            fail("OozieClientException expected.");
        }
        catch (OozieClientException oce) {
            assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
        }
        try {
            client.getJobsInfo("foo", 0, 5);
            fail("OozieClientException expected.");
        }
        catch (OozieClientException oce) {
            assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
        }
        try {
            client.getJobInfo("foo-id");
            fail("OozieClientException expected.");
        }
        catch (OozieClientException oce) {
            assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
        }
    }

    public void testReRun2() {
        OozieClient client = LocalOozie.getCoordClient();
        try {
            client.reRun("foo-id", client.createConfiguration());
            fail("OozieClientException expected.");
        }
        catch (OozieClientException oce) {
            assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
        }
    }

    private void writeToFile(String appXml, String appPath) throws Exception {
        File wf = new File(new URI(appPath).getPath());
        PrintWriter out = null;
        try {
            out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(wf), StandardCharsets.UTF_8));
            out.println(appXml);
        }
        catch (IOException iex) {
            throw iex;
        }
        finally {
            if (out != null) {
                out.close();
            }
        }
    }

    public void testJobMethods() throws Exception {
        final OozieClient client = LocalOozie.getCoordClient();

        // Just in case, check that there are no Coord job records left by previous tests:
        List<CoordinatorJob> list0 = client.getCoordJobsInfo("", 1, 100);
        assertEquals(0, list0.size());

        Properties conf = client.createConfiguration();

        String appPath = getTestCaseFileUri("coordinator.xml");
        String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(20)}\" "
                + "start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
                + "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <timeout>10</timeout> <concurrency>1</concurrency> "
                + "<execution>LIFO</execution> </controls> <datasets> "
                + "<dataset name=\"a\" frequency=\"${coord:minutes(20)}\" initial-instance=\"2009-02-01T01:00Z\" "
                + "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
                + "</uri-template> </dataset> "
                + "<dataset name=\"local_a\" frequency=\"${coord:minutes(20)}\" initial-instance=\"2009-02-01T01:00Z\" "
                + "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
                + "</uri-template> </dataset> "
                + "</datasets> <input-events> "
                + "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in>  " + "</input-events> "
                + "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
                + "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
                + "<app-path>hdfs:///tmp/workflows/</app-path> "
                + "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
                + "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
                + "</property></configuration> </workflow> </action> </coordinator-app>";
        writeToFile(appXml, appPath);

        conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath);
        String jobId0 = client.submit(conf);
        client.kill(jobId0);

        String jobId = client.run(conf);
        client.suspend(jobId);
        client.resume(jobId);
        client.kill(jobId);

        CoordinatorJob job = client.getCoordJobInfo(jobId);
        String appName = job.getAppName();
        assertEquals("NAME", appName);

        List<CoordinatorJob> list = client.getCoordJobsInfo("", 1, 5);
        assertEquals(2, list.size());
    }

    public void testJobsOperations() throws Exception {
        final OozieClient client = LocalOozie.getCoordClient();

        // Just in case, check that there are no Coord job records left by previous tests:
        List<CoordinatorJob> list0 = client.getCoordJobsInfo("", 1, 100);
        assertEquals(0, list0.size());
        Properties conf = client.createConfiguration();
        String appPath = storedCoordAppPath();

        conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath);
        final String jobId0 = client.run(conf);
        final String jobId1 = client.run(conf);
        final String jobId2 = client.run(conf);
        waitFor(client, jobId0);
        waitFor(client, jobId1);
        waitFor(client, jobId2);
        list0 = client.getCoordJobsInfo("name=NAME", 1, 10);
        assertEquals(3, list0.size());

        JSONObject jsonObject = client.suspendJobs("name=NAME", "coord", 1, 3);
        assertEquals(3, jsonObject.get("total"));
        assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
        assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId0).getStatus());
        assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId1).getStatus());
        assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId2).getStatus());

        jsonObject = client.resumeJobs("name=NAME", "coord", 1, 3);
        assertEquals(3, jsonObject.get("total"));
        assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
        assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId0).getStatus());
        assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId1).getStatus());
        assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId2).getStatus());

        jsonObject = client.killJobs("name=NAME", "coord", 1, 3);
        assertEquals(3, jsonObject.get("total"));
        assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
        assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId0).getStatus());
        assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId1).getStatus());
        assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId2).getStatus());
    }

    private void waitFor(final OozieClient client, final String jobId) {
        waitFor(10 * 1000, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                Job.Status status = client.getCoordJobInfo(jobId).getStatus();
                return status != Job.Status.PREP;
            }
        });
    }

    private String storedCoordAppPath() throws Exception {
        String appPath = getTestCaseFileUri("coordinator.xml");
        String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(20)}\" "
                + "start=\"2009-02-01T01:00Z\" end=\"2009-02-01T03:00Z\" timezone=\"UTC\" "
                + "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <timeout>10</timeout> <concurrency>1</concurrency> "
                + "<execution>LIFO</execution> </controls> "
                + " <action> <workflow> "
                + "<app-path>hdfs:///tmp/workflows/</app-path> "
                +" </workflow> </action> </coordinator-app>";
        writeToFile(appXml, appPath);
        return appPath;
    }
}
