blob: fc84009f1927e836b14cc869997c0950b0b60436 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.InvalidAttributeValueException;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.NotCompliantMBeanException;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.QueryExp;
import javax.management.ReflectionException;
import javax.management.loading.ClassLoaderRepository;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.ServiceSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class StartAndConcurrentStopBrokerTest {
private static final Logger LOG = LoggerFactory.getLogger(StartAndConcurrentStopBrokerTest.class);
@Test(timeout = 30000)
public void testConcurrentStop() throws Exception {
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
final CountDownLatch gotPaMBean = new CountDownLatch(1);
final AtomicBoolean checkPaMBean = new AtomicBoolean(false);
final HashMap mbeans = new HashMap();
final MBeanServer mBeanServer = new MBeanServer() {
@Override
public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
return null;
}
@Override
public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
if (mbeans.containsKey(name)) {
throw new InstanceAlreadyExistsException("Got one already: " + name);
}
LOG.info("register:" + name);
try {
if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) {
gotBrokerMbean.countDown();
}
if (checkPaMBean.get()) {
if (new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,service=PersistenceAdapter,instanceName=*").apply(name)) {
gotPaMBean.countDown();
}
}
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
mbeans.put(name, object);
return new ObjectInstance(name, object.getClass().getName());
}
@Override
public void unregisterMBean(ObjectName name) throws InstanceNotFoundException, MBeanRegistrationException {
mbeans.remove(name);
}
@Override
public ObjectInstance getObjectInstance(ObjectName name) throws InstanceNotFoundException {
return null;
}
@Override
public Set<ObjectInstance> queryMBeans(ObjectName name, QueryExp query) {
return null;
}
@Override
public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {
return null;
}
@Override
public boolean isRegistered(ObjectName name) {
return mbeans.containsKey(name);
}
@Override
public Integer getMBeanCount() {
return null;
}
@Override
public Object getAttribute(ObjectName name, String attribute) throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public AttributeList getAttributes(ObjectName name, String[] attributes) throws InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public void setAttribute(ObjectName name, Attribute attribute) throws InstanceNotFoundException, AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
}
@Override
public AttributeList setAttributes(ObjectName name, AttributeList attributes) throws InstanceNotFoundException, ReflectionException {
return null;
}
@Override
public Object invoke(ObjectName name, String operationName, Object[] params, String[] signature) throws InstanceNotFoundException, MBeanException, ReflectionException {
return null;
}
@Override
public String getDefaultDomain() {
return null;
}
@Override
public String[] getDomains() {
return new String[0];
}
@Override
public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException {
}
@Override
public void addNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, ObjectName listener) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, NotificationListener listener) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public void removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException {
}
@Override
public MBeanInfo getMBeanInfo(ObjectName name) throws InstanceNotFoundException, IntrospectionException, ReflectionException {
return null;
}
@Override
public boolean isInstanceOf(ObjectName name, String className) throws InstanceNotFoundException {
return false;
}
@Override
public Object instantiate(String className) throws ReflectionException, MBeanException {
return null;
}
@Override
public Object instantiate(String className, ObjectName loaderName) throws ReflectionException, MBeanException, InstanceNotFoundException {
return null;
}
@Override
public Object instantiate(String className, Object[] params, String[] signature) throws ReflectionException, MBeanException {
return null;
}
@Override
public Object instantiate(String className, ObjectName loaderName, Object[] params, String[] signature) throws ReflectionException, MBeanException, InstanceNotFoundException {
return null;
}
@Override
public ObjectInputStream deserialize(ObjectName name, byte[] data) throws InstanceNotFoundException, OperationsException {
return null;
}
@Override
public ObjectInputStream deserialize(String className, byte[] data) throws OperationsException, ReflectionException {
return null;
}
@Override
public ObjectInputStream deserialize(String className, ObjectName loaderName, byte[] data) throws InstanceNotFoundException, OperationsException, ReflectionException {
return null;
}
@Override
public ClassLoader getClassLoaderFor(ObjectName mbeanName) throws InstanceNotFoundException {
return null;
}
@Override
public ClassLoader getClassLoader(ObjectName loaderName) throws InstanceNotFoundException {
return null;
}
@Override
public ClassLoaderRepository getClassLoaderRepository() {
return null;
}
};
final BrokerService broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(new Runnable() {
@Override
public void run() {
try {
broker.getManagementContext().setMBeanServer(mBeanServer);
broker.start();
} catch (BrokerStoppedException expected) {
} catch (ConfigurationException expected) {
} catch (Exception e) {
e.printStackTrace();
// lots of possible errors depending on progress
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
assertTrue("broker has registered mbean", gotBrokerMbean.await(10, TimeUnit.SECONDS));
broker.stop();
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
executor.shutdown();
assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
BrokerService sanityBroker = new BrokerService();
sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
sanityBroker.start();
sanityBroker.stop();
assertNull("No error", error.get());
// again, after Persistence adapter mbean
final BrokerService brokerTwo = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
checkPaMBean.set(true);
executor = Executors.newFixedThreadPool(4);
executor.execute(new Runnable() {
@Override
public void run() {
try {
brokerTwo.getManagementContext().setMBeanServer(mBeanServer);
brokerTwo.start();
} catch (BrokerStoppedException expected) {
} catch (ConfigurationException expected) {
} catch (Exception e) {
e.printStackTrace();
// lots of possible errors depending on progress
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(10, TimeUnit.SECONDS));
brokerTwo.stop();
} catch (Exception e) {
e.printStackTrace();
error.set(e);
}
}
});
executor.shutdown();
assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
assertTrue("broker has registered persistence adapter mbean", gotPaMBean.await(0, TimeUnit.SECONDS));
sanityBroker = new BrokerService();
sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
sanityBroker.start();
sanityBroker.stop();
assertNull("No error", error.get());
}
@Test(timeout = 30000)
public void testStopWithScheduler() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setSchedulerSupport(true);
ExecutorService executor = Executors.newFixedThreadPool(1);
for (int sleepT : new int[] {30, 40, 50}) {
final int sleepMillis = sleepT;
final CountDownLatch stopperStarted = new CountDownLatch(1);
final CountDownLatch stopperDone = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
try {
stopperStarted.countDown();
int timeOutMillis = 4000;
PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
if (persistenceAdapter != null && persistenceAdapter instanceof LockableServiceSupport) {
LockableServiceSupport lockableServiceSupport = (LockableServiceSupport) persistenceAdapter;
if (lockableServiceSupport.isUseLock() && lockableServiceSupport.getLocker() instanceof ServiceSupport) {
ServiceSupport lockService = ((ServiceSupport) lockableServiceSupport.getLocker());
while (!lockService.isStarted() && timeOutMillis > 0) {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
timeOutMillis = timeOutMillis - sleepMillis;
}
LOG.info("broker stop on active async start, blocked on store lock:" + lockService.isStarted());
brokerService.stop();
}
}
stopperDone.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("Checker started", stopperStarted.await(20, TimeUnit.SECONDS));
try {
brokerService.start();
// Expect start to fail with brokerStopped Exception but not a guarantee
} catch (BrokerStoppedException expected) {
expected.printStackTrace();
} finally {
brokerService.stop();
}
assertTrue("Stopper done", stopperDone.await(20, TimeUnit.SECONDS));
// ok to start a new
BrokerService sanity = new BrokerService();
sanity.setSchedulerSupport(true);
sanity.start();
sanity.stop();
}
}
}