blob: f558ef1e7fa5f32e612e62b238973aaf1054801e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.openwire.codec;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.jms.JMSException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.openwire.codec.OpenWireFormat;
import org.apache.activemq.openwire.codec.OpenWireFormatFactory;
import org.apache.activemq.openwire.commands.BrokerInfo;
import org.apache.activemq.openwire.commands.Command;
import org.apache.activemq.openwire.commands.KeepAliveInfo;
import org.apache.activemq.openwire.commands.Message;
import org.apache.activemq.openwire.commands.MessageDispatch;
import org.apache.activemq.openwire.commands.Response;
import org.apache.activemq.openwire.commands.ShutdownInfo;
import org.apache.activemq.openwire.commands.WireFormatInfo;
import org.apache.activemq.openwire.util.TcpTransport;
import org.apache.activemq.openwire.util.TransportListener;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class used in testing the interoperability between the OpenWire
* commands and Marshalers in this library and those in ActiveMQ.
*/
public abstract class OpenWireInteropTestSupport implements TransportListener {
private static final Logger LOG = LoggerFactory.getLogger(OpenWireInteropTestSupport.class);
@Rule public TestName name = new TestName();
protected BrokerService brokerService;
private TcpTransport transport;
protected URI connectionURI;
private OpenWireFormatFactory factory;
protected OpenWireFormat wireFormat;
private CountDownLatch connected;
private WireFormatInfo remoteWireformatInfo;
private BrokerInfo remoteInfo;
private Exception failureCause;
private final AtomicInteger requestIdGenerator = new AtomicInteger(1);
private final Map<Integer, CountDownLatch> requestMap =
new ConcurrentHashMap<Integer, CountDownLatch>();
protected Command latest;
protected final Queue<Message> messages = new LinkedList<Message>();
@Before
public void setUp() throws Exception {
brokerService = createBroker();
brokerService.start();
brokerService.waitUntilStarted();
factory = new OpenWireFormatFactory();
factory.setVersion(getOpenWireVersion());
factory.setCacheEnabled(false);
factory.setTightEncodingEnabled(isTightEncodingEnabled());
wireFormat = factory.createWireFormat();
}
@After
public void tearDown() throws Exception {
disconnect();
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
protected abstract int getOpenWireVersion();
protected abstract boolean isTightEncodingEnabled();
protected void connect() throws Exception {
connected = new CountDownLatch(1);
transport = new TcpTransport(wireFormat, connectionURI);
transport.setTransportListener(this);
transport.start();
transport.oneway(wireFormat.getPreferedWireFormatInfo());
}
protected void disconnect() throws Exception {
if (transport != null && transport.isStarted()) {
ShutdownInfo done = new ShutdownInfo();
transport.oneway(done);
Thread.sleep(50);
transport.stop();
}
}
protected boolean request(Command command, long timeout, TimeUnit units) throws Exception {
command.setCommandId(requestIdGenerator.getAndIncrement());
command.setResponseRequired(true);
CountDownLatch complete = new CountDownLatch(1);
requestMap.put(command.getCommandId(), complete);
transport.oneway(command);
return complete.await(timeout, units);
}
protected void requestNoResponse(Command command) throws Exception {
command.setCommandId(requestIdGenerator.getAndIncrement());
command.setResponseRequired(false);
transport.oneway(command);
}
protected boolean awaitConnected(long time, TimeUnit unit) throws InterruptedException {
return connected.await(time, unit);
}
protected BrokerService createBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseJmx(true);
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0?transport.trace=true&trace=true");
connectionURI = connector.getPublishableConnectURI();
LOG.debug("Using openwire port: {}", connectionURI);
return brokerService;
}
@Override
public void onCommand(Object command) {
try {
if (command instanceof WireFormatInfo) {
handleWireFormatInfo((WireFormatInfo) command);
} else if (command instanceof KeepAliveInfo) {
handleKeepAliveInfo((KeepAliveInfo) command);
} else if (command instanceof BrokerInfo) {
handleBrokerInfo((BrokerInfo) command);
} else if (command instanceof Response) {
Response response = (Response) command;
this.latest = response;
LOG.info("Received response for request: {}, response = {}", response.getCorrelationId(), latest);
CountDownLatch done = requestMap.get(response.getCorrelationId());
if (done != null) {
done.countDown();
}
} else if (command instanceof MessageDispatch) {
LOG.info("Received new MessageDispatch: {}", command);
MessageDispatch dispatch = (MessageDispatch) command;
messages.add(dispatch.getMessage());
} else {
LOG.info("Received unknown command: {}", command);
}
} catch (Exception e) {
failureCause = e;
}
}
@Override
public void onException(IOException error) {
failureCause = error;
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
public WireFormatInfo getRemoteWireFormatInfo() {
return this.remoteWireformatInfo;
}
public BrokerInfo getRemoteBrokerInfo() {
return this.remoteInfo;
}
public Command getLastCommandReceived() {
return this.latest;
}
public boolean isFailed() {
return this.failureCause != null;
}
protected void handleWireFormatInfo(WireFormatInfo info) throws Exception {
LOG.info("Received remote WireFormatInfo: {}", info);
this.remoteWireformatInfo = info;
if (LOG.isDebugEnabled()) {
LOG.debug(this + " before negotiation: " + wireFormat);
}
if (!info.isValid()) {
onException(new IOException("Remote wire format magic is invalid"));
} else if (info.getVersion() < getOpenWireVersion()) {
onException(new IOException("Remote wire format (" + info.getVersion() +
") is lower the minimum version required (" + getOpenWireVersion() + ")"));
}
wireFormat.renegotiateWireFormat(info);
if (LOG.isDebugEnabled()) {
LOG.debug(this + " after negotiation: " + wireFormat);
}
connected.countDown();
}
protected void handleKeepAliveInfo(KeepAliveInfo info) throws Exception {
LOG.info("Received remote KeepAliveInfo: {}", info);
if (info.isResponseRequired()) {
KeepAliveInfo response = new KeepAliveInfo();
transport.oneway(response);
}
}
protected void handleBrokerInfo(BrokerInfo info) throws Exception {
LOG.info("Received remote BrokerInfo: {}", info);
this.remoteInfo = info;
}
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
}