blob: 473a9fb34d83b5e195c779f6e47ac4816306b0b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.endpoints.amf;
import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.MessageException;
import flex.messaging.endpoints.AbstractEndpoint;
import flex.messaging.io.MessageIOConstants;
import flex.messaging.io.amf.ActionContext;
import flex.messaging.io.amf.MessageBody;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.ErrorMessage;
import flex.messaging.messages.Message;
import flex.messaging.messages.MessagePerformanceUtils;
import flex.messaging.services.MessageService;
import flex.messaging.util.StringUtils;
import flex.messaging.log.LogCategories;
import flex.messaging.log.Log;
import flex.messaging.util.ExceptionUtil;
import flex.messaging.util.UUIDUtils;
import java.util.List;
import java.lang.reflect.Array;
/**
* A simple bridge between the encoding/decoding functionality of
* the AMF endpoint and the MessageBroker: this last filter in the
* chain returns the message to the MessageBroker, which will then
* locate the correct service to handle the message.
*/
public class MessageBrokerFilter extends AMFFilter {
private static final int UNHANDLED_ERROR = 10000;
static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL;
protected AbstractEndpoint endpoint;
public MessageBrokerFilter(AbstractEndpoint endpoint) {
this.endpoint = endpoint;
}
public void invoke(final ActionContext context) {
MessageBody request = context.getRequestMessageBody();
MessageBody response = context.getResponseMessageBody();
Message inMessage = request.getDataAsMessage();
Object outMessage = null;
String replyMethodName = MessageIOConstants.STATUS_METHOD;
try {
// Lookup or create the correct FlexClient.
endpoint.setupFlexClient(inMessage);
// Assign a clientId if necessary.
// We don't need to assign clientIds to general poll requests.
if (inMessage.getClientId() == null &&
(!(inMessage instanceof CommandMessage) || ((CommandMessage) inMessage).getOperation() != CommandMessage.POLL_OPERATION)) {
Object clientId = UUIDUtils.createUUID();
inMessage.setClientId(clientId);
}
// Messages received via the AMF channel can be batched (by NetConnection on the client) and
// we must not put the handler thread into a poll-wait state if a poll command message is followed by
// or preceeded by other messages in the batch; the request-response loop must complete without waiting.
// If the poll command is the only message in the batch it's ok to wait.
// If it isn't ok to wait, tag the poll message with a header that short-circuits any potential poll-wait.
if (inMessage instanceof CommandMessage) {
CommandMessage command = (CommandMessage) inMessage;
if ((command.getOperation() == CommandMessage.POLL_OPERATION) && (context.getRequestMessage().getBodyCount() != 1))
command.setHeader(CommandMessage.SUPPRESS_POLL_WAIT_HEADER, Boolean.TRUE);
}
// If MPI is enabled update the MPI metrics on the object referred to by the context
// and the messages
if (context.isMPIenabled())
MessagePerformanceUtils.setupMPII(context, inMessage);
// Service the message.
outMessage = endpoint.serviceMessage(inMessage);
// if processing of the message resulted in an error, set up context and reply method accordingly
if (outMessage instanceof ErrorMessage) {
context.setStatus(MessageIOConstants.STATUS_ERR);
replyMethodName = MessageIOConstants.STATUS_METHOD;
} else {
replyMethodName = MessageIOConstants.RESULT_METHOD;
}
} catch (MessageException e) {
context.setStatus(MessageIOConstants.STATUS_ERR);
replyMethodName = MessageIOConstants.STATUS_METHOD;
outMessage = e.createErrorMessage();
((ErrorMessage) outMessage).setCorrelationId(inMessage.getMessageId());
((ErrorMessage) outMessage).setDestination(inMessage.getDestination());
((ErrorMessage) outMessage).setClientId(inMessage.getClientId());
e.logAtHingePoint(inMessage, (ErrorMessage) outMessage, null /* Use default message intros */);
} catch (Throwable t) {
// Handle any uncaught failures. The normal exception path on the server
// is to throw MessageExceptions which are handled in the catch block above,
// so if that was skipped we have an overlooked or serious problem.
context.setStatus(MessageIOConstants.STATUS_ERR);
replyMethodName = MessageIOConstants.STATUS_METHOD;
String lmeMessage = t.getMessage();
if (lmeMessage == null)
lmeMessage = t.getClass().getName();
MessageException lme = new MessageException();
lme.setMessage(UNHANDLED_ERROR, new Object[]{lmeMessage});
outMessage = lme.createErrorMessage();
((ErrorMessage) outMessage).setCorrelationId(inMessage.getMessageId());
((ErrorMessage) outMessage).setDestination(inMessage.getDestination());
((ErrorMessage) outMessage).setClientId(inMessage.getClientId());
if (Log.isError()) {
Log.getLogger(LOG_CATEGORY).error("Unhandled error when processing a message: " +
t.toString() + StringUtils.NEWLINE +
" incomingMessage: " + inMessage + StringUtils.NEWLINE +
" errorReply: " + outMessage + StringUtils.NEWLINE +
ExceptionUtil.exceptionFollowedByRootCausesToString(t) + StringUtils.NEWLINE);
}
} finally {
// If MPI is enabled update the MPI metrics on the object referred to by the context
// and the messages
if (context.isRecordMessageSizes() || context.isRecordMessageTimes()) {
MessagePerformanceUtils.updateOutgoingMPI(context, inMessage, outMessage);
}
// If our channel-endpoint combination supports small messages, and
// if we know the current protocol version supports small messages,
// try to replace the message...
FlexSession session = FlexContext.getFlexSession();
if (session != null && session.useSmallMessages()
&& !context.isLegacy()
&& context.getVersion() >= MessageIOConstants.AMF3
&& outMessage instanceof Message) {
outMessage = endpoint.convertToSmallMessage((Message) outMessage);
}
response.setReplyMethod(replyMethodName);
response.setData(outMessage);
}
}
}