GOSSIP-47 sign data
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 60a443f..bcea75c 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -56,6 +56,11 @@
private boolean persistDataState = true;
+ private String pathToKeyStore = "./keys";
+
+ private boolean signMessages = false;
+
+
/**
* Construct GossipSettings with default settings.
*/
@@ -202,5 +207,21 @@
public void setPersistDataState(boolean persistDataState) {
this.persistDataState = persistDataState;
}
+
+ public String getPathToKeyStore() {
+ return pathToKeyStore;
+ }
+
+ public void setPathToKeyStore(String pathToKeyStore) {
+ this.pathToKeyStore = pathToKeyStore;
+ }
+
+ public boolean isSignMessages() {
+ return signMessages;
+ }
+
+ public void setSignMessages(boolean signMessages) {
+ this.signMessages = signMessages;
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index e23ee54..6f97a74 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -17,12 +17,23 @@
*/
package org.apache.gossip.manager;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.SignatureException;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@@ -45,6 +56,7 @@
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
+import org.apache.gossip.model.SignedPayload;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
@@ -66,6 +78,8 @@
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
+ private final PKCS8EncodedKeySpec privKeySpec;
+ private final PrivateKey privKey;
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
@@ -86,6 +100,41 @@
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
+ if (manager.getSettings().isSignMessages()){
+ File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
+ File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
+ if (!privateKey.exists()){
+ throw new IllegalArgumentException("private key not found " + privateKey);
+ }
+ if (!publicKey.exists()){
+ throw new IllegalArgumentException("public key not found " + publicKey);
+ }
+ try (FileInputStream keyfis = new FileInputStream(privateKey)) {
+ byte[] encKey = new byte[keyfis.available()];
+ keyfis.read(encKey);
+ keyfis.close();
+ privKeySpec = new PKCS8EncodedKeySpec(encKey);
+ KeyFactory keyFactory = KeyFactory.getInstance("DSA");
+ privKey = keyFactory.generatePrivate(privKeySpec);
+ } catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
+ throw new RuntimeException("failed hard", e);
+ }
+ } else {
+ privKeySpec = null;
+ privKey = null;
+ }
+ }
+
+ private byte [] sign(byte [] bytes){
+ Signature dsa;
+ try {
+ dsa = Signature.getInstance("SHA1withDSA", "SUN");
+ dsa.initSign(privKey);
+ dsa.update(bytes);
+ return dsa.sign();
+ } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
+ throw new RuntimeException(e);
+ }
}
public void addSharedData(SharedGossipDataMessage message){
@@ -207,7 +256,14 @@
private void sendInternal(Base message, URI uri){
byte[] json_bytes;
try {
- json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes();
+ if (privKey == null){
+ json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+ } else {
+ SignedPayload p = new SignedPayload();
+ p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+ p.setSignature(sign(p.getData()));
+ json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+ }
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
@@ -285,7 +341,14 @@
public void sendOneWay(Base message, URI u){
byte[] json_bytes;
try {
- json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+ if (privKey == null){
+ json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
+ } else {
+ SignedPayload p = new SignedPayload();
+ p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
+ p.setSignature(sign(p.getData()));
+ json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
+ }
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 67cb06b..9221aa6 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -76,11 +76,11 @@
URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
this.settings = settings;
- gossipCore = new GossipCore(this, registry);
- clock = new SystemClock();
- dataReaper = new DataReaper(gossipCore, clock);
+ clock = new SystemClock();
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
+ gossipCore = new GossipCore(this, registry);
+ dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
@@ -337,5 +337,9 @@
public ObjectMapper getObjectMapper() {
return objectMapper;
}
+
+ public MetricRegistry getRegistry() {
+ return registry;
+ }
}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
new file mode 100644
index 0000000..3bcc344
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+public interface PassiveGossipConstants {
+ String SIGNED_MESSAGE = "gossip.passive.signed_message";
+ String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message";
+}
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index 51cf264..bfce2dd 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -26,8 +26,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base;
+import org.apache.gossip.model.SignedPayload;
import org.apache.log4j.Logger;
+import com.codahale.metrics.Meter;
+
/**
* This class handles the passive cycle,
* where this client has received an incoming message.
@@ -41,6 +44,8 @@
private final AtomicBoolean keepRunning;
private final GossipCore gossipCore;
private final GossipManager gossipManager;
+ private final Meter signed;
+ private final Meter unsigned;
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
@@ -52,14 +57,13 @@
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
- LOGGER.debug("Gossip service successfully initialized on port "
- + gossipManager.getMyself().getUri().getPort());
- LOGGER.debug("I am " + gossipManager.getMyself());
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
+ signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
+ unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
@@ -72,7 +76,15 @@
debug(p.getData());
try {
Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
- gossipCore.receive(activeGossipMessage);
+ if (activeGossipMessage instanceof SignedPayload){
+ SignedPayload s = (SignedPayload) activeGossipMessage;
+ Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
+ gossipCore.receive(nested);
+ signed.mark();
+ } else {
+ gossipCore.receive(activeGossipMessage);
+ unsigned.mark();
+ }
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
diff --git a/src/main/java/org/apache/gossip/model/SignedPayload.java b/src/main/java/org/apache/gossip/model/SignedPayload.java
new file mode 100644
index 0000000..9ffbcf1
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/SignedPayload.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.model;
+
+public class SignedPayload extends Base{
+ private byte [] data;
+ private byte [] signature;
+ public byte[] getData() {
+ return data;
+ }
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+ public byte[] getSignature() {
+ return signature;
+ }
+ public void setSignature(byte[] signature) {
+ this.signature = signature;
+ }
+
+}
diff --git a/src/main/java/org/apache/gossip/secure/KeyTool.java b/src/main/java/org/apache/gossip/secure/KeyTool.java
new file mode 100644
index 0000000..69f4e72
--- /dev/null
+++ b/src/main/java/org/apache/gossip/secure/KeyTool.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.secure;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.SecureRandom;
+
+public class KeyTool {
+
+ public static void generatePubandPrivateKeyFiles(String path, String id)
+ throws NoSuchAlgorithmException, NoSuchProviderException, IOException{
+ SecureRandom r = new SecureRandom();
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA", "SUN");
+ keyGen.initialize(1024, r);
+ KeyPair pair = keyGen.generateKeyPair();
+ PrivateKey priv = pair.getPrivate();
+ PublicKey pub = pair.getPublic();
+ {
+ FileOutputStream sigfos = new FileOutputStream(new File(path, id));
+ sigfos.write(priv.getEncoded());
+ sigfos.close();
+ }
+ {
+ FileOutputStream sigfos = new FileOutputStream(new File(path, id + ".pub"));
+ sigfos.write(pub.getEncoded());
+ sigfos.close();
+ }
+ }
+
+ public static void main (String [] args) throws
+ NoSuchAlgorithmException, NoSuchProviderException, IOException{
+ generatePubandPrivateKeyFiles(args[0], args[1]);
+ }
+}
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 98c7ee0..b5fa705 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -79,7 +79,6 @@
}
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
-
TUnit.assertThat(new Callable<Object>() {
public Object call() throws Exception {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
diff --git a/src/test/java/org/apache/gossip/SignedMessageTest.java b/src/test/java/org/apache/gossip/SignedMessageTest.java
new file mode 100644
index 0000000..6bea974
--- /dev/null
+++ b/src/test/java/org/apache/gossip/SignedMessageTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.manager.PassiveGossipConstants;
+import org.apache.gossip.secure.KeyTool;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+import io.teknek.tunit.TUnit;
+
+public class SignedMessageTest {
+
+ @Test(expected=IllegalArgumentException.class)
+ public void ifSignMustHaveKeys()
+ throws URISyntaxException, UnknownHostException, InterruptedException {
+ String cluster = UUID.randomUUID().toString();
+ GossipSettings settings = gossiperThatSigns();
+ List<GossipMember> startupMembers = new ArrayList<>();
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
+ GossipService gossipService = new GossipService(cluster, uri, 1 + "",
+ new HashMap<String, String>(), startupMembers, settings, (a, b) -> { },
+ new MetricRegistry());
+ gossipService.start();
+ }
+
+ private GossipSettings gossiperThatSigns(){
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ settings.setSignMessages(true);
+ return settings;
+ }
+
+ @Test
+ public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{
+ String keys = "./keys";
+ GossipSettings settings = gossiperThatSigns();
+ setup(keys);
+ String cluster = UUID.randomUUID().toString();
+ List<GossipMember> startupMembers = new ArrayList<>();
+ for (int i = 1; i < 2; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+ startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
+ }
+ final List<GossipService> clients = new ArrayList<>();
+ for (int i = 1; i < 3; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
+ GossipService gossipService = new GossipService(cluster, uri, i + "",
+ new HashMap<String,String>(), startupMembers, settings,
+ (a,b) -> {}, new MetricRegistry());
+ clients.add(gossipService);
+ gossipService.start();
+ }
+ assertTwoAlive(clients);
+ assertOnlySignedMessages(clients);
+ cleanup(keys, clients);
+ }
+
+ private void assertTwoAlive(List<GossipService> clients){
+ TUnit.assertThat(() -> {
+ int total = 0;
+ for (int i = 0; i < clients.size(); ++i) {
+ total += clients.get(i).getGossipManager().getLiveMembers().size();
+ }
+ return total;
+ }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
+ }
+
+ private void assertOnlySignedMessages(List<GossipService> clients){
+ Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry()
+ .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
+ Assert.assertTrue(clients.get(0).getGossipManager().getRegistry()
+ .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
+ }
+
+ private void cleanup(String keys, List<GossipService> clients){
+ new File(keys, "1").delete();
+ new File(keys, "2").delete();
+ new File(keys).delete();
+ for (int i = 0; i < clients.size(); ++i) {
+ clients.get(i).shutdown();
+ }
+ }
+
+ private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException {
+ new File(keys).mkdir();
+ KeyTool.generatePubandPrivateKeyFiles(keys, "1");
+ KeyTool.generatePubandPrivateKeyFiles(keys, "2");
+ }
+}