blob: ce996de181c2401f7d200ca31da9ba056faeb5ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionExpireMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
public class ServerSessionPacketHandler implements ChannelHandler {
private static final int MAX_CACHED_NULL_RESPONSES = 32;
private static final Logger logger = Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
private final StorageManager storageManager;
private final Channel channel;
private volatile CoreRemotingConnection remotingConnection;
private final Actor<Packet> packetActor;
private final ArtemisExecutor callExecutor;
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
private final boolean direct;
private final Object largeMessageLock = new Object();
private final Queue<NullResponseMessage> cachedNullRes;
private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
final Channel channel) {
this.session = session;
session.addCloseable((boolean failed) -> clearLargeMessage());
this.storageManager = server.getStorageManager();
this.channel = channel;
this.remotingConnection = channel.getConnection();
Connection conn = remotingConnection.getTransportConnection();
this.callExecutor = server.getExecutorFactory().getExecutor();
// In an optimized way packetActor should use the threadPool as the parent executor
// directly from server.getThreadPool();
// However due to how transferConnection is handled we need to
// use the same executor
this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
this.direct = conn.isDirectDeliver();
// no confirmation window size means no resend cache hence NullResponsePackets
// won't get cached on it because need confirmation
if (this.channel.getConfirmationWindowSize() == -1) {
cachedNullRes = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
cachedNullRes_V2 = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
} else {
cachedNullRes = null;
cachedNullRes_V2 = null;
}
}
private void clearLargeMessage() {
synchronized (largeMessageLock) {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentLargeMessage = null;
}
}
}
}
public ServerSession getSession() {
return session;
}
public long getID() {
return channel.getID();
}
public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
closeExecutors();
try {
session.close(true);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
}
ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
}
public void closeExecutors() {
packetActor.shutdown();
callExecutor.shutdown();
}
public void close() {
closeExecutors();
channel.flushConfirmations();
try {
session.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingSession(e);
}
}
public Channel getChannel() {
return channel;
}
@Override
public void handlePacket(final Packet packet) {
// This method will call onMessagePacket through an actor
packetActor.act(packet);
}
private void onMessagePacket(final Packet packet) {
if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
}
if (AuditLogger.isAnyLoggingEnabled()) {
AuditLogger.setRemoteAddress(remotingConnection.getRemoteAddress());
AuditLogger.setCurrentCaller(remotingConnection.getAuditSubject());
}
final byte type = packet.getType();
switch (type) {
case SESS_SEND: {
onSessionSend(packet);
break;
}
case SESS_ACKNOWLEDGE: {
onSessionAcknowledge(packet);
break;
}
case SESS_PRODUCER_REQUEST_CREDITS: {
onSessionRequestProducerCredits(packet);
break;
}
case SESS_FLOWTOKEN: {
onSessionConsumerFlowCredit(packet);
break;
}
default:
// separating a method for everything else as JIT was faster this way
slowPacketHandler(packet);
break;
}
}
// This is being separated from onMessagePacket as JIT was more efficient with a small method for the
// hot executions.
private void slowPacketHandler(final Packet packet) {
final byte type = packet.getType();
storageManager.setContext(session.getSessionContext());
Packet response = null;
boolean flush = false;
boolean closeChannel = false;
boolean requiresResponse = false;
try {
try {
switch (type) {
case SESS_SEND_LARGE: {
SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
sendLarge(message.getLargeMessage());
break;
}
case SESS_SEND_CONTINUATION: {
SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
requiresResponse = message.isRequiresResponse();
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case SESS_CREATECONSUMER: {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null);
if (requiresResponse) {
// We send back queue information on the queue as a response- this allows the queue to
// be automatically recreated on failover
QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
} else {
response = new SessionQueueQueryResponseMessage(queueQueryResult);
}
}
break;
}
case CREATE_ADDRESS: {
CreateAddressMessage request = (CreateAddressMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(new QueueConfiguration(request.getQueueName())
.setAddress(request.getAddress())
.setRoutingType(getRoutingTypeFromAddress(request.getAddress()))
.setFilterString(request.getFilterString())
.setTemporary(request.isTemporary())
.setDurable(request.isDurable()));
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case CREATE_QUEUE_V2: {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.toQueueConfiguration());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case CREATE_SHARED_QUEUE: {
CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
session.createSharedQueue(new QueueConfiguration(request.getQueueName())
.setAddress(request.getAddress())
.setFilterString(request.getFilterString())
.setDurable(request.isDurable()));
}
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case CREATE_SHARED_QUEUE_V2: {
CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
session.createSharedQueue(request.toQueueConfiguration());
}
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case DELETE_QUEUE: {
requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
session.deleteQueue(request.getQueueName());
response = createNullResponseMessage(packet);
break;
}
case SESS_QUEUEQUERY: {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (result.isExists() && remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
}
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(result);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
response = new SessionQueueQueryResponseMessage_V2(result);
} else {
response = new SessionQueueQueryResponseMessage(result);
}
break;
}
case SESS_BINDINGQUERY: {
requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
final int clientVersion = remotingConnection.getChannelVersion();
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
* names otherwise the older client won't realize the queue exists and will try to create it and receive
* an error
*/
if (result.isExists() && clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
final List<SimpleString> queueNames = result.getQueueNames();
if (!queueNames.isEmpty()) {
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
if (convertedQueueNames != queueNames) {
result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
}
}
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues());
} else {
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
}
break;
}
case SESS_EXPIRED: {
SessionExpireMessage message = (SessionExpireMessage) packet;
session.expire(message.getConsumerID(), message.getMessageID());
break;
}
case SESS_COMMIT: {
requiresResponse = true;
session.commit();
response = createNullResponseMessage(packet);
break;
}
case SESS_ROLLBACK: {
requiresResponse = true;
session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
response = createNullResponseMessage(packet);
break;
}
case SESS_XA_COMMIT: {
requiresResponse = true;
SessionXACommitMessage message = (SessionXACommitMessage) packet;
session.xaCommit(message.getXid(), message.isOnePhase());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_END: {
requiresResponse = true;
SessionXAEndMessage message = (SessionXAEndMessage) packet;
session.xaEnd(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_FORGET: {
requiresResponse = true;
SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
session.xaForget(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_JOIN: {
requiresResponse = true;
SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
session.xaJoin(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_RESUME: {
requiresResponse = true;
SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
session.xaResume(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_ROLLBACK: {
requiresResponse = true;
SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
session.xaRollback(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_START: {
requiresResponse = true;
SessionXAStartMessage message = (SessionXAStartMessage) packet;
session.xaStart(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_FAILED: {
requiresResponse = true;
SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet;
session.xaFailed(message.getXid());
// no response on this case
break;
}
case SESS_XA_SUSPEND: {
requiresResponse = true;
session.xaSuspend();
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_PREPARE: {
requiresResponse = true;
SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
session.xaPrepare(message.getXid());
response = createSessionXAResponseMessage(packet);
break;
}
case SESS_XA_INDOUBT_XIDS: {
requiresResponse = true;
List<Xid> xids = session.xaGetInDoubtXids();
response = new SessionXAGetInDoubtXidsResponseMessage(xids);
break;
}
case SESS_XA_GET_TIMEOUT: {
requiresResponse = true;
int timeout = session.xaGetTimeout();
response = new SessionXAGetTimeoutResponseMessage(timeout);
break;
}
case SESS_XA_SET_TIMEOUT: {
requiresResponse = true;
SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
session.xaSetTimeout(message.getTimeoutSeconds());
response = new SessionXASetTimeoutResponseMessage(true);
break;
}
case SESS_START: {
session.start();
break;
}
case SESS_STOP: {
requiresResponse = true;
session.stop();
response = createNullResponseMessage(packet);
break;
}
case SESS_CLOSE: {
requiresResponse = true;
session.close(false);
// removeConnectionListeners();
response = createNullResponseMessage(packet);
flush = true;
closeChannel = true;
break;
}
case SESS_INDIVIDUAL_ACKNOWLEDGE: {
SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet;
requiresResponse = message.isRequiresResponse();
session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
break;
}
case SESS_CONSUMER_CLOSE: {
requiresResponse = true;
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
session.closeConsumer(message.getConsumerID());
response = createNullResponseMessage(packet);
break;
}
case SESS_FORCE_CONSUMER_DELIVERY: {
SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
break;
}
case PacketImpl.SESS_ADD_METADATA: {
response = createNullResponseMessage(packet);
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
session.addMetaData(message.getKey(), message.getData());
break;
}
case PacketImpl.SESS_ADD_METADATA2: {
requiresResponse = true;
SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
if (message.isRequiresConfirmations()) {
response = createNullResponseMessage(packet);
}
session.addMetaData(message.getKey(), message.getData());
break;
}
case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
requiresResponse = true;
SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
if (session.addUniqueMetaData(message.getKey(), message.getData())) {
response = createNullResponseMessage(packet);
} else {
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
}
break;
}
}
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, flush, closeChannel);
} finally {
storageManager.clearContext();
}
}
private RoutingType getRoutingTypeFromAddress(SimpleString address) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX) || address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
return RoutingType.ANYCAST;
}
return RoutingType.MULTICAST;
}
private boolean requireNullResponseMessage_V1(Packet packet) {
return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange();
}
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
assert requireNullResponseMessage_V1(packet);
NullResponseMessage response;
if (cachedNullRes != null) {
response = cachedNullRes.poll();
if (response == null) {
response = new NullResponseMessage();
} else {
response.reset();
}
} else {
response = new NullResponseMessage();
}
return response;
}
private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
assert !requireNullResponseMessage_V1(packet);
NullResponseMessage_V2 response;
if (cachedNullRes_V2 != null) {
response = cachedNullRes_V2.poll();
if (response == null) {
response = new NullResponseMessage_V2(packet.getCorrelationID());
} else {
response.reset();
// this should be already set by the channel too, but let's do it just in case
response.setCorrelationID(packet.getCorrelationID());
}
} else {
response = new NullResponseMessage_V2(packet.getCorrelationID());
}
return response;
}
private Packet createNullResponseMessage(Packet packet) {
if (requireNullResponseMessage_V1(packet)) {
return createNullResponseMessage_V1(packet);
}
return createNullResponseMessage_V2(packet);
}
private Packet createSessionXAResponseMessage(Packet packet) {
Packet response;
if (packet.isResponseAsync()) {
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null);
} else {
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
return response;
}
private void releaseResponse(Packet packet) {
if (cachedNullRes == null || cachedNullRes_V2 == null) {
return;
}
if (packet instanceof NullResponseMessage) {
cachedNullRes.offer((NullResponseMessage) packet);
return;
}
if (packet instanceof NullResponseMessage_V2) {
cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
}
}
private void onSessionAcknowledge(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
Packet response = null;
boolean requiresResponse = false;
try {
final SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
requiresResponse = message.isRequiresResponse();
this.session.acknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
this.storageManager.clearContext();
}
}
private void onSessionSend(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
Packet response = null;
boolean requiresResponse = false;
try {
final SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage(), storageManager), this.direct);
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
this.storageManager.clearContext();
}
}
private void onSessionRequestProducerCredits(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
Packet response = null;
boolean requiresResponse = false;
try {
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
session.requestProducerCredits(message.getAddress(), message.getCredits());
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
this.storageManager.clearContext();
}
}
private void onSessionConsumerFlowCredit(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
Packet response = null;
boolean requiresResponse = false;
try {
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
} catch (ActiveMQException e) {
response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
} catch (Throwable t) {
response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
}
sendResponse(packet, response, false, false);
} finally {
this.storageManager.clearContext();
}
}
private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet,
ActiveMQIOErrorException e,
boolean requiresResponse,
Packet response,
ServerSession session) {
session.markTXFailed(e);
if (requiresResponse) {
logger.debug("Sending exception to client", e);
response = convertToExceptionPacket(packet, e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
return response;
}
private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet,
ActiveMQXAException e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
if (packet.isResponseAsync()) {
response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage());
} else {
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
}
} else {
ActiveMQServerLogger.LOGGER.caughtXaException(e);
}
return response;
}
private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet,
ActiveMQQueueMaxConsumerLimitReached e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
response = convertToExceptionPacket(packet, e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
return response;
}
private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) {
Packet response;
if (packet.isResponseAsync()) {
response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e);
} else {
response = new ActiveMQExceptionMessage(e);
}
return response;
}
private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet,
ActiveMQException e,
boolean requiresResponse,
Packet response) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
response = convertToExceptionPacket(packet, e);
} else {
if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
logger.debug("Caught exception", e);
} else {
ActiveMQServerLogger.LOGGER.caughtException(e);
}
}
return response;
}
private static Packet onCatchThrowableWhileHandlePacket(Packet packet,
Throwable t,
boolean requiresResponse,
Packet response,
ServerSession session) {
session.markTXFailed(t);
if (requiresResponse) {
ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t);
ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
activeMQInternalErrorException.initCause(t);
response = convertToExceptionPacket(packet, activeMQInternalErrorException);
} else {
ActiveMQServerLogger.LOGGER.caughtException(t);
}
return response;
}
private void sendResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
final boolean closeChannel) {
if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::scheduling response::" + response);
}
storageManager.afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);
if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
}
}
@Override
public void done() {
if (logger.isTraceEnabled()) {
logger.trace("ServerSessionPacketHandler::regular response sent::" + response);
}
doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
}
});
}
private void doConfirmAndResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
final boolean closeChannel) {
// don't confirm if the response is an exception
if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
channel.confirm(confirmPacket);
if (flush) {
channel.flushConfirmations();
}
}
if (response != null) {
try {
channel.send(response);
} finally {
releaseResponse(response);
}
}
if (closeChannel) {
channel.close();
}
}
public void closeListeners() {
List<CloseListener> listeners = remotingConnection.removeCloseListeners();
for (CloseListener closeListener : listeners) {
closeListener.connectionClosed();
if (closeListener instanceof FailureListener) {
remotingConnection.removeFailureListener((FailureListener) closeListener);
}
}
}
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
SimpleFuture<Integer> future = new SimpleFutureImpl<>();
callExecutor.execute(() -> {
int value = internaltransferConnection(newConnection, lastReceivedCommandID);
future.set(value);
});
try {
return future.get().intValue();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private int internaltransferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
// delivered
// after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
// sequence of packets.
// It is not sufficient to just stop the session, since right after stopping the session, another session start
// might be executed
// before we have transferred the connection, leaving it in a started state
session.setTransferring(true);
List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
// Note. We do not destroy the replicating connection here. In the case the live server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
// the replicating connection will cause the outstanding responses to be be replayed on the live server,
// if these reach the client who then subsequently fails over, on reconnection to backup, it will have
// received responses that the backup did not know about.
channel.transferConnection(newConnection);
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
Connection oldTransportConnection = remotingConnection.getTransportConnection();
remotingConnection = newConnection;
remotingConnection.setCloseListeners(closeListeners);
remotingConnection.setFailureListeners(failureListeners);
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
channel.replayCommands(lastReceivedCommandID);
channel.setTransferring(false);
session.setTransferring(false);
// We do this because the old connection could be out of credits on netty
// this will force anything to resume after the reattach through the ReadyListener callbacks
oldTransportConnection.fireReady(true);
return serverLastReceivedCommandID;
}
// Large Message is part of the core protocol, we have these functions here as part of Packet handler
private void sendLarge(final Message message) throws Exception {
// need to create the LargeMessage before continue
long id = storageManager.generateID();
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
if (logger.isTraceEnabled()) {
logger.trace("sendLarge::" + largeMsg);
}
if (currentLargeMessage != null) {
ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID());
}
currentLargeMessage = largeMsg;
}
private void sendContinuations(final int packetSize,
final long messageBodySize,
final byte[] body,
final boolean continues) throws Exception {
synchronized (largeMessageLock) {
if (currentLargeMessage == null) {
throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
}
// Immediately release the credits for the continuations- these don't contribute to the in-memory size
// of the message
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources(true, true);
if (messageBodySize >= 0) {
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false);
}
}
}
}