blob: b8e09a125ca8416f8c3df9ee9f2cedbc5ea95f67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.axis2.clustering.tribes;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.MembershipScheme;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.util.Utils;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.SocketException;
import java.util.Map;
import java.util.Properties;
/**
* Implementation of the multicast based membership scheme. In this scheme, membership is discovered
* using multicasts
*/
public class MulticastBasedMembershipScheme implements MembershipScheme {
private static final Log log = LogFactory.getLog(MulticastBasedMembershipScheme.class);
/**
* The Tribes channel
*/
private final ManagedChannel channel;
private final Map<String, Parameter> parameters;
/**
* The domain to which this node belongs to
*/
private final byte[] domain;
/**
* The mode in which this member operates such as "loadBalance" or "application"
*/
private final OperationMode mode;
// private MembershipListener membershipListener;
private final boolean atmostOnceMessageSemantics;
private final boolean preserverMsgOrder;
public MulticastBasedMembershipScheme(ManagedChannel channel,
OperationMode mode,
Map<String, Parameter> parameters,
byte[] domain,
boolean atmostOnceMessageSemantics,
boolean preserverMsgOrder) {
this.channel = channel;
this.mode = mode;
this.parameters = parameters;
this.domain = domain;
this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
this.preserverMsgOrder = preserverMsgOrder;
}
public void init() throws ClusteringFault {
addInterceptors();
configureMulticastParameters();
}
public void joinGroup() throws ClusteringFault {
// Nothing to do
}
private void configureMulticastParameters() throws ClusteringFault {
Properties mcastProps = channel.getMembershipService().getProperties();
Parameter mcastAddress = getParameter(TribesConstants.MCAST_ADDRESS);
if (mcastAddress != null) {
mcastProps.setProperty(TribesConstants.MCAST_ADDRESS,
((String) mcastAddress.getValue()).trim());
}
Parameter mcastBindAddress = getParameter(TribesConstants.MCAST_BIND_ADDRESS);
if (mcastBindAddress != null) {
mcastProps.setProperty(TribesConstants.MCAST_BIND_ADDRESS,
((String) mcastBindAddress.getValue()).trim());
}
Parameter mcastPort = getParameter(TribesConstants.MCAST_PORT);
if (mcastPort != null) {
mcastProps.setProperty(TribesConstants.MCAST_PORT,
((String) mcastPort.getValue()).trim());
}
Parameter mcastFrequency = getParameter(TribesConstants.MCAST_FREQUENCY);
if (mcastFrequency != null) {
mcastProps.setProperty(TribesConstants.MCAST_FREQUENCY,
((String) mcastFrequency.getValue()).trim());
}
Parameter mcastMemberDropTime = getParameter(TribesConstants.MEMBER_DROP_TIME);
if (mcastMemberDropTime != null) {
mcastProps.setProperty(TribesConstants.MEMBER_DROP_TIME,
((String) mcastMemberDropTime.getValue()).trim());
}
// Set the IP address that will be advertised by this node
ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
Parameter tcpListenHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
if (tcpListenHost != null) {
String host = ((String) tcpListenHost.getValue()).trim();
mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
receiver.setAddress(host);
} else {
String host;
try {
host = Utils.getIpAddress();
} catch (SocketException e) {
String msg = "Could not get local IP address";
log.error(msg, e);
throw new ClusteringFault(msg, e);
}
mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
receiver.setAddress(host);
}
String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
if (localIP != null) {
receiver.setAddress(localIP);
}
Parameter tcpListenPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
if (tcpListenPort != null) {
String port = ((String) tcpListenPort.getValue()).trim();
mcastProps.setProperty(TribesConstants.TCP_LISTEN_PORT, port);
receiver.setPort(Integer.parseInt(port));
}
mcastProps.setProperty(TribesConstants.MCAST_CLUSTER_DOMAIN, new String(domain));
}
/**
* Add ChannelInterceptors. The order of the interceptors that are added will depend on the
* membership management scheme
*/
private void addInterceptors() {
if (log.isDebugEnabled()) {
log.debug("Adding Interceptors...");
}
// Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
tcpFailureDetector.setConnectTimeout(30000);
channel.addInterceptor(tcpFailureDetector);
if (log.isDebugEnabled()) {
log.debug("Added TCP Failure Detector");
}
// Add the NonBlockingCoordinator.
// channel.addInterceptor(new Axis2Coordinator(membershipListener));
channel.getMembershipService().setDomain(domain);
mode.addInterceptors(channel);
if (atmostOnceMessageSemantics) {
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
channel.addInterceptor(atMostOnceInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added At-most-once Interceptor");
}
}
if (preserverMsgOrder) {
// Add the OrderInterceptor to preserve sender ordering
OrderInterceptor orderInterceptor = new OrderInterceptor();
orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
channel.addInterceptor(orderInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added Message Order Interceptor");
}
}
}
public Parameter getParameter(String name) {
return parameters.get(name);
}
}