GOSSIP-80 Sundry cleanups
* remove redundant parameter from method call.
* remove uncessary threadpool.
* Simplify `GossipCore.sendOneWay()`
* Cleanup useage of `MessageInvoker`
* `DefaultMessageInvoker` replaced by a factory
* `MessageInvokerCombiner` replaced by same factory
* Alter `MessageInvokerTest` to not rely on specific types
* Remove type assertion from `GossipManagerBuilderTest`
* Merge `MessageInvoker` with `MessageHandler`
* This required changing method signature return type from `void` to `boolean`.
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index f53419d..d01a84c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -57,7 +57,6 @@
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager;
private ConcurrentHashMap<String, LatchAndBase> requests;
- private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
@@ -71,15 +70,12 @@
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
- service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
- metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount());
- metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize());
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
@@ -178,17 +174,10 @@
}
public void shutdown(){
- service.shutdown();
- try {
- service.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.warn(e);
- }
- service.shutdownNow();
}
public void receive(Base base) {
- if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
+ if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
LOGGER.warn("received message can not be handled");
}
}
@@ -268,29 +257,10 @@
* @param message the message to send
* @param u the uri to send it to
*/
- public void sendOneWay(Base message, URI u){
- byte[] json_bytes;
+ public void sendOneWay(Base message, URI u) {
try {
- if (privKey == null){
- json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
- } else {
- SignedPayload p = new SignedPayload();
- p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
- p.setSignature(sign(p.getData()));
- json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
- }
- } catch (IOException e) {
- messageSerdeException.mark();
- throw new RuntimeException(e);
- }
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
- InetAddress dest = InetAddress.getByName(u.getHost());
- DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
- socket.send(datagramPacket);
- tranmissionSuccess.mark();
- } catch (IOException ex) {
- tranmissionException.mark();
+ sendInternal(message, u);
+ } catch (RuntimeException ex) {
LOGGER.debug("Send one way failed", ex);
}
}
@@ -304,13 +274,11 @@
/**
* Merge lists from remote members and update heartbeats
*
- * @param gossipManager
* @param senderMember
* @param remoteList
*
*/
- public void mergeLists(GossipManager gossipManager, RemoteMember senderMember,
- List<Member> remoteList) {
+ public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList);
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index c2b50ae..ff70ccc 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -25,7 +25,7 @@
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
@@ -64,14 +64,14 @@
private final GossipMemberStateRefresher memberStateRefresher;
private final ObjectMapper objectMapper;
- private final MessageInvoker messageInvoker;
+ private final MessageHandler messageHandler;
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
- ObjectMapper objectMapper, MessageInvoker messageInvoker) {
+ ObjectMapper objectMapper, MessageHandler messageHandler) {
this.settings = settings;
- this.messageInvoker = messageInvoker;
+ this.messageHandler = messageHandler;
clock = new SystemClock();
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
@@ -101,8 +101,8 @@
readSavedDataState();
}
- public MessageInvoker getMessageInvoker() {
- return messageInvoker;
+ public MessageHandler getMessageHandler() {
+ return messageHandler;
}
public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index b87045b..bb73177 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -25,8 +25,8 @@
import org.apache.gossip.StartupSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
+import org.apache.gossip.manager.handlers.MessageHandlerFactory;
import java.net.URI;
import java.util.ArrayList;
@@ -50,7 +50,7 @@
private MetricRegistry registry;
private Map<String,String> properties;
private ObjectMapper objectMapper;
- private MessageInvoker messageInvoker;
+ private MessageHandler messageHandler;
private ManagerBuilder() {}
@@ -114,8 +114,8 @@
return this;
}
- public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
- this.messageInvoker = messageInvoker;
+ public ManagerBuilder messageHandler(MessageHandler messageHandler) {
+ this.messageHandler = messageHandler;
return this;
}
@@ -142,10 +142,10 @@
objectMapper.registerModule(new CrdtModule());
objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
}
- if (messageInvoker == null) {
- messageInvoker = new DefaultMessageInvoker();
+ if (messageHandler == null) {
+ messageHandler = MessageHandlerFactory.defaultHandler();
}
- return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
+ return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
index f5e568e..e89179b 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
@@ -32,8 +32,15 @@
import java.util.List;
public class ActiveGossipMessageHandler implements MessageHandler {
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
@Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
List<Member> remoteGossipMembers = new ArrayList<>();
RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
@@ -69,6 +76,7 @@
o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid());
gossipCore.sendOneWay(o, senderMember.getUri());
- gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers);
+ gossipCore.mergeLists(senderMember, remoteGossipMembers);
+ return true;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
deleted file mode 100644
index 5b78ce3..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.*;
-
-public class DefaultMessageInvoker implements MessageInvoker {
- private final MessageInvokerCombiner mic;
-
- public DefaultMessageInvoker() {
- mic = new MessageInvokerCombiner();
- mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
- mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
- mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler()));
- mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler()));
- mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
- }
-
- public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- return mic.invoke(gossipCore, gossipManager, base);
- }
-}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
index 4b5d49d..5af9b14 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
@@ -22,5 +22,11 @@
import org.apache.gossip.model.Base;
public interface MessageHandler {
- void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
+ boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
}
\ No newline at end of file
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
new file mode 100644
index 0000000..fff9430
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+
+import java.util.Arrays;
+
+public class MessageHandlerFactory {
+
+ public static MessageHandler defaultHandler() {
+ return concurrentHandler(
+ new TypedMessageHandler(Response.class, new ResponseHandler()),
+ new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
+ new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
+ new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
+ new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
+ );
+ }
+
+ public static MessageHandler concurrentHandler(MessageHandler... handlers) {
+ if (handlers == null) throw new NullPointerException("handlers cannot be null");
+ if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
+ throw new NullPointerException("found at least one null handler");
+ }
+ return new MessageHandler() {
+ @Override
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ // return true if at least one of the component handlers return true.
+ return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
+ }
+ };
+ }
+}
+
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
deleted file mode 100644
index 70be408..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-
-public interface MessageInvoker {
- /**
- *
- * @param gossipCore
- * @param gossipManager
- * @param base
- * @return true if the invoker processed the message type
- */
- boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
-}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
deleted file mode 100644
index 5faf6a5..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class MessageInvokerCombiner implements MessageInvoker {
- private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
-
- public MessageInvokerCombiner() {
- }
-
- public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
- }
-
- public void clear() {
- invokers.clear();
- }
-
- public void add(MessageInvoker mi) {
- if (mi == null) {
- throw new NullPointerException();
- }
- invokers.add(mi);
- }
-}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
index b3a785e..0ad0d91 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
@@ -23,9 +23,17 @@
import org.apache.gossip.udp.UdpPerNodeDataMessage;
public class PerNodeDataMessageHandler implements MessageHandler {
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
@Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
gossipCore.addPerNodeData(message);
+ return true;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
index 2f33b01..1070ff7 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
@@ -23,11 +23,20 @@
import org.apache.gossip.udp.Trackable;
public class ResponseHandler implements MessageHandler {
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
@Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (base instanceof Trackable) {
Trackable t = (Trackable) base;
gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+ return true;
}
+ return false;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
index 89ca4a0..3fe3033 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
@@ -23,9 +23,17 @@
import org.apache.gossip.udp.UdpSharedDataMessage;
public class SharedDataMessageHandler implements MessageHandler{
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
@Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataMessage message = (UdpSharedDataMessage) base;
gossipCore.addSharedData(message);
+ return true;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
index a40c7a1..40e4c07 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
@@ -24,8 +24,15 @@
import org.apache.gossip.model.ShutdownMessage;
public class ShutdownMessageHandler implements MessageHandler {
+
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return boolean indicating success.
+ */
@Override
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base;
PerNodeDataMessage m = new PerNodeDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
@@ -34,5 +41,6 @@
m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
gossipCore.addPerNodeData(m);
+ return true;
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
similarity index 82%
rename from gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
rename to gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
index 0f410d2..b40461d 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
@@ -21,11 +21,11 @@
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
-public class SimpleMessageInvoker implements MessageInvoker {
+public class TypedMessageHandler implements MessageHandler {
final private Class<?> messageClass;
final private MessageHandler messageHandler;
- public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) {
+ public TypedMessageHandler(Class<?> messageClass, MessageHandler messageHandler) {
if (messageClass == null || messageHandler == null) {
throw new NullPointerException();
}
@@ -33,6 +33,12 @@
this.messageHandler = messageHandler;
}
+ /**
+ * @param gossipCore context.
+ * @param gossipManager context.
+ * @param base message reference.
+ * @return true if types match, false otherwise.
+ */
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (messageClass.isAssignableFrom(base.getClass())) {
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
index 8842643..959f818 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
@@ -21,10 +21,9 @@
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
-import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.ResponseHandler;
-import org.apache.gossip.manager.handlers.SimpleMessageInvoker;
+import org.apache.gossip.manager.handlers.TypedMessageHandler;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
@@ -77,28 +76,27 @@
}
@Test
- public void createDefaultMessageInvokerIfNull() throws URISyntaxException {
+ public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings())
- .messageInvoker(null).registry(new MetricRegistry()).build();
- assertNotNull(gossipManager.getMessageInvoker());
- Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
+ .messageHandler(null).registry(new MetricRegistry()).build();
+ assertNotNull(gossipManager.getMessageHandler());
}
@Test
- public void testMessageInvokerKeeping() throws URISyntaxException {
- MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler());
+ public void testMessageHandlerKeeping() throws URISyntaxException {
+ MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler());
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings())
- .messageInvoker(mi).registry(new MetricRegistry()).build();
- assertNotNull(gossipManager.getMessageInvoker());
- Assert.assertEquals(gossipManager.getMessageInvoker(), mi);
+ .messageHandler(mi).registry(new MetricRegistry()).build();
+ assertNotNull(gossipManager.getMessageHandler());
+ Assert.assertEquals(gossipManager.getMessageHandler(), mi);
}
@Test
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
new file mode 100644
index 0000000..c035d21
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageHandlerTest {
+ private class FakeMessage extends Base {
+ public FakeMessage() {
+ }
+ }
+
+ private class FakeMessageData extends Base {
+ public int data;
+
+ public FakeMessageData(int data) {
+ this.data = data;
+ }
+ }
+
+ private class FakeMessageDataHandler implements MessageHandler {
+ public int data;
+
+ public FakeMessageDataHandler() {
+ data = 0;
+ }
+
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ data = ((FakeMessageData) base).data;
+ return true;
+ }
+ }
+
+ private class FakeMessageHandler implements MessageHandler {
+ public int counter;
+
+ public FakeMessageHandler() {
+ counter = 0;
+ }
+
+ public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+ counter++;
+ return true;
+ }
+ }
+
+ @Test
+ public void testSimpleHandler() {
+ MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+ Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSimpleHandlerNullClassConstructor() {
+ new TypedMessageHandler(null, new FakeMessageHandler());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSimpleHandlerNullHandlerConstructor() {
+ new TypedMessageHandler(FakeMessage.class, null);
+ }
+
+ @Test
+ public void testCallCountSimpleHandler() {
+ FakeMessageHandler h = new FakeMessageHandler();
+ MessageHandler mi = new TypedMessageHandler(FakeMessage.class, h);
+ mi.invoke(null, null, new FakeMessage());
+ Assert.assertEquals(1, h.counter);
+ mi.invoke(null, null, new ActiveGossipMessage());
+ Assert.assertEquals(1, h.counter);
+ mi.invoke(null, null, new FakeMessage());
+ Assert.assertEquals(2, h.counter);
+ }
+
+ @Test(expected = NullPointerException.class)
+ @SuppressWarnings("all")
+ public void cantAddNullHandler() {
+ MessageHandler handler = MessageHandlerFactory.concurrentHandler(null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void cantAddNullHandler2() {
+ MessageHandler handler = MessageHandlerFactory.concurrentHandler(
+ new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()),
+ null,
+ new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
+ );
+ }
+
+ @Test
+ public void testMessageHandlerCombiner() {
+ //Empty combiner - false result
+ MessageHandler mi = MessageHandlerFactory.concurrentHandler();
+ Assert.assertFalse(mi.invoke(null, null, new Base()));
+
+ FakeMessageHandler h = new FakeMessageHandler();
+ mi = MessageHandlerFactory.concurrentHandler(
+ new TypedMessageHandler(FakeMessage.class, h),
+ new TypedMessageHandler(FakeMessage.class, h)
+ );
+
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+ Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+ Assert.assertEquals(2, h.counter);
+
+ //Increase size in runtime. Should be 3 calls: 2+3 = 5
+ mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessage.class, h));
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+ Assert.assertEquals(5, h.counter);
+ }
+
+ @Test
+ public void testMessageHandlerCombiner2levels() {
+ FakeMessageHandler h = new FakeMessageHandler();
+
+ MessageHandler mi1 = MessageHandlerFactory.concurrentHandler(
+ new TypedMessageHandler(FakeMessage.class, h),
+ new TypedMessageHandler(FakeMessage.class, h)
+ );
+
+ MessageHandler mi2 = MessageHandlerFactory.concurrentHandler(
+ new TypedMessageHandler(FakeMessage.class, h),
+ new TypedMessageHandler(FakeMessage.class, h)
+ );
+
+ MessageHandler mi = MessageHandlerFactory.concurrentHandler(mi1, mi2);
+
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+ Assert.assertEquals(4, h.counter);
+ }
+
+ @Test
+ public void testMessageHandlerCombinerDataShipping() {
+ MessageHandler mi = MessageHandlerFactory.concurrentHandler();
+ FakeMessageDataHandler h = new FakeMessageDataHandler();
+ mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessageData.class, h));
+
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
+ Assert.assertEquals(101, h.data);
+ }
+
+ @Test
+ public void testCombiningDefaultHandler() {
+ MessageHandler mi = MessageHandlerFactory.concurrentHandler(
+ MessageHandlerFactory.defaultHandler(),
+ new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
+ );
+ //UdpSharedGossipDataMessage with null gossipCore -> exception
+ boolean thrown = false;
+ try {
+ mi.invoke(null, null, new UdpSharedDataMessage());
+ } catch (NullPointerException e) {
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+ //skips FakeMessage and FakeHandler works ok
+ Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+ }
+
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
deleted file mode 100644
index 571d7ba..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpSharedDataMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MessageInvokerTest {
- private class FakeMessage extends Base {
- public FakeMessage() {
- }
- }
-
- private class FakeMessageData extends Base {
- public int data;
-
- public FakeMessageData(int data) {
- this.data = data;
- }
- }
-
- private class FakeMessageDataHandler implements MessageHandler {
- public int data;
-
- public FakeMessageDataHandler() {
- data = 0;
- }
-
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- data = ((FakeMessageData) base).data;
- }
- }
-
- private class FakeMessageHandler implements MessageHandler {
- public int counter;
-
- public FakeMessageHandler() {
- counter = 0;
- }
-
- public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
- counter++;
- }
- }
-
- @Test
- public void testSimpleInvoker() {
- MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler());
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
- Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
- }
-
- @Test(expected = NullPointerException.class)
- public void testSimpleInvokerNullClassConstructor() {
- new SimpleMessageInvoker(null, new FakeMessageHandler());
- }
-
- @Test(expected = NullPointerException.class)
- public void testSimpleInvokerNullHandlerConstructor() {
- new SimpleMessageInvoker(FakeMessage.class, null);
- }
-
- @Test
- public void testCallCountSimpleInvoker() {
- FakeMessageHandler h = new FakeMessageHandler();
- MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h);
- mi.invoke(null, null, new FakeMessage());
- Assert.assertEquals(1, h.counter);
- mi.invoke(null, null, new ActiveGossipMessage());
- Assert.assertEquals(1, h.counter);
- mi.invoke(null, null, new FakeMessage());
- Assert.assertEquals(2, h.counter);
- }
-
- @Test(expected = NullPointerException.class)
- public void cantAddNullInvoker() {
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- mi.add(null);
- }
-
- @Test
- public void testCombinerClear() {
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-
- mi.clear();
- Assert.assertFalse(mi.invoke(null, null, new FakeMessage()));
- }
-
- @Test
- public void testMessageInvokerCombiner() {
- //Empty combiner - false result
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- Assert.assertFalse(mi.invoke(null, null, new Base()));
-
- FakeMessageHandler h = new FakeMessageHandler();
- mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
- mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
- Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
- Assert.assertEquals(2, h.counter);
-
- //Increase size in runtime. Should be 3 calls: 2+3 = 5
- mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
- Assert.assertEquals(5, h.counter);
- }
-
- @Test
- public void testMessageInvokerCombiner2levels() {
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- FakeMessageHandler h = new FakeMessageHandler();
-
- MessageInvokerCombiner mi1 = new MessageInvokerCombiner();
- mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
- mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
- MessageInvokerCombiner mi2 = new MessageInvokerCombiner();
- mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
- mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
- mi.add(mi1);
- mi.add(mi2);
-
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
- Assert.assertEquals(4, h.counter);
- }
-
- @Test
- public void testMessageInvokerCombinerDataShipping() {
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- FakeMessageDataHandler h = new FakeMessageDataHandler();
- mi.add(new SimpleMessageInvoker(FakeMessageData.class, h));
-
- Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
- Assert.assertEquals(101, h.data);
- }
-
- @Test
- public void testCombiningDefaultInvoker() {
- MessageInvokerCombiner mi = new MessageInvokerCombiner();
- mi.add(new DefaultMessageInvoker());
- mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
- //UdpSharedGossipDataMessage with null gossipCore -> exception
- boolean thrown = false;
- try {
- mi.invoke(null, null, new UdpSharedDataMessage());
- } catch (NullPointerException e) {
- thrown = true;
- }
- Assert.assertTrue(thrown);
- //DefaultInvoker skips FakeMessage and FakeHandler works ok
- Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
- }
-
-}