blob: d676fe6e012e8732fedf0edb8f1118a3ce79c4d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.axis2.clustering.tribes;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.RpcMessage;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.Logs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Serializable;
/**
* Represents a Tribes GroupChannel. The difference between
* org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders
* are set before message deserialization
*/
public class Axis2GroupChannel extends GroupChannel{
private static final Log log = LogFactory.getLog(Axis2GroupChannel.class);
@Override
public void messageReceived(ChannelMessage msg) {
if ( msg == null ) return;
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) +
" at " +new java.sql.Timestamp(System.currentTimeMillis())+
" from "+msg.getAddress().getName());
}
Serializable fwd;
if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
try {
fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
msg.getMessage().getLength(),
ClassLoaderUtil.getClassLoaders());
}catch (Exception sx) {
log.error("Unable to deserialize message:"+msg,sx);
return;
}
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
}
//get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
boolean delivered = false;
for (Object channelListener1 : channelListeners) {
ChannelListener channelListener = (ChannelListener) channelListener1;
if (channelListener != null && channelListener.accept(fwd, source)) {
channelListener.messageReceived(fwd, source);
delivered = true;
//if the message was accepted by an RPC channel, that channel
//is responsible for returning the reply, otherwise we send an absence reply
if (channelListener instanceof RpcChannel) rx = true;
}
}//for
if ((!rx) && (fwd instanceof RpcMessage)) {
//if we have a message that requires a response,
//but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
}
} catch ( Exception x ) {
//this could be the channel listener throwing an exception, we should log it
//as a warning.
if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
throw new RemoteProcessException("Exception:"+x.getMessage(),x);
}
}
}