/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.query.h2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.api.JavaObjectSerializer;
import org.h2.engine.Database;
import org.h2.jdbc.JdbcConnection;
import org.h2.store.DataHandler;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.setter;

/**
 * H2 connection manager.
 */
public class ConnectionManager {
    /** Default DB options. */
    private static final String DEFAULT_DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
        ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
        ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
        ";ROW_FACTORY=\"" + H2PlainRowFactory.class.getName() + "\"" +
        ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();

    /** Default maximum size of connection pool. */
    private static final int DFLT_CONNECTION_POOL_SIZE = 32;

    /** */
    private static final H2Utils.Setter<Database, JavaObjectSerializer> DB_JOBJ_SERIALIZER
        = setter(Database.class, "javaObjectSerializer");

    /*
     * Initialize system properties for H2.
     */
    static {
        // Note! System properties actually are not set correctly due to h2 SysProperties is initialized earlier
        // in IgniteH2Indexing.start(). Then actual values of this properties are:
        // - h2.objectCache = true
        // - h2.serializeJavaObject = false (explicitly setup in IgniteH2Indexing.start())
        // - h2.objectCacheMaxPerElementSize = 4096
        System.setProperty("h2.objectCache", "false");
        System.setProperty("h2.serializeJavaObject", "false");
        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
        // H2 DbSettings.
        System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
        System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
    }

    /** The period of clean up the statement cache. */
    @SuppressWarnings("FieldCanBeLocal")
    private final Long stmtCleanupPeriod = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);

    /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
    private final Long stmtTimeout = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000)
        * 1_000_000; // convert millis to nanos

    /** Database URL. */
    private final String dbUrl;

    /** Statement cleanup task. */
    private final GridTimeoutProcessor.CancelableTask stmtCleanupTask;

    /** Logger. */
    private final IgniteLogger log;

    /** Used connections set. */
    private final Set<H2Connection> usedConns = Collections.newSetFromMap(new ConcurrentHashMap<>());

    /** Connection pool. */
    private final ConcurrentStripedPool<H2Connection> connPool;

    /** H2 connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
    private volatile Connection sysConn;

    /** H2 data handler. Primarily used for serialization. */
    private final DataHandler dataNhd;

    /**
     * Constructor.
     *
     * @param ctx Context.
     */
    public ConnectionManager(GridKernalContext ctx) {
        connPool = new ConcurrentStripedPool<>(ctx.config().getQueryThreadPoolSize(), DFLT_CONNECTION_POOL_SIZE);

        dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DEFAULT_DB_OPTIONS;

        log = ctx.log(ConnectionManager.class);

        org.h2.Driver.load();

        try {
            sysConn = DriverManager.getConnection(dbUrl);

            sysConn.setSchema(QueryUtils.SCHEMA_INFORMATION);

            assert sysConn instanceof JdbcConnection : sysConn;

            JdbcConnection conn = (JdbcConnection)sysConn;

            dataNhd = conn.getSession().getDataHandler();
        }
        catch (SQLException e) {
            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
        }

        stmtCleanupTask = ctx.timeout().schedule(this::cleanupStatements, stmtCleanupPeriod, stmtCleanupPeriod);
    }

    /**
     * Execute SQL statement on specific schema.
     *
     * @param schema Schema
     * @param sql SQL statement.
     * @throws IgniteCheckedException If failed.
     */
    public void executeStatement(String schema, String sql) throws IgniteCheckedException {
        try (H2PooledConnection conn = connection(schema)) {
            Connection c = conn.connection();

            try (Statement stmt = c.createStatement()) {
                stmt.executeUpdate(sql);
            }
        }
        catch (SQLException e) {
            throw new IgniteCheckedException("Failed to execute statement: " + sql, e);
        }
    }

    /**
     * Execute statement on H2 INFORMATION_SCHEMA.
     *
     * @param sql SQL statement.
     * @throws IgniteCheckedException On error.
     */
    public void executeSystemStatement(String sql) throws IgniteCheckedException {
        Statement stmt = null;

        try {
            stmt = sysConn.createStatement();

            stmt.executeUpdate(sql);
        }
        catch (SQLException e) {
            U.close(sysConn, log);

            throw new IgniteCheckedException("Failed to execute system statement: " + sql, e);
        }
        finally {
            U.close(stmt, log);
        }
    }

    /**
     * Clear statement cache when cache is unregistered..
     */
    public void onCacheDestroyed() {
        connPool.forEach(H2Connection::clearStatementCache);
    }

    /**
     * Close all connections.
     */
    private void closeConnections() {
        connPool.forEach(c -> U.close(c.connection(), log));
        connPool.clear();

        usedConns.forEach(c -> U.close(c.connection(), log));
        usedConns.clear();
    }

    /**
     * Cancel all queries.
     */
    public void onKernalStop() {
        closeConnections();
    }

    /**
     * Close executor.
     */
    public void stop() {
        if (stmtCleanupTask != null)
            stmtCleanupTask.close();

        // Needs to be released before SHUTDOWN.
        closeConnections();

        try (Statement s = sysConn.createStatement()) {
            s.execute("SHUTDOWN");
        }
        catch (SQLException e) {
            U.error(log, "Failed to shutdown database.", e);
        }

        U.close(sysConn, log);
    }

    /**
     * Called periodically to clean up the statement cache.
     */
    private void cleanupStatements() {
        connPool.forEach(c -> {
            if (c.statementCache().inactiveFor(stmtTimeout))
                c.clearStatementCache();
        });
    }

    /**
     * @param schema Schema name.
     * @return Connection with setup schema.
     */
    public H2PooledConnection connection(String schema) {
        H2PooledConnection conn = connection();

        try {
            conn.schema(schema);

            return conn;
        }
        catch (IgniteSQLException e) {
            U.closeQuiet(conn);

            throw e;
        }
    }

    /**
     * Resize the connection pool.
     *
     * @param size New size the connection pool.
     */
    void poolSize(int size) {
        if (size <= 0)
            throw new IllegalArgumentException("Invalid connection pool size: " + size);

        connPool.resize(size);
    }

    /**
     * @return H2 connection wrapper.
     */
    public H2PooledConnection connection() {
        try {
            H2Connection conn = connPool.borrow();

            if (conn == null)
                conn = newConnection();

            H2PooledConnection connWrp = new H2PooledConnection(conn, this);

            usedConns.add(conn);

            assert !conn.connection().isClosed() : "Connection is closed [conn=" + conn + ']';

            return connWrp;
        }
        catch (SQLException e) {
            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
        }
    }

    /**
     * Create new connection wrapper.
     *
     * @return Connection wrapper.
     */
    private H2Connection newConnection() {
        try {
            return new H2Connection(DriverManager.getConnection(dbUrl), log);
        }
        catch (SQLException e) {
            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
        }
    }

    /**
     * Return connection to pool or close if the pool size is bigger then maximum.
     *
     * @param conn Connection.
     */
    void recycle(H2Connection conn) {
        boolean rmv = usedConns.remove(conn);

        assert rmv : "Connection isn't tracked [conn=" + conn + ']';

        if (!connPool.recycle(conn))
            conn.close();
    }

    /**
     * @return Data handler.
     */
    public DataHandler dataHandler() {
        return dataNhd;
    }

    /**
     * Sets internal H2 serializer.
     *
     * @param serializer Serializer.
     */
    void setH2Serializer(JavaObjectSerializer serializer) {
        if (dataNhd != null && dataNhd instanceof Database)
            DB_JOBJ_SERIALIZER.set((Database)dataNhd, serializer);
    }

    /**
     * @return H2 connection.
     */
    public JdbcConnection jdbcConnection() {
        return (JdbcConnection) sysConn;
    }
}
