blob: 3d139ea5d12be443bee277f9daed1c1887f9a1f7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import static java.util.Arrays.asList;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.security.auth.Subject;
public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
{
private ConfiguredObjectRecord _hostRecord;
private CurrentThreadTaskExecutor _taskExecutor;
private UUID _hostId;
private VirtualHostNode _virtualHostNode;
private DurableConfigurationStore _durableConfigurationStore;
private final Principal _systemPrincipal =
new Principal()
{
@Override
public String getName()
{
return "TEST";
}
};
private final Subject _systemSubject = new Subject(true,
Collections.singleton(_systemPrincipal),
Collections.emptySet(),
Collections.emptySet());
@Override
public void setUp() throws Exception
{
super.setUp();
UUID hostParentId = UUID.randomUUID();
_hostId = UUID.randomUUID();
Map<String, Object> hostAttributes = new HashMap<>();
hostAttributes.put("modelVersion", "0.0");
hostAttributes.put("name", "test");
hostAttributes.put("type", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
_hostRecord = mock(ConfiguredObjectRecord.class);
when(_hostRecord.getId()).thenReturn(_hostId);
when(_hostRecord.getAttributes()).thenReturn(hostAttributes);
when(_hostRecord.getType()).thenReturn("VirtualHost");
when(_hostRecord.toString()).thenReturn("VirtualHost[name='test',id='" + _hostId + "']");
_taskExecutor = new CurrentThreadTaskExecutor();
_taskExecutor.start();
SystemConfig<?> systemConfig = mock(SystemConfig.class);
when(systemConfig.getEventLogger()).thenReturn(new EventLogger());
Broker broker = BrokerTestHelper.mockWithSystemPrincipal(Broker.class, _systemPrincipal);
when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
when(broker.getChildExecutor()).thenReturn(_taskExecutor);
when(broker.getModel()).thenReturn(BrokerModel.getInstance());
_durableConfigurationStore = mock(DurableConfigurationStore.class);
Map<String,Object> attributes = new HashMap<>();
attributes.put(VirtualHostNode.ID, hostParentId);
attributes.put(VirtualHostNode.NAME, "test");
_virtualHostNode = new TestVirtualHostNode(broker, attributes, _durableConfigurationStore);
}
@Override
public void tearDown()throws Exception
{
_taskExecutor.stopImmediately();
_virtualHostNode.close();
super.tearDown();
}
public void testRecoverQueueWithDLQEnabled() throws Exception
{
ConfiguredObjectRecord queue = mockQueue("test", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "true"));
ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled", "false"));
ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct", "test"));
ConfiguredObjectRecord queueBinding = mockBinding("test", queue, directExchange);
setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
upgraderAndRecoverer.upgradeAndRecover(_durableConfigurationStore);
final VirtualHost<?> host = _virtualHostNode.getVirtualHost();
Subject.doAs(_systemSubject, new PrivilegedAction<Void>()
{
@Override
public Void run()
{
host.open();
return null;
}
}
);
assertNotNull("Virtual host is not recovered", host);
Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
assertNotNull("Queue is not recovered", recoveredQueue);
Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
assertNotNull("DLQ queue is not recovered", recoveredDLQ);
Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
assertNotNull("DLE exchange is not recovered", recoveredDLE);
assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
host.close();
}
public void testRecordUpdatedInOneUpgraderAndRemovedInAnotherUpgraderIsNotRecovered()
{
ConfiguredObjectRecord queue = mockQueue("test-queue", null);
ConfiguredObjectRecord exchange = mockExchange("test-direct", "direct");
ConfiguredObjectRecord queueBinding1 = mockBinding("test-binding", queue, exchange);
ConfiguredObjectRecord nonExistingExchange = mock(ConfiguredObjectRecord.class);
// selector on non-topic exchange should be removed from binding arguments in upgrader 0.0->0.1
// binding to non-existing exchange is removed in upgrader 0.1->0.2
when(nonExistingExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("non-existing", "test"));
Map<String, Object> arguments = Collections.<String, Object>singletonMap("x-filter-jms-selector", "id=1");
ConfiguredObjectRecord queueBinding2 = mockBinding("test-non-existing", queue, nonExistingExchange, arguments);
setUpVisit(_hostRecord, queue, exchange, queueBinding1, queueBinding2);
VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
upgraderAndRecoverer.upgradeAndRecover(_durableConfigurationStore);
final VirtualHost<?> host = _virtualHostNode.getVirtualHost();
Subject.doAs(_systemSubject, new PrivilegedAction<Void>()
{
@Override
public Void run()
{
host.open();
return null;
}
}
);
assertNotNull("Virtual host is not recovered", host);
Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test-queue");
assertNotNull("Queue is not recovered", recoveredQueue);
Exchange<?> recoveredExchange= host.findConfiguredObject(Exchange.class, "test-direct");
assertNotNull("Exchange is not recovered", recoveredExchange);
Binding<?> recoveredBinding1 = recoveredQueue.findConfiguredObject(Binding.class, "test-binding");
assertNotNull("Correct binding is not recovered", recoveredBinding1);
Binding<?> recoveredBinding2 = recoveredQueue.findConfiguredObject(Binding.class, "test-non-existing");
assertNull("Incorrect binding is recovered", recoveredBinding2);
host.close();
}
private ConfiguredObjectRecord mockBinding(String bindingName, ConfiguredObjectRecord queue, ConfiguredObjectRecord exchange)
{
return mockBinding(bindingName, queue, exchange, null);
}
private ConfiguredObjectRecord mockBinding(String bindingName, ConfiguredObjectRecord queue, ConfiguredObjectRecord exchange, Map<String, Object> arguments)
{
ConfiguredObjectRecord binding = mock(ConfiguredObjectRecord.class);
when(binding.getId()).thenReturn(UUID.randomUUID());
when(binding.getType()).thenReturn("org.apache.qpid.server.model.Binding");
Map<String,UUID> parents = new HashMap<>();
parents.put("Queue", queue.getId());
parents.put("Exchange", exchange.getId());
when(binding.getParents()).thenReturn(parents);
when(binding.toString()).thenReturn("Binding[" + bindingName + "]");
Map<String, Object> attributes = new HashMap<>();
attributes.put("durable", true);
attributes.put("name", bindingName);
if (arguments != null)
{
attributes.put("arguments", arguments);
}
when(binding.getAttributes()).thenReturn(attributes);
return binding;
}
private ConfiguredObjectRecord mockExchange(String exchangeName, String exchangeType)
{
ConfiguredObjectRecord exchange = mock(ConfiguredObjectRecord.class);
when(exchange.getId()).thenReturn(UUID.randomUUID());
when(exchange.getType()).thenReturn("org.apache.qpid.server.model.Exchange");
when(exchange.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
when(exchange.toString()).thenReturn("Exchange[" + exchangeName + "]");
Map<String, Object> attributes = new HashMap<>();
attributes.put("type", exchangeType);
attributes.put("durable", true);
attributes.put("name", exchangeName);
when(exchange.getAttributes()).thenReturn(attributes);
return exchange;
}
private ConfiguredObjectRecord mockQueue(String queueName, Map<String, Object> arguments)
{
ConfiguredObjectRecord queue = mock(ConfiguredObjectRecord.class);
when(queue.getId()).thenReturn(UUID.randomUUID());
when(queue.getType()).thenReturn("org.apache.qpid.server.model.Queue");
when(queue.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
when(queue.toString()).thenReturn("Queue[" + queueName + "]");
Map<String, Object> attributes = new HashMap<>();
attributes.put("durable", true);
attributes.put("name", queueName);
if (arguments != null)
{
attributes.put("arguments", arguments);
}
when(queue.getAttributes()).thenReturn(attributes);
return queue;
}
private void setUpVisit(final ConfiguredObjectRecord... records)
{
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
Iterator<ConfiguredObjectRecord> iterator = asList(records).iterator();
ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
boolean handlerContinue = true;
while(iterator.hasNext())
{
handler.handle(iterator.next());
}
return null;
}
}).when(_durableConfigurationStore).openConfigurationStore(any(ConfiguredObjectRecordHandler.class));
}
}