blob: f68068a159036e2fc1a957785959d403960b6bd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.clustering.hazelcast;
import com.hazelcast.core.*;
import com.hazelcast.config.FileSystemXmlConfig;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.*;
/**
* This class implements necessary methods to build the cluster using hazelcast
*/
public class HazelcastClusterImpl implements ClusterManager {
private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
private HazelcastInstance _hazelcastInstance;
private boolean isMaster = false;
private String nodeID;
private String uuid;
private Member leader;
private IMap<String, String> deployment_lock_map;
private IMap<Long, Long> instance_lock_map;
private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
private ClusterProcessStore _clusterProcessStore;
private ClusterMemberListener _listener;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
public void init(File configRoot) {
/*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
String hzConfig = System.getProperty("hazelcast.config");
if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
else {
File hzXml = new File(configRoot, "hazelcast.xml");
if (!hzXml.isFile())
__log.error("hazelcast.xml does not exist or is not a file");
else
try {
_hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(hzXml));
} catch (FileNotFoundException fnf) {
__log.error(fnf);
}
}
if (_hazelcastInstance != null) {
// Registering this node in the cluster.
_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
Member localMember = _hazelcastInstance.getCluster().getLocalMember();
nodeID = localMember.getInetSocketAddress().getHostName() +":" +localMember.getInetSocketAddress().getPort();
uuid = localMember.getUuid();
__log.info("Registering HZ localMember ID " + nodeID);
markAsMaster();
deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
_hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map);
_hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map);
}
}
class ClusterMemberShipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Added " +nodeId);
if(isMaster && _listener != null) _listener.memberAdded(nodeId);
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Removed " + nodeId);
markAsMaster();
if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
// Noting to do here.
}
}
public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) {
clusterEvent.setUuid(uuid);
__log.info("Send " +clusterEvent.getInfo() +" Cluster Message " +"for " +clusterEvent.getDuName() +" [" +nodeID +"]");
clusterMessageTopic.publish(clusterEvent);
}
class ClusterMessageListener implements MessageListener<ProcessStoreClusterEvent> {
@Override
public void onMessage(Message<ProcessStoreClusterEvent> msg) {
handleEvent(msg.getMessageObject());
}
}
private void handleEvent(ProcessStoreClusterEvent message) {
if (message instanceof ProcessStoreDeployedEvent) {
ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
String eventUuid = event.getUuid();
if (!uuid.equals(eventUuid)) {
String duName = event.getDuName();
__log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]");
_clusterProcessStore.deployProcesses(duName);
}
}
else if (message instanceof ProcessStoreUndeployedEvent) {
ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
String eventUuid = event.getUuid();
if (!uuid.equals(eventUuid)) {
String duName = event.getDuName();
__log.info("Receive " +event.getInfo() +" Cluster Message " +"for " +event.getDuName() +" [" +nodeID +"]");
_clusterProcessStore.undeployProcesses(duName);
}
}
}
private void markAsMaster() {
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember() && isMaster == false) {
isMaster = true;
if(_listener != null) _listener.memberElectedAsMaster(uuid);
}
__log.info(isMaster);
}
public boolean getIsMaster() {
return isMaster;
}
public String getUuid() {
return uuid;
}
public void setClusterProcessStore(ClusterProcessStore store) {
_clusterProcessStore = store;
}
public void registerClusterProcessStoreMessageListener() {
clusterMessageTopic.addMessageListener(new ClusterMessageListener());
}
public void registerClusterMemberListener(ClusterMemberListener listener) {
_listener = listener;
}
public void shutdown() {
if(_hazelcastInstance != null) _hazelcastInstance.getLifecycleService().shutdown();
}
public ClusterLock<String> getDeploymentLock(){
return _hazelcastDeploymentLock;
}
public ClusterLock<Long> getInstanceLock(){
return _hazelcastInstanceLock;
}
public List<String> getActiveNodes() {
List<String> nodeList = new ArrayList<String>();
for(Member m : _hazelcastInstance.getCluster().getMembers())
nodeList.add(m.getUuid()) ;
return nodeList;
}
}