blob: c22ec80e865c57ccd8514ef360339221912071d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
public class SharedFileLockerTest {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SharedFileLockerTest.class);
@Rule
public TemporaryFolder testFolder;
public SharedFileLockerTest() {
File file = new File(IOHelper.getDefaultDataDirectory());
file.mkdir();
// TemporaryFolder will make sure the files are removed after the test is done
testFolder = new TemporaryFolder(file);
}
@Test
public void testStopNoStart() throws Exception {
SharedFileLocker locker1 = new SharedFileLocker();
locker1.setDirectory(testFolder.getRoot());
locker1.stop();
}
@Test
public void testLoop() throws Exception {
// Increase the number of iterations if you are debugging races
for (int i = 0; i < 100; i++) {
internalLoop(5);
}
}
@Test
public void testLogging() throws Exception {
// using a bigger wait here
// to make sure we won't log any extra info
internalLoop(100);
}
private void internalLoop(long timewait) throws Exception {
final AtomicInteger logCounts = new AtomicInteger(0);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel() == Level.INFO) {
logCounts.incrementAndGet();
}
}
};
Logger.getRootLogger().addAppender(appender);
final AtomicInteger errors = new AtomicInteger(0);
Thread thread = null;
SharedFileLocker locker1 = new SharedFileLocker();
locker1.setDirectory(testFolder.getRoot());
final SharedFileLocker locker2 = new SharedFileLocker();
locker2.setLockAcquireSleepInterval(1);
locker2.setDirectory(testFolder.getRoot());
try {
locker1.doStart();
assertTrue(locker1.keepAlive());
thread = new Thread("Locker Thread") {
public void run() {
try {
locker2.doStart();
} catch (Throwable e) {
errors.incrementAndGet();
}
}
};
thread.start();
// I need to make sure the info was already logged
// but I don't want to have an unecessary wait here,
// as I want the test to run as fast as possible
{
long timeout = System.currentTimeMillis() + 5000;
while (logCounts.get() < 1 && System.currentTimeMillis() < timeout) {
Thread.sleep(1);
}
}
if (timewait > 0) {
Thread.sleep(timewait);
}
assertTrue(thread.isAlive());
locker1.stop();
// 10 seconds here is an eternity, but it should only take milliseconds
thread.join(5000);
Assert.assertEquals("Extra logs in place", 1, logCounts.get());
long timeout = System.currentTimeMillis() + 5000;
while (timeout > System.currentTimeMillis() && !locker2.keepAlive()) {
Thread.sleep(1);
}
assertTrue(locker2.keepAlive());
locker2.stop();
Assert.assertEquals(0, errors.get());
} finally {
Logger.getRootLogger().removeAppender(appender);
// to make sure we won't leak threads if the test ever failed for any reason
thread.join(1000);
if (thread.isAlive()) {
thread.interrupt();
}
File lockFile = new File(testFolder.getRoot(), "lock");
lockFile.delete();
}
}
@Test
public void verifyLockAcquireWaitsForLockDrop() throws Exception {
final AtomicInteger logCounts = new AtomicInteger(0);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
logCounts.incrementAndGet();
}
};
Logger sharedFileLogger = Logger.getLogger(SharedFileLocker.class);
sharedFileLogger.addAppender(appender);
LockableServiceSupport config = new LockableServiceSupport() {
@Override
public long getLockKeepAlivePeriod() {
return 500;
}
@Override
public Locker createDefaultLocker() throws IOException {
return null;
}
public void init() throws Exception {
}
protected void doStop(ServiceStopper stopper) throws Exception {
}
protected void doStart() throws Exception {
}
};
final SharedFileLocker underTest = new SharedFileLocker();
underTest.setDirectory(testFolder.getRoot());
underTest.setLockAcquireSleepInterval(5);
underTest.setLockable(config);
// get the in jvm lock
File lockFile = new File(testFolder.getRoot(), "lock");
String jvmProp = LockFile.class.getName() + ".lock." + lockFile.getCanonicalPath();
System.getProperties().put(jvmProp, jvmProp);
final CountDownLatch locked = new CountDownLatch(1);
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final AtomicLong acquireTime = new AtomicLong(0l);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
underTest.start();
acquireTime.set(System.currentTimeMillis());
locked.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("locker failed to obtain lock", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return logCounts.get() > 0;
}
}, 5000, 10));
// release vm lock
long releaseTime = System.currentTimeMillis();
System.getProperties().remove(jvmProp);
assertTrue("locker got lock", locked.await(5, TimeUnit.SECONDS));
// verify delay in start
LOG.info("ReleaseTime: " + releaseTime + ", AcquireTime:" + acquireTime.get());
assertTrue("acquire delayed for keepAlive: " + config.getLockKeepAlivePeriod(), acquireTime.get() >= releaseTime + config.getLockKeepAlivePeriod());
} finally {
executorService.shutdownNow();
underTest.stop();
lockFile.delete();
sharedFileLogger.removeAppender(appender);
}
}
}