blob: d568372943ba69d6c7c95902c5287d4833b344ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.systests.jms_1_1.extensions.acl;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupMember;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.security.access.config.AclFileParser;
import org.apache.qpid.server.security.access.config.LegacyOperation;
import org.apache.qpid.server.security.access.config.ObjectProperties;
import org.apache.qpid.server.security.access.config.ObjectType;
import org.apache.qpid.server.security.access.config.Rule;
import org.apache.qpid.server.security.access.config.RuleSet;
import org.apache.qpid.server.security.access.plugins.AclRule;
import org.apache.qpid.server.security.access.plugins.RuleBasedVirtualHostAccessControlProvider;
import org.apache.qpid.server.security.access.plugins.RuleOutcome;
import org.apache.qpid.server.security.group.GroupProviderImpl;
import org.apache.qpid.systests.JmsTestBase;
public class MessagingACLTest extends JmsTestBase
{
private static final Logger LOGGER = LoggerFactory.getLogger(MessagingACLTest.class);
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
private static final String USER1 = "guest";
private static final String USER1_PASSWORD = "guest";
private static final String USER2 = "admin";
private static final String USER2_PASSWORD = "admin";
private static final String RULE_BASED_VIRTUAL_HOST_ACCESS_CONTROL_PROVIDER_TYPE =
"org.apache.qpid.RuleBaseVirtualHostAccessControlProvider";
private static final String EXCHANGE_TYPE = "org.apache.qpid.Exchange";
@Test
public void testAccessAuthorizedSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
assertConnection(connection);
}
finally
{
connection.close();
}
}
@Test
public void testAccessNoRightsFailure() throws Exception
{
configureACL(String.format("ACL DENY-LOG %s ACCESS VIRTUALHOST", USER1));
try
{
getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
fail("Connection was created.");
}
catch (JMSException e)
{
assertAccessDeniedException(e);
}
}
@Test
public void testAccessVirtualHostWithName() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST name='%s'",
USER1,
getVirtualHostName()),
String.format("ACL DENY-LOG %s ACCESS VIRTUALHOST name='%s'", USER2, getVirtualHostName()));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
assertConnection(connection);
}
finally
{
connection.close();
}
try
{
getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
fail("Access should be denied");
}
catch (JMSException e)
{
assertAccessDeniedException(e);
}
}
@Test
public void testAccessVirtualHostWildCard() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST name='*'", USER1),
String.format("ACL DENY-LOG %s ACCESS VIRTUALHOST name='*'", USER2));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
assertConnection(connection);
}
finally
{
connection.close();
}
try
{
getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
fail("Access should be denied");
}
catch (JMSException e)
{
assertAccessDeniedException(e);
}
}
@Test
public void testAuthorizationWithConnectionLimit() throws Exception
{
final int connectionLimit = 2;
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST connection_limit='%d'",
USER2,
connectionLimit));
final List<Connection> establishedConnections = new ArrayList<>();
try
{
establishConnections(connectionLimit, establishedConnections);
verifyConnectionEstablishmentFails(connectionLimit);
establishedConnections.remove(0).close();
getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build().close();
}
finally
{
closeConnections(establishedConnections);
}
}
@Test
public void testAuthorizationWithConnectionFrequencyLimit() throws Exception
{
final int connectionFrequencyLimit = 1;
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST connection_frequency_limit='%d'",
USER2,
connectionFrequencyLimit));
final List<Connection> establishedConnections = new ArrayList<>();
try
{
establishConnections(connectionFrequencyLimit, establishedConnections);
verifyConnectionEstablishmentFails(connectionFrequencyLimit);
establishedConnections.remove(0).close();
verifyConnectionEstablishmentFails(connectionFrequencyLimit);
}
finally
{
closeConnections(establishedConnections);
}
}
@Test
public void testAuthorizationWithConnectionLimitAndFrequencyLimit() throws Exception
{
final int connectionFrequencyLimit = 2;
final int connectionLimit = 3;
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST connection_limit='%d' connection_frequency_limit='%d'",
USER2,
connectionLimit,
connectionFrequencyLimit));
final List<Connection> establishedConnections = new ArrayList<>();
try
{
establishConnections(connectionFrequencyLimit, establishedConnections);
verifyConnectionEstablishmentFails(connectionFrequencyLimit);
establishedConnections.remove(0).close();
verifyConnectionEstablishmentFails(connectionFrequencyLimit);
}
finally
{
closeConnections(establishedConnections);
}
}
private void establishConnections(final int connectionNumber, final List<Connection> establishedConnections)
throws NamingException, JMSException
{
for (int i = 0; i < connectionNumber; i++)
{
establishedConnections.add(getConnectionBuilder().setUsername(USER2)
.setPassword(USER2_PASSWORD)
.setClientId(getTestName() + i)
.build());
}
}
private void closeConnections(final List<Connection> establishedConnections) throws JMSException
{
for (Connection c : establishedConnections)
{
c.close();
}
}
private void verifyConnectionEstablishmentFails(final int frequencyLimit) throws NamingException
{
try
{
final Connection connection = getConnectionBuilder().setUsername(USER2)
.setPassword(USER2_PASSWORD)
.setClientId(getTestName() + frequencyLimit)
.build();
try
{
fail("Connection creation should fail due to exceeding limit");
}
finally
{
connection.close();
}
}
catch (JMSException e)
{
//pass
}
}
@Test
public void testConsumeFromTempQueueSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=\"true\"", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\"", USER1) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
session.createConsumer(session.createTemporaryQueue()).close();
}
finally
{
connection.close();
}
}
@Test
public void testConsumeFromTempQueueFailure() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
String.format("ACL DENY-LOG %s CONSUME QUEUE temporary=\"true\"", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\"", USER1) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
try
{
session.createConsumer(temporaryQueue);
fail("Exception is not thrown");
}
catch (JMSException e)
{
// pass
}
}
finally
{
try
{
connection.close();
}
catch (Exception e)
{
LOGGER.error("Unexpected exception on connection close", e);
}
}
}
@Test
public void testConsumeOwnQueueSuccess() throws Exception
{
final String queueName = "user1Queue";
assumeThat(getBrokerAdmin().getValidUsername(), is(equalTo(USER1)));
createQueue(queueName);
Map<String, Object> queueAttributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", true);
assertThat("Test prerequiste not met, queue belongs to unexpected user", queueAttributes.get(ConfiguredObject.CREATED_BY), is(equalTo(USER1)));
configureACL("ACL ALLOW-LOG ALL ACCESS VIRTUALHOST",
"ACL ALLOW-LOG OWNER CONSUME QUEUE",
"ACL DENY-LOG ALL CONSUME QUEUE");
final String queueAddress = String.format(isLegacyClient() ? "ADDR:%s; {create:never}" : "%s", queueName);
Connection queueOwnerCon = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session queueOwnerSession = queueOwnerCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = queueOwnerSession.createQueue(queueAddress);
queueOwnerSession.createConsumer(queue).close();
}
finally
{
queueOwnerCon.close();
}
Connection otherUserCon = getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
try
{
Session otherUserSession = otherUserCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
otherUserSession.createConsumer(otherUserSession.createQueue(queueAddress)).close();
fail("Exception not thrown");
}
catch (JMSException e)
{
final String expectedMessage =
Sets.newHashSet(Protocol.AMQP_1_0, Protocol.AMQP_0_10).contains(getProtocol())
? "Permission CREATE is denied for : Consumer"
: "403(access refused)";
assertJMSExceptionMessageContains(e, expectedMessage);
}
}
finally
{
otherUserCon.close();
}
}
@Test
public void testConsumeFromTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=\"true\"", USER1),
String.format(isLegacyClient()
? "ACL ALLOW-LOG %s BIND EXCHANGE name=\"amq.topic\""
: "ACL ALLOW-LOG %s BIND EXCHANGE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
session.createConsumer(temporaryTopic);
}
finally
{
connection.close();
}
}
@Test
public void testConsumeFromNamedQueueValid() throws Exception
{
final String queueName = getTestName();
Queue queue = createQueue(queueName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE name=\"%s\"", USER1, queueName),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\" routingKey=\"%s\"", USER1, queueName) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
session.createConsumer(queue).close();
}
finally
{
connection.close();
}
}
@Test
public void testConsumeFromNamedQueueFailure() throws Exception
{
String queueName = getTestName();
Queue queue = createQueue(queueName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL DENY-LOG %s CONSUME QUEUE name=\"%s\"", USER1, queueName),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\" routingKey=\"%s\"", USER1, queueName) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
try
{
session.createConsumer(queue);
fail("Test failed as consumer was created.");
}
catch (JMSException e)
{
// pass
}
}
finally
{
connection.close();
}
}
@Test
public void testCreateTemporaryQueueSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\" temporary=true", USER1) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertNotNull(queue);
}
finally
{
connection.close();
}
}
// For AMQP 1.0 the server causes a temporary instance of the fanout exchange to come into being.
// For early AMQP version, there are no server side objects created as amq.topic is used.
@Test
public void testCreateTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
assertNotNull(temporaryTopic);
}
finally
{
connection.close();
}
}
@Test
public void testCreateTemporaryQueueFailed() throws Exception
{
assumeThat("QPID-7919",
getProtocol(),
is(not(equalTo(Protocol.AMQP_1_0))));
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL DENY-LOG %s CREATE QUEUE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
try
{
session.createTemporaryQueue();
fail("Test failed as creation succeeded.");
}
catch (JMSException e)
{
assertJMSExceptionMessageContains(e, "Permission CREATE is denied for : Queue");
}
}
finally
{
connection.close();
}
}
@Test
public void testPublishUsingTransactionSuccess() throws Exception
{
String queueName = getTestName();
Queue queue = createQueue(queueName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format(isLegacyClient()
? "ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\""
: "ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"",
USER1,
queueName));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer sender = session.createProducer(queue);
sender.send(session.createTextMessage("test"));
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void testPublishToExchangeUsingTransactionSuccess() throws Exception
{
String queueName = getTestName();
createQueue(queueName);
final Map<String, Object> bindingArguments = new HashMap<>();
bindingArguments.put("destination", queueName);
bindingArguments.put("bindingKey", queueName);
performOperationUsingAmqpManagement("amq.direct",
"bind",
EXCHANGE_TYPE,
bindingArguments);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\"", USER1, queueName));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
String address = String.format(isLegacyClient() ? "ADDR:amq.direct/%s" : "amq.direct/%s", queueName);
Queue queue = session.createQueue(address);
MessageProducer sender = session.createProducer(queue);
sender.send(session.createTextMessage("test"));
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void testRequestResponseSuccess() throws Exception
{
String queueName = getTestName();
Queue queue = createQueue(queueName);
String groupName = "messaging-users";
createGroupProvider(groupName, USER1, USER2);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", groupName),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE name=\"%s\"", USER1, queueName),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=true", USER2),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=true", USER2),
isLegacyClient() ?
String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"amq.direct\" temporary=true", USER2) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"TempQueue*\"", USER1),
isLegacyClient() ?
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\"", USER2, queueName) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER2, queueName),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE", USER1) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"TempQueue*\"", USER1) : ""
);
Connection responderConnection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session responderSession = responderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer requestConsumer = responderSession.createConsumer(queue);
responderConnection.start();
Connection requesterConnection = getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
try
{
Session requesterSession = requesterConnection.createSession(true, Session.SESSION_TRANSACTED);
Queue responseQueue = requesterSession.createTemporaryQueue();
MessageConsumer responseConsumer = requesterSession.createConsumer(responseQueue);
requesterConnection.start();
Message request = requesterSession.createTextMessage("Request");
request.setJMSReplyTo(responseQueue);
requesterSession.createProducer(queue).send(request);
requesterSession.commit();
Message receivedRequest = requestConsumer.receive(getReceiveTimeout());
assertNotNull("Request is not received", receivedRequest);
assertNotNull("Request should have Reply-To", receivedRequest.getJMSReplyTo());
MessageProducer responder = responderSession.createProducer(receivedRequest.getJMSReplyTo());
responder.send(responderSession.createTextMessage("Response"));
responderSession.commit();
Message receivedResponse = responseConsumer.receive(getReceiveTimeout());
requesterSession.commit();
assertNotNull("Response is not received", receivedResponse);
assertEquals("Unexpected response is received", "Response", ((TextMessage) receivedResponse).getText());
}
finally
{
requesterConnection.close();
}
}
finally
{
responderConnection.close();
}
}
@Test
public void testPublishToTempTopicSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.topic\"", USER1) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE temporary=\"true\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
TemporaryTopic temporaryTopic = session.createTemporaryTopic();
MessageProducer producer = session.createProducer(temporaryTopic);
producer.send(session.createMessage());
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void testFirewallAllow() throws Exception
{
configureACL(String.format("ACL ALLOW %s ACCESS VIRTUALHOST from_network=\"127.0.0.1\"", USER1));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
assertConnection(connection);
}
finally
{
connection.close();
}
}
@Test
public void testAllAllowed() throws Exception
{
configureACL("ACL ALLOW ALL ALL");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
assertConnection(connection);
}
finally
{
connection.close();
}
}
@Test
public void testFirewallDeny() throws Exception
{
configureACL(String.format("ACL DENY %s ACCESS VIRTUALHOST from_network=\"127.0.0.1\"", USER1));
try
{
getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
fail("We expected the connection to fail");
}
catch (JMSException e)
{
// pass
}
}
@Test
public void testPublishToDefaultExchangeSuccess() throws Exception
{
assumeThat("Test not applicable for AMQP 1.0",
getProtocol(),
is(not(equalTo(Protocol.AMQP_1_0))));
String queueName = getTestName();
createQueue(queueName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER1, queueName));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer sender = session.createProducer(session.createQueue(String.format("ADDR: %s", queueName)));
sender.send(session.createTextMessage("test"));
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void testPublishToDefaultExchangeFailure() throws Exception
{
assumeThat("Test not applicable for AMQP 1.0",
getProtocol(),
is(not(equalTo(Protocol.AMQP_1_0))));
String queueName = getTestName();
createQueue(queueName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL DENY-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER1, queueName));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer sender = session.createProducer(session.createQueue(String.format("ADDR: %s", queueName)));
sender.send(session.createTextMessage("test"));
session.commit();
fail("Sending to the anonymousExchange without permission should fail");
}
catch (JMSException e)
{
assertJMSExceptionMessageContains(e, "Access denied to publish to default exchange");
}
finally
{
connection.close();
}
}
@Test
public void testAnonymousProducerFailsToSendMessageIntoDeniedDestination() throws Exception
{
final String allowedDestinationName = "example.RequestQueue";
final String deniedDestinationName = "deniedQueue";
createQueue(allowedDestinationName);
createQueue(deniedDestinationName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format(isLegacyClient()
? "ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\""
: "ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER1, allowedDestinationName),
String.format("ACL DENY-LOG %s PUBLISH EXCHANGE name=\"*\" routingKey=\"%s\"", USER1, deniedDestinationName));
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(null);
producer.send(session.createQueue(allowedDestinationName), session.createTextMessage("test1"));
session.commit();
}
finally
{
connection.close();
}
Connection connection2 = getConnectionBuilder().setSyncPublish(true).setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection2.createSession(true, Session.SESSION_TRANSACTED);
try
{
MessageProducer producer = session.createProducer(null);
producer.send(session.createQueue(deniedDestinationName), session.createTextMessage("test2"));
fail("Sending should fail");
}
catch (JMSException e)
{
assertJMSExceptionMessageContains(e,
String.format(
"Permission PERFORM_ACTION(publish) is denied for : %s",
(!isLegacyClient() ? "Queue" : "Exchange")));
}
try
{
session.commit();
fail("Commit should fail");
}
catch (JMSException e)
{
// pass
}
}
finally
{
connection2.close();
}
}
@Test
public void testPublishIntoDeniedDestinationFails() throws Exception
{
final String deniedDestinationName = "deniedQueue";
createQueue(deniedDestinationName);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL DENY-LOG %s PUBLISH EXCHANGE name=\"*\" routingKey=\"%s\"", USER1, deniedDestinationName));
Connection connection = getConnectionBuilder().setSyncPublish(true).setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(deniedDestinationName));
producer.send(session.createTextMessage("test"));
fail("Sending should fail");
}
catch (JMSException e)
{
assertJMSExceptionMessageContains(e,
String.format(
"Permission PERFORM_ACTION(publish) is denied for : %s",
(!isLegacyClient() ? "Queue" : "Exchange")));
}
}
@Test
public void testCreateNamedQueueFailure() throws Exception
{
assumeThat("Test not applicable for AMQP 1.0",
getProtocol(),
is(not(equalTo(Protocol.AMQP_1_0))));
String queueName = getTestName();
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\" routingKey=\"%s\"", USER1, queueName) : "");
Connection connection = getConnectionBuilder().setSyncPublish(true).setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
session.createConsumer(session.createQueue("IllegalQueue"));
fail("Test failed as Queue creation succeeded.");
}
catch (JMSException e)
{
assertJMSExceptionMessageContains(e, "Permission CREATE is denied for : Queue");
}
}
finally
{
connection.close();
}
}
private void assertJMSExceptionMessageContains(final JMSException e, final String expectedMessage)
{
Set<Throwable> examined = new HashSet<>();
Throwable current = e;
do
{
if (current.getMessage().contains(expectedMessage))
{
return;
}
examined.add(current);
current = current.getCause();
}
while (current != null && !examined.contains(current));
e.printStackTrace();
fail(String.format("Unexpected message. Root exception : %s. Expected root or underlying(s) to contain : %s", e.getMessage(), expectedMessage));
}
private void configureACL(String... rules) throws Exception
{
EventLoggerProvider eventLoggerProvider = mock(EventLoggerProvider.class);
EventLogger eventLogger = mock(EventLogger.class);
when(eventLoggerProvider.getEventLogger()).thenReturn(eventLogger);
List<AclRule> aclRules = new ArrayList<>();
try(StringReader stringReader = new StringReader(Arrays.stream(rules).collect(Collectors.joining(LINE_SEPARATOR))))
{
RuleSet ruleSet = AclFileParser.parse(stringReader, eventLoggerProvider);
final List<Rule> parsedRules = ruleSet.getAllRules();
for(final Rule rule: parsedRules)
{
aclRules.add(new AclRule(){
@Override
public String getIdentity()
{
return rule.getIdentity();
}
@Override
public ObjectType getObjectType()
{
return rule.getAction().getObjectType();
}
@Override
public LegacyOperation getOperation()
{
return rule.getAction().getOperation();
}
@Override
public Map<ObjectProperties.Property, String> getAttributes()
{
return rule.getAttributes();
}
@Override
public RuleOutcome getOutcome()
{
return rule.getRuleOutcome();
}
});
}
}
configureACL(aclRules.toArray(new AclRule[aclRules.size()]));
}
private void configureACL(AclRule... rules) throws Exception
{
final String serializedRules = new ObjectMapper().writeValueAsString(rules);
final Map<String, Object> attributes = new HashMap<>();
attributes.put(RuleBasedVirtualHostAccessControlProvider.RULES, serializedRules);
attributes.put(RuleBasedVirtualHostAccessControlProvider.DEFAULT_RESULT, "DENIED");
createEntityUsingAmqpManagement("acl", RULE_BASED_VIRTUAL_HOST_ACCESS_CONTROL_PROVIDER_TYPE, attributes);
}
private void createGroupProvider(final String groupName, final String... groupMembers) throws Exception
{
String groupProviderName = "groups";
Connection connection = getConnectionBuilder().setVirtualHost("$management").build();
try
{
connection.start();
createEntity(groupProviderName,
GroupProviderImpl.class.getName(),
Collections.emptyMap(),
connection);
createEntity(groupName,
Group.class.getName(),
Collections.singletonMap("object-path", groupProviderName),
connection);
for (String groupMember: groupMembers)
{
createEntity(groupMember,
GroupMember.class.getName(),
Collections.singletonMap("object-path", groupProviderName + "/" + groupName),
connection);
}
}
finally
{
connection.close();
}
}
private void assertConnection(final Connection connection) throws JMSException
{
assertNotNull("create session should be successful",
connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
}
private void assertAccessDeniedException(JMSException e) throws Exception
{
assertTrue("Unexpected exception message:" + e.getMessage(),
e.getMessage().contains("Permission PERFORM_ACTION(connect) is denied"));
if (getProtocol() == Protocol.AMQP_1_0)
{
assertTrue("Unexpected error condition reported:" + e.getMessage(),
e.getMessage().contains("amqp:not-allowed"));
}
}
private boolean isLegacyClient()
{
return getProtocol() != Protocol.AMQP_1_0;
}
}