blob: c15ab89e7e86bcc6832e5381443bbf3513ad9037 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.endpoints.amf;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import javax.servlet.http.HttpServletRequest;
import flex.messaging.FlexContext;
import flex.messaging.MessageException;
import flex.messaging.io.MessageDeserializer;
import flex.messaging.io.MessageIOConstants;
import flex.messaging.io.MessageSerializer;
import flex.messaging.io.SerializationContext;
import flex.messaging.io.SerializationException;
import flex.messaging.io.amf.ASObject;
import flex.messaging.io.amf.ActionContext;
import flex.messaging.io.amf.ActionMessage;
import flex.messaging.io.amf.AmfTrace;
import flex.messaging.io.amf.MessageBody;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.log.Logger;
import flex.messaging.messages.ErrorMessage;
import flex.messaging.messages.Message;
import flex.messaging.messages.MessagePerformanceInfo;
import flex.messaging.util.ExceptionUtil;
import flex.messaging.util.StringUtils;
/**
* Filter for serializing and deserializing action messages.
*/
public class SerializationFilter extends AMFFilter
{
//--------------------------------------------------------------------------
//
// Private Static Constants
//
//--------------------------------------------------------------------------
private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
// Error codes.
private static final int UNHANDLED_ERROR = 10306;
private static final int REQUEST_ERROR = 10307;
private static final int RESPONSE_ERROR = 10308;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs a <tt>SerializationFilter</tt>.
*
* @param logCategory Log category to use in logging. If <code>null</code>, the default values is <code>Endpoint.General</code>.
*/
public SerializationFilter(String logCategory)
{
if (logCategory == null)
logCategory = LogCategories.ENDPOINT_GENERAL;
logger = Log.getLogger(logCategory);
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* Used to log serialization/deserialization messages.
*/
private Logger logger;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
@Override
public void invoke(final ActionContext context) throws IOException
{
boolean success = false;
// Additional AMF packet tracing is enabled only at the debug logging level
// and only if there's a target listening for it.
AmfTrace debugTrace = Log.isDebug() && logger.hasTarget()? new AmfTrace() : null;
// Create an empty ActionMessage object to hold our response
context.setResponseMessage(new ActionMessage());
SerializationContext sc = SerializationContext.getSerializationContext();
try
{
// Deserialize the input stream into an "ActionMessage" object.
MessageDeserializer deserializer = sc.newMessageDeserializer();
// Set up the deserialization context
HttpServletRequest req = FlexContext.getHttpRequest();
InputStream in = req.getInputStream();
// Determine whether the request is coming from a Javascript client.
// If so, convert stream from UTF-8 to raw hex before AMF deserialization.
String contentType = req.getContentType();
boolean jsClient = (contentType != null && contentType.startsWith(MessageIOConstants.CONTENT_TYPE_PLAIN));
if (jsClient)
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in, UTF8_CHARSET));
int currentByte = -1;
while ((currentByte = reader.read()) != -1)
{
if (currentByte == 256)
currentByte = 0;
outputStream.write(currentByte);
}
if (outputStream.size() > 0)
in = new ByteArrayInputStream(outputStream.toByteArray());
}
deserializer.initialize(sc, in, debugTrace);
// record the length of the input stream for performance metrics
int reqLen = FlexContext.getHttpRequest().getContentLength();
context.setDeserializedBytes(reqLen);
// set up the incoming MPI info if it is enabled
if(context.isMPIenabled())
{
MessagePerformanceInfo mpi = new MessagePerformanceInfo();
mpi.recordMessageSizes = context.isRecordMessageSizes();
mpi.recordMessageTimes = context.isRecordMessageTimes();
if(context.isRecordMessageTimes())
mpi.receiveTime = System.currentTimeMillis();
if(context.isRecordMessageSizes())
mpi.messageSize =reqLen;
context.setMPII(mpi);
}
ActionMessage m = new ActionMessage();
context.setRequestMessage(m);
deserializer.readMessage(m, context);
success = true;
}
catch (Throwable t)
{
handleDeserializationException(context, t, logger);
}
finally
{
// Use the same ActionMessage version for the response
ActionMessage respMsg = context.getResponseMessage();
respMsg.setVersion(context.getVersion());
if (debugTrace != null)
logger.debug(debugTrace.toString());
}
try
{
if (success)
{
next.invoke(context);
}
}
catch (Throwable t)
{
unhandledError(context, t);
}
finally
{
// serialize output
if (context.getStatus() != MessageIOConstants.STATUS_NOTAMF)
{
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
ActionMessage respMesg = context.getResponseMessage();
// Additional AMF packet tracing is enabled only at the debug logging level
// and only if there's a target listening for it.
debugTrace = Log.isDebug() && logger.hasTarget()? new AmfTrace() : null;
try
{
// overhead calculation is only necessary when MPI is enabled
long serializationOverhead=0;
if(context.isRecordMessageTimes())
{
// set server send time
context.getMPIO().sendTime = System.currentTimeMillis();
if(context.isRecordMessageSizes())
serializationOverhead = System.currentTimeMillis();
}
MessageSerializer serializer = sc.newMessageSerializer();
serializer.initialize(sc, outBuffer, debugTrace);
serializer.writeMessage(respMesg);
// keep track of serializes bytes for performance metrics
context.setSerializedBytes(outBuffer.size());
// serialized message again after adding info if mpio with sizing is enabled
if(context.isRecordMessageSizes())
{
try
{
context.getMPIO().messageSize = outBuffer.size();
// reset server send time
if(context.isRecordMessageTimes())
{
serializationOverhead = System.currentTimeMillis() - serializationOverhead;
context.getMPIO().addToOverhead(serializationOverhead);
context.getMPIO().sendTime = System.currentTimeMillis();
}
// reserialize the message now that info has been added
outBuffer = new ByteArrayOutputStream();
respMesg = context.getResponseMessage();
serializer.initialize(sc, outBuffer, debugTrace);
serializer.writeMessage(respMesg);
}
catch(Exception e)
{
if (Log.isDebug())
logger.debug("MPI set up error: " + e.toString());
}
}
context.setResponseOutput(outBuffer);
}
catch (Exception e)
{
handleSerializationException(sc, context, e, logger);
}
finally
{
if (debugTrace != null)
logger.debug(debugTrace.toString());
}
}
}
}
/**
* This static method provides a common way for deserialization errors to be
* handled. It attempts to provide the client with useful information about
* deserialization failure.
*
* @param actionContext The action context.
* @param t The throwable that needs to be handled.
* @param logger The logger to which to log messages.
* @throws IOException
*/
public static void handleDeserializationException(ActionContext actionContext, Throwable t, Logger logger) throws IOException
{
if (t instanceof EOFException)
{
actionContext.setStatus(MessageIOConstants.STATUS_NOTAMF);
}
else if (t instanceof IOException)
{
if (Log.isDebug())
logger.debug("IOException reading message - client closed socket before sending the message?");
throw (IOException)t;
}
else
{
actionContext.setStatus(MessageIOConstants.STATUS_ERR);
// Create a single message body to hold the error
MessageBody responseBody = new MessageBody();
if (actionContext.getMessageNumber() < actionContext.getRequestMessage().getBodyCount())
responseBody.setTargetURI(actionContext.getRequestMessageBody().getResponseURI());
// If the message couldn't be deserialized enough to know the version, set the current version here
if (actionContext.getVersion() == 0)
actionContext.setVersion(ActionMessage.CURRENT_VERSION);
// append the response body to the output message
actionContext.getResponseMessage().addBody(responseBody);
String message;
MessageException methodResult;
if (t instanceof MessageException)
{
methodResult = (MessageException)t;
message = methodResult.getMessage();
}
else
{
//Error deserializing client message.
methodResult = new SerializationException();
methodResult.setMessage(REQUEST_ERROR);
methodResult.setRootCause(t);
message = methodResult.getMessage();
}
responseBody.setData(methodResult.createErrorMessage());
responseBody.setReplyMethod(MessageIOConstants.STATUS_METHOD);
if (Log.isError())
logger.error(message + StringUtils.NEWLINE + ExceptionUtil.toString(t));
}
}
/**
* This static method provides a common way for serialization errors to be
* handled. It attempts to provide the client with useful information about
* the serialization failure. When there is a serialization failure, there is
* no way to tell which response failed serialization, so it adds a new response
* with the serialization failure for each of the corresponding requests.
*
* @param serializer The serializer that generated the error.
* @param serializationContext The serialization context.
* @param actionContext The action context.
* @param t The throwable that needs to be handled.
* @param logger The logger to which to log error messages.
*/
public static void handleSerializationException(SerializationContext serializationContext,
ActionContext actionContext, Throwable t, Logger logger)
{
ActionMessage responseMessage = new ActionMessage();
actionContext.setResponseMessage(responseMessage);
int bodyCount = actionContext.getRequestMessage().getBodyCount();
for (actionContext.setMessageNumber(0); actionContext.getMessageNumber() < bodyCount; actionContext.incrementMessageNumber())
{
MessageBody responseBody = new MessageBody();
responseBody.setTargetURI(actionContext.getRequestMessageBody().getResponseURI());
actionContext.getResponseMessage().addBody(responseBody);
Object methodResult;
if (t instanceof MessageException)
{
methodResult = ((MessageException)t).createErrorMessage();
}
else
{
String message = "An error occurred while serializing server response(s).";
if (t.getMessage() != null)
{
message = t.getMessage();
if (message == null)
message = t.toString();
}
methodResult = new MessageException(message, t).createErrorMessage();
}
if (actionContext.isLegacy())
{
if (methodResult instanceof ErrorMessage)
{
ErrorMessage error = (ErrorMessage)methodResult;
ASObject aso = new ASObject();
aso.put("message", error.faultString);
aso.put("code", error.faultCode);
aso.put("details", error.faultDetail);
aso.put("rootCause", error.rootCause);
methodResult = aso;
}
else if (methodResult instanceof Message)
{
methodResult = ((Message)methodResult).getBody();
}
}
else
{
try
{
Message inMessage = actionContext.getRequestMessageBody().getDataAsMessage();
if (inMessage.getClientId() != null)
((ErrorMessage)methodResult).setClientId(inMessage.getClientId().toString());
if (inMessage.getMessageId() != null)
{
((ErrorMessage)methodResult).setCorrelationId(inMessage.getMessageId());
((ErrorMessage)methodResult).setDestination(inMessage.getDestination());
}
}
catch (MessageException ignore){}
}
responseBody.setData(methodResult);
responseBody.setReplyMethod(MessageIOConstants.STATUS_METHOD);
}
if (Log.isError() && logger != null)
logger.error("Exception occurred during serialization: " + ExceptionUtil.toString(t));
// Serialize the error messages
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
AmfTrace debugTrace = Log.isDebug() && logger.hasTarget()? new AmfTrace() : null;
MessageSerializer serializer = serializationContext.newMessageSerializer();
serializer.initialize(serializationContext, outBuffer, debugTrace);
try
{
serializer.writeMessage(actionContext.getResponseMessage());
actionContext.setResponseOutput(outBuffer);
}
catch (IOException e)
{
//Error serializing response
MessageException ex = new MessageException();
ex.setMessage(RESPONSE_ERROR);
ex.setRootCause(e);
throw ex;
}
}
//--------------------------------------------------------------------------
//
// Private Methods
//
//--------------------------------------------------------------------------
/**
* An unhandled error happened somewhere between the SerializationFilter and the
* ErrorFilter... we ignore all request bodies and attempt to send back a single response
* body to the client. It will not make it back to a custom responder, but the NetConnection
* Debugger will show the event.
*/
private void unhandledError(ActionContext context, Throwable t)
{
ActionMessage responseMessage = new ActionMessage();
context.setResponseMessage(responseMessage);
MessageBody responseBody = new MessageBody();
responseBody.setTargetURI(context.getRequestMessageBody().getResponseURI());
context.getResponseMessage().addBody(responseBody);
MessageException methodResult;
if (t instanceof MessageException)
{
methodResult = (MessageException)t;
}
else
{
// An unhandled error occurred while processing client request(s).
methodResult = new SerializationException();
methodResult.setMessage(UNHANDLED_ERROR);
methodResult.setRootCause(t);
}
responseBody.setData(methodResult);
responseBody.setReplyMethod(MessageIOConstants.STATUS_METHOD);
logger.info(t.getMessage());
}
}