blob: 00fa8e4c0d5037468d94e28313a5e41d82058db2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.db;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnection;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import com.cloud.utils.Pair;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
/**
* Transaction abstracts away the Connection object in JDBC. It allows the
* following things that the Connection object does not.
*
* 1. Transaction can be started at an entry point and whether the DB
* actions should be auto-commit or not determined at that point.
* 2. DB Connection is allocated only when it is needed.
* 3. Code does not need to know if a transaction has been started or not.
* It just starts/ends a transaction and we resolve it correctly with
* the previous actions.
*
* Note that this class is not synchronous but it doesn't need to be because
* it is stored with TLS and is one per thread. Use appropriately.
*/
public class TransactionLegacy implements Closeable {
protected static Logger LOGGER = LogManager.getLogger(Transaction.class.getName() + "." + "Transaction");
protected Logger stmtLogger = LogManager.getLogger(Transaction.class.getName() + "." + "Statement");
protected Logger lockLogger = LogManager.getLogger(Transaction.class.getName() + "." + "Lock");
protected static Logger CONN_LOGGER = LogManager.getLogger(Transaction.class.getName() + "." + "Connection");
private static final ThreadLocal<TransactionLegacy> tls = new ThreadLocal<TransactionLegacy>();
private static final String START_TXN = "start_txn";
private static final String CURRENT_TXN = "current_txn";
private static final String CREATE_TXN = "create_txn";
private static final String CREATE_CONN = "create_conn";
private static final String STATEMENT = "statement";
private static final String ATTACHMENT = "attachment";
public static final short CLOUD_DB = 0;
public static final short USAGE_DB = 1;
public static final short SIMULATOR_DB = 3;
public static final short CONNECTED_DB = -1;
public static final String CONNECTION_PARAMS = "scrollTolerantForwardOnly=true";
private static AtomicLong s_id = new AtomicLong();
private static final TransactionMBeanImpl s_mbean = new TransactionMBeanImpl();
static {
try {
JmxUtil.registerMBean("Transaction", "Transaction", s_mbean);
} catch (Exception e) {
LOGGER.error("Unable to register mbean for transaction", e);
}
}
private final LinkedList<StackElement> _stack;
private long _id;
private final LinkedList<Pair<String, Long>> _lockTimes = new LinkedList<Pair<String, Long>>();
private String _name;
private Connection _conn;
private boolean _txn;
private short _dbId;
private long _txnTime;
private Statement _stmt;
private String _creator;
public static TransactionLegacy currentTxn() {
return currentTxn(true);
}
protected static TransactionLegacy currentTxn(boolean check) {
TransactionLegacy txn = tls.get();
if (check) {
assert txn != null : "No Transaction on stack. Did you mark the method with @DB?";
}
return txn;
}
public static TransactionLegacy open(final short databaseId) {
String name = buildName();
if (name == null) {
name = CURRENT_TXN;
}
return open(name, databaseId, true);
}
//
// Usage of this transaction setup should be limited, it will always open a new transaction context regardless of whether or not there is other
// transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize
// the existing DAO features
//
public void transitToUserManagedConnection(Connection conn) {
if (_conn != null)
throw new IllegalStateException("Can't change to a user managed connection unless the db connection is null");
_conn = conn;
_dbId = CONNECTED_DB;
}
public void transitToAutoManagedConnection(short dbId) {
// assert(_stack.size() <= 1) : "Can't change to auto managed connection unless your stack is empty";
_dbId = dbId;
_conn = null;
}
public static TransactionLegacy open(final String name) {
return open(name, TransactionLegacy.CLOUD_DB, false);
}
public static TransactionLegacy open(final String name, final short databaseId, final boolean forceDbChange) {
TransactionLegacy txn = tls.get();
if (txn == null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Creating the transaction: " + name);
}
txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
s_mbean.addTransaction(txn);
} else if (forceDbChange) {
final short currentDbId = txn.getDatabaseId();
if (currentDbId != databaseId) {
// we need to end the current transaction and switch databases
if (txn.close(txn.getName()) && txn.getCurrentConnection() == null) {
s_mbean.removeTransaction(txn);
}
txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
s_mbean.addTransaction(txn);
}
}
txn.checkConnection();
txn.takeOver(name, false);
return txn;
}
public void checkConnection() {
try {
if (_conn != null && !_conn.isValid(3)) {
_conn = null;
}
} catch (SQLException e) {
_conn = null;
}
}
protected StackElement peekInStack(Object obj) {
final Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement next = it.next();
if (next.type == obj) {
return next;
}
}
return null;
}
public void registerLock(String sql) {
if (_txn && lockLogger.isDebugEnabled()) {
Pair<String, Long> time = new Pair<String, Long>(sql, System.currentTimeMillis());
_lockTimes.add(time);
}
}
public boolean dbTxnStarted() {
return _txn;
}
public static Connection getStandaloneConnectionWithException() throws SQLException {
Connection conn = s_ds.getConnection();
if (CONN_LOGGER.isTraceEnabled()) {
CONN_LOGGER.trace("Retrieving a standalone connection: dbconn" + System.identityHashCode(conn));
}
return conn;
}
public static Connection getStandaloneConnection() {
try {
return getStandaloneConnectionWithException();
} catch (SQLException e) {
LOGGER.error("Unexpected exception: ", e);
return null;
}
}
public static Connection getStandaloneUsageConnection() {
try {
Connection conn = s_usageDS.getConnection();
if (CONN_LOGGER.isTraceEnabled()) {
CONN_LOGGER.trace("Retrieving a standalone connection for usage: dbconn" + System.identityHashCode(conn));
}
return conn;
} catch (SQLException e) {
LOGGER.warn("Unexpected exception: ", e);
return null;
}
}
public static Connection getStandaloneSimulatorConnection() {
try {
Connection conn = s_simulatorDS.getConnection();
if (CONN_LOGGER.isTraceEnabled()) {
CONN_LOGGER.trace("Retrieving a standalone connection for simulator: dbconn" + System.identityHashCode(conn));
}
return conn;
} catch (SQLException e) {
LOGGER.warn("Unexpected exception: ", e);
return null;
}
}
protected void attach(TransactionAttachment value) {
_stack.push(new StackElement(ATTACHMENT, value));
}
protected TransactionAttachment detach(String name) {
Iterator<StackElement> it = _stack.descendingIterator();
while (it.hasNext()) {
StackElement element = it.next();
if (element.type == ATTACHMENT) {
TransactionAttachment att = (TransactionAttachment)element.ref;
if (name.equals(att.getName())) {
it.remove();
return att;
}
}
}
assert false : "Are you sure you attached this: " + name;
return null;
}
public static void attachToTxn(TransactionAttachment value) {
TransactionLegacy txn = tls.get();
assert txn != null && txn.peekInStack(CURRENT_TXN) != null : "Come on....how can we attach something to the transaction if you haven't started it?";
txn.attach(value);
}
public static TransactionAttachment detachFromTxn(String name) {
TransactionLegacy txn = tls.get();
assert txn != null : "No Transaction in TLS";
return txn.detach(name);
}
protected static boolean checkAnnotation(int stack, TransactionLegacy txn) {
final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
StackElement se = txn.peekInStack(CURRENT_TXN);
if (se == null) {
return false;
}
StringBuffer sb = new StringBuffer();
for (; stack < stacks.length; stack++) {
String methodName = stacks[stack].getMethodName();
sb.append(" ").append(methodName);
if (methodName.equals(se.ref)) {
return true;
}
}
// relax stack structure for several places that @DB required injection is not in place
LOGGER.warn("Non-standard stack context that Transaction context is manaully placed into the calling chain. Stack chain: " + sb);
return true;
}
protected static String buildName() {
if (LOGGER.isDebugEnabled()) {
final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
final StringBuilder str = new StringBuilder();
int i = 3, j = 3;
while (j < 15 && i < stacks.length) {
StackTraceElement element = stacks[i];
String filename = element.getFileName();
String method = element.getMethodName();
if ((filename != null && filename.equals("<generated>")) || (method != null && method.equals("invokeSuper"))) {
i++;
continue;
}
str.append("-")
.append(stacks[i].getClassName().substring(stacks[i].getClassName().lastIndexOf(".") + 1))
.append(".")
.append(stacks[i].getMethodName())
.append(":")
.append(stacks[i].getLineNumber());
j++;
i++;
}
return str.toString();
}
return "";
}
private TransactionLegacy(final String name, final boolean forLocking, final short databaseId) {
_name = name;
_conn = null;
_stack = new LinkedList<StackElement>();
_txn = false;
_dbId = databaseId;
_id = s_id.incrementAndGet();
_creator = Thread.currentThread().getName();
}
public String getCreator() {
return _creator;
}
public long getId() {
return _id;
}
public String getName() {
return _name;
}
public Short getDatabaseId() {
return _dbId;
}
@Override
public String toString() {
final StringBuilder str = new StringBuilder((_name != null ? _name : ""));
str.append(" : ");
for (final StackElement se : _stack) {
if (se.type == CURRENT_TXN) {
str.append(se.ref).append(", ");
}
}
return str.toString();
}
protected void mark(final String name) {
_stack.push(new StackElement(CURRENT_TXN, name));
}
public boolean lock(final String name, final int timeoutSeconds) {
Merovingian2 lockController = Merovingian2.getLockController();
if (lockController == null) {
throw new CloudRuntimeException("There's no support for locking yet");
}
return lockController.acquire(name, timeoutSeconds);
}
public boolean release(final String name) {
Merovingian2 lockController = Merovingian2.getLockController();
if (lockController == null) {
throw new CloudRuntimeException("There's no support for locking yet");
}
return lockController.release(name);
}
/**
* @deprecated Use {@link Transaction} for new code
*/
@Deprecated
public void start() {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("txn: start requested by: " + buildName());
}
_stack.push(new StackElement(START_TXN, null));
if (_txn) {
LOGGER.trace("txn: has already been started.");
return;
}
_txn = true;
_txnTime = System.currentTimeMillis();
if (_conn != null) {
try {
LOGGER.trace("txn: set auto commit to false");
_conn.setAutoCommit(false);
} catch (final SQLException e) {
LOGGER.warn("Unable to set auto commit: ", e);
throw new CloudRuntimeException("Unable to set auto commit: ", e);
}
}
}
protected void closePreviousStatement() {
if (_stmt != null) {
try {
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Closing: " + _stmt.toString());
}
try {
ResultSet rs = _stmt.getResultSet();
if (rs != null && _stmt.getResultSetHoldability() != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
rs.close();
}
} catch (SQLException e) {
stmtLogger.trace("Unable to close resultset");
}
_stmt.close();
} catch (final SQLException e) {
stmtLogger.trace("Unable to close statement: " + _stmt.toString());
} finally {
_stmt = null;
}
}
}
/**
* Prepares an auto close statement. The statement is closed automatically if it is
* retrieved with this method.
*
* @param sql sql String
* @return PreparedStatement
* @throws SQLException if problem with JDBC layer.
*
* @see java.sql.Connection
*/
public PreparedStatement prepareAutoCloseStatement(final String sql) throws SQLException {
PreparedStatement stmt = prepareStatement(sql);
closePreviousStatement();
_stmt = stmt;
return stmt;
}
public PreparedStatement prepareStatement(final String sql) throws SQLException {
final Connection conn = getConnection();
final PreparedStatement pstmt = conn.prepareStatement(sql);
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Preparing: " + sql);
}
return pstmt;
}
/**
* Prepares an auto close statement. The statement is closed automatically if it is
* retrieved with this method.
*
* @param sql sql String
* @param autoGeneratedKeys keys that are generated
* @return PreparedStatement
* @throws SQLException if problem with JDBC layer.
*
* @see java.sql.Connection
*/
public PreparedStatement prepareAutoCloseStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
final Connection conn = getConnection();
final PreparedStatement pstmt = conn.prepareStatement(sql, autoGeneratedKeys);
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Preparing: " + sql);
}
closePreviousStatement();
_stmt = pstmt;
return pstmt;
}
/**
* Prepares an auto close statement. The statement is closed automatically if it is
* retrieved with this method.
*
* @param sql sql String
* @param columnNames names of the columns
* @return PreparedStatement
* @throws SQLException if problem with JDBC layer.
*
* @see java.sql.Connection
*/
public PreparedStatement prepareAutoCloseStatement(final String sql, final String[] columnNames) throws SQLException {
final Connection conn = getConnection();
final PreparedStatement pstmt = conn.prepareStatement(sql, columnNames);
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Preparing: " + sql);
}
closePreviousStatement();
_stmt = pstmt;
return pstmt;
}
/**
* Prepares an auto close statement. The statement is closed automatically if it is
* retrieved with this method.
*
* @param sql sql String
* @return PreparedStatement
* @throws SQLException if problem with JDBC layer.
*
* @see java.sql.Connection
*/
public PreparedStatement prepareAutoCloseStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
final Connection conn = getConnection();
final PreparedStatement pstmt = conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Preparing: " + sql);
}
closePreviousStatement();
_stmt = pstmt;
return pstmt;
}
/**
* Returns the db connection.
*
* Note: that you can call getConnection() but beaware that
* all prepare statements from the Connection are not garbage
* collected!
*
* @return DB Connection but make sure you understand that
* you are responsible for closing the PreparedStatement.
* @throws SQLException
*/
public Connection getConnection() throws SQLException {
if (_conn == null) {
switch (_dbId) {
case CLOUD_DB:
if (s_ds != null) {
_conn = s_ds.getConnection();
} else {
LOGGER.warn("A static-initialized variable becomes null, process is dying?");
throw new CloudRuntimeException("Database is not initialized, process is dying?");
}
break;
case USAGE_DB:
if (s_usageDS != null) {
_conn = s_usageDS.getConnection();
} else {
LOGGER.warn("A static-initialized variable becomes null, process is dying?");
throw new CloudRuntimeException("Database is not initialized, process is dying?");
}
break;
case SIMULATOR_DB:
if (s_simulatorDS != null) {
_conn = s_simulatorDS.getConnection();
} else {
LOGGER.warn("A static-initialized variable becomes null, process is dying?");
throw new CloudRuntimeException("Database is not initialized, process is dying?");
}
break;
default:
throw new CloudRuntimeException("No database selected for the transaction");
}
_conn.setAutoCommit(!_txn);
//
// MySQL default transaction isolation level is REPEATABLE READ,
// to reduce chances of DB deadlock, we will use READ COMMITED isolation level instead
// see http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
//
_stack.push(new StackElement(CREATE_CONN, null));
if (CONN_LOGGER.isTraceEnabled()) {
CONN_LOGGER.trace("Creating a DB connection with " + (_txn ? " txn: " : " no txn: ") + " for " + _dbId + ": dbconn" + System.identityHashCode(_conn) +
". Stack: " + buildName());
}
} else {
LOGGER.trace("conn: Using existing DB connection");
}
return _conn;
}
protected boolean takeOver(final String name, final boolean create) {
if (_stack.size() != 0) {
if (!create) {
// If it is not a create transaction, then let's just use the current one.
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Using current transaction: " + toString());
}
mark(name);
return false;
}
final StackElement se = _stack.getFirst();
if (se.type == CREATE_TXN) {
// This create is called inside of another create. Which is ok?
// We will let that create be responsible for cleaning up.
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Create using current transaction: " + toString());
}
mark(name);
return false;
}
LOGGER.warn("Encountered a transaction that has leaked. Cleaning up. " + toString());
cleanup();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Took over the transaction: " + name);
}
_stack.push(new StackElement(create ? CREATE_TXN : CURRENT_TXN, name));
_name = name;
return true;
}
public void cleanup() {
closePreviousStatement();
removeUpTo(null, null);
if (_txn) {
rollbackTransaction();
}
_txn = false;
_name = null;
closeConnection();
_stack.clear();
Merovingian2 lockController = Merovingian2.getLockController();
if (lockController != null) {
lockController.cleanupThread();
}
}
@Override
public void close() {
removeUpTo(CURRENT_TXN, null);
if (_stack.size() == 0) {
LOGGER.trace("Transaction is done");
cleanup();
}
}
/**
* close() is used by endTxn to close the connection. This method only
* closes the connection if the name is the same as what's stored.
*
* @param name
* @return true if this close actually closes the connection. false if not.
*/
public boolean close(final String name) {
if (_name == null) { // Already cleaned up.
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Already cleaned up." + buildName());
}
return true;
}
if (!_name.equals(name)) {
close();
return false;
}
if (LOGGER.isDebugEnabled() && _stack.size() > 2) {
LOGGER.debug("Transaction is not closed properly: " + toString() + ". Called by " + buildName());
}
cleanup();
LOGGER.trace("All done");
return true;
}
protected boolean hasTxnInStack() {
return peekInStack(START_TXN) != null;
}
protected void clearLockTimes() {
if (lockLogger.isDebugEnabled()) {
for (Pair<String, Long> time : _lockTimes) {
lockLogger.trace("SQL " + time.first() + " took " + (System.currentTimeMillis() - time.second()));
}
_lockTimes.clear();
}
}
public boolean commit() {
if (!_txn) {
LOGGER.warn("txn: Commit called when it is not a transaction: " + buildName());
return false;
}
Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement st = it.next();
if (st.type == START_TXN) {
it.remove();
break;
}
}
if (hasTxnInStack()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("txn: Not committing because transaction started elsewhere: " + buildName() + " / " + toString());
}
return false;
}
_txn = false;
try {
if (_conn != null) {
_conn.commit();
LOGGER.trace("txn: DB Changes committed. Time = " + (System.currentTimeMillis() - _txnTime));
clearLockTimes();
closeConnection();
}
return true;
} catch (final SQLException e) {
rollbackTransaction();
throw new CloudRuntimeException("Unable to commit or close the connection. ", e);
}
}
protected void closeConnection() {
closePreviousStatement();
if (_conn == null) {
return;
}
if (_txn) {
CONN_LOGGER.trace("txn: Not closing DB connection because we're still in a transaction.");
return;
}
try {
// we should only close db connection when it is not user managed
if (_dbId != CONNECTED_DB) {
if (CONN_LOGGER.isTraceEnabled()) {
CONN_LOGGER.trace("Closing DB connection: dbconn" + System.identityHashCode(_conn));
}
_conn.close();
_conn = null;
s_mbean.removeTransaction(this);
}
} catch (final SQLException e) {
LOGGER.warn("Unable to close connection", e);
}
}
protected void removeUpTo(String type, Object ref) {
boolean rollback = false;
Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement item = it.next();
it.remove();
try {
if ( (type == null || type.equals(item.type)) && (ref == null || ref.equals(item.ref))) {
break;
}
if (item.type == CURRENT_TXN) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Releasing the current txn: " + (item.ref != null ? item.ref : ""));
}
} else if (item.type == CREATE_CONN) {
closeConnection();
} else if (item.type == START_TXN) {
if (item.ref == null) {
rollback = true;
} else {
try {
_conn.rollback((Savepoint)ref);
rollback = false;
} catch (final SQLException e) {
LOGGER.warn("Unable to rollback Txn.", e);
}
}
} else if (item.type == STATEMENT) {
try {
if (stmtLogger.isTraceEnabled()) {
stmtLogger.trace("Closing: " + ref.toString());
}
Statement stmt = (Statement)ref;
try {
ResultSet rs = stmt.getResultSet();
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
stmtLogger.trace("Unable to close resultset");
}
stmt.close();
} catch (final SQLException e) {
stmtLogger.trace("Unable to close statement: " + item);
}
} else if (item.type == ATTACHMENT) {
TransactionAttachment att = (TransactionAttachment)item.ref;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Cleaning up " + att.getName());
}
att.cleanup();
}
} catch (Exception e) {
LOGGER.error("Unable to clean up " + item, e);
}
}
if (rollback) {
rollback();
}
}
protected void rollbackTransaction() {
closePreviousStatement();
if (!_txn) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Rollback called for " + _name + " when there's no transaction: " + buildName());
}
return;
}
assert (!hasTxnInStack()) : "Who's rolling back transaction when there's still txn in stack?";
_txn = false;
try {
if (_conn != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Rolling back the transaction: Time = " + (System.currentTimeMillis() - _txnTime) + " Name = " + _name + "; called by " + buildName());
}
_conn.rollback();
}
clearLockTimes();
closeConnection();
} catch (final SQLException e) {
LOGGER.warn("Unable to rollback", e);
}
}
protected void rollbackSavepoint(Savepoint sp) {
try {
if (_conn != null) {
_conn.rollback(sp);
}
} catch (SQLException e) {
LOGGER.warn("Unable to rollback to savepoint " + sp);
}
if (!hasTxnInStack()) {
_txn = false;
closeConnection();
}
}
public void rollback() {
Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement st = it.next();
if (st.type == START_TXN) {
if (st.ref == null) {
it.remove();
} else {
rollback((Savepoint)st.ref);
return;
}
}
}
rollbackTransaction();
}
public Savepoint setSavepoint() throws SQLException {
_txn = true;
StackElement st = new StackElement(START_TXN, null);
_stack.push(st);
final Connection conn = getConnection();
final Savepoint sp = conn.setSavepoint();
st.ref = sp;
return sp;
}
public Savepoint setSavepoint(final String name) throws SQLException {
_txn = true;
StackElement st = new StackElement(START_TXN, null);
_stack.push(st);
final Connection conn = getConnection();
final Savepoint sp = conn.setSavepoint(name);
st.ref = sp;
return sp;
}
public void releaseSavepoint(final Savepoint sp) throws SQLException {
removeTxn(sp);
if (_conn != null) {
_conn.releaseSavepoint(sp);
}
if (!hasTxnInStack()) {
_txn = false;
closeConnection();
}
}
protected boolean hasSavepointInStack(Savepoint sp) {
Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement se = it.next();
if (se.type == START_TXN && se.ref == sp) {
return true;
}
}
return false;
}
protected void removeTxn(Savepoint sp) {
assert hasSavepointInStack(sp) : "Removing a save point that's not in the stack";
if (!hasSavepointInStack(sp)) {
return;
}
Iterator<StackElement> it = _stack.iterator();
while (it.hasNext()) {
StackElement se = it.next();
if (se.type == START_TXN) {
it.remove();
if (se.ref == sp) {
return;
}
}
}
}
public void rollback(final Savepoint sp) {
removeTxn(sp);
rollbackSavepoint(sp);
}
public Connection getCurrentConnection() {
return _conn;
}
public List<StackElement> getStack() {
return _stack;
}
private TransactionLegacy() {
_name = null;
_conn = null;
_stack = null;
_txn = false;
_dbId = -1;
}
@Override
protected void finalize() throws Throwable {
if (!(_conn == null && (_stack == null || _stack.size() == 0))) {
assert (false) : "Oh Alex oh alex...something is wrong with how we're doing this";
LOGGER.error("Something went wrong that a transaction is orphaned before db connection is closed");
cleanup();
}
}
protected class StackElement {
public String type;
public Object ref;
public StackElement(String type, Object ref) {
this.type = type;
this.ref = ref;
}
@Override
public String toString() {
return type + "-" + ref;
}
}
private static DataSource s_ds;
private static DataSource s_usageDS;
private static DataSource s_simulatorDS;
protected static boolean s_dbHAEnabled;
static {
// Initialize with assumed db.properties file
initDataSource(DbProperties.getDbProperties());
}
public static void initDataSource(String propsFileName) throws IOException {
Properties dbProps = new Properties();
File dbPropsFile = PropertiesUtil.findConfigFile(propsFileName);
if (dbPropsFile != null && dbPropsFile.exists()) {
PropertiesUtil.loadFromFile(dbProps, dbPropsFile);
initDataSource(dbProps);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
public static void initDataSource(Properties dbProps) {
try {
if (dbProps.size() == 0)
return;
s_dbHAEnabled = Boolean.valueOf(dbProps.getProperty("db.ha.enabled"));
LOGGER.info("Is Data Base High Availiability enabled? Ans : " + s_dbHAEnabled);
String loadBalanceStrategy = dbProps.getProperty("db.ha.loadBalanceStrategy");
// FIXME: If params are missing...default them????
final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
final int cloudMaxIdle = Integer.parseInt(dbProps.getProperty("db.cloud.maxIdle"));
final long cloudMaxWait = Long.parseLong(dbProps.getProperty("db.cloud.maxWait"));
final String cloudUsername = dbProps.getProperty("db.cloud.username");
final String cloudPassword = dbProps.getProperty("db.cloud.password");
final String cloudValidationQuery = dbProps.getProperty("db.cloud.validationQuery");
final String cloudIsolationLevel = dbProps.getProperty("db.cloud.isolation.level");
int isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
if (cloudIsolationLevel == null) {
isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
} else if (cloudIsolationLevel.equalsIgnoreCase("readcommitted")) {
isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
} else if (cloudIsolationLevel.equalsIgnoreCase("repeatableread")) {
isolationLevel = Connection.TRANSACTION_REPEATABLE_READ;
} else if (cloudIsolationLevel.equalsIgnoreCase("serializable")) {
isolationLevel = Connection.TRANSACTION_SERIALIZABLE;
} else if (cloudIsolationLevel.equalsIgnoreCase("readuncommitted")) {
isolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED;
} else {
LOGGER.warn("Unknown isolation level " + cloudIsolationLevel + ". Using read uncommitted");
}
final boolean cloudTestOnBorrow = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testOnBorrow"));
final boolean cloudTestWhileIdle = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testWhileIdle"));
final long cloudTimeBtwEvictionRunsMillis = Long.parseLong(dbProps.getProperty("db.cloud.timeBetweenEvictionRunsMillis"));
final long cloudMinEvcitableIdleTimeMillis = Long.parseLong(dbProps.getProperty("db.cloud.minEvictableIdleTimeMillis"));
final boolean useSSL = Boolean.parseBoolean(dbProps.getProperty("db.cloud.useSSL"));
if (useSSL) {
System.setProperty("javax.net.ssl.keyStore", dbProps.getProperty("db.cloud.keyStore"));
System.setProperty("javax.net.ssl.keyStorePassword", dbProps.getProperty("db.cloud.keyStorePassword"));
System.setProperty("javax.net.ssl.trustStore", dbProps.getProperty("db.cloud.trustStore"));
System.setProperty("javax.net.ssl.trustStorePassword", dbProps.getProperty("db.cloud.trustStorePassword"));
}
Pair<String, String> cloudUriAndDriver = getConnectionUriAndDriver(dbProps, loadBalanceStrategy, useSSL, "cloud");
DriverLoader.loadDriver(cloudUriAndDriver.second());
// Default Data Source for CloudStack
s_ds = createDataSource(cloudUriAndDriver.first(), cloudUsername, cloudPassword, cloudMaxActive, cloudMaxIdle, cloudMaxWait,
cloudTimeBtwEvictionRunsMillis, cloudMinEvcitableIdleTimeMillis, cloudTestWhileIdle, cloudTestOnBorrow,
cloudValidationQuery, isolationLevel);
// Configure the usage db
final int usageMaxActive = Integer.parseInt(dbProps.getProperty("db.usage.maxActive"));
final int usageMaxIdle = Integer.parseInt(dbProps.getProperty("db.usage.maxIdle"));
final long usageMaxWait = Long.parseLong(dbProps.getProperty("db.usage.maxWait"));
final String usageUsername = dbProps.getProperty("db.usage.username");
final String usagePassword = dbProps.getProperty("db.usage.password");
Pair<String, String> usageUriAndDriver = getConnectionUriAndDriver(dbProps, loadBalanceStrategy, useSSL, "usage");
DriverLoader.loadDriver(usageUriAndDriver.second());
// Data Source for usage server
s_usageDS = createDataSource(usageUriAndDriver.first(), usageUsername, usagePassword,
usageMaxActive, usageMaxIdle, usageMaxWait, null, null, null, null,
null, isolationLevel);
try {
// Configure the simulator db
final int simulatorMaxActive = Integer.parseInt(dbProps.getProperty("db.simulator.maxActive"));
final int simulatorMaxIdle = Integer.parseInt(dbProps.getProperty("db.simulator.maxIdle"));
final long simulatorMaxWait = Long.parseLong(dbProps.getProperty("db.simulator.maxWait"));
final String simulatorUsername = dbProps.getProperty("db.simulator.username");
final String simulatorPassword = dbProps.getProperty("db.simulator.password");
String simulatorDriver;
String simulatorConnectionUri;
String simulatorUri = dbProps.getProperty("db.simulator.uri");
if (StringUtils.isEmpty(simulatorUri)) {
simulatorDriver = dbProps.getProperty("db.simulator.driver");
final int simulatorPort = Integer.parseInt(dbProps.getProperty("db.simulator.port"));
final String simulatorDbName = dbProps.getProperty("db.simulator.name");
final boolean simulatorAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.simulator.autoReconnect"));
final String simulatorHost = dbProps.getProperty("db.simulator.host");
simulatorConnectionUri = simulatorDriver + "://" + simulatorHost + ":" + simulatorPort + "/" + simulatorDbName + "?autoReconnect=" +
simulatorAutoReconnect;
} else {
LOGGER.warn("db.simulator.uri was set, ignoring the following properties on db.properties: [db.simulator.driver, db.simulator.host, db.simulator.port, "
+ "db.simulator.name, db.simulator.autoReconnect].");
String[] splitUri = simulatorUri.split(":");
simulatorDriver = String.format("%s:%s", splitUri[0], splitUri[1]);
simulatorConnectionUri = simulatorUri;
}
DriverLoader.loadDriver(simulatorDriver);
s_simulatorDS = createDataSource(simulatorConnectionUri, simulatorUsername, simulatorPassword,
simulatorMaxActive, simulatorMaxIdle, simulatorMaxWait, null, null, null, null, cloudValidationQuery, isolationLevel);
} catch (Exception e) {
LOGGER.debug("Simulator DB properties are not available. Not initializing simulator DS");
}
} catch (final Exception e) {
s_ds = getDefaultDataSource("cloud");
s_usageDS = getDefaultDataSource("cloud_usage");
s_simulatorDS = getDefaultDataSource("cloud_simulator");
LOGGER.warn(
"Unable to load db configuration, using defaults with 5 connections. Falling back on assumed datasource on localhost:3306 using username:password=cloud:cloud. Please check your configuration",
e);
}
}
protected static Pair<String, String> getConnectionUriAndDriver(Properties dbProps, String loadBalanceStrategy, boolean useSSL, String schema) {
String connectionUri;
String driver;
String propertyUri = dbProps.getProperty(String.format("db.%s.uri", schema));
if (StringUtils.isEmpty(propertyUri)) {
driver = dbProps.getProperty(String.format("db.%s.driver", schema));
connectionUri = getPropertiesAndBuildConnectionUri(dbProps, loadBalanceStrategy, driver, useSSL, schema);
} else {
LOGGER.warn(String.format("db.%s.uri was set, ignoring the following properties for schema %s of db.properties: [host, port, name, driver, autoReconnect, url.params,"
+ " replicas, ha.loadBalanceStrategy, ha.enable, failOverReadOnly, reconnectAtTxEnd, autoReconnectForPools, secondsBeforeRetrySource, queriesBeforeRetrySource, "
+ "initialTimeout].", schema, schema));
String[] splitUri = propertyUri.split(":");
driver = String.format("%s:%s", splitUri[0], splitUri[1]);
connectionUri = propertyUri;
}
LOGGER.info(String.format("Using the following URI to connect to %s database [%s].", schema, connectionUri));
return new Pair<>(connectionUri, driver);
}
protected static String getPropertiesAndBuildConnectionUri(Properties dbProps, String loadBalanceStrategy, String driver, boolean useSSL, String schema) {
String host = dbProps.getProperty(String.format("db.%s.host", schema));
int port = Integer.parseInt(dbProps.getProperty(String.format("db.%s.port", schema)));
String dbName = dbProps.getProperty(String.format("db.%s.name", schema));
boolean autoReconnect = Boolean.parseBoolean(dbProps.getProperty(String.format("db.%s.autoReconnect", schema)));
String urlParams = dbProps.getProperty(String.format("db.%s.url.params", schema));
String replicas = null;
String dbHaParams = null;
if (s_dbHAEnabled) {
dbHaParams = getDBHAParams(schema, dbProps);
replicas = dbProps.getProperty(String.format("db.%s.replicas", schema));
LOGGER.info(String.format("The replicas configured for %s data base are %s.", schema, replicas));
}
return buildConnectionUri(loadBalanceStrategy, driver, useSSL, host, replicas, port, dbName, autoReconnect, urlParams, dbHaParams);
}
protected static String buildConnectionUri(String loadBalanceStrategy, String driver, boolean useSSL, String host, String replicas, int port, String dbName, boolean autoReconnect,
String urlParams, String dbHaParams) {
StringBuilder connectionUri = new StringBuilder();
connectionUri.append(driver);
connectionUri.append("://");
connectionUri.append(host);
if (s_dbHAEnabled) {
connectionUri.append(",");
connectionUri.append(replicas);
}
connectionUri.append(":");
connectionUri.append(port);
connectionUri.append("/");
connectionUri.append(dbName);
connectionUri.append("?autoReconnect=");
connectionUri.append(autoReconnect);
if (urlParams != null) {
connectionUri.append("&");
connectionUri.append(urlParams);
}
if (useSSL) {
connectionUri.append("&useSSL=true");
}
if (s_dbHAEnabled) {
connectionUri.append("&");
connectionUri.append(dbHaParams);
connectionUri.append("&loadBalanceStrategy=");
connectionUri.append(loadBalanceStrategy);
}
connectionUri.append("&");
connectionUri.append(CONNECTION_PARAMS);
return connectionUri.toString();
}
/**
* Creates a data source
*/
private static DataSource createDataSource(String uri, String username, String password,
Integer maxActive, Integer maxIdle, Long maxWait,
Long timeBtwnEvictionRuns, Long minEvictableIdleTime,
Boolean testWhileIdle, Boolean testOnBorrow,
String validationQuery, Integer isolationLevel) {
ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(uri, username, password);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null);
GenericObjectPoolConfig config = createPoolConfig(maxActive, maxIdle, maxWait, timeBtwnEvictionRuns, minEvictableIdleTime, testWhileIdle, testOnBorrow);
ObjectPool<PoolableConnection> connectionPool = new GenericObjectPool<>(poolableConnectionFactory, config);
poolableConnectionFactory.setPool(connectionPool);
if (validationQuery != null) {
poolableConnectionFactory.setValidationQuery(validationQuery);
}
if (isolationLevel != null) {
poolableConnectionFactory.setDefaultTransactionIsolation(isolationLevel);
}
return new PoolingDataSource<>(connectionPool);
}
/**
* Return a GenericObjectPoolConfig configuration usable on connection pool creation
*/
private static GenericObjectPoolConfig createPoolConfig(Integer maxActive, Integer maxIdle, Long maxWait,
Long timeBtwnEvictionRuns, Long minEvictableIdleTime,
Boolean testWhileIdle, Boolean testOnBorrow) {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMaxWaitMillis(maxWait);
if (timeBtwnEvictionRuns != null) {
config.setTimeBetweenEvictionRunsMillis(timeBtwnEvictionRuns);
}
if (minEvictableIdleTime != null) {
config.setMinEvictableIdleTimeMillis(minEvictableIdleTime);
}
if (testWhileIdle != null) {
config.setTestWhileIdle(testWhileIdle);
}
if (testOnBorrow != null) {
config.setTestOnBorrow(testOnBorrow);
}
return config;
}
private static String getDBHAParams(String dbName, Properties dbProps) {
StringBuilder sb = new StringBuilder();
sb.append("failOverReadOnly=" + dbProps.getProperty("db." + dbName + ".failOverReadOnly"));
sb.append("&").append("reconnectAtTxEnd=" + dbProps.getProperty("db." + dbName + ".reconnectAtTxEnd"));
sb.append("&").append("autoReconnectForPools=" + dbProps.getProperty("db." + dbName + ".autoReconnectForPools"));
sb.append("&").append("secondsBeforeRetrySource=" + dbProps.getProperty("db." + dbName + ".secondsBeforeRetrySource"));
sb.append("&").append("queriesBeforeRetrySource=" + dbProps.getProperty("db." + dbName + ".queriesBeforeRetrySource"));
sb.append("&").append("initialTimeout=" + dbProps.getProperty("db." + dbName + ".initialTimeout"));
return sb.toString();
}
@SuppressWarnings({"unchecked", "rawtypes"})
private static DataSource getDefaultDataSource(final String database) {
final ConnectionFactory connectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://localhost:3306/" + database + "?" + CONNECTION_PARAMS, "cloud", "cloud");
final PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null);
final GenericObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
return new PoolingDataSource(connectionPool);
}
/**
* Used for unit testing primarily
*
* @param conn
*/
protected void setConnection(Connection conn) {
_conn = conn;
}
/**
* Receives a list of {@link PreparedStatement} and quietly closes all of them, which
* triggers also closing their dependent objects, like a {@link ResultSet}
*
* @param pstmt2Close
*/
public static void closePstmts(List<PreparedStatement> pstmt2Close) {
for (PreparedStatement pstmt : pstmt2Close) {
try {
if (pstmt != null && !pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
// It's not possible to recover from this and we need to continue closing
e.printStackTrace();
}
}
}
}