blob: 98ba8c985fd7ef81e7f69f3b54aee54c3972e5db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.net.ProtocolException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidClientIDException;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests various use cases that require authentication or authorization over MQTT
*/
@RunWith(Parameterized.class)
public class MQTTAuthTest extends MQTTAuthTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTAuthTest.class);
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"mqtt", false},
{"mqtt+ssl", true},
{"mqtt+nio", false},
{"mqtt+nio+ssl", true}
});
}
public MQTTAuthTest(String connectorScheme, boolean useSSL) {
super(connectorScheme, useSSL);
}
@Test(timeout = 60 * 1000)
public void testAnonymousUserConnect() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setCleanSession(true);
mqtt.setUserName((String)null);
mqtt.setPassword((String)null);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
LOG.info("Connected as anonymous client");
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("foo");
mqttPub.setPassword("bar");
final AtomicBoolean failed = new AtomicBoolean();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
assertEquals(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED, connAck.code());
} catch (ProtocolException e) {
failed.set(true);
fail("Error decoding publish " + e.getMessage());
} catch (Throwable err) {
failed.set(true);
throw err;
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertFalse("connection should have failed.", failed.get());
}
@Test(timeout = 60 * 1000)
public void testFailedSubscription() throws Exception {
final String ANONYMOUS = "anonymous";
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2);
mqtt.setVersion("3.1.1");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String NAMED = "named";
byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
assertEquals((byte) 0x80, qos[0]);
assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
// validate the subscription by sending a retained message
connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
connection.unsubscribe(new String[] { ANONYMOUS });
qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE) });
assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack();
//delete retained message
connection.publish(ANONYMOUS, "".getBytes(), QoS.AT_MOST_ONCE, true);
// that delete retained message gets dispatched! Wonder if that is expected?
// guess it is simpler if it is - it shows up on the assertNull:196 below on occasion
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(ANONYMOUS, new String(msg.getTopic()));
msg.ack();
connection.disconnect();
// Test 3.1 functionality
mqtt.setVersion("3.1");
connection = mqtt.blockingConnection();
connection.connect();
qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE) });
assertEquals(QoS.AT_MOST_ONCE.ordinal(), qos[0]);
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish(NAMED, NAMED.getBytes(), QoS.AT_MOST_ONCE, true);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
LOG.info("got msg: " + msg + ", " + (msg != null ? new String(msg.getTopic()) : ""));
assertNull(msg);
}
@Test(timeout = 30 * 1000)
public void testPublishWhenNotAuthorizedDoesNotStall() throws Exception {
getProxyToBroker().addTopic("USERS.foo");
MQTT mqtt = null;
BlockingConnection connection = null;
// Test 3.1 functionality
mqtt = createMQTTConnection("pub", true);
mqtt.setUserName("guest");
mqtt.setPassword("password");
mqtt.setVersion("3.1");
connection = mqtt.blockingConnection();
connection.connect();
connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true);
connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true);
connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true);
connection.disconnect();
assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
// Test 3.1.1 functionality
mqtt = createMQTTConnection("pub", true);
mqtt.setUserName("guest");
mqtt.setPassword("password");
mqtt.setVersion("3.1.1");
connection = mqtt.blockingConnection();
connection.connect();
connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE, true);
connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE, true);
connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE, true);
connection.disconnect();
assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
}
@Test(timeout = 60 * 1000)
public void testWildcardRetainedSubscription() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish("one", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
MQTT mqttSub = createMQTTConnection("sub", true);
mqttSub.setUserName("user");
mqttSub.setPassword("password");
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
}
@Test(timeout = 60 * 1000)
public void testWildcardRetainedSubscriptionLocked() throws Exception {
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
getProxyToBroker().addTopic("one.foo");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish("one/foo", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
MQTT mqttSub = createMQTTConnection("sub", true);
mqttSub.setUserName("user");
mqttSub.setPassword("password");
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(new Topic[]{new Topic("+/#", QoS.AT_LEAST_ONCE)});
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
assertEquals(1, getProxyToTopic("one.foo").getEnqueueCount());
}
@Test(timeout = 60 * 1000)
public void testInvalidClientIdGetCorrectErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("invalid", true);
final AtomicInteger errorCode = new AtomicInteger();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
errorCode.set(connAck.code().ordinal());
assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, connAck.code());
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal(), errorCode.get());
}
@Test(timeout = 60 * 1000)
public void testBadCredentialExceptionGetsCorrectErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("bad-credential", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
final AtomicInteger errorCode = new AtomicInteger();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
errorCode.set(connAck.code().ordinal());
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.ordinal(), errorCode.get());
}
@Override
protected void createPlugins(List<BrokerPlugin> plugins) throws Exception {
BrokerPlugin failOnSpecificConditionsPlugin = new BrokerPlugin() {
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new BrokerFilter(broker) {
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
String clientId = info.getClientId();
if (clientId != null && !clientId.isEmpty()) {
if (clientId.equalsIgnoreCase("invalid")) {
LOG.info("Client ID was invalid");
throw new InvalidClientIDException("Bad client Id");
} else if (clientId.equalsIgnoreCase("bad-credential")) {
LOG.info("User Name was invalid");
throw new CredentialException("Unknwon User Name.");
}
}
}
};
}
};
plugins.add(failOnSpecificConditionsPlugin);
}
}