blob: f2a2982bab8dd858afb5b30a00421a185d123ad0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sql.DataSource;
import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.util.Wait;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class LeaseDatabaseLockerTest {
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
JDBCPersistenceAdapter jdbc;
BrokerService brokerService;
DataSource dataSource;
@Before
public void setUpStore() throws Exception {
jdbc = new JDBCPersistenceAdapter();
dataSource = jdbc.getDataSource();
brokerService = new BrokerService();
jdbc.setBrokerService(brokerService);
jdbc.getAdapter().doCreateTables(jdbc.getTransactionContext());
}
@After
public void stopDerby() {
DataSourceServiceSupport.shutdownDefaultDataSource(dataSource);
}
@Test
public void testLockInterleave() throws Exception {
LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
lockerA.setLeaseHolderId("First");
jdbc.setLocker(lockerA);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
lockerB.setLeaseHolderId("Second");
jdbc.setLocker(lockerB);
final AtomicBoolean blocked = new AtomicBoolean(true);
final Connection connection = dataSource.getConnection();
printLockTable(connection);
lockerA.start();
printLockTable(connection);
assertTrue("First has lock", lockerA.keepAlive());
final CountDownLatch lockerBStarting = new CountDownLatch(1);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerBStarting.countDown();
lockerB.start();
blocked.set(false);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return lockerBStarting.await(1, TimeUnit.SECONDS);
}
});
TimeUnit.MILLISECONDS.sleep(lockerB.getLockAcquireSleepInterval() / 2);
assertTrue("B is blocked", blocked.get());
assertTrue("A is good", lockerA.keepAlive());
printLockTable(connection);
lockerA.stop();
printLockTable(connection);
TimeUnit.MILLISECONDS.sleep(2 * lockerB.getLockAcquireSleepInterval());
assertFalse("lockerB has the lock", blocked.get());
lockerB.stop();
printLockTable(connection);
}
@Test
public void testLockAcquireRace() throws Exception {
// build a fake lock
final String fakeId = "Anon";
final Connection connection = dataSource.getConnection();
printLockTable(connection);
PreparedStatement statement = connection.prepareStatement(jdbc.getStatements().getLeaseObtainStatement());
final long now = System.currentTimeMillis();
statement.setString(1,fakeId);
statement.setLong(2, now + 30000);
statement.setLong(3, now);
assertEquals("we got the lease", 1, statement.executeUpdate());
printLockTable(connection);
final LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
lockerA.setLeaseHolderId("A");
jdbc.setLocker(lockerA);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
lockerB.setLeaseHolderId("B");
jdbc.setLocker(lockerB);
final Set<LeaseDatabaseLocker> lockedSet = new HashSet<LeaseDatabaseLocker>();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerA.start();
lockedSet.add(lockerA);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerB.start();
lockedSet.add(lockerB);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// sleep for a bit till both are alive
TimeUnit.SECONDS.sleep(2);
assertTrue("no start", lockedSet.isEmpty());
assertFalse("A is blocked", lockerA.keepAlive());
assertFalse("B is blocked", lockerB.keepAlive());
LOG.info("releasing phony lock " + fakeId);
statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
statement.setString(1, null);
statement.setLong(2, 0l);
statement.setString(3, fakeId);
assertEquals("we released " + fakeId, 1, statement.executeUpdate());
LOG.info("released " + fakeId);
printLockTable(connection);
TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
assertEquals("one locker started", 1, lockedSet.size());
assertTrue("one isAlive", lockerA.keepAlive() || lockerB.keepAlive());
LeaseDatabaseLocker winner = lockedSet.iterator().next();
winner.stop();
lockedSet.remove(winner);
TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
assertEquals("one locker started", 1, lockedSet.size());
lockedSet.iterator().next().stop();
printLockTable(connection);
}
@Test
public void testDiffOffsetAhead() throws Exception {
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
assertTrue("when ahead of db adjustment is negative", callDiffOffset(underTest, System.currentTimeMillis() - 60000) < 0);
}
@Test
public void testDiffOffsetBehind() throws Exception {
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
assertTrue("when behind db adjustment is positive", callDiffOffset(underTest, System.currentTimeMillis() + 60000) > 0);
}
@Test
public void testDiffIngoredIfLessthanMaxAllowableDiffFromDBTime() throws Exception {
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
underTest.setMaxAllowableDiffFromDBTime(60000);
assertEquals("no adjust when under limit", 0, callDiffOffset(underTest,System.currentTimeMillis() - 40000 ));
}
public long callDiffOffset(LeaseDatabaseLocker underTest, final long dbTime) throws Exception {
Mockery context = new Mockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
final Statements statements = context.mock(Statements.class);
final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
final Connection connection = context.mock(Connection.class);
final PreparedStatement preparedStatement = context.mock(PreparedStatement.class);
final ResultSet resultSet = context.mock(ResultSet.class);
final Timestamp timestamp = context.mock(Timestamp.class);
context.checking(new Expectations() {{
allowing(jdbcPersistenceAdapter).getStatements();
will(returnValue(statements));
allowing(jdbcPersistenceAdapter);
allowing(statements);
allowing(connection).prepareStatement(with(any(String.class)));
will(returnValue(preparedStatement));
allowing(connection);
allowing(preparedStatement).executeQuery();
will(returnValue(resultSet));
allowing(resultSet).next();
will(returnValue(true));
allowing(resultSet).getTimestamp(1);
will(returnValue(timestamp));
allowing(timestamp).getTime();
will(returnValue(dbTime));
allowing(resultSet).close();
allowing(preparedStatement).close();
}});
underTest.configure(jdbcPersistenceAdapter);
underTest.setLockable(jdbcPersistenceAdapter);
return underTest.determineTimeDifference(connection);
}
private void printLockTable(Connection connection) throws Exception {
DefaultJDBCAdapter.printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
}
}