/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.cache.store.jdbc;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Random;
import javax.cache.integration.CacheLoaderException;
import org.apache.commons.dbcp.DelegatingConnection;
import org.apache.commons.dbcp.DelegatingPreparedStatement;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
import org.apache.ignite.cache.store.jdbc.model.Gender;
import org.apache.ignite.cache.store.jdbc.model.Person;
import org.apache.ignite.cache.store.jdbc.model.PersonKey;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;

/**
 * Class for {@link CacheJdbcPojoStore} tests.
 */
public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstractTest {
    /** DB connection URL. */
    private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1";

    /** Organization count. */
    private static final int ORGANIZATION_CNT = 1000;

    /** Person count. */
    private static final int PERSON_CNT = 100000;

    /** Fetch size. */
    public static final int FETCH_SZ = 42;

    /** Test cache name. */
    private static final String CACHE_NAME = "test-cache";

    /** Flag indicating that tests should use transactional cache. */
    private static boolean transactional;

    /** Flag indicating that tests should use primitive classes like java.lang.Integer for keys. */
    protected static boolean builtinKeys;

    /** Flag indicating that classes for keys available on class path or not. */
    private static boolean noKeyClasses;

    /** Flag indicating that classes for values available on class path or not. */
    private static boolean noValClasses;

    /** Flag indicating that fetch size should be checked. */
    private static boolean checkFetchSize;

    /** Batch size to load in parallel. */
    private static int parallelLoadThreshold;

    /**
     * @return Flag indicating that all internal SQL queries should use escaped identifiers.
     */
    protected boolean sqlEscapeAll(){
        return false;
    }

    /**
     * @return Connection to test in-memory H2 database.
     * @throws SQLException if failed to connect.
     */
    protected Connection getConnection() throws SQLException {
        return new DelegatingConnection(DriverManager.getConnection(DFLT_CONN_URL, "sa", "")) {
            /** {@inheritDoc} */
            @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
                return new DelegatingPreparedStatement(this, super.prepareStatement(sql)) {
                    /** {@inheritDoc} */
                    @Override public ResultSet executeQuery() throws SQLException {
                        if (checkFetchSize)
                            assertEquals(FETCH_SZ, getFetchSize());

                        return super.executeQuery();
                    }
                };
            }
        };
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);

        super.beforeTestsStarted();
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        Connection conn = getConnection();

        Statement stmt = conn.createStatement();

        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
        stmt.executeUpdate("DROP TABLE IF EXISTS Person");

        stmt.executeUpdate("CREATE TABLE Organization (" +
            " id INTEGER PRIMARY KEY," +
            " name VARCHAR(50)," +
            " city VARCHAR(50))");

        stmt.executeUpdate("CREATE TABLE Person (" +
            " id INTEGER PRIMARY KEY," +
            " org_id INTEGER," +
            " birthday DATE," +
            " name VARCHAR(50)," +
            " gender VARCHAR(50))");

        conn.commit();

        U.closeQuiet(stmt);

        fillSampleDatabase(conn);

        U.closeQuiet(conn);
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);

        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setCacheConfiguration(cacheConfiguration());

        cfg.setMarshaller(marshaller());

        ConnectorConfiguration connCfg = new ConnectorConfiguration();
        cfg.setConnectorConfiguration(connCfg);

        return cfg;
    }

    /**
     * @return Marshaller to be used in test.
     */
    protected abstract Marshaller marshaller();

    /**
     * @return Types to be used in test.
     */
    protected JdbcType[] storeTypes() {
        JdbcType[] storeTypes = new JdbcType[2];

        storeTypes[0] = new JdbcType();
        storeTypes[0].setCacheName(CACHE_NAME);
        storeTypes[0].setDatabaseSchema("PUBLIC");
        storeTypes[0].setDatabaseTable("ORGANIZATION");

        if (builtinKeys) {
            storeTypes[0].setKeyType("java.lang.Long");
            storeTypes[0].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"));
        }
        else {
            storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey" + (noKeyClasses ? "1" : ""));
            storeTypes[0].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"));
        }

        storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization" + (noValClasses ? "1" : ""));

        boolean escape = sqlEscapeAll();

        storeTypes[0].setValueFields(
            new JdbcTypeField(Types.INTEGER, escape ? "ID" : "Id", Integer.class, "id"),
            new JdbcTypeField(Types.VARCHAR, escape ? "NAME" : "Name", String.class, "name"),
            new JdbcTypeField(Types.VARCHAR, escape ? "CITY" : "City", String.class, "city"));

        storeTypes[1] = new JdbcType();
        storeTypes[1].setCacheName(CACHE_NAME);
        storeTypes[1].setDatabaseSchema("PUBLIC");
        storeTypes[1].setDatabaseTable("PERSON");

        if (builtinKeys) {
            storeTypes[1].setKeyType("java.lang.Integer");
            storeTypes[1].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Long.class, "id"));
        }
        else {
            storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey" + (noKeyClasses ? "1" : ""));
            storeTypes[1].setKeyFields(new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"));
        }

        storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person" + (noValClasses ? "1" : ""));
        storeTypes[1].setValueFields(
            new JdbcTypeField(Types.INTEGER, "ID", Integer.class, "id"),
            new JdbcTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"),
            new JdbcTypeField(Types.DATE, "BIRTHDAY", Date.class, "birthday"),
            new JdbcTypeField(Types.VARCHAR, "NAME", String.class, "name"),
            new JdbcTypeField(Types.VARCHAR, "GENDER", Gender.class, "gender"));

        return storeTypes;
    }

    /**
     * @return Cache configuration for test.
     * @throws Exception In case when failed to create cache configuration.
     */
    protected CacheConfiguration cacheConfiguration() throws Exception {
        CacheConfiguration cc = defaultCacheConfiguration();

        cc.setName(CACHE_NAME);
        cc.setCacheMode(PARTITIONED);
        cc.setAtomicityMode(transactional ? TRANSACTIONAL : ATOMIC);
        cc.setWriteBehindEnabled(false);
        cc.setStoreKeepBinary(storeKeepBinary());

        CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();

        H2Dialect dialect = new H2Dialect();

        dialect.setFetchSize(FETCH_SZ);

        storeFactory.setDialect(dialect);
        storeFactory.setTypes(storeTypes());
        storeFactory.setDataSourceFactory(new H2DataSourceFactory()); // H2 DataSource factory.
        storeFactory.setSqlEscapeAll(sqlEscapeAll());
        storeFactory.setParallelLoadCacheMinimumThreshold(parallelLoadThreshold);

        cc.setCacheStoreFactory(storeFactory);
        cc.setReadThrough(true);
        cc.setWriteThrough(true);
        cc.setLoadPreviousValue(true);

        return cc;
    }

    /**
     * @return Flag indicate keep value in binary format or not.
     */
    protected boolean storeKeepBinary(){
        return false;
    }

    /**
     * Fill in-memory database with sample data.
     *
     * @param conn Connection to database.
     * @throws SQLException In case of filling database with sample data failed.
     */
    protected void fillSampleDatabase(Connection conn) throws SQLException {
        info("Start to fill sample database...");

        PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)");

        for (int i = 0; i < ORGANIZATION_CNT; i++) {
            orgStmt.setInt(1, i);
            orgStmt.setString(2, "name" + i);
            orgStmt.setString(3, "city" + i % 10);

            orgStmt.addBatch();
        }

        orgStmt.executeBatch();

        U.closeQuiet(orgStmt);

        conn.commit();

        PreparedStatement prnStmt = conn.prepareStatement(
            "INSERT INTO Person(id, org_id, birthday, name, gender) VALUES (?, ?, ?, ?, ?)");

        Random rnd = new Random();

        for (int i = 0; i < PERSON_CNT; i++) {
            prnStmt.setInt(1, i);
            prnStmt.setInt(2, i % 100);
            prnStmt.setDate(3, Date.valueOf(String.format("%d-%d-%d", 1970 + rnd.nextInt(50), 1 + rnd.nextInt(11), 1 + rnd.nextInt(27))));
            prnStmt.setString(4, "name" + i);
            prnStmt.setString(5, Gender.random().toString());

            prnStmt.addBatch();
        }

        prnStmt.executeBatch();

        conn.commit();

        U.closeQuiet(prnStmt);

        info("Sample database prepared.");
    }

    /**
     * Start test grid with specified options.
     *
     * @param builtin {@code True} if keys are built in java types.
     * @param noKeyCls {@code True} if keys classes are not on class path.
     * @param noValCls {@code True} if values classes are not on class path.
     * @param trn {@code True} if cache should be started in transactional mode.
     * @param threshold Load batch size.
     * @throws Exception If failed to start grid.
     */
    protected void startTestGrid(boolean builtin, boolean noKeyCls, boolean noValCls, boolean trn, int threshold) throws Exception {
        builtinKeys = builtin;
        noKeyClasses = noKeyCls;
        noValClasses = noValCls;
        transactional = trn;
        parallelLoadThreshold = threshold;

        startGrid();
    }

    /**
     * Check that data was loaded correctly.
     */
    protected void checkCacheLoad() {
        IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);

        checkFetchSize = true;

        c1.loadCache(null);

        checkFetchSize = false;

        assertEquals(ORGANIZATION_CNT + PERSON_CNT, c1.size());
    }

    /**
     * Check that data was loaded correctly.
     */
    protected void checkCacheLoadWithSql() {
        IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);

        checkFetchSize = true;

        c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", "select id, org_id, name, birthday, gender from Person");

        checkFetchSize = false;

        assertEquals(PERSON_CNT, c1.size());
    }

    /**
     * Checks that data was loaded correctly with prepared statement.
     */
    protected void checkCacheLoadWithStatement() throws SQLException {
        Connection conn = null;

        PreparedStatement stmt = null;

        try {
            conn = getConnection();

            conn.setAutoCommit(true);

            String qry = "select id, org_id, name, birthday, gender from Person";

            stmt = conn.prepareStatement(qry);

            IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);

            c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt);

            assertEquals(PERSON_CNT, c1.size());
        }
        finally {
            U.closeQuiet(stmt);

            U.closeQuiet(conn);
        }

    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheWithStatement() throws Exception {
        startTestGrid(false, false, false, false, 512);

        checkCacheLoadWithStatement();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheWithStatementTx() throws Exception {
        startTestGrid(false, false, false, true, 512);

        checkCacheLoadWithStatement();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCache() throws Exception {
        startTestGrid(false, false, false, false, 512);

        checkCacheLoad();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheAll() throws Exception {
        startTestGrid(false, false, false, false, ORGANIZATION_CNT + PERSON_CNT + 1);

        checkCacheLoad();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheWithSql() throws Exception {
        startTestGrid(false, false, false, false, 512);

        checkCacheLoadWithSql();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheTx() throws Exception {
        startTestGrid(false, false, false, true, 512);

        checkCacheLoad();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheWithSqlTx() throws Exception {
        startTestGrid(false, false, false, true, 512);

        checkCacheLoadWithSql();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCachePrimitiveKeys() throws Exception {
        startTestGrid(true, false, false, false, 512);

        checkCacheLoad();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCachePrimitiveKeysTx() throws Exception {
        startTestGrid(true, false, false, true, 512);

        checkCacheLoad();
    }

    /**
     * Check put in cache and store it in db.
     *
     * @throws Exception If failed.
     */
    private void checkPutRemove() throws Exception {
        boolean binaryMarshaller = marshaller() instanceof BinaryMarshaller || marshaller() == null;

        IgniteCache<Object, Person> c1 = grid().cache(CACHE_NAME);

        Connection conn = getConnection();
        try {
            PreparedStatement stmt = conn.prepareStatement("SELECT ID, ORG_ID, BIRTHDAY, NAME, GENDER FROM PERSON WHERE ID = ?");

            stmt.setInt(1, -1);

            ResultSet rs = stmt.executeQuery();

            assertFalse("Unexpected non empty result set", rs.next());

            U.closeQuiet(rs);

            Date testDate = Date.valueOf("2001-05-05");
            Gender testGender = Gender.random();

            Person val = new Person(-1, -2, testDate, "Person-to-test-put-insert", 999, testGender);

            Object key = builtinKeys ? Integer.valueOf(-1) : new PersonKey(-1);

            // Test put-insert.
            c1.put(key, val);

            rs = stmt.executeQuery();

            assertTrue("Unexpected empty result set", rs.next());

            assertEquals(-1, rs.getInt(1));
            assertEquals(-2, rs.getInt(2));
            assertEquals(testDate, rs.getDate(3));
            assertEquals("Person-to-test-put-insert", rs.getString(4));

            assertEquals(testGender.toString(),
                binaryMarshaller ? Gender.values()[rs.getInt(5)].toString(): rs.getString(5));

            assertFalse("Unexpected more data in result set", rs.next());

            U.closeQuiet(rs);

            // Test put-update.
            testDate = Date.valueOf("2016-04-04");

            c1.put(key, new Person(-1, -3, testDate, "Person-to-test-put-update", 999, testGender));

            rs = stmt.executeQuery();

            assertTrue("Unexpected empty result set", rs.next());

            assertEquals(-1, rs.getInt(1));
            assertEquals(-3, rs.getInt(2));
            assertEquals(testDate, rs.getDate(3));
            assertEquals("Person-to-test-put-update", rs.getString(4));

            assertEquals(testGender.toString(),
                binaryMarshaller ? Gender.values()[rs.getInt(5)].toString(): rs.getString(5));

            assertFalse("Unexpected more data in result set", rs.next());

            // Test remove.
            c1.remove(key);

            rs = stmt.executeQuery();

            assertFalse("Unexpected non-empty result set", rs.next());

            U.closeQuiet(rs);
        }
        finally {
            U.closeQuiet(conn);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutRemoveBuiltIn() throws Exception {
        startTestGrid(true, false, false, false, 512);

        checkPutRemove();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutRemove() throws Exception {
        startTestGrid(false, false, false, false, 512);

        checkPutRemove();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutRemoveTxBuiltIn() throws Exception {
        startTestGrid(true, false, false, true, 512);

        checkPutRemove();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutRemoveTx() throws Exception {
        startTestGrid(false, false, false, true, 512);

        checkPutRemove();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadNotRegisteredType() throws Exception {
        startTestGrid(false, false, false, false, 512);

        IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);

        try {
            checkFetchSize = true;

            c1.loadCache(null, "PersonKeyWrong", "SELECT * FROM Person");
        }
        catch (CacheLoaderException e) {
            String msg = e.getMessage();

            assertTrue("Unexpected exception: " + msg,
                ("Provided key type is not found in store or cache configuration " +
                    "[cache=" + CACHE_NAME + ", key=PersonKeyWrong]").equals(msg));
        } finally {
            checkFetchSize = false;
        }
    }
}
