GOSSIP-44 Remove 4 byte header
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index c12f946..d24c0fa 100644
--- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -31,7 +31,7 @@
GossipSettings s = new GossipSettings();
s.setWindowSize(10);
s.setConvictThreshold(1.0);
- s.setGossipInterval(10);
+ s.setGossipInterval(1000);
GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
gossipService.start();
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 731b019..f81565b 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -17,7 +17,6 @@
*/
package org.apache.gossip.manager;
-import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
@@ -44,8 +43,6 @@
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import static com.codahale.metrics.MetricRegistry.name;
/**
@@ -61,7 +58,6 @@
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
- private ObjectMapper MAPPER = new ObjectMapper();
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
@@ -114,28 +110,17 @@
LOGGER.debug("Send sendMembershipList() is called without action");
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
return;
- }
- try {
- for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
- UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
- message.setUuid(UUID.randomUUID().toString());
- message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- gossipCore.sendOneWay(message, member.getUri());
- } else {
- LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- }
- } catch (IOException e1) {
- LOGGER.warn(e1);
+ }
+ for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
+ UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
@@ -148,36 +133,26 @@
LOGGER.debug("Send sendMembershipList() is called without action");
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
return;
- }
- try {
- for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
- for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
- UdpGossipDataMessage message = new UdpGossipDataMessage();
- message.setUuid(UUID.randomUUID().toString());
- message.setUriFrom(me.getId());
- message.setExpireAt(innerEntry.getValue().getExpireAt());
- message.setKey(innerEntry.getValue().getKey());
- message.setNodeId(innerEntry.getValue().getNodeId());
- message.setTimestamp(innerEntry.getValue().getTimestamp());
- message.setPayload(innerEntry.getValue().getPayload());
- byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- gossipCore.sendOneWay(message, member.getUri());
- } else {
- LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- }
+ }
+ for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
+ for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
+ UdpGossipDataMessage message = new UdpGossipDataMessage();
+ message.setUuid(UUID.randomUUID().toString());
+ message.setUriFrom(me.getId());
+ message.setExpireAt(innerEntry.getValue().getExpireAt());
+ message.setKey(innerEntry.getValue().getKey());
+ message.setNodeId(innerEntry.getValue().getNodeId());
+ message.setTimestamp(innerEntry.getValue().getTimestamp());
+ message.setPayload(innerEntry.getValue().getPayload());
+ gossipCore.sendOneWay(message, member.getUri());
}
- } catch (IOException e1) {
- LOGGER.warn(e1);
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
+ System.out.println("send" );
sendMembershipList(gossipManager.getMyself(), member);
}
@@ -199,29 +174,18 @@
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
- try {
- UdpActiveGossipMessage message = new UdpActiveGossipMessage();
- message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
- message.setUuid(UUID.randomUUID().toString());
- message.getMembers().add(convert(me));
- for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
- message.getMembers().add(convert(other));
- }
- byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- Response r = gossipCore.send(message, member.getUri());
- if (r instanceof ActiveGossipOk){
- //maybe count metrics here
- } else {
- LOGGER.debug("Message " + message + " generated response " + r);
- }
- } else {
- LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- } catch (IOException e1) {
- LOGGER.warn(e1);
+ UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+ message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+ message.setUuid(UUID.randomUUID().toString());
+ message.getMembers().add(convert(me));
+ for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
+ message.getMembers().add(convert(other));
+ }
+ Response r = gossipCore.send(message, member.getUri());
+ if (r instanceof ActiveGossipOk){
+ //maybe count metrics here
+ } else {
+ LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index d315361..5d561c3 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -31,7 +31,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -53,27 +52,42 @@
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
-public class GossipCore {
+public class GossipCore implements GossipCoreConstants {
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests;
- private ExecutorService service;
+ private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
+ private final Meter messageSerdeException;
+ private final Meter tranmissionException;
+ private final Meter tranmissionSuccess;
- public GossipCore(GossipManager manager){
+ public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
+ metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
+ metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
+ metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
+ metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
+ metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount());
+ metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize());
+ messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
+ tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
+ tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
}
public void addSharedData(SharedGossipDataMessage message){
@@ -175,29 +189,29 @@
}
/**
- * Sends a blocking message. Throws exception when tranmission fails
+ * Sends a blocking message.
* @param message
* @param uri
+ * @throws RuntimeException if data can not be serialized or in transmission error
*/
private void sendInternal(Base message, URI uri){
byte[] json_bytes;
try {
json_bytes = MAPPER.writeValueAsString(message).getBytes();
} catch (IOException e) {
+ messageSerdeException.mark();
throw new RuntimeException(e);
}
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
- InetAddress dest = InetAddress.getByName(uri.getHost());
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
- socket.send(datagramPacket);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+ InetAddress dest = InetAddress.getByName(uri.getHost());
+ DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort());
+ socket.send(datagramPacket);
+ tranmissionSuccess.mark();
+ } catch (IOException e) {
+ tranmissionException.mark();
+ throw new RuntimeException(e);
+ }
}
public Response send(Base message, URI uri){
@@ -225,7 +239,7 @@
return (Response) b;
}
try {
- Thread.sleep(0, 1000);
+ Thread.sleep(0, 555555);
} catch (InterruptedException e) {
}
@@ -261,19 +275,20 @@
public void sendOneWay(Base message, URI u){
byte[] json_bytes;
try {
- json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ json_bytes = MAPPER.writeValueAsBytes(message);
} catch (IOException e) {
+ messageSerdeException.mark();
throw new RuntimeException(e);
}
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
- InetAddress dest = InetAddress.getByName(u.getHost());
- DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
- socket.send(datagramPacket);
- } catch (IOException ex) { }
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
+ InetAddress dest = InetAddress.getByName(u.getHost());
+ DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
+ socket.send(datagramPacket);
+ tranmissionSuccess.mark();
+ } catch (IOException ex) {
+ tranmissionException.mark();
+ LOGGER.debug("Send one way failed", ex);
}
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
new file mode 100644
index 0000000..6d3765a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface GossipCoreConstants {
+ String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
+ String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size";
+ String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
+ String REQUEST_SIZE = "gossip.core.requests.size";
+ String THREADPOOL_ACTIVE = "gossip.core.threadpool.active";
+ String THREADPOOL_SIZE = "gossip.core.threadpool.size";
+ String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception";
+ String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception";
+ String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success";
+}
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index fb7ec93..cf67c9c 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -49,8 +49,6 @@
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
- public static final int MAX_PACKET_SIZE = 102400;
-
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
private final LocalGossipMember me;
@@ -82,7 +80,7 @@
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
this.settings = settings;
- gossipCore = new GossipCore(this);
+ gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
@@ -256,4 +254,5 @@
return dataReaper;
}
+
}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index b54a963..ebda513 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -78,23 +78,13 @@
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
- int packet_length = UdpUtil.readPacketLengthFromBuffer(buf);
- if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
- byte[] json_bytes = new byte[packet_length];
- for (int i = 0; i < packet_length; i++) {
- json_bytes[i] = buf[i + 4];
- }
- debug(packet_length, json_bytes);
- try {
- Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class);
- gossipCore.receive(activeGossipMessage);
- } catch (RuntimeException ex) {//TODO trap json exception
- LOGGER.error("Unable to process message", ex);
- }
- } else {
- LOGGER.error("The received message is not of the expected size, it has been dropped.");
+ debug(p.getData());
+ try {
+ Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class);
+ gossipCore.receive(activeGossipMessage);
+ } catch (RuntimeException ex) {//TODO trap json exception
+ LOGGER.error("Unable to process message", ex);
}
-
} catch (IOException e) {
LOGGER.error(e);
keepRunning.set(false);
@@ -103,11 +93,10 @@
shutdown();
}
- private void debug(int packetLength, byte[] jsonBytes) {
+ private void debug(byte[] jsonBytes) {
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
- LOGGER.debug("Received message (" + packetLength + " bytes): "
- + receivedMessage);
+ LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}
diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/UdpUtil.java
deleted file mode 100644
index c61769f..0000000
--- a/src/main/java/org/apache/gossip/manager/UdpUtil.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.nio.ByteBuffer;
-
-public class UdpUtil {
-
- public static int readPacketLengthFromBuffer(byte [] buffer){
- int packetLength = 0;
- for (int i = 0; i < 4; i++) {
- int shift = (4 - 1 - i) * 8;
- packetLength += (buffer[i] & 0x000000FF) << shift;
- }
- return packetLength;
- }
-
- public static byte[] createBuffer(int packetLength, byte[] jsonBytes) {
- byte[] lengthBytes = new byte[4];
- lengthBytes[0] = (byte) (packetLength >> 24);
- lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
- lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
- lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
- byteBuffer.put(lengthBytes);
- byteBuffer.put(jsonBytes);
- byte[] buf = byteBuffer.array();
- return buf;
- }
-}
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index 5388bb3..b4ac45d 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -32,27 +32,37 @@
public class DataReaperTest {
private final MetricRegistry registry = new MetricRegistry();
-
+ String myId = "4";
+ String key = "key";
+ String value = "a";
+
@Test
public void testReaperOneShot() {
- String myId = "4";
- String key = "key";
- String value = "a";
GossipSettings settings = new GossipSettings();
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
.withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
gm.gossipPerNodeData(perNodeDatum(key, value));
gm.gossipSharedData(sharedDatum(key, value));
- Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
- Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+ assertDataIsAtCorrectValue(gm);
gm.getDataReaper().runPerNodeOnce();
gm.getDataReaper().runSharedOnce();
- TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
- TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
+ assertDataIsRemoved(gm);
gm.shutdown();
}
+ private void assertDataIsAtCorrectValue(GossipManager gm){
+ Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
+ Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.PER_NODE_DATA_SIZE).getValue());
+ Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+ Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.SHARED_DATA_SIZE).getValue());
+ }
+
+ private void assertDataIsRemoved(GossipManager gm){
+ TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
+ TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
+ }
+
private GossipDataMessage perNodeDatum(String key, String value) {
GossipDataMessage m = new GossipDataMessage();
m.setExpireAt(System.currentTimeMillis() + 5L);