blob: 99e308c9b8bea65bb6e11b8c7a391847014f4ef3 [file] [log] [blame]
package com.google.code.gossip.manager;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Notification;
import javax.management.NotificationListener;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LocalGossipMember;
public abstract class GossipManager extends Thread implements NotificationListener {
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
public static final int MAX_PACKET_SIZE = 102400;
/** The list of members which are in the gossip group (not including myself). */
private ArrayList<LocalGossipMember> _memberList;
/** The list of members which are known to be dead. */
private ArrayList<LocalGossipMember> _deadList;
/** The member I am representing. */
private LocalGossipMember _me;
/** The settings for gossiping. */
private GossipSettings _settings;
/** A boolean whether the gossip service should keep running. */
private AtomicBoolean _gossipServiceRunning;
/** A ExecutorService used for executing the active and passive gossip threads. */
private ExecutorService _gossipThreadExecutor;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
_memberList = new ArrayList<LocalGossipMember>();
_deadList = new ArrayList<LocalGossipMember>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval());
_memberList.add(member);
GossipService.LOGGER.debug(member);
}
}
_gossipServiceRunning = new AtomicBoolean(true);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
GossipService.LOGGER.info("Service has been shutdown...");
}
}));
}
/**
* All timers associated with a member will trigger this method when it goes off. The timer will
* go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
*/
@Override
public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.info("Dead member detected: " + deadMember);
synchronized (this._memberList) {
this._memberList.remove(deadMember);
}
synchronized (this._deadList) {
this._deadList.add(deadMember);
}
}
public GossipSettings getSettings() {
return _settings;
}
/**
* Get a clone of the memberlist.
*
* @return
*/
public ArrayList<LocalGossipMember> getMemberList() {
return _memberList;
}
public LocalGossipMember getMyself() {
return _me;
}
public ArrayList<LocalGossipMember> getDeadList() {
return _deadList;
}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*
* @throws InterruptedException
*/
public void run() {
for (LocalGossipMember member : _memberList) {
if (member != _me) {
member.startTimeoutTimer();
}
}
_gossipThreadExecutor = Executors.newCachedThreadPool();
try {
passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
throw new RuntimeException(e1);
}
GossipService.LOGGER.info("The GossipService is started.");
while (_gossipServiceRunning.get()) {
try {
// TODO
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
GossipService.LOGGER.info("The GossipClient was interrupted.");
}
}
}
/**
* Shutdown the gossip service.
*/
public void shutdown() {
_gossipThreadExecutor.shutdown();
passiveGossipThread.shutdown();
activeGossipThread.shutdown();
try {
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
System.err.println("Terminate retuned " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
_gossipServiceRunning.set(false);
}
}