blob: b8404c7e80151c52fbb66e61c2cc75c64051d13a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.api.JavaObjectSerializer;
import org.h2.engine.Database;
import org.h2.jdbc.JdbcConnection;
import org.h2.store.DataHandler;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.setter;
/**
* H2 connection manager.
*/
public class ConnectionManager {
/** Default DB options. */
private static final String DEFAULT_DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
";ROW_FACTORY=\"" + H2PlainRowFactory.class.getName() + "\"" +
";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
/** Default maximum size of connection pool. */
private static final int DFLT_CONNECTION_POOL_SIZE = 32;
/** */
private static final H2Utils.Setter<Database, JavaObjectSerializer> DB_JOBJ_SERIALIZER
= setter(Database.class, "javaObjectSerializer");
/*
* Initialize system properties for H2.
*/
static {
// Note! System properties actually are not set correctly due to h2 SysProperties is initialized earlier
// in IgniteH2Indexing.start(). Then actual values of this properties are:
// - h2.objectCache = true
// - h2.serializeJavaObject = false (explicitly setup in IgniteH2Indexing.start())
// - h2.objectCacheMaxPerElementSize = 4096
System.setProperty("h2.objectCache", "false");
System.setProperty("h2.serializeJavaObject", "false");
System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
// H2 DbSettings.
System.setProperty("h2.optimizeTwoEquals", "false"); // Makes splitter fail on subqueries in WHERE.
System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
}
/** The period of clean up the statement cache. */
@SuppressWarnings("FieldCanBeLocal")
private final Long stmtCleanupPeriod = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
/** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
private final Long stmtTimeout = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000)
* 1_000_000; // convert millis to nanos
/** Database URL. */
private final String dbUrl;
/** Statement cleanup task. */
private final GridTimeoutProcessor.CancelableTask stmtCleanupTask;
/** Logger. */
private final IgniteLogger log;
/** Used connections set. */
private final Set<H2Connection> usedConns = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** Connection pool. */
private final ConcurrentStripedPool<H2Connection> connPool;
/** H2 connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
private volatile Connection sysConn;
/** H2 data handler. Primarily used for serialization. */
private final DataHandler dataNhd;
/**
* Constructor.
*
* @param ctx Context.
*/
public ConnectionManager(GridKernalContext ctx) {
connPool = new ConcurrentStripedPool<>(ctx.config().getQueryThreadPoolSize(), DFLT_CONNECTION_POOL_SIZE);
dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DEFAULT_DB_OPTIONS;
log = ctx.log(ConnectionManager.class);
org.h2.Driver.load();
try {
sysConn = DriverManager.getConnection(dbUrl);
sysConn.setSchema(QueryUtils.SCHEMA_INFORMATION);
assert sysConn instanceof JdbcConnection : sysConn;
JdbcConnection conn = (JdbcConnection)sysConn;
dataNhd = conn.getSession().getDataHandler();
}
catch (SQLException e) {
throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
}
stmtCleanupTask = ctx.timeout().schedule(this::cleanupStatements, stmtCleanupPeriod, stmtCleanupPeriod);
}
/**
* Execute SQL statement on specific schema.
*
* @param schema Schema
* @param sql SQL statement.
* @throws IgniteCheckedException If failed.
*/
public void executeStatement(String schema, String sql) throws IgniteCheckedException {
try (H2PooledConnection conn = connection(schema)) {
Connection c = conn.connection();
try (Statement stmt = c.createStatement()) {
stmt.executeUpdate(sql);
}
}
catch (SQLException e) {
throw new IgniteCheckedException("Failed to execute statement: " + sql, e);
}
}
/**
* Execute statement on H2 INFORMATION_SCHEMA.
*
* @param sql SQL statement.
* @throws IgniteCheckedException On error.
*/
public void executeSystemStatement(String sql) throws IgniteCheckedException {
Statement stmt = null;
try {
stmt = sysConn.createStatement();
stmt.executeUpdate(sql);
}
catch (SQLException e) {
U.close(sysConn, log);
throw new IgniteCheckedException("Failed to execute system statement: " + sql, e);
}
finally {
U.close(stmt, log);
}
}
/**
* Clear statement cache when cache is unregistered..
*/
public void onCacheDestroyed() {
connPool.forEach(H2Connection::clearStatementCache);
}
/**
* Close all connections.
*/
private void closeConnections() {
connPool.forEach(c -> U.close(c.connection(), log));
connPool.clear();
usedConns.forEach(c -> U.close(c.connection(), log));
usedConns.clear();
}
/**
* Cancel all queries.
*/
public void onKernalStop() {
closeConnections();
}
/**
* Close executor.
*/
public void stop() {
if (stmtCleanupTask != null)
stmtCleanupTask.close();
// Needs to be released before SHUTDOWN.
closeConnections();
try (Statement s = sysConn.createStatement()) {
s.execute("SHUTDOWN");
}
catch (SQLException e) {
U.error(log, "Failed to shutdown database.", e);
}
U.close(sysConn, log);
}
/**
* Called periodically to clean up the statement cache.
*/
private void cleanupStatements() {
connPool.forEach(c -> {
if (c.statementCache().inactiveFor(stmtTimeout))
c.clearStatementCache();
});
}
/**
* @param schema Schema name.
* @return Connection with setup schema.
*/
public H2PooledConnection connection(String schema) {
H2PooledConnection conn = connection();
try {
conn.schema(schema);
return conn;
}
catch (IgniteSQLException e) {
U.closeQuiet(conn);
throw e;
}
}
/**
* Resize the connection pool.
*
* @param size New size the connection pool.
*/
void poolSize(int size) {
if (size <= 0)
throw new IllegalArgumentException("Invalid connection pool size: " + size);
connPool.resize(size);
}
/**
* @return H2 connection wrapper.
*/
public H2PooledConnection connection() {
try {
H2Connection conn = connPool.borrow();
if (conn == null)
conn = newConnection();
H2PooledConnection connWrp = new H2PooledConnection(conn, this);
usedConns.add(conn);
assert !conn.connection().isClosed() : "Connection is closed [conn=" + conn + ']';
return connWrp;
}
catch (SQLException e) {
throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
}
}
/**
* Create new connection wrapper.
*
* @return Connection wrapper.
*/
private H2Connection newConnection() {
try {
return new H2Connection(DriverManager.getConnection(dbUrl), log);
}
catch (SQLException e) {
throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
}
}
/**
* Return connection to pool or close if the pool size is bigger then maximum.
*
* @param conn Connection.
*/
void recycle(H2Connection conn) {
boolean rmv = usedConns.remove(conn);
assert rmv : "Connection isn't tracked [conn=" + conn + ']';
if (!connPool.recycle(conn))
conn.close();
}
/**
* @return Data handler.
*/
public DataHandler dataHandler() {
return dataNhd;
}
/**
* Sets internal H2 serializer.
*
* @param serializer Serializer.
*/
void setH2Serializer(JavaObjectSerializer serializer) {
if (dataNhd != null && dataNhd instanceof Database)
DB_JOBJ_SERIALIZER.set((Database)dataNhd, serializer);
}
/**
* @return H2 connection.
*/
public JdbcConnection jdbcConnection() {
return (JdbcConnection) sysConn;
}
}