package com.google.code.gossip.manager; | |
import java.lang.reflect.InvocationTargetException; | |
import java.util.ArrayList; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import javax.management.Notification; | |
import javax.management.NotificationListener; | |
import com.google.code.gossip.GossipMember; | |
import com.google.code.gossip.GossipService; | |
import com.google.code.gossip.GossipSettings; | |
import com.google.code.gossip.LocalGossipMember; | |
public abstract class GossipManager extends Thread implements NotificationListener { | |
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ | |
public static final int MAX_PACKET_SIZE = 102400; | |
/** The list of members which are in the gossip group (not including myself). */ | |
private ArrayList<LocalGossipMember> _memberList; | |
/** The list of members which are known to be dead. */ | |
private ArrayList<LocalGossipMember> _deadList; | |
/** The member I am representing. */ | |
private LocalGossipMember _me; | |
/** The settings for gossiping. */ | |
private GossipSettings _settings; | |
/** A boolean whether the gossip service should keep running. */ | |
private AtomicBoolean _gossipServiceRunning; | |
/** A ExecutorService used for executing the active and passive gossip threads. */ | |
private ExecutorService _gossipThreadExecutor; | |
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass; | |
private Class<? extends ActiveGossipThread> _activeGossipThreadClass; | |
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, | |
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, | |
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) { | |
_passiveGossipThreadClass = passiveGossipThreadClass; | |
_activeGossipThreadClass = activeGossipThreadClass; | |
_settings = settings; | |
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); | |
_memberList = new ArrayList<LocalGossipMember>(); | |
_deadList = new ArrayList<LocalGossipMember>(); | |
for (GossipMember startupMember : gossipMembers) { | |
if (!startupMember.equals(_me)) { | |
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), | |
startupMember.getPort(), startupMember.getId(), 0, this, | |
settings.getCleanupInterval()); | |
_memberList.add(member); | |
GossipService.LOGGER.debug(member); | |
} | |
} | |
_gossipServiceRunning = new AtomicBoolean(true); | |
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { | |
public void run() { | |
GossipService.LOGGER.info("Service has been shutdown..."); | |
} | |
})); | |
} | |
/** | |
* All timers associated with a member will trigger this method when it goes | |
* off. The timer will go off if we have not heard from this member in | |
* <code> _settings.T_CLEANUP </code> time. | |
*/ | |
@Override | |
public void handleNotification(Notification notification, Object handback) { | |
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); | |
GossipService.LOGGER.info("Dead member detected: " + deadMember); | |
synchronized (this._memberList) { | |
this._memberList.remove(deadMember); | |
} | |
synchronized (this._deadList) { | |
this._deadList.add(deadMember); | |
} | |
} | |
public GossipSettings getSettings() { | |
return _settings; | |
} | |
/** | |
* Get a clone of the memberlist. | |
* @return | |
*/ | |
public ArrayList<LocalGossipMember> getMemberList() { | |
return _memberList; | |
} | |
public LocalGossipMember getMyself() { | |
return _me; | |
} | |
public ArrayList<LocalGossipMember> getDeadList() { | |
return _deadList; | |
} | |
/** | |
* Starts the client. Specifically, start the various cycles for this protocol. | |
* Start the gossip thread and start the receiver thread. | |
* @throws InterruptedException | |
*/ | |
public void run() { | |
for (LocalGossipMember member : _memberList) { | |
if (member != _me) { | |
member.startTimeoutTimer(); | |
} | |
} | |
_gossipThreadExecutor = Executors.newCachedThreadPool(); | |
try { | |
_gossipThreadExecutor.execute(_passiveGossipThreadClass.getConstructor(GossipManager.class) | |
.newInstance(this)); | |
_gossipThreadExecutor.execute(_activeGossipThreadClass.getConstructor(GossipManager.class) | |
.newInstance(this)); | |
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | |
| InvocationTargetException | NoSuchMethodException | SecurityException e1) { | |
throw new RuntimeException(e1); | |
} | |
GossipService.LOGGER.info("The GossipService is started."); | |
while(_gossipServiceRunning.get()) { | |
try { | |
TimeUnit.SECONDS.sleep(10); | |
} catch (InterruptedException e) { | |
GossipService.LOGGER.info("The GossipClient was interrupted."); | |
} | |
} | |
} | |
/** | |
* Shutdown the gossip service. | |
*/ | |
public void shutdown() { | |
_gossipThreadExecutor.shutdown(); | |
_gossipServiceRunning.set(false); | |
} | |
} |