blob: d2537e369bd6027944ef4d8a4497f74affef0004 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import com.cloud.utils.DateUtil;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.time.InaccurateClock;
public class Merovingian2 extends StandardMBean implements MerovingianMBean {
private static final Logger s_logger = Logger.getLogger(Merovingian2.class);
private static final String ACQUIRE_SQL =
"INSERT INTO op_lock (op_lock.key, op_lock.mac, op_lock.ip, op_lock.thread, op_lock.acquired_on, waiters) VALUES (?, ?, ?, ?, ?, 1)";
private static final String INCREMENT_SQL = "UPDATE op_lock SET waiters=waiters+1 where op_lock.key=? AND op_lock.mac=? AND op_lock.ip=? AND op_lock.thread=?";
private static final String SELECT_SQL = "SELECT op_lock.key, mac, ip, thread, acquired_on, waiters FROM op_lock";
private static final String INQUIRE_SQL = SELECT_SQL + " WHERE op_lock.key=?";
private static final String DECREMENT_SQL = "UPDATE op_lock SET waiters=waiters-1 where op_lock.key=? AND op_lock.mac=? AND op_lock.ip=? AND op_lock.thread=?";
private static final String RELEASE_LOCK_SQL = "DELETE FROM op_lock WHERE op_lock.key = ?";
private static final String RELEASE_SQL = RELEASE_LOCK_SQL + " AND op_lock.mac=? AND waiters=0";
private static final String CLEANUP_MGMT_LOCKS_SQL = "DELETE FROM op_lock WHERE op_lock.mac = ?";
private static final String SELECT_MGMT_LOCKS_SQL = SELECT_SQL + " WHERE mac=?";
private static final String SELECT_THREAD_LOCKS_SQL = SELECT_SQL + " WHERE mac=? AND ip=?";
private static final String CLEANUP_THREAD_LOCKS_SQL = "DELETE FROM op_lock WHERE mac=? AND ip=? AND thread=?";
TimeZone _gmtTimeZone = TimeZone.getTimeZone("GMT");
private final long _msId;
private static Merovingian2 s_instance = null;
private ConnectionConcierge _concierge = null;
private static ThreadLocal<Count> s_tls = new ThreadLocal<Count>();
private Merovingian2(long msId) {
super(MerovingianMBean.class, false);
_msId = msId;
Connection conn = null;
try {
conn = TransactionLegacy.getStandaloneConnectionWithException();
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
conn.setAutoCommit(true);
_concierge = new ConnectionConcierge("LockMaster", conn, true);
} catch (SQLException e) {
s_logger.error("Unable to get a new db connection", e);
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes", e);
} finally {
if (_concierge == null && conn != null) {
try {
conn.close();
} catch (SQLException e) {
s_logger.debug("closing connection failed after everything else.", e);
}
}
}
}
public static synchronized Merovingian2 createLockMaster(long msId) {
assert s_instance == null : "No lock can serve two masters. Either he will hate the one and love the other, or he will be devoted to the one and despise the other.";
s_instance = new Merovingian2(msId);
s_instance.cleanupThisServer();
try {
JmxUtil.registerMBean("Locks", "Locks", s_instance);
} catch (Exception e) {
s_logger.error("Unable to register for JMX", e);
}
return s_instance;
}
public static Merovingian2 getLockMaster() {
return s_instance;
}
protected void incrCount() {
Count count = s_tls.get();
if (count == null) {
count = new Count();
s_tls.set(count);
}
count.count++;
}
protected void decrCount() {
Count count = s_tls.get();
if (count == null) {
return;
}
count.count--;
}
public boolean acquire(String key, int timeInSeconds) {
Thread th = Thread.currentThread();
String threadName = th.getName();
int threadId = System.identityHashCode(th);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Acquiring lck-" + key + " with wait time of " + timeInSeconds);
}
long startTime = InaccurateClock.getTime();
while ((InaccurateClock.getTime() - startTime) < (timeInSeconds * 1000l)) {
int count = owns(key);
if (count >= 1) {
return increment(key, threadName, threadId);
} else if (count == 0) {
if (doAcquire(key, threadName, threadId)) {
return true;
}
}
try {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Sleeping more time while waiting for lck-" + key);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
s_logger.debug("[ignored] interupted while aquiring " + key);
}
}
String msg = "Timed out on acquiring lock " + key + " . Waited for " + ((InaccurateClock.getTime() - startTime)/1000) + "seconds";
Exception e = new CloudRuntimeException(msg);
s_logger.warn(msg, e);
return false;
}
protected boolean increment(String key, String threadName, int threadId) {
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(INCREMENT_SQL);){
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
pstmt.setInt(4, threadId);
int rows = pstmt.executeUpdate();
assert (rows <= 1) : "hmm...non unique key? " + pstmt;
if (s_logger.isTraceEnabled()) {
s_logger.trace("lck-" + key + (rows == 1 ? " acquired again" : " failed to acquire again"));
}
if (rows == 1) {
incrCount();
return true;
}
return false;
} catch (Exception e) {
s_logger.error("increment:Exception:"+e.getMessage());
throw new CloudRuntimeException("increment:Exception:"+e.getMessage(), e);
}
}
protected boolean doAcquire(String key, String threadName, int threadId) {
long startTime = InaccurateClock.getTime();
try(PreparedStatement pstmt = _concierge.conn().prepareStatement(ACQUIRE_SQL);) {
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
pstmt.setInt(4, threadId);
pstmt.setString(5, DateUtil.getDateDisplayString(_gmtTimeZone, new Date()));
try {
int rows = pstmt.executeUpdate();
if (rows == 1) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Acquired for lck-" + key);
}
incrCount();
return true;
}
} catch (SQLException e) {
if (!(e.getSQLState().equals("23000") && e.getErrorCode() == 1062)) {
throw new CloudRuntimeException("Unable to lock " + key + ". Waited " + (InaccurateClock.getTime() - startTime), e);
}
}
} catch (SQLException e) {
s_logger.error("doAcquire:Exception:"+e.getMessage());
throw new CloudRuntimeException("Unable to lock " + key + ". Waited " + (InaccurateClock.getTime() - startTime), e);
}
s_logger.trace("Unable to acquire lck-" + key);
return false;
}
protected Map<String, String> isLocked(String key) {
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(INQUIRE_SQL);){
pstmt.setString(1, key);
try(ResultSet rs = pstmt.executeQuery();)
{
if (!rs.next()) {
return null;
}
return toLock(rs);
}catch (SQLException e) {
s_logger.error("isLocked:Exception:"+e.getMessage());
throw new CloudRuntimeException("isLocked:Exception:"+e.getMessage(), e);
}
} catch (SQLException e) {
s_logger.error("isLocked:Exception:"+e.getMessage());
throw new CloudRuntimeException("isLocked:Exception:"+e.getMessage(), e);
}
}
public void cleanupThisServer() {
cleanupForServer(_msId);
}
@Override
public void cleanupForServer(long msId) {
s_logger.info("Cleaning up locks for " + msId);
try {
synchronized (_concierge.conn()) {
try(PreparedStatement pstmt = _concierge.conn().prepareStatement(CLEANUP_MGMT_LOCKS_SQL);) {
pstmt.setLong(1, msId);
int rows = pstmt.executeUpdate();
s_logger.info("Released " + rows + " locks for " + msId);
}catch (Exception e) {
s_logger.error("cleanupForServer:Exception:"+e.getMessage());
throw new CloudRuntimeException("cleanupForServer:Exception:"+e.getMessage(), e);
}
}
} catch (Exception e) {
s_logger.error("cleanupForServer:Exception:"+e.getMessage());
throw new CloudRuntimeException("cleanupForServer:Exception:"+e.getMessage(), e);
}
}
public boolean release(String key) {
Thread th = Thread.currentThread();
String threadName = th.getName();
int threadId = System.identityHashCode(th);
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(DECREMENT_SQL);)
{
pstmt.setString(1, key);
pstmt.setLong(2, _msId);
pstmt.setString(3, threadName);
pstmt.setLong(4, threadId);
int rows = pstmt.executeUpdate();
assert (rows <= 1) : "hmmm....keys not unique? " + pstmt;
if (s_logger.isTraceEnabled()) {
s_logger.trace("lck-" + key + " released");
}
if (rows == 1) {
try (PreparedStatement rel_sql_pstmt = _concierge.conn().prepareStatement(RELEASE_SQL);) {
rel_sql_pstmt.setString(1, key);
rel_sql_pstmt.setLong(2, _msId);
int result = rel_sql_pstmt.executeUpdate();
if (result == 1 && s_logger.isTraceEnabled()) {
s_logger.trace("lck-" + key + " removed");
}
decrCount();
}catch (Exception e) {
s_logger.error("release:Exception:"+ e.getMessage());
throw new CloudRuntimeException("release:Exception:"+ e.getMessage(), e);
}
} else if (rows < 1) {
String msg = ("Was unable to find lock for the key " + key + " and thread id " + threadId);
Exception e = new CloudRuntimeException(msg);
s_logger.warn(msg, e);
}
return rows == 1;
} catch (Exception e) {
s_logger.error("release:Exception:"+ e.getMessage());
throw new CloudRuntimeException("release:Exception:"+ e.getMessage(), e);
}
}
protected Map<String, String> toLock(ResultSet rs) throws SQLException {
Map<String, String> map = new HashMap<String, String>();
map.put("key", rs.getString(1));
map.put("mgmt", rs.getString(2));
map.put("name", rs.getString(3));
map.put("tid", Integer.toString(rs.getInt(4)));
map.put("date", rs.getString(5));
map.put("count", Integer.toString(rs.getInt(6)));
return map;
}
protected List<Map<String, String>> toLocks(ResultSet rs) throws SQLException {
LinkedList<Map<String, String>> results = new LinkedList<Map<String, String>>();
while (rs.next()) {
results.add(toLock(rs));
}
return results;
}
protected List<Map<String, String>> getLocks(String sql, Long msId) {
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(sql);)
{
if (msId != null) {
pstmt.setLong(1, msId);
}
try(ResultSet rs = pstmt.executeQuery();)
{
return toLocks(rs);
}catch (Exception e) {
s_logger.error("getLocks:Exception:"+e.getMessage());
throw new CloudRuntimeException("getLocks:Exception:"+e.getMessage(), e);
}
} catch (Exception e) {
s_logger.error("getLocks:Exception:"+e.getMessage());
throw new CloudRuntimeException("getLocks:Exception:"+e.getMessage(), e);
}
}
@Override
public List<Map<String, String>> getAllLocks() {
return getLocks(SELECT_SQL, null);
}
@Override
public List<Map<String, String>> getLocksAcquiredByThisServer() {
return getLocks(SELECT_MGMT_LOCKS_SQL, _msId);
}
public int owns(String key) {
Thread th = Thread.currentThread();
int threadId = System.identityHashCode(th);
Map<String, String> owner = isLocked(key);
if (owner == null) {
return 0;
}
if (owner.get("mgmt").equals(Long.toString(_msId)) && owner.get("tid").equals(Integer.toString(threadId))) {
return Integer.parseInt(owner.get("count"));
}
return -1;
}
public List<Map<String, String>> getLocksAcquiredBy(long msId, String threadName) {
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(SELECT_THREAD_LOCKS_SQL);){
pstmt.setLong(1, msId);
pstmt.setString(2, threadName);
try (ResultSet rs =pstmt.executeQuery();) {
return toLocks(rs);
}
catch (Exception e) {
s_logger.error("getLocksAcquiredBy:Exception:"+e.getMessage());
throw new CloudRuntimeException("Can't get locks " + pstmt, e);
}
} catch (Exception e) {
s_logger.error("getLocksAcquiredBy:Exception:"+e.getMessage());
throw new CloudRuntimeException("getLocksAcquiredBy:Exception:"+e.getMessage(), e);
}
}
public void cleanupThread() {
Count count = s_tls.get();
if (count == null || count.count == 0) {
return;
}
int c = count.count;
count.count = 0;
Thread th = Thread.currentThread();
String threadName = th.getName();
int threadId = System.identityHashCode(th);
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(CLEANUP_THREAD_LOCKS_SQL);)
{
pstmt.setLong(1, _msId);
pstmt.setString(2, threadName);
pstmt.setInt(3, threadId);
int rows = pstmt.executeUpdate();
assert (false) : "Abandon hope, all ye who enter here....There were still " + rows + ":" + c +
" locks not released when the transaction ended, check for lock not released or @DB is not added to the code that using the locks!";
} catch (Exception e) {
s_logger.error("cleanupThread:Exception:" + e.getMessage());
throw new CloudRuntimeException("cleanupThread:Exception:" + e.getMessage(), e);
}
}
@Override
public boolean releaseLockAsLastResortAndIReallyKnowWhatIAmDoing(String key) {
s_logger.info("Releasing a lock from JMX lck-" + key);
try (PreparedStatement pstmt = _concierge.conn().prepareStatement(RELEASE_LOCK_SQL);)
{
pstmt.setString(1, key);
int rows = pstmt.executeUpdate();
return rows > 0;
} catch (Exception e) {
s_logger.error("releaseLockAsLastResortAndIReallyKnowWhatIAmDoing : Exception: " + e.getMessage());
return false;
}
}
protected static class Count {
public int count = 0;
}
}