/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.piggybank.test.storage;

import java.io.File;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Date;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.hsqldb.Server;
import org.junit.After;
import org.junit.Before;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;

public class TestDBStorage {

    private PigServer pigServer;
    private MiniGenericCluster cluster;
    private Server dbServer;
    private String driver = "org.hsqldb.jdbcDriver";
    // private String url = "jdbc:hsqldb:mem:.";
    private String TMP_DIR;
    private String dbUrl = "jdbc:hsqldb:hsql://localhost/batchtest";
    private String user = "sa";
    private String password = "";

    private static final String INPUT_FILE = "datafile.txt";

    public TestDBStorage() throws IOException {
        // Initialise Pig server
        cluster = MiniGenericCluster.buildCluster();
        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
        pigServer.getPigContext().getProperties()
                .setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
        pigServer.getPigContext().getProperties()
                .setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1");
        System.out.println("Pig server initialized successfully");
        TMP_DIR = System.getProperty("user.dir") + "/build/test/";
        // Initialise DBServer
        dbServer = new Server();
        dbServer.setDatabaseName(0, "batchtest");
        // dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true");
        dbServer.setDatabasePath(0,
                            "file:" + TMP_DIR + "batchtest;"+
                            "hsqldb.default_table_type=cached;hsqldb.cache_rows=100;sql.enforce_strict_size=true");
        dbServer.setLogWriter(null);
        dbServer.setErrWriter(null);
        dbServer.start();
        System.out.println("Database URL: " + dbUrl);
        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(this + ".setUp() error: " + e.getMessage());
        }
        System.out.println("Database server started on port: " + dbServer.getPort());
    }

    private void createFile() throws IOException {
        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
        w.println("100\tapple\t1.0\t2008-01-01");
        w.println("100\torange\t2.0\t2008-02-01");
        w.println("100\tbanana\t1.1\t2008-03-01");
        w.println("\t\t\t");
        w.close();
        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
    }

    private void createTable() throws IOException {
        Connection con = null;
        String sql = "create table ttt (id integer, name varchar(32), ratio double, dt date)";
        try {
            con = DriverManager.getConnection(dbUrl, user, password);
        } catch (SQLException sqe) {
            throw new IOException("Unable to obtain a connection to the database",
                    sqe);
        }
        try {
            Statement st = con.createStatement();
            st.executeUpdate(sql);
            st.close();
            con.commit();
            con.close();
        } catch (SQLException sqe) {
            throw new IOException("Cannot create table", sqe);
        }
    }

    @Before
    public void setUp() throws IOException {
        createFile();
        createTable();
    }

    @After
    public void tearDown() throws IOException {
        new File(INPUT_FILE).delete();
        Util.deleteFile(cluster, INPUT_FILE);
        pigServer.shutdown();
        dbServer.stop();
        cluster.shutDown();

        File[] dbFiles = new File(TMP_DIR).listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                if (name.startsWith("batchtest")) {
                    return true;
                } else {
                    return false;
                }
            }
        });
        if (dbFiles != null) {
            for (File file : dbFiles) {
                file.delete();
            }
        }
    }

    @Test
    public void testWriteToDB() throws IOException, InterruptedException, ParseException {
        String insertQuery = "insert into ttt (id, name, ratio, dt) values (?,?,?,?)";
        pigServer.setBatchOn();
        String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver
                + "', '" + Util.encodeEscape(dbUrl) + "', '" + insertQuery + "');";
        pigServer.registerQuery("A = LOAD '" + INPUT_FILE
                + "' as (id:int, fruit:chararray, ratio:double, dt : datetime);");
        pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore);
      ExecJob job = pigServer.executeBatch().get(0);
        try {
            while(!job.hasCompleted()) Thread.sleep(1000);
        } catch(InterruptedException ie) {// ignore
        }

        assertNotSame("Failed: " + job.getException(), job.getStatus(),
                        ExecJob.JOB_STATUS.FAILED);

        Connection con = null;
        String selectQuery = "select id, name, ratio, dt from ttt order by name";
        try {
            con = DriverManager.getConnection(dbUrl, user, password);
        } catch (SQLException sqe) {
            throw new IOException(
                    "Unable to obtain database connection for data verification", sqe);
        }
        try {
            PreparedStatement ps = con.prepareStatement(selectQuery);
            ResultSet rs = ps.executeQuery();

            int expId = 100;
            String[] expNames = {"apple", "banana", "orange"};
            double[] expRatios = {1.0, 1.1, 2.0};
            DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
            Date[] expDates = {new Date(df.parse("2008-01-01").getTime()),
                new Date(df.parse("2008-03-01").getTime()),
                new Date(df.parse("2008-02-01").getTime())};
            for (int i = 0; i < 4; i++) {
                rs.next();
                //Need to check for nulls explicitly.
                if ( i == 0) {
                    //Id
                    rs.getInt(1);
                    assertTrue(rs.wasNull());
                    //Name
                    rs.getString(2);
                    assertTrue(rs.wasNull());
                    //Ratio
                    rs.getDouble(3);
                    assertTrue(rs.wasNull());
                    //Date
                    rs.getDate(4);
                    assertTrue(rs.wasNull());
                } else {
                    assertEquals("Id mismatch", expId, rs.getInt(1));
                    assertEquals("Name mismatch", expNames[i-1], rs.getString(2));
                    assertEquals("Ratio mismatch", expRatios[i-1], rs.getDouble(3), 0.0001);
                    assertEquals("Date mismatch", expDates[i-1], rs.getDate(4));
                }
            }

        } catch (SQLException sqe) {
            throw new IOException(
                    "Unable to read data from database for verification", sqe);
        }
    }
}
