blob: 353bc73d556d9e0046c8889a2d0e0842f4db3c11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.ra.inflow;
import javax.jms.MessageListener;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
import org.apache.activemq.artemis.ra.ActiveMQRALogger;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.VersionLoader;
/**
* The message handler
*/
public class ActiveMQMessageHandler implements MessageHandler, FailoverEventListener {
/**
* Trace enabled
*/
private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
/**
* The session
*/
private final ClientSessionInternal session;
private ClientConsumerInternal consumer;
/**
* The endpoint
*/
private MessageEndpoint endpoint;
private final ConnectionFactoryOptions options;
private final ActiveMQActivation activation;
private boolean useLocalTx;
private boolean transacted;
private boolean useXA = false;
private final int sessionNr;
private final TransactionManager tm;
private ClientSessionFactory cf;
private volatile boolean connected;
public ActiveMQMessageHandler(final ConnectionFactoryOptions options,
final ActiveMQActivation activation,
final TransactionManager tm,
final ClientSessionInternal session,
final ClientSessionFactory cf,
final int sessionNr) {
this.options = options;
this.activation = activation;
this.session = session;
this.cf = cf;
this.sessionNr = sessionNr;
this.tm = tm;
}
public void setup() throws Exception {
if (ActiveMQMessageHandler.trace) {
ActiveMQRALogger.LOGGER.trace("setup()");
}
ActiveMQActivationSpec spec = activation.getActivationSpec();
String selector = spec.getMessageSelector();
// Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) {
SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
QueueQuery subResponse = session.queueQuery(queueName);
if (!subResponse.isExists()) {
session.createQueue(activation.getAddress(), queueName, selectorString, true);
}
else {
// The check for already exists should be done only at the first session
// As a deployed MDB could set up multiple instances in order to process messages in parallel.
if (sessionNr == 0 && subResponse.getConsumerCount() > 0) {
if (!spec.isShareSubscriptions()) {
throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
}
else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
ActiveMQRALogger.LOGGER.debug("the mdb on destination " + queueName + " already had " +
subResponse.getConsumerCount() +
" consumers but the MDB is configured to share subscriptions, so no exceptions are thrown");
}
}
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
oldFilterString == null && selector != null ||
(oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
SimpleString oldTopicName = subResponse.getAddress();
boolean topicChanged = !oldTopicName.equals(activation.getAddress());
if (selectorChanged || topicChanged) {
// Delete the old durable sub
session.deleteQueue(queueName);
// Create the new one
session.createQueue(activation.getAddress(), queueName, selectorString, true);
}
}
consumer = (ClientConsumerInternal) session.createConsumer(queueName, null, false);
}
else {
SimpleString tempQueueName;
if (activation.isTopic()) {
if (activation.getTopicTemporaryQueue() == null) {
tempQueueName = new SimpleString(UUID.randomUUID().toString());
session.createTemporaryQueue(activation.getAddress(), tempQueueName, selectorString);
activation.setTopicTemporaryQueue(tempQueueName);
}
else {
tempQueueName = activation.getTopicTemporaryQueue();
QueueQuery queueQuery = session.queueQuery(tempQueueName);
if (!queueQuery.isExists()) {
// this is because we could be using remote servers (in cluster maybe)
// and the queue wasn't created on that node yet.
session.createTemporaryQueue(activation.getAddress(), tempQueueName, selectorString);
}
}
}
else {
tempQueueName = activation.getAddress();
}
consumer = (ClientConsumerInternal) session.createConsumer(tempQueueName, selectorString);
}
// Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
transacted = activation.isDeliveryTransacted();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx()) {
Map<String, Object> xaResourceProperties = new HashMap<>();
xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_JNDI_NAME, ((ActiveMQResourceAdapter) spec.getResourceAdapter()).getJndiName());
xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_NODE_ID, ((ClientSessionFactoryInternal) cf).getLiveNodeId());
xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_PRODUCT_NAME, ActiveMQResourceAdapter.PRODUCT_NAME);
xaResourceProperties.put(ActiveMQXAResourceWrapper.ACTIVEMQ_PRODUCT_VERSION, VersionLoader.getVersion().getFullVersion());
XAResource xaResource = ServiceUtils.wrapXAResource(session, xaResourceProperties);
endpoint = endpointFactory.createEndpoint(xaResource);
useXA = true;
}
else {
endpoint = endpointFactory.createEndpoint(null);
useXA = false;
}
connected = true;
session.addFailoverListener(this);
consumer.setMessageHandler(this);
}
XAResource getXAResource() {
return useXA ? session : null;
}
public Thread interruptConsumer(FutureLatch future) {
try {
if (consumer != null) {
return consumer.prepareForClose(future);
}
}
catch (Throwable e) {
ActiveMQRALogger.LOGGER.errorInterruptingHandler(endpoint.toString(), consumer.toString(), e);
}
return null;
}
/**
* Stop the handler
*/
public void teardown() {
if (ActiveMQMessageHandler.trace) {
ActiveMQRALogger.LOGGER.trace("teardown()");
}
try {
if (endpoint != null) {
endpoint.release();
endpoint = null;
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, t);
}
//only do this if we haven't been disconnected at some point whilst failing over
if (connected) {
try {
consumer.close();
if (activation.getTopicTemporaryQueue() != null) {
// We need to delete temporary topics when the activation is stopped or messages will build up on the server
SimpleString tmpQueue = activation.getTopicTemporaryQueue();
QueueQuery subResponse = session.queueQuery(tmpQueue);
if (subResponse.getConsumerCount() == 0) {
// This is optional really, since we now use temporaryQueues, we could simply ignore this
// and the server temporary queue would remove this as soon as the queue was removed
session.deleteQueue(tmpQueue);
}
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t);
}
try {
if (session != null) {
session.close();
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t);
}
try {
if (cf != null) {
cf.close();
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t);
}
}
else {
//otherwise we just clean up
try {
if (cf != null) {
cf.cleanup();
}
}
catch (Throwable t) {
ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t);
}
}
}
@Override
public void onMessage(final ClientMessage message) {
if (ActiveMQMessageHandler.trace) {
ActiveMQRALogger.LOGGER.trace("onMessage(" + message + ")");
}
ActiveMQMessage msg = ActiveMQMessage.createMessage(message, session, options);
boolean beforeDelivery = false;
try {
if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null) {
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
}
if (trace) {
ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling beforeDelivery on message " + message);
}
endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
//In the transacted case the message must be acked *before* onMessage is called
if (transacted) {
message.individualAcknowledge();
}
((MessageListener) endpoint).onMessage(msg);
if (!transacted) {
message.individualAcknowledge();
}
if (trace) {
ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling afterDelivery on message " + message);
}
try {
endpoint.afterDelivery();
}
catch (ResourceException e) {
ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e);
// If we get here, The TX was already rolled back
// However we must do some stuff now to make sure the client message buffer is cleared
// so we mark this as rollbackonly
session.markRollbackOnly();
return;
}
if (useLocalTx) {
session.commit();
}
if (trace) {
ActiveMQRALogger.LOGGER.trace("finished onMessage on " + message);
}
}
catch (Throwable e) {
ActiveMQRALogger.LOGGER.errorDeliveringMessage(e);
// we need to call before/afterDelivery as a pair
if (beforeDelivery) {
if (useXA && tm != null) {
// This is the job for the container,
// however if the container throws an exception because of some other errors,
// there are situations where the container is not setting the rollback only
// this is to avoid a scenario where afterDelivery would kick in
try {
Transaction tx = tm.getTransaction();
if (tx != null) {
tx.setRollbackOnly();
}
}
catch (Exception e1) {
ActiveMQRALogger.LOGGER.warn("unnable to clear the transaction", e1);
}
}
MessageEndpoint endToUse = endpoint;
try {
// to avoid a NPE that would happen while the RA is in tearDown
if (endToUse != null) {
endToUse.afterDelivery();
}
}
catch (ResourceException e1) {
ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e1);
}
}
if (useLocalTx || !activation.isDeliveryTransacted()) {
try {
session.rollback(true);
}
catch (ActiveMQException e1) {
ActiveMQRALogger.LOGGER.unableToRollbackTX();
}
}
// This is to make sure we will issue a rollback after failures
// so that would cleanup consumer buffers among other things
session.markRollbackOnly();
}
finally {
try {
session.resetIfNeeded();
}
catch (ActiveMQException e) {
ActiveMQRALogger.LOGGER.unableToResetSession(activation.toString(), e);
activation.startReconnectThread("Reset MessageHandler after Failure Thread");
}
}
}
public void start() throws ActiveMQException {
session.start();
}
@Override
public void failoverEvent(FailoverEventType eventType) {
connected = eventType == FailoverEventType.FAILOVER_COMPLETED;
}
}