/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.cloudera.sqoop.metastore;

import static org.junit.Assert.assertEquals;

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.testutil.CommonArgs;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.manager.ConnManager;
import org.apache.sqoop.manager.DefaultManagerFactory;
import org.apache.sqoop.tool.JobTool;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;


/**
 * Base test class for Incremental Import Metastore data, implemented for specific database services in sub-classes
 */

public abstract class MetaConnectIncrementalImportTestBase extends BaseSqoopTestCase {

    public static final Log LOG = LogFactory
            .getLog(MetaConnectIncrementalImportTestBase.class.getName());

    private String metaConnectString;
    private String metaUser;
    private String metaPass;

    private Connection connMeta;
    private ConnManager cm;

    public MetaConnectIncrementalImportTestBase(String metaConnectString, String metaUser, String metaPass) {
        this.metaConnectString = metaConnectString;
        this.metaUser = metaUser;
        this.metaPass = metaPass;
    }

    @Before
    public void setUp() {
        super.setUp();
    }

    @After
    public void tearDown() {
        super.tearDown();
    }

    protected String[] getIncrementalJob(String metaConnectString, String metaUser, String metaPass) {
        List<String> args = new ArrayList<>();
        CommonArgs.addHadoopFlags(args);
        args.add("--create");
        args.add("testJob");
        args.add("--meta-connect");
        args.add(metaConnectString);
        args.add("--meta-username");
        args.add(metaUser);
        args.add("--meta-password");
        args.add(metaPass);
        args.add("--");
        args.add("import");
        args.add("-m");
        args.add("1");
        args.add("--connect");
        args.add(getConnectString());
        args.add("--table");
        args.add("CARLOCATIONS");
        args.add("--incremental");
        args.add("append");
        args.add("--check-column");
        args.add("CARID");
        args.add("--last-value");
        args.add("0");
        args.add("--as-textfile");

        return args.toArray(new String[0]);
    }


    protected String[] getExecJob(String metaConnectString, String metaUser, String metaPass) {
        List<String> args = new ArrayList<>();
        CommonArgs.addHadoopFlags(args);
        args.add("--exec");
        args.add("testJob");
        args.add("--meta-connect");
        args.add(metaConnectString);
        args.add("--meta-username");
        args.add(metaUser);
        args.add("--meta-password");
        args.add(metaPass);

        return args.toArray(new String[0]);
    }

    @Test
    public void testIncrementalJob() throws SQLException {
        resetTable();

        initMetastoreConnection();

        resetMetastoreSchema();

        //creates Job
        createJob();

        //Executes the import
        execJob();

        //Ensures the saveIncrementalState saved the right row
        checkIncrementalState(1);

        //Adds rows to the import table
        Statement insertStmt = getConnection().createStatement();
        insertStmt.executeUpdate("INSERT INTO CARLOCATIONS VALUES (2, 'lexus')");
        getConnection().commit();

        //Execute the import again
        execJob();

        //Ensures the last incremental value is updated correctly.
        checkIncrementalState(2);

        cm.close();
    }

    private void checkIncrementalState(int expected) throws SQLException {
        Statement getSaveIncrementalState = connMeta.createStatement();
        ResultSet lastCol = getSaveIncrementalState.executeQuery(
                "SELECT propVal FROM " + cm.escapeTableName("SQOOP_SESSIONS") + " WHERE propname = 'incremental.last.value'");
        lastCol.next();
        assertEquals("Last row value differs from expected",
                expected, lastCol.getInt("propVal"));
    }

    private void execJob() {
        JobTool jobToolExec = new JobTool();
        org.apache.sqoop.Sqoop sqoopExec = new org.apache.sqoop.Sqoop(jobToolExec);
        String[] argsExec = getExecJob(metaConnectString, metaUser, metaPass);
        assertEquals("Sqoop Job did not execute properly",
                0, org.apache.sqoop.Sqoop.runSqoop(sqoopExec, argsExec));
    }

    private void createJob() {
        Configuration conf = new Configuration();
        conf.set(org.apache.sqoop.SqoopOptions.METASTORE_PASSWORD_KEY, "true");
        JobTool jobToolCreate = new JobTool();
        org.apache.sqoop.Sqoop sqoopCreate = new org.apache.sqoop.Sqoop(jobToolCreate, conf);
        String[] argsCreate = getIncrementalJob(metaConnectString, metaUser, metaPass);
        org.apache.sqoop.Sqoop.runSqoop(sqoopCreate, argsCreate);
    }

    private void resetTable() throws SQLException {
        //Resets the target table
        dropTableIfExists("CARLOCATIONS");
        setCurTableName("CARLOCATIONS");
        createTableWithColTypesAndNames(
                new String [] {"CARID", "LOCATIONS"},
                new String [] {"INTEGER", "VARCHAR"},
                new String [] {"1", "'Lexus'"});
    }

    private void resetMetastoreSchema() {
        try {
            //Resets the metastore schema
            Statement metastoreStatement = connMeta.createStatement();
            metastoreStatement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_ROOT"));
            metastoreStatement.execute("DROP TABLE " + cm.escapeTableName("SQOOP_SESSIONS"));
            connMeta.commit();
        }
        catch (Exception e) {
            LOG.error( e.getLocalizedMessage() );
        }
    }

    private void initMetastoreConnection() throws SQLException{
        SqoopOptions options = new SqoopOptions();
        options.setConnectString(metaConnectString);
        options.setUsername(metaUser);
        options.setPassword(metaPass);
        com.cloudera.sqoop.metastore.JobData jd =
                new com.cloudera.sqoop.metastore.JobData(options, new JobTool());
        DefaultManagerFactory dmf = new DefaultManagerFactory();
        cm = dmf.accept(jd);
        connMeta= cm.getConnection();
    }
}