blob: 8f98b8fc584306bbbecb4a15f6774a2a70009164 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.activemq.transport;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.MaxFrameSizeExceededException;
import org.apache.activemq.transport.nio.SelectorManager;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.*;
//Test for AMQ-8142
@RunWith(value = Parameterized.class)
public class MaxFrameSizeEnabledTest {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
public static final String TRUST_KEYSTORE = "src/test/resources/org/apache/activemq/security/broker1.ks";
private BrokerService broker;
private final String transportType;
private final boolean clientSideEnabled;
private final boolean clientSideFailoverEnabled;
private final boolean serverSideEnabled;
public MaxFrameSizeEnabledTest(String transportType, boolean clientSideEnabled, boolean clientSideFailoverEnabled, boolean serverSideEnabled) {
this.transportType = transportType;
this.clientSideEnabled = clientSideEnabled;
this.clientSideFailoverEnabled = clientSideFailoverEnabled;
this.serverSideEnabled = serverSideEnabled;
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
//Both client and server side max frame check enabled
{"tcp", true, false, true},
{"tcp", true, true, true},
{"ssl", true, false, true},
{"ssl", true, true, true},
{"nio", true, false, true},
{"nio", true, true, true},
{"nio+ssl", true, false, true},
{"nio+ssl", true, true, true},
{"auto", true, false, true},
{"auto", true, true, true},
{"auto+ssl", true, false, true},
{"auto+ssl", true, true, true},
{"auto+nio", true, false, true},
{"auto+nio", true, true, true},
{"auto+nio+ssl", true, false, true},
{"auto+nio+ssl", true, true, true},
//Client side enabled but server side disabled
{"tcp", true, false, false},
{"tcp", true, true, false},
{"ssl", true, false, false},
{"ssl", true, true, false},
{"nio", true, false, false},
{"nio", true, true, false},
{"nio+ssl", true, false, false},
{"nio+ssl", true, true, false},
{"auto", true, false, false},
{"auto", true, true, false},
{"auto+ssl", true, false, false},
{"auto+ssl", true, true, false},
{"auto+nio", true, false, false},
{"auto+nio", true, true, false},
{"auto+nio+ssl", true, false, false},
{"auto+nio+ssl", true, true, false},
//Client side disabled but server side enabled
// AMQ-8515 client=false, failover=true, server=true
// results in infinite retries since broker closes
// socket, so we don't test that combo
{"tcp", false, false, true},
{"ssl", false, false, true},
{"nio", false, false, true},
{"nio+ssl", false, false, true},
{"auto", false, false, true},
{"auto+ssl", false, false, true},
{"auto+nio", false, false, true},
{"auto+nio+ssl", false, false, true},
//Client side and server side disabled
{"tcp", false, false, false},
{"tcp", false, true, false},
{"ssl", false, false, false},
{"ssl", false, true, false},
{"nio", false, false, false},
{"nio", false, true, false},
{"nio+ssl", false, false, false},
{"nio+ssl", false, true, false},
{"auto", false, false, false},
{"auto", false, true, false},
{"auto+ssl", false, false, false},
{"auto+ssl", false, true, false},
{"auto+nio", false, false, false},
{"auto+nio", false, true, false},
{"auto+nio+ssl", false, false, false},
{"auto+nio+ssl", false, true, false},
public static void beforeClass() throws Exception {
System.setProperty("", TRUST_KEYSTORE);
System.setProperty("", PASSWORD);
System.setProperty("", KEYSTORE_TYPE);
System.setProperty("", SERVER_KEYSTORE);
System.setProperty("", KEYSTORE_TYPE);
System.setProperty("", PASSWORD);
public void after() throws Exception {
public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
BrokerService broker = new BrokerService();
TransportConnector connector = broker.addConnector(connectorString);
return broker;
public void stopBroker(BrokerService broker) throws Exception {
if (broker != null) {
public void testMaxFrameSize() throws Exception {
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false);
public void testMaxFrameSizeCompression() throws Exception {
// Test message body length is 99841 bytes. Compresses to ~ 48000
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), true);
protected void testMaxFrameSize(String transportType, String clientUri, boolean useCompression) throws Exception {
final List<Connection> connections = new ArrayList<>();
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
for (int i = 0; i < 10; i++) {
Connection connection = factory.createConnection();
//Generate a body that is too large
StringBuffer body = new StringBuffer();
Random r = new Random();
for (int i = 0; i < 10000; i++) {
//Try sending 10 large messages rapidly in a loop to make sure all
//nio threads are allowed to send again and do not close server-side
for (int i = 0; i < 10; i++) {
boolean maxFrameSizeException = false;
boolean otherException = false;
Connection connection = null;
Session session = null;
Queue destination = null;
MessageConsumer messageConsumer = null;
MessageProducer producer = null;
try {
connection = connections.get(i);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST");
producer = session.createProducer(destination);
} catch (JMSException e) {
if (clientSideEnabled) {
assertEquals("41300", e.getErrorCode());
assertTrue(e.getCause() instanceof MaxFrameSizeExceededException);
} else {
assertTrue(e.getCause() instanceof IOException);
maxFrameSizeException = true;
} catch (Exception e) {
otherException = true;
if (maxFrameSizeEnabled() && !useCompression) {
assertTrue("Should have gotten a jms maxframesize exception", maxFrameSizeException);
assertFalse("Should not have gotten a transport exception", otherException);
} else {
assertFalse("Should not have gotten a jms maxframesize exception", maxFrameSizeException);
if (!maxFrameSizeEnabled() && otherException) {
fail("Should not have gotten exception");
if (connectionsShouldBeOpen(useCompression)) {
// Validate we can send a valid sized message after sending a too-large of message
boolean nextException = false;
try {
messageConsumer = session.createConsumer(destination);
int maxLoops = 50;
boolean found = false;
do {
Message message = messageConsumer.receive(200l);
if (message != null) {
found = true;
} while (!found && maxLoops <= 50);
} catch (Exception e) {
nextException = true;
assertFalse("Should not have gotten an exception for the next message", nextException);
if (connectionsShouldBeOpen(useCompression)) {
//Verify that all connections are active
assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 10,
3000, 500));
} else {
//Verify that all connections are closed
assertTrue(Wait.waitFor(() -> broker.getConnectorByName(transportType).getConnections().size() == 0,
3000, 500));
if (isNio() && connectionsShouldBeOpen(useCompression)) {
//Verify one active transport connections in the selector thread pool.
final ThreadPoolExecutor e = (ThreadPoolExecutor) SelectorManager.getInstance().getSelectorExecutor();
assertTrue(Wait.waitFor(() -> e.getActiveCount() == 1, 3000, 500));
private boolean maxFrameSizeEnabled() {
return clientSideEnabled || serverSideEnabled;
private boolean connectionsShouldBeOpen(boolean useCompression) {
return !maxFrameSizeEnabled() || clientSideEnabled || useCompression;
private boolean isFailover() {
return clientSideFailoverEnabled;
private boolean isSsl() {
return transportType.contains("ssl");
private boolean isNio() {
return transportType.contains("nio");
private String getServerParams() {
if (serverSideEnabled) {
return isSsl() ? "&transport.needClientAuth=true" : "";
} else {
return isSsl() ? "&transport.needClientAuth=true&wireFormat.maxFrameSizeEnabled=false" : "&wireFormat.maxFrameSizeEnabled=false";
private String getClientParams() {
if (clientSideEnabled) {
if(clientSideFailoverEnabled) {
return isSsl() ? "?nested.socket.verifyHostName=false" : "";
} else {
return isSsl() ? "?socket.verifyHostName=false" : "";
} else {
if(clientSideFailoverEnabled) {
return isSsl() ? "?nested.socket.verifyHostName=false&nested.wireFormat.maxFrameSizeEnabled=false" : "?nested.wireFormat.maxFrameSizeEnabled=false";
} else {
return isSsl() ? "?socket.verifyHostName=false&wireFormat.maxFrameSizeEnabled=false" : "?wireFormat.maxFrameSizeEnabled=false";
private String getClientUri(int port) {
if(isFailover()) {
return "failover:(" + (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + ")" + getClientParams() + "&maxReconnectAttempts=1&startupMaxReconnectAttempts=1";
} else {
return (isSsl() ? "ssl" : "tcp") + "://localhost:" + port + getClientParams();