GOSSIP-79 Isolate UDP and JSON code
With these changes, it should now be possible to create alternate serialization (e.g. Gson or native) and transports (like HTTP).
To make this PR reviewable I decided against creating new modules right now. That can be done subsequently in another PR that doesn't modify any code.
* Creates two new interfaces: `TransportManager` and `ProtocolManager`
* Implementation classes must honor a common constructor interface
* Includes UDP and Jackson implementations of those.
* `AbstractTransportManager` has a lot of boilerplate that includes:
* starting the active gossiper, and
* starting the passive gossiper.
I spent some time trying to polish the implementations to become less dependent on references to `GossipManager`. I still feel there is a lot of room for improvement.
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 6b2bf8b..e4a95d3 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -45,6 +45,9 @@
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+ private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager";
+ private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager";
+
private Map<String,String> activeGossipProperties = new HashMap<>();
private String pathToRingState = "./";
@@ -222,5 +225,12 @@
public void setSignMessages(boolean signMessages) {
this.signMessages = signMessages;
}
-
+
+ public String getTransportManagerClass() {
+ return transportManagerClass;
+ }
+
+ public String getProtocolManagerClass() {
+ return protocolManagerClass;
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index d01a84c..e034432 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -29,16 +29,8 @@
import org.apache.gossip.udp.Trackable;
import org.apache.log4j.Logger;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
import java.net.URI;
-import java.security.*;
-import java.security.spec.InvalidKeySpecException;
-import java.security.spec.PKCS8EncodedKeySpec;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.*;
@@ -60,8 +52,6 @@
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
- private final PKCS8EncodedKeySpec privKeySpec;
- private final PrivateKey privKey;
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
@@ -79,42 +69,6 @@
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
-
- if (manager.getSettings().isSignMessages()){
- File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
- File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
- if (!privateKey.exists()){
- throw new IllegalArgumentException("private key not found " + privateKey);
- }
- if (!publicKey.exists()){
- throw new IllegalArgumentException("public key not found " + publicKey);
- }
- try (FileInputStream keyfis = new FileInputStream(privateKey)) {
- byte[] encKey = new byte[keyfis.available()];
- keyfis.read(encKey);
- keyfis.close();
- privKeySpec = new PKCS8EncodedKeySpec(encKey);
- KeyFactory keyFactory = KeyFactory.getInstance("DSA");
- privKey = keyFactory.generatePrivate(privKeySpec);
- } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
- throw new RuntimeException("failed hard", e);
- }
- } else {
- privKeySpec = null;
- privKey = null;
- }
- }
-
- private byte [] sign(byte [] bytes){
- Signature dsa;
- try {
- dsa = Signature.getInstance("SHA1withDSA", "SUN");
- dsa.initSign(privKey);
- dsa.update(bytes);
- return dsa.sign();
- } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
- throw new RuntimeException(e);
- }
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -184,30 +138,21 @@
/**
* Sends a blocking message.
+ * todo: move functionality to TransportManager layer.
* @param message
* @param uri
* @throws RuntimeException if data can not be serialized or in transmission error
*/
- private void sendInternal(Base message, URI uri){
+ private void sendInternal(Base message, URI uri) {
byte[] json_bytes;
try {
- if (privKey == null){
- json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
- } else {
- SignedPayload p = new SignedPayload();
- p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
- p.setSignature(sign(p.getData()));
- json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
- }
+ json_bytes = gossipManager.getProtocolManager().write(message);
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
- InetAddress dest = InetAddress.getByName(uri.getHost());
- DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort());
- socket.send(datagramPacket);
+ try {
+ gossipManager.getTransportManager().send(uri, json_bytes);
tranmissionSuccess.mark();
} catch (IOException e) {
tranmissionException.mark();
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index ff70ccc..b1752cd 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -18,6 +18,7 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
@@ -26,13 +27,14 @@
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageHandler;
-import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.protocol.ProtocolManager;
+import org.apache.gossip.transport.TransportManager;
+import org.apache.gossip.utils.ReflectionUtils;
import org.apache.log4j.Logger;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
@@ -46,14 +48,21 @@
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
+
+ // this mapper is used for ring and user-data persistence only. NOT messages.
+ public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{
+ enableDefaultTyping();
+ configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
+ }};
private final ConcurrentSkipListMap<LocalMember, GossipState> members;
private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
- private AbstractActiveGossiper activeGossipThread;
- private PassiveGossipThread passiveGossipThread;
- private ExecutorService gossipThreadExecutor;
+
+ private TransportManager transportManager;
+ private ProtocolManager protocolManager;
+
private final GossipCore gossipCore;
private final DataReaper dataReaper;
private final Clock clock;
@@ -62,14 +71,13 @@
private final RingStatePersister ringState;
private final UserDataPersister userDataState;
private final GossipMemberStateRefresher memberStateRefresher;
- private final ObjectMapper objectMapper;
-
+
private final MessageHandler messageHandler;
-
+
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
- ObjectMapper objectMapper, MessageHandler messageHandler) {
+ MessageHandler messageHandler) {
this.settings = settings;
this.messageHandler = messageHandler;
@@ -89,14 +97,15 @@
members.put(member, GossipState.DOWN);
}
}
- gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
- this.ringState = new RingStatePersister(this);
- this.userDataState = new UserDataPersister(this, this.gossipCore);
+ this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
+ this.userDataState = new UserDataPersister(
+ gossipCore,
+ GossipManager.buildPerNodeDataPath(this),
+ GossipManager.buildSharedDataPath(this));
this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
- this.objectMapper = objectMapper;
readSavedRingState();
readSavedDataState();
}
@@ -140,49 +149,66 @@
return me;
}
- private AbstractActiveGossiper constructActiveGossiper(){
- try {
- Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class);
- return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry);
- } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
public void init() {
- passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
- gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = constructActiveGossiper();
- activeGossipThread.init();
+
+ // protocol manager and transport managers are specified in settings.
+ // construct them here via reflection.
+
+ protocolManager = ReflectionUtils.constructWithReflection(
+ settings.getProtocolManagerClass(),
+ new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class },
+ new Object[] { settings, me.getId(), this.getRegistry() }
+ );
+
+ transportManager = ReflectionUtils.constructWithReflection(
+ settings.getTransportManagerClass(),
+ new Class<?>[] { GossipManager.class, GossipCore.class},
+ new Object[] { this, gossipCore }
+ );
+
+ // start processing gossip messages.
+ transportManager.startEndpoint();
+ transportManager.startActiveGossiper();
+
dataReaper.init();
- scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
- scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
+ if (settings.isPersistRingState()) {
+ scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
+ }
+ if (settings.isPersistDataState()) {
+ scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
+ }
scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
LOGGER.debug("The GossipManager is started.");
}
-
+
private void readSavedRingState() {
- for (LocalMember l : ringState.readFromDisk()){
- LocalMember member = new LocalMember(l.getClusterName(),
- l.getUri(), l.getId(),
- clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
- settings.getMinimumSamples(), settings.getDistribution());
- members.putIfAbsent(member, GossipState.DOWN);
+ if (settings.isPersistRingState()) {
+ for (LocalMember l : ringState.readFromDisk()) {
+ LocalMember member = new LocalMember(l.getClusterName(),
+ l.getUri(), l.getId(),
+ clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
+ settings.getMinimumSamples(), settings.getDistribution());
+ members.putIfAbsent(member, GossipState.DOWN);
+ }
}
}
private void readSavedDataState() {
- for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
- for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
- gossipCore.addPerNodeData(j.getValue());
+ if (settings.isPersistDataState()) {
+ for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()) {
+ for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()) {
+ gossipCore.addPerNodeData(j.getValue());
+ }
}
}
- for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
- gossipCore.addSharedData(l.getValue());
+ if (settings.isPersistRingState()) {
+ for (Entry<String, SharedDataMessage> l : userDataState.readSharedDataFromDisk().entrySet()) {
+ gossipCore.addSharedData(l.getValue());
+ }
}
}
@@ -191,24 +217,9 @@
*/
public void shutdown() {
gossipServiceRunning.set(false);
- gossipThreadExecutor.shutdown();
gossipCore.shutdown();
+ transportManager.shutdown();
dataReaper.close();
- if (passiveGossipThread != null) {
- passiveGossipThread.shutdown();
- }
- if (activeGossipThread != null) {
- activeGossipThread.shutdown();
- }
- try {
- boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
- if (!result) {
- LOGGER.error("executor shutdown timed out");
- }
- } catch (InterruptedException e) {
- LOGGER.error(e);
- }
- gossipThreadExecutor.shutdownNow();
scheduledServiced.shutdown();
try {
scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@@ -234,7 +245,6 @@
gossipCore.addSharedData(message);
}
-
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
SharedDataMessage l = gossipCore.getSharedData().get(key);
@@ -308,12 +318,32 @@
return clock;
}
- public ObjectMapper getObjectMapper() {
- return objectMapper;
- }
-
public MetricRegistry getRegistry() {
return registry;
}
+ public ProtocolManager getProtocolManager() {
+ return protocolManager;
+ }
+
+ public TransportManager getTransportManager() {
+ return transportManager;
+ }
+
+ // todo: consider making these path methods part of GossipSettings
+
+ public static File buildRingStatePath(GossipManager manager) {
+ return new File(manager.getSettings().getPathToRingState(), "ringstate." + manager.getMyself().getClusterName() + "."
+ + manager.getMyself().getId() + ".json");
+ }
+
+ public static File buildSharedDataPath(GossipManager manager){
+ return new File(manager.getSettings().getPathToDataState(), "shareddata."
+ + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
+ }
+
+ public static File buildPerNodeDataPath(GossipManager manager) {
+ return new File(manager.getSettings().getPathToDataState(), "pernodedata."
+ + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index bb73177..86dca57 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -18,12 +18,9 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
-import com.fasterxml.jackson.core.JsonGenerator.Feature;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.StartupSettings;
-import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.MessageHandlerFactory;
@@ -49,7 +46,6 @@
private GossipListener listener;
private MetricRegistry registry;
private Map<String,String> properties;
- private ObjectMapper objectMapper;
private MessageHandler messageHandler;
private ManagerBuilder() {}
@@ -108,11 +104,6 @@
this.uri = uri;
return this;
}
-
- public ManagerBuilder mapper(ObjectMapper objectMapper){
- this.objectMapper = objectMapper;
- return this;
- }
public ManagerBuilder messageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
@@ -136,16 +127,11 @@
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
- if (objectMapper == null) {
- objectMapper = new ObjectMapper();
- objectMapper.enableDefaultTyping();
- objectMapper.registerModule(new CrdtModule());
- objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
- }
+
if (messageHandler == null) {
messageHandler = MessageHandlerFactory.defaultHandler();
- }
- return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
+ }
+ return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, messageHandler) {} ;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index ae28bf7..30e39d5 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -18,34 +18,25 @@
package org.apache.gossip.manager;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
+
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base;
-import org.apache.gossip.model.SignedPayload;
import org.apache.log4j.Logger;
-import com.codahale.metrics.Meter;
/**
* This class handles the passive cycle,
* where this client has received an incoming message.
*/
-abstract public class PassiveGossipThread implements Runnable {
+public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
- /** The socket used for the passive thread of the gossip service. */
- private final DatagramSocket server;
+
private final AtomicBoolean keepRunning;
private final GossipCore gossipCore;
private final GossipManager gossipManager;
- private final Meter signed;
- private final Meter unsigned;
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
@@ -53,38 +44,18 @@
if (gossipManager.getMyself().getClusterName() == null){
throw new IllegalArgumentException("Cluster was null");
}
- try {
- SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
- gossipManager.getMyself().getUri().getPort());
- server = new DatagramSocket(socketAddress);
- } catch (SocketException ex) {
- LOGGER.warn(ex);
- throw new RuntimeException(ex);
- }
+
keepRunning = new AtomicBoolean(true);
- signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
- unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
- byte[] buf = new byte[server.getReceiveBufferSize()];
- DatagramPacket p = new DatagramPacket(buf, buf.length);
- server.receive(p);
- debug(p.getData());
+ byte[] buf = gossipManager.getTransportManager().read();
try {
- Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
- if (activeGossipMessage instanceof SignedPayload){
- SignedPayload s = (SignedPayload) activeGossipMessage;
- Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
- gossipCore.receive(nested);
- signed.mark();
- } else {
- gossipCore.receive(activeGossipMessage);
- unsigned.mark();
- }
+ Base message = gossipManager.getProtocolManager().read(buf);
+ gossipCore.receive(message);
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
@@ -94,21 +65,9 @@
keepRunning.set(false);
}
}
- shutdown();
}
-
- private void debug(byte[] jsonBytes) {
- if (LOGGER.isDebugEnabled()){
- String receivedMessage = new String(jsonBytes);
- LOGGER.debug("Received message ( bytes): " + receivedMessage);
- }
+
+ public void requestStop() {
+ keepRunning.set(false);
}
-
- public void shutdown() {
- try {
- server.close();
- } catch (RuntimeException ex) {
- }
- }
-
}
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
index 7e42562..0af9f12 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -26,16 +26,24 @@
import java.util.List;
import java.util.NavigableSet;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.LocalMember;
+import org.apache.gossip.crdt.CrdtModule;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
- private GossipManager parent;
+ private final File path;
+ // NOTE: this is a different instance than what gets used for message marshalling.
+ private final ObjectMapper objectMapper;
+ private final GossipManager manager;
- public RingStatePersister(GossipManager parent){
- this.parent = parent;
+ public RingStatePersister(File path, GossipManager manager){
+ this.path = path;
+ this.objectMapper = GossipManager.metdataObjectMapper;
+ this.manager = manager;
}
@Override
@@ -43,34 +51,25 @@
writeToDisk();
}
- File computeTarget(){
- return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "."
- + parent.getMyself().getId() + ".json");
- }
-
- void writeToDisk(){
- if (!parent.getSettings().isPersistRingState()){
- return;
- }
- NavigableSet<LocalMember> i = parent.getMembers().keySet();
- try (FileOutputStream fos = new FileOutputStream(computeTarget())){
- parent.getObjectMapper().writeValue(fos, i);
+ void writeToDisk() {
+ NavigableSet<LocalMember> i = manager.getMembers().keySet();
+ try (FileOutputStream fos = new FileOutputStream(path)){
+ objectMapper.writeValue(fos, i);
} catch (IOException e) {
LOGGER.debug(e);
}
}
@SuppressWarnings("unchecked")
- List<LocalMember> readFromDisk(){
- if (!parent.getSettings().isPersistRingState()){
- return Collections.emptyList();
+ List<LocalMember> readFromDisk() {
+ if (!path.exists()) {
+ return new ArrayList<>();
}
- try (FileInputStream fos = new FileInputStream(computeTarget())){
- return parent.getObjectMapper().readValue(fos, ArrayList.class);
+ try (FileInputStream fos = new FileInputStream(path)){
+ return objectMapper.readValue(fos, ArrayList.class);
} catch (IOException e) {
LOGGER.debug(e);
}
- return Collections.emptyList();
+ return new ArrayList<>();
}
-
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
index 3b9eafa..28c3151 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
@@ -30,31 +31,26 @@
public class UserDataPersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
- private final GossipManager parent;
private final GossipCore gossipCore;
- UserDataPersister(GossipManager parent, GossipCore gossipCore){
- this.parent = parent;
+ private final File perNodePath;
+ private final File sharedPath;
+ private final ObjectMapper objectMapper;
+
+ UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) {
this.gossipCore = gossipCore;
- }
-
- File computeSharedTarget(){
- return new File(parent.getSettings().getPathToDataState(), "shareddata."
- + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
- }
-
- File computePerNodeTarget() {
- return new File(parent.getSettings().getPathToDataState(), "pernodedata."
- + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
+ this.objectMapper = GossipManager.metdataObjectMapper;
+ this.perNodePath = perNodePath;
+ this.sharedPath = sharedPath;
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
- if (!parent.getSettings().isPersistDataState()){
+ if (!perNodePath.exists()) {
return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
- try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
- return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+ try (FileInputStream fos = new FileInputStream(perNodePath)){
+ return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
@@ -62,22 +58,16 @@
}
void writePerNodeToDisk(){
- if (!parent.getSettings().isPersistDataState()){
- return;
- }
- try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
- parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
+ try (FileOutputStream fos = new FileOutputStream(perNodePath)){
+ objectMapper.writeValue(fos, gossipCore.getPerNodeData());
} catch (IOException e) {
LOGGER.warn(e);
}
}
void writeSharedToDisk(){
- if (!parent.getSettings().isPersistDataState()){
- return;
- }
- try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
- parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
+ try (FileOutputStream fos = new FileOutputStream(sharedPath)){
+ objectMapper.writeValue(fos, gossipCore.getSharedData());
} catch (IOException e) {
LOGGER.warn(e);
}
@@ -85,11 +75,11 @@
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
- if (!parent.getSettings().isPersistRingState()){
- return new ConcurrentHashMap<String, SharedDataMessage>();
+ if (!sharedPath.exists()) {
+ return new ConcurrentHashMap<>();
}
- try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
- return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
+ try (FileInputStream fos = new FileInputStream(sharedPath)){
+ return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
deleted file mode 100644
index dff5056..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.impl;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.PassiveGossipThread;
-import org.apache.log4j.Logger;
-
-public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
-
- public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
-
- public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
- super(gossipManager, gossipCore);
- }
-
-}
diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
new file mode 100644
index 0000000..91ed7f9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.protocol;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.crdt.CrdtModule;
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SignedPayload;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.InvalidKeyException;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
+
+// this class is constructed by reflection in GossipManager.
+public class JacksonProtocolManager implements ProtocolManager {
+
+ private final ObjectMapper objectMapper;
+ private final PrivateKey privKey;
+ private final Meter signed;
+ private final Meter unsigned;
+
+ /** required for reflection to work! */
+ public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
+ // set up object mapper.
+ objectMapper = buildObjectMapper(settings);
+
+ // set up message signing.
+ if (settings.isSignMessages()){
+ File privateKey = new File(settings.getPathToKeyStore(), id);
+ File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
+ if (!privateKey.exists()){
+ throw new IllegalArgumentException("private key not found " + privateKey);
+ }
+ if (!publicKey.exists()){
+ throw new IllegalArgumentException("public key not found " + publicKey);
+ }
+ try (FileInputStream keyfis = new FileInputStream(privateKey)) {
+ byte[] encKey = new byte[keyfis.available()];
+ keyfis.read(encKey);
+ keyfis.close();
+ PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
+ KeyFactory keyFactory = KeyFactory.getInstance("DSA");
+ privKey = keyFactory.generatePrivate(privKeySpec);
+ } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
+ throw new RuntimeException("failed hard", e);
+ }
+ } else {
+ privKey = null;
+ }
+
+ signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
+ unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
+ }
+
+ @Override
+ public byte[] write(Base message) throws IOException {
+ byte[] json_bytes;
+ if (privKey == null){
+ json_bytes = objectMapper.writeValueAsBytes(message);
+ } else {
+ SignedPayload p = new SignedPayload();
+ p.setData(objectMapper.writeValueAsString(message).getBytes());
+ p.setSignature(sign(p.getData(), privKey));
+ json_bytes = objectMapper.writeValueAsBytes(p);
+ }
+ return json_bytes;
+ }
+
+ @Override
+ public Base read(byte[] buf) throws IOException {
+ Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
+ if (activeGossipMessage instanceof SignedPayload){
+ SignedPayload s = (SignedPayload) activeGossipMessage;
+ signed.mark();
+ return objectMapper.readValue(s.getData(), Base.class);
+ } else {
+ unsigned.mark();
+ return activeGossipMessage;
+ }
+ }
+
+ public static ObjectMapper buildObjectMapper(GossipSettings settings) {
+ ObjectMapper om = new ObjectMapper();
+ om.enableDefaultTyping();
+ // todo: should be specified in the configuration.
+ om.registerModule(new CrdtModule());
+ om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
+ return om;
+ }
+
+ private static byte[] sign(byte [] bytes, PrivateKey pk){
+ Signature dsa;
+ try {
+ dsa = Signature.getInstance("SHA1withDSA", "SUN");
+ dsa.initSign(pk);
+ dsa.update(bytes);
+ return dsa.sign();
+ } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java
new file mode 100644
index 0000000..0f553c7
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/protocol/ProtocolManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.protocol;
+
+import org.apache.gossip.model.Base;
+
+import java.io.IOException;
+
+/** interface for managing message marshaling. */
+public interface ProtocolManager {
+
+ /** serialize a message
+ * @param message
+ * @return serialized message.
+ * @throws IOException
+ */
+ byte[] write(Base message) throws IOException;
+
+ /**
+ * Reads the next message from a byte source.
+ * @param buf
+ * @return a gossip message.
+ * @throws IOException
+ */
+ Base read(byte[] buf) throws IOException;
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
new file mode 100644
index 0000000..497e605
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.gossip.manager.AbstractActiveGossiper;
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.PassiveGossipThread;
+import org.apache.gossip.utils.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manage the protcol threads (active and passive gossipers).
+ */
+public abstract class AbstractTransportManager implements TransportManager {
+
+ public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
+
+ private final PassiveGossipThread passiveGossipThread;
+ private final ExecutorService gossipThreadExecutor;
+
+ private final AbstractActiveGossiper activeGossipThread;
+
+ public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
+
+ passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore);
+ gossipThreadExecutor = Executors.newCachedThreadPool();
+ activeGossipThread = ReflectionUtils.constructWithReflection(
+ gossipManager.getSettings().getActiveGossipClass(),
+ new Class<?>[]{
+ GossipManager.class, GossipCore.class, MetricRegistry.class
+ },
+ new Object[]{
+ gossipManager, gossipCore, gossipManager.getRegistry()
+ });
+ }
+
+ // shut down threads etc.
+ @Override
+ public void shutdown() {
+ passiveGossipThread.requestStop();
+ gossipThreadExecutor.shutdown();
+ if (activeGossipThread != null) {
+ activeGossipThread.shutdown();
+ }
+ try {
+ boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
+ if (!result) {
+ LOGGER.error("executor shutdown timed out");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ }
+ gossipThreadExecutor.shutdownNow();
+ }
+
+ @Override
+ public void startActiveGossiper() {
+ activeGossipThread.init();
+ }
+
+ @Override
+ public void startEndpoint() {
+ gossipThreadExecutor.execute(passiveGossipThread);
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
new file mode 100644
index 0000000..031d90e
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** interface for manage that sends and receives messages that have already been serialized. */
+public interface TransportManager {
+
+ /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */
+ void startActiveGossiper();
+
+ /** starts the passive gossip thread that receives messages from remote nodes. Not related to `startActiveGossiper()` */
+ void startEndpoint();
+
+ /** attempts to shutdown all threads. */
+ void shutdown();
+
+ /** sends a payload to an endpoint. */
+ void send(URI endpoint, byte[] buf) throws IOException;
+
+ /** gets the next payload being sent to this node */
+ byte[] read() throws IOException;
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
new file mode 100644
index 0000000..d80deec
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.transport;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.URI;
+
+/**
+ * This class is constructed by reflection in GossipManager.
+ * It manages transport (byte read/write) operations over UDP.
+ */
+public class UdpTransportManager extends AbstractTransportManager {
+
+ public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
+
+ /** The socket used for the passive thread of the gossip service. */
+ private final DatagramSocket server;
+
+ private final int soTimeout;
+
+ /** required for reflection to work! */
+ public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
+ super(gossipManager, gossipCore);
+
+ soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
+
+ try {
+ SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
+ gossipManager.getMyself().getUri().getPort());
+ server = new DatagramSocket(socketAddress);
+ } catch (SocketException ex) {
+ LOGGER.warn(ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ server.close();
+ super.shutdown();
+ }
+
+ /**
+ * blocking read a message.
+ * @return buffer of message contents.
+ * @throws IOException
+ */
+ public byte[] read() throws IOException {
+ byte[] buf = new byte[server.getReceiveBufferSize()];
+ DatagramPacket p = new DatagramPacket(buf, buf.length);
+ server.receive(p);
+ debug(p.getData());
+ return p.getData();
+ }
+
+ @Override
+ public void send(URI endpoint, byte[] buf) throws IOException {
+ DatagramSocket socket = new DatagramSocket();
+ socket.setSoTimeout(soTimeout);
+ InetAddress dest = InetAddress.getByName(endpoint.getHost());
+ DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
+ socket.send(payload);
+ // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
+ socket.close();
+ }
+
+ private void debug(byte[] jsonBytes) {
+ if (LOGGER.isDebugEnabled()){
+ String receivedMessage = new String(jsonBytes);
+ LOGGER.debug("Received message ( bytes): " + receivedMessage);
+ }
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java
new file mode 100644
index 0000000..2ae4eb1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/utils/ReflectionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+public class ReflectionUtils {
+
+ /**
+ * Create an instance of a thing. This method essentially makes code more readable by handing the various exception
+ * trapping.
+ * @param className
+ * @param constructorTypes
+ * @param constructorArgs
+ * @param <T>
+ * @return constructed instance of a thing.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T constructWithReflection(String className, Class<?>[] constructorTypes, Object[] constructorArgs) {
+ try {
+ Constructor<?> c = Class.forName(className).getConstructor(constructorTypes);
+ c.setAccessible(true);
+ return (T) c.newInstance(constructorArgs);
+ } catch (InvocationTargetException e) {
+ // catch ITE and throw the target if it is a RTE.
+ if (e.getTargetException() != null && RuntimeException.class.isAssignableFrom(e.getTargetException().getClass())) {
+ throw (RuntimeException) e.getTargetException();
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (ReflectiveOperationException others) {
+ // Note: No class in the above list should be a descendent of RuntimeException. Otherwise, we're just wrapping
+ // and making stack traces confusing.
+ throw new RuntimeException(others);
+ }
+ }
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
index b19f221..4c6014a 100644
--- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -18,15 +18,14 @@
package org.apache.gossip.crdt;
import java.io.IOException;
-import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.protocol.JacksonProtocolManager;
import org.junit.Assert;
import org.junit.Test;
@@ -88,16 +87,11 @@
@Test
public void serialTest() throws InterruptedException, URISyntaxException, IOException {
- GossipManager gossipService2 = GossipManagerBuilder.newBuilder()
- .cluster("a")
- .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
- .id("1")
- .gossipSettings(new GossipSettings())
- .build();
+ ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings());
OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
- String s = gossipService2.getObjectMapper().writeValueAsString(i);
+ String s = objectMapper.writeValueAsString(i);
@SuppressWarnings("unchecked")
- OrSet<Integer> back = gossipService2.getObjectMapper().readValue(s, OrSet.class);
+ OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
Assert.assertEquals(back, i);
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
index 959f818..bc0b46a 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
@@ -25,6 +25,7 @@
import org.apache.gossip.manager.handlers.ResponseHandler;
import org.apache.gossip.manager.handlers.TypedMessageHandler;
import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@@ -43,6 +44,17 @@
@RunWith(JUnitPlatform.class)
public class GossipManagerBuilderTest {
+ private GossipManagerBuilder.ManagerBuilder builder;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ builder = GossipManagerBuilder.newBuilder()
+ .id("id")
+ .cluster("aCluster")
+ .uri(new URI("udp://localhost:2000"))
+ .gossipSettings(new GossipSettings());
+ }
+
@Test
public void idShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
@@ -66,35 +78,20 @@
@Test
public void createMembersListIfNull() throws URISyntaxException {
- GossipManager gossipManager = GossipManagerBuilder.newBuilder()
- .id("id")
- .cluster("aCluster")
- .uri(new URI("udp://localhost:2000"))
- .gossipSettings(new GossipSettings())
- .gossipMembers(null).registry(new MetricRegistry()).build();
+ GossipManager gossipManager = builder.gossipMembers(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getLiveMembers());
}
@Test
public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
- GossipManager gossipManager = GossipManagerBuilder.newBuilder()
- .id("id")
- .cluster("aCluster")
- .uri(new URI("udp://localhost:2000"))
- .gossipSettings(new GossipSettings())
- .messageHandler(null).registry(new MetricRegistry()).build();
+ GossipManager gossipManager = builder.messageHandler(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageHandler());
}
@Test
public void testMessageHandlerKeeping() throws URISyntaxException {
MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler());
- GossipManager gossipManager = GossipManagerBuilder.newBuilder()
- .id("id")
- .cluster("aCluster")
- .uri(new URI("udp://localhost:2000"))
- .gossipSettings(new GossipSettings())
- .messageHandler(mi).registry(new MetricRegistry()).build();
+ GossipManager gossipManager = builder.messageHandler(mi).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageHandler());
Assert.assertEquals(gossipManager.getMessageHandler(), mi);
}
@@ -106,10 +103,7 @@
System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential");
List<Member> memberList = new ArrayList<>();
memberList.add(member);
- GossipManager gossipManager = GossipManagerBuilder.newBuilder()
- .id("id")
- .cluster("aCluster")
- .gossipSettings(new GossipSettings())
+ GossipManager gossipManager = builder
.uri(new URI("udp://localhost:8000"))
.gossipMembers(memberList).registry(new MetricRegistry()).build();
assertEquals(1, gossipManager.getDeadMembers().size());
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
index d448b98..ebe0e2c 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
@@ -49,7 +49,7 @@
new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"),
new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2"))).build();
gossipService.getRingState().writeToDisk();
- return gossipService.getRingState().computeTarget();
+ return GossipManager.buildRingStatePath(gossipService);
}
private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException {
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
index 7b17e41..dde4b74 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -68,8 +68,8 @@
gossipService.init();
Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor());
Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedGossipData("a").getPayload()).getColor());
- File f = gossipService.getUserDataState().computeSharedTarget();
- File g = gossipService.getUserDataState().computePerNodeTarget();
+ File f = GossipManager.buildSharedDataPath(gossipService);
+ File g = GossipManager.buildPerNodeDataPath(gossipService);
gossipService.shutdown();
f.delete();
g.delete();
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
index c035d21..ec91d67 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
@@ -64,7 +64,7 @@
return true;
}
}
-
+
@Test
public void testSimpleHandler() {
MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());