blob: 362e30406a0918c557ecec196314539bfd3645a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
@RunWith(value = Parameterized.class)
public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
private RestartType restartType;
enum RestartType {
NORMAL,
FULL_RECOVERY,
UNCLEAN_SHUTDOWN
}
@Override
protected void configureBroker(BrokerService broker) throws Exception {
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setJournalMaxFileLength(1024*20);
//persistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
broker.setPersistenceAdapter(persistenceAdapter);
broker.setDestinationPolicy(policyMap);
}
protected void restartBroker(RestartType restartType) throws Exception {
if (restartType == RestartType.FULL_RECOVERY) {
stopBroker();
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
File dir = kahaDBPersistenceAdapter.getDirectory();
if (dir != null) {
IOHelper.deleteFile(new File(dir, "db.data"));
}
broker.start();
} else if (restartType == RestartType.UNCLEAN_SHUTDOWN){
//Simulate an unclean shutdown
File dir = broker.getBrokerDataDirectory();
File backUpDir = new File(dir, "bk");
IOHelper.mkdirs(new File(dir, "bk"));
for (File f: dir.listFiles()) {
if (!f.isDirectory()) {
IOHelper.copyFile(f, new File(backUpDir, f.getName()));
}
}
stopBroker();
for (File f: backUpDir.listFiles()) {
if (!f.isDirectory()) {
IOHelper.copyFile(f, new File(dir, f.getName()));
}
}
broker.start();
} else {
restartBroker();
}
}
@Parameterized.Parameters(name="{0}")
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][] {
{RestartType.NORMAL},
{RestartType.FULL_RECOVERY},
{RestartType.UNCLEAN_SHUTDOWN},
});
}
public RecoveryStatsBrokerTest(RestartType restartType) {
this.restartType = restartType;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Test(timeout = 60 * 1000)
public void testStaticsRecovery() throws Exception {
List<ActiveMQDestination> destinations = new ArrayList<>();
destinations.add(new ActiveMQQueue("TEST.A"));
destinations.add(new ActiveMQQueue("TEST.B"));
Random random = new Random();
Map<ActiveMQDestination, Integer> consumedMessages = new HashMap<>();
destinations.forEach(destination -> consumedMessages.put(destination, 0));
int numberOfMessages = 400;
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < numberOfMessages; i++) {
for (ActiveMQDestination destination : destinations) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setProducerId(message.getMessageId().getProducerId());
connection.request(message);
}
}
Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics = getCurrentStatistics(destinations);
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
for (ActiveMQDestination destination : destinations) {
consume(destination, 100, false);
}
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
for (ActiveMQDestination destination : destinations) {
int messagesToConsume = random.nextInt(numberOfMessages);
consume(destination, messagesToConsume, true);
consumedMessages.compute(destination, (key, value) -> value = value + messagesToConsume);
}
originalStatistics = getCurrentStatistics(destinations);
for (ActiveMQDestination destination : destinations) {
int consumedCount = consumedMessages.get(destination);
assertEquals("", numberOfMessages - consumedCount, originalStatistics.get(destination).getMessageCount().getCount());
}
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
}
private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(final List<ActiveMQDestination> destinations) {
return destinations.stream()
.map(destination -> getDestination(broker, destination))
.collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics()));
}
private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception {
// Setup the consumer and receive the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// The we should get the messages.
for (int i = 0; i < numberOfMessages; i++) {
Message m2 = receiveMessage(connection);
assertNotNull(m2);
if (shouldAck) {
MessageAck ack = createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE);
connection.request(ack);
}
}
connection.request(closeConnectionInfo(connectionInfo));
}
private void checkStatistics(final List<ActiveMQDestination> destinations, final Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
for (ActiveMQDestination destination : destinations) {
MessageStoreStatistics original = originalStatistics.get(destination);
MessageStoreStatistics actual = getDestination(broker, destination).getMessageStore().getMessageStoreStatistics();
assertEquals("Have Same Count", original.getMessageCount().getCount(), actual.getMessageCount().getCount());
assertEquals("Have Same TotalSize", original.getMessageSize().getTotalSize(), getDestination(broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
}
}
protected Destination getDestination(BrokerService target, ActiveMQDestination destination) {
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
if (destination.isTemporary()) {
return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap().get(destination) :
regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
}
return destination.isQueue() ?
regionBroker.getQueueRegion().getDestinationMap().get(destination) :
regionBroker.getTopicRegion().getDestinationMap().get(destination);
}
}