GOSSIP-47 sign data
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 60a443f..bcea75c 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -56,6 +56,11 @@
   
   private boolean persistDataState = true;
   
+  private String pathToKeyStore = "./keys";
+  
+  private boolean signMessages = false;
+  
+  
   /**
    * Construct GossipSettings with default settings.
    */
@@ -202,5 +207,21 @@
   public void setPersistDataState(boolean persistDataState) {
     this.persistDataState = persistDataState;
   }
+
+  public String getPathToKeyStore() {
+    return pathToKeyStore;
+  }
+
+  public void setPathToKeyStore(String pathToKeyStore) {
+    this.pathToKeyStore = pathToKeyStore;
+  }
+
+  public boolean isSignMessages() {
+    return signMessages;
+  }
+
+  public void setSignMessages(boolean signMessages) {
+    this.signMessages = signMessages;
+  }
   
 }
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index e23ee54..6f97a74 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -17,12 +17,23 @@
  */
 package org.apache.gossip.manager;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -45,6 +56,7 @@
 import org.apache.gossip.model.Response;
 import org.apache.gossip.model.SharedGossipDataMessage;
 import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.model.SignedPayload;
 import org.apache.gossip.udp.Trackable;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
@@ -66,6 +78,8 @@
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
   private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
   private final BlockingQueue<Runnable> workQueue;
+  private final PKCS8EncodedKeySpec privKeySpec;
+  private final PrivateKey privKey;
   private final Meter messageSerdeException;
   private final Meter tranmissionException;
   private final Meter tranmissionSuccess;
@@ -86,6 +100,41 @@
     messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
     tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
     tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
+    if (manager.getSettings().isSignMessages()){
+      File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
+      File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
+      if (!privateKey.exists()){
+        throw new IllegalArgumentException("private key not found " + privateKey);
+      }
+      if (!publicKey.exists()){
+        throw new IllegalArgumentException("public key not found " + publicKey);
+      }
+      try (FileInputStream keyfis = new FileInputStream(privateKey)) {
+        byte[] encKey = new byte[keyfis.available()];
+        keyfis.read(encKey);
+        keyfis.close();
+        privKeySpec = new PKCS8EncodedKeySpec(encKey);
+        KeyFactory keyFactory = KeyFactory.getInstance("DSA");
+        privKey = keyFactory.generatePrivate(privKeySpec);
+      } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
+        throw new RuntimeException("failed hard", e);
+      }
+    } else {
+      privKeySpec = null;
+      privKey = null;
+    }
+  }
+  
+  private byte [] sign(byte [] bytes){
+    Signature dsa;
+    try {
+      dsa = Signature.getInstance("SHA1withDSA", "SUN");
+      dsa.initSign(privKey);
+      dsa.update(bytes);
+      return dsa.sign();
+    } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
+      throw new RuntimeException(e);
+    } 
   }
   
   public void addSharedData(SharedGossipDataMessage message){
@@ -207,7 +256,14 @@
   private void sendInternal(Base message, URI uri){
     byte[] json_bytes;
     try {
-      json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes();
+      if (privKey == null){
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+      } else {
+        SignedPayload p = new SignedPayload();
+        p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+        p.setSignature(sign(p.getData()));
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+      }
     } catch (IOException e) {
       messageSerdeException.mark();
       throw new RuntimeException(e);
@@ -285,7 +341,14 @@
   public void sendOneWay(Base message, URI u){
     byte[] json_bytes;
     try {
-      json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+      if (privKey == null){
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+      } else {
+        SignedPayload p = new SignedPayload();
+        p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+        p.setSignature(sign(p.getData()));
+        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+      }
     } catch (IOException e) {
       messageSerdeException.mark();
       throw new RuntimeException(e);
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 67cb06b..9221aa6 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -76,11 +76,11 @@
           URI uri, String id, Map<String,String> properties, GossipSettings settings,
           List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
     this.settings = settings;
-    gossipCore = new GossipCore(this, registry);
-    clock = new SystemClock();
-    dataReaper = new DataReaper(gossipCore, clock);
+    clock = new SystemClock();    
     me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
             settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
+    gossipCore = new GossipCore(this, registry);
+    dataReaper = new DataReaper(gossipCore, clock);
     members = new ConcurrentSkipListMap<>();
     for (GossipMember startupMember : gossipMembers) {
       if (!startupMember.equals(me)) {
@@ -337,5 +337,9 @@
   public ObjectMapper getObjectMapper() {
     return objectMapper;
   }
+
+  public MetricRegistry getRegistry() {
+    return registry;
+  }
   
 }
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
new file mode 100644
index 0000000..3bcc344
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface PassiveGossipConstants {
+  String SIGNED_MESSAGE = "gossip.passive.signed_message";
+  String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message";
+}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 51cf264..bfce2dd 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -26,8 +26,11 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SignedPayload;
 import org.apache.log4j.Logger;
 
+import com.codahale.metrics.Meter;
+
 /**
  * This class handles the passive cycle,
  * where this client has received an incoming message. 
@@ -41,6 +44,8 @@
   private final AtomicBoolean keepRunning;
   private final GossipCore gossipCore;
   private final GossipManager gossipManager;
+  private final Meter signed;
+  private final Meter unsigned;
 
   public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
     this.gossipManager = gossipManager;
@@ -52,14 +57,13 @@
       SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
               gossipManager.getMyself().getUri().getPort());
       server = new DatagramSocket(socketAddress);
-      LOGGER.debug("Gossip service successfully initialized on port "
-              + gossipManager.getMyself().getUri().getPort());
-      LOGGER.debug("I am " + gossipManager.getMyself());
     } catch (SocketException ex) {
       LOGGER.warn(ex);
       throw new RuntimeException(ex);
     }
     keepRunning = new AtomicBoolean(true);
+    signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
+    unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
   }
 
   @Override
@@ -72,7 +76,15 @@
         debug(p.getData());
         try {
           Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
-          gossipCore.receive(activeGossipMessage);
+          if (activeGossipMessage instanceof SignedPayload){
+            SignedPayload s = (SignedPayload) activeGossipMessage;
+            Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
+            gossipCore.receive(nested);
+            signed.mark();
+          } else {
+            gossipCore.receive(activeGossipMessage);
+            unsigned.mark();
+          }
         } catch (RuntimeException ex) {//TODO trap json exception
           LOGGER.error("Unable to process message", ex);
         }
diff --git a/src/main/java/org/apache/gossip/model/SignedPayload.java b/src/main/java/org/apache/gossip/model/SignedPayload.java
new file mode 100644
index 0000000..9ffbcf1
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/SignedPayload.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+public class SignedPayload extends Base{
+  private byte [] data;
+  private byte [] signature;
+  public byte[] getData() {
+    return data;
+  }
+  public void setData(byte[] data) {
+    this.data = data;
+  }
+  public byte[] getSignature() {
+    return signature;
+  }
+  public void setSignature(byte[] signature) {
+    this.signature = signature;
+  }
+  
+}
diff --git a/src/main/java/org/apache/gossip/secure/KeyTool.java b/src/main/java/org/apache/gossip/secure/KeyTool.java
new file mode 100644
index 0000000..69f4e72
--- /dev/null
+++ b/src/main/java/org/apache/gossip/secure/KeyTool.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.secure;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.SecureRandom;
+
+public class KeyTool {
+
+  public static void generatePubandPrivateKeyFiles(String path, String id) 
+          throws NoSuchAlgorithmException, NoSuchProviderException, IOException{
+    SecureRandom r = new SecureRandom();
+    KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA", "SUN");
+    keyGen.initialize(1024, r);
+    KeyPair pair = keyGen.generateKeyPair();
+    PrivateKey priv = pair.getPrivate();
+    PublicKey pub = pair.getPublic();
+    {
+      FileOutputStream sigfos = new FileOutputStream(new File(path, id));
+      sigfos.write(priv.getEncoded());
+      sigfos.close();
+    }
+    {
+      FileOutputStream sigfos = new FileOutputStream(new File(path, id + ".pub"));
+      sigfos.write(pub.getEncoded());
+      sigfos.close();
+    }
+  }
+  
+  public static void main (String [] args) throws 
+    NoSuchAlgorithmException, NoSuchProviderException, IOException{
+    generatePubandPrivateKeyFiles(args[0], args[1]);
+  }
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 98c7ee0..b5fa705 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -79,7 +79,6 @@
       }
     }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
     
-    
     TUnit.assertThat(new Callable<Object>() {
       public Object call() throws Exception {
         SharedGossipDataMessage x = clients.get(1).findSharedData("a");
diff --git a/src/test/java/org/apache/gossip/SignedMessageTest.java b/src/test/java/org/apache/gossip/SignedMessageTest.java
new file mode 100644
index 0000000..6bea974
--- /dev/null
+++ b/src/test/java/org/apache/gossip/SignedMessageTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.secure.KeyTool;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+import io.teknek.tunit.TUnit;
+
+public class SignedMessageTest {
+
+  @Test(expected=IllegalArgumentException.class)
+  public void ifSignMustHaveKeys()
+          throws URISyntaxException, UnknownHostException, InterruptedException {
+    String cluster = UUID.randomUUID().toString();
+    GossipSettings settings = gossiperThatSigns();
+    List<GossipMember> startupMembers = new ArrayList<>();
+    URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
+    GossipService gossipService = new GossipService(cluster, uri, 1 + "",
+            new HashMap<String, String>(), startupMembers, settings, (a, b) -> { }, 
+            new MetricRegistry());
+    gossipService.start();
+  }
+  
+  private GossipSettings gossiperThatSigns(){
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    settings.setSignMessages(true);
+    return settings;
+  }
+  
+  @Test
+  public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{
+    String keys = "./keys";
+    GossipSettings settings = gossiperThatSigns();
+    setup(keys);
+    String cluster = UUID.randomUUID().toString();
+    List<GossipMember> startupMembers = new ArrayList<>();
+    for (int i = 1; i < 2; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+      startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+    }
+    final List<GossipService> clients = new ArrayList<>();
+    for (int i = 1; i < 3; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+      GossipService gossipService = new GossipService(cluster, uri, i + "",
+              new HashMap<String,String>(), startupMembers, settings,
+              (a,b) -> {}, new MetricRegistry());
+      clients.add(gossipService);
+      gossipService.start();
+    }
+    assertTwoAlive(clients);
+    assertOnlySignedMessages(clients);
+    cleanup(keys, clients);
+  }
+  
+  private void assertTwoAlive(List<GossipService> clients){
+    TUnit.assertThat(() -> {
+      int total = 0;
+      for (int i = 0; i < clients.size(); ++i) {
+        total += clients.get(i).getGossipManager().getLiveMembers().size();
+      }
+      return total;
+    }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+  }
+  
+  private void assertOnlySignedMessages(List<GossipService> clients){
+    Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry()
+            .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
+    Assert.assertTrue(clients.get(0).getGossipManager().getRegistry()
+            .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
+  }
+  
+  private void cleanup(String keys, List<GossipService> clients){
+    new File(keys, "1").delete();
+    new File(keys, "2").delete();
+    new File(keys).delete();
+    for (int i = 0; i < clients.size(); ++i) {
+      clients.get(i).shutdown();
+    }
+  }
+  
+  private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException {
+    new File(keys).mkdir();
+    KeyTool.generatePubandPrivateKeyFiles(keys, "1");
+    KeyTool.generatePubandPrivateKeyFiles(keys, "2");
+  }
+}