blob: d8ebe5c50d4022d8e3b65ea9df23ad0bad1256e6 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.clustering.tribes;
import org.apache.axis2.clustering.LoadBalanceEventHandler;
import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Represents a member running in load balance mode
*/
public class LoadBalancerMode implements OperationMode {
private static final Log log = LogFactory.getLog(LoadBalancerMode.class);
private byte[] loadBalancerDomain;
private Map<String, LoadBalanceEventHandler> lbEventHandlers;
private List<MembershipManager> membershipManagers = new ArrayList<MembershipManager>();
private MembershipManager primaryMembershipManager;
public LoadBalancerMode(byte[] loadBalancerDomain,
Map<String, LoadBalanceEventHandler> lbEventHandlers,
MembershipManager primaryMembershipManager) {
this.loadBalancerDomain = loadBalancerDomain;
this.lbEventHandlers = lbEventHandlers;
this.primaryMembershipManager = primaryMembershipManager;
}
public void addInterceptors(Channel channel) {
LoadBalancerInterceptor lbInterceptor =
new LoadBalancerInterceptor(loadBalancerDomain);
lbInterceptor.setOptionFlag(TribesConstants.MEMBERSHIP_MSG_OPTION);
channel.addInterceptor(lbInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added Load Balancer Interceptor");
}
}
public void init(Channel channel) {
// Have multiple RPC channels with multiple RPC request handlers for each domain
// This is needed only when this member is running as a load balancer
for (Object o : lbEventHandlers.keySet()) {
String domain = (String) o;
final MembershipManager membershipManager = new MembershipManager();
membershipManager.setDomain(domain.getBytes());
membershipManager.setLoadBalanceEventHandler(lbEventHandlers.get(domain));
MembershipListener membershipListener = new MembershipListener() {
public void memberAdded(org.apache.catalina.tribes.Member member) {
membershipManager.memberAdded(member);
}
public void memberDisappeared(org.apache.catalina.tribes.Member member) {
membershipManager.memberDisappeared(member);
}
};
channel.addMembershipListener(membershipListener);
membershipManagers.add(membershipManager);
}
}
public List<MembershipManager> getMembershipManagers() {
return membershipManagers;
}
public void notifyMemberJoin(final Member member) {
if (TribesUtil.isInDomain(member, loadBalancerDomain)) { // A peer load balancer has joined
// Notify all members in the LB group
primaryMembershipManager.sendMemberJoinedToAll(member);
// Send the MEMBER_LISTS of all the groups to the the new LB member
for (MembershipManager manager : membershipManagers) {
manager.sendMemberList(member);
}
} else { // An application member has joined.
// Need to notify all members in the group of the new app member
Thread th = new Thread() {
public void run() {
for (MembershipManager manager : membershipManagers) {
if (TribesUtil.isInDomain(member, manager.getDomain())) {
// Send MEMBER_JOINED to the group of the new member
manager.sendMemberJoinedToAll(member);
// Send MEMBER_JOINED to the load balancer group
sendMemberJoinedToLoadBalancerGroup(manager.getRpcMembershipChannel(),
member);
break;
}
}
}
/**
* Send MEMBER_JOINED to the load balancer group
* @param rpcChannel The RpcChannel corresponding to the member's group
* @param member The member who joined
*/
private void sendMemberJoinedToLoadBalancerGroup(RpcChannel rpcChannel,
Member member) {
MemberJoinedCommand cmd = new MemberJoinedCommand();
cmd.setMember(member);
try {
rpcChannel.send(primaryMembershipManager.getMembers(),
cmd,
RpcChannel.ALL_REPLY,
Channel.SEND_OPTIONS_ASYNCHRONOUS,
10000);
} catch (ChannelException e) {
String errMsg = "Could not send MEMBER_JOINED[" +
TribesUtil.getName(member) +
"] to all load balancer members ";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
}
};
th.start();
}
}
}