GOSSIP-75 Vote based locking
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 32c00c9..4ea0ab6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -17,6 +17,8 @@
*/
package org.apache.gossip;
+import org.apache.gossip.lock.LockManagerSettings;
+
import java.util.HashMap;
import java.util.Map;
@@ -61,7 +63,10 @@
private String pathToKeyStore = "./keys";
private boolean signMessages = false;
-
+
+ // Settings related to lock manager
+ private LockManagerSettings lockManagerSettings = LockManagerSettings
+ .getLockManagerDefaultSettings();
/**
* Construct GossipSettings with default settings.
@@ -242,4 +247,15 @@
this.protocolManagerClass = protocolManagerClass;
}
+ public LockManagerSettings getLockManagerSettings() {
+ return lockManagerSettings;
+ }
+
+ /**
+ * Set the lock settings use by the lock manager
+ * @param lockManagerSettings lock settings. This object cannot be null.
+ */
+ public void setLockManagerSettings(LockManagerSettings lockManagerSettings) {
+ this.lockManagerSettings = lockManagerSettings;
+ }
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 396ec03..83d573d 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -24,6 +24,9 @@
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.gossip.LocalMember;
+import org.apache.gossip.lock.vote.MajorityVote;
+import org.apache.gossip.lock.vote.Vote;
+import org.apache.gossip.lock.vote.VoteCandidate;
import org.apache.gossip.replication.BlackListReplicable;
import org.apache.gossip.replication.Replicable;
import org.apache.gossip.replication.WhiteListReplicable;
@@ -107,6 +110,31 @@
@JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers();
}
+abstract class VoteCandidateMixin {
+ @JsonCreator
+ VoteCandidateMixin(
+ @JsonProperty("candidateNodeId") String candidateNodeId,
+ @JsonProperty("votingKey") String votingKey,
+ @JsonProperty("votes") Map<String, Vote> votes
+ ) { }
+}
+
+abstract class VoteMixin {
+ @JsonCreator
+ VoteMixin(
+ @JsonProperty("votingNode") String votingNode,
+ @JsonProperty("voteValue") Boolean voteValue,
+ @JsonProperty("voteExchange") Boolean voteExchange,
+ @JsonProperty("liveMembers") List<String> liveMembers,
+ @JsonProperty("deadMembers") List<String> deadMembers
+ ) { }
+}
+
+abstract class MajorityVoteMixin<E>{
+ @JsonCreator
+ MajorityVoteMixin(@JsonProperty("voteCandidates") Map<String, VoteCandidate> voteCandidateMap){ }
+}
+
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@@ -130,6 +158,9 @@
context.setMixInAnnotations(Replicable.class, ReplicableMixin.class);
context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class);
context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class);
+ context.setMixInAnnotations(MajorityVote.class, MajorityVoteMixin.class);
+ context.setMixInAnnotations(VoteCandidate.class, VoteCandidateMixin.class);
+ context.setMixInAnnotations(Vote.class, VoteMixin.class);
}
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
new file mode 100644
index 0000000..9f4636a
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.gossip.Member;
+import org.apache.gossip.lock.exceptions.VoteFailedException;
+import org.apache.gossip.lock.vote.MajorityVote;
+import org.apache.gossip.lock.vote.Vote;
+import org.apache.gossip.lock.vote.VoteCandidate;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class LockManager {
+
+ public static final Logger LOGGER = Logger.getLogger(LockManager.class);
+
+ private final GossipManager gossipManager;
+ private final LockManagerSettings lockSettings;
+ private final ScheduledExecutorService voteService;
+ private final AtomicInteger numberOfNodes;
+ private final Set<String> lockKeys;
+ // For MetricRegistry
+ public static final String LOCK_KEY_SET_SIZE = "gossip.lock.key_set_size";
+ public static final String LOCK_TIME = "gossip.lock.time";
+ private final Timer lockTimeMetric;
+
+ public LockManager(GossipManager gossipManager, final LockManagerSettings lockManagerSettings,
+ MetricRegistry metrics) {
+ this.gossipManager = gossipManager;
+ this.lockSettings = lockManagerSettings;
+ this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes());
+ this.lockKeys = new CopyOnWriteArraySet<>();
+ metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size);
+ lockTimeMetric = metrics.timer(LOCK_TIME);
+ // Register listener for lock keys
+ gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
+ if (key.contains("lock/")) {
+ lockKeys.add(key);
+ }
+ });
+ voteService = Executors.newScheduledThreadPool(2);
+ voteService.scheduleAtFixedRate(this::updateVotes, 0, lockSettings.getVoteUpdateInterval(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void acquireSharedDataLock(String key) throws VoteFailedException {
+ final Timer.Context context = lockTimeMetric.time();
+ gossipManager.merge(generateLockMessage(key));
+ int deadlockDetectCount = 0;
+ while (true) {
+ SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
+ if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+ continue;
+ }
+ MajorityVote majorityVoteResult = (MajorityVote) message.getPayload();
+ final Map<String, VoteCandidate> voteCandidatesMap = majorityVoteResult.value();
+ final Map<String, Boolean> voteResultMap = new HashMap<>();
+ // Store the vote result for each vote candidate nodes
+ voteCandidatesMap.forEach((candidateId, voteCandidate) -> voteResultMap
+ .put(candidateId, isVoteSuccess(voteCandidate)));
+
+ long passedCandidates = voteResultMap.values().stream().filter(aBoolean -> aBoolean).count();
+ String myNodeId = gossipManager.getMyself().getId();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("NodeId=" + myNodeId + ", VoteMap=" + voteResultMap + ", WinnerCount="
+ + passedCandidates);
+ }
+ // Check for possible dead lock when no candidates were won
+ if (passedCandidates == 0) {
+ if (isDeadLock(voteCandidatesMap)) {
+ deadlockDetectCount++;
+ // Testing for deadlock is not always correct, therefore test for continues deadlocks
+ if (deadlockDetectCount >= lockSettings.getDeadlockDetectionThreshold()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Deadlock detected from node " + myNodeId + ". VoteCandidatesMap="
+ + voteCandidatesMap);
+ }
+ preventDeadLock(voteCandidatesMap);
+ }
+ } else {
+ deadlockDetectCount = 0;
+ }
+ } else if (passedCandidates == 1 && voteResultMap.containsKey(myNodeId)) {
+ context.stop();
+ if (voteResultMap.get(myNodeId)) {
+ // There is one winner and that is my node, therefore break the while loop and continue
+ break;
+ } else {
+ throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key);
+ }
+ } else if (passedCandidates > 1) {
+ // Multiple winners are not possible
+ context.stop();
+ throw new IllegalStateException("Multiple nodes get voted.");
+ }
+
+ try {
+ Thread.sleep(lockSettings.getResultCalculationDelay());
+ } catch (InterruptedException e) {
+ throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key, e);
+ }
+ }
+ }
+
+ // Generate Crdt lock message for voting
+ private SharedDataMessage generateLockMessage(String key) {
+ VoteCandidate voteCandidate = new VoteCandidate(gossipManager.getMyself().getId(), key,
+ new ConcurrentHashMap<>());
+ voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true, false,
+ gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()),
+ gossipManager.getDeadMembers().stream().map(Member::getId)
+ .collect(Collectors.toList())));
+ Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>();
+ voteCandidateMap.put(voteCandidate.getCandidateNodeId(), voteCandidate);
+ MajorityVote majorityVote = new MajorityVote(voteCandidateMap);
+ SharedDataMessage lockMessage = new SharedDataMessage();
+ lockMessage.setKey(generateLockKey(key));
+ lockMessage.setPayload(majorityVote);
+ lockMessage.setExpireAt(Long.MAX_VALUE);
+ lockMessage.setTimestamp(System.currentTimeMillis());
+ return lockMessage;
+ }
+
+ // This method will run periodically to vote the other nodes
+ private void updateVotes() {
+ for (String lockKey : lockKeys) {
+ SharedDataMessage message = gossipManager.findSharedGossipData(lockKey);
+ if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+ continue;
+ }
+ MajorityVote majorityVote = (MajorityVote) message.getPayload();
+ Map<String, VoteCandidate> voteCandidateMap = majorityVote.value();
+ String myNodeId = gossipManager.getMyself().getId();
+ // No need to vote if my node is already voted to every node for the key
+ if (isVotedToAll(myNodeId, voteCandidateMap)) {
+ continue;
+ }
+ String myVoteCandidate = getVotedCandidateNodeId(myNodeId, voteCandidateMap);
+
+ if (myVoteCandidate == null) {
+ myVoteCandidate = lockSettings.getVoteSelector().getVoteCandidateId(voteCandidateMap.keySet());
+ }
+ for (VoteCandidate voteCandidate : voteCandidateMap.values()) {
+ if (voteCandidate.getCandidateNodeId().equals(myNodeId)) {
+ continue;
+ }
+ // Vote for selected candidate
+ boolean voteResult = voteCandidate.getCandidateNodeId().equals(myVoteCandidate);
+ voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), voteResult, false,
+ gossipManager.getLiveMembers().stream().map(Member::getId)
+ .collect(Collectors.toList()),
+ gossipManager.getDeadMembers().stream().map(Member::getId)
+ .collect(Collectors.toList())));
+ }
+ }
+ }
+
+ // Return true if every node has a vote from given node id.
+ private boolean isVotedToAll(String nodeId, final Map<String, VoteCandidate> voteCandidates) {
+ int voteCount = 0;
+ for (VoteCandidate voteCandidate : voteCandidates.values()) {
+ if (voteCandidate.getVotes().containsKey(nodeId)) {
+ voteCount++;
+ }
+ }
+ return voteCount == voteCandidates.size();
+ }
+
+ // Returns true if there is a deadlock for given vote candidates
+ private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates) {
+ boolean result = true;
+ int numberOfLiveNodes;
+ if (numberOfNodes.get() > 0) {
+ numberOfLiveNodes = numberOfNodes.get();
+ } else {
+ // numberOfNodes is not set by the user, therefore calculate it.
+ Set<String> liveNodes = voteCandidates.values().stream()
+ .map(voteCandidate -> voteCandidate.getVotes().values()).flatMap(Collection::stream)
+ .map(Vote::getLiveMembers).flatMap(List::stream).collect(Collectors.toSet());
+ numberOfLiveNodes = liveNodes.size();
+ }
+ for (VoteCandidate voteCandidate : voteCandidates.values()) {
+ result = result && voteCandidate.getVotes().size() == numberOfLiveNodes;
+ }
+ return result;
+ }
+
+ // Prevent the deadlock by giving up the votes
+ private void preventDeadLock(Map<String, VoteCandidate> voteCandidates) {
+ String myNodeId = gossipManager.getMyself().getId();
+ VoteCandidate myResults = voteCandidates.get(myNodeId);
+ if (myResults == null) {
+ return;
+ }
+ // Set of nodes that is going to receive this nodes votes
+ List<String> donateCandidateIds = voteCandidates.keySet().stream()
+ .filter(s -> s.compareTo(myNodeId) < 0).collect(Collectors.toList());
+ if (donateCandidateIds.size() == 0) {
+ return;
+ }
+ // Select a random node to donate
+ Random randomizer = new Random();
+ String selectedCandidateId = donateCandidateIds
+ .get(randomizer.nextInt(donateCandidateIds.size()));
+ VoteCandidate selectedCandidate = voteCandidates.get(selectedCandidateId);
+
+ Set<Vote> myVotes = new HashSet<>(myResults.getVotes().values());
+ Set<Vote> selectedCandidateVotes = new HashSet<>(selectedCandidate.getVotes().values());
+ // Exchange the votes
+ for (Vote myVote : myVotes) {
+ for (Vote candidateVote : selectedCandidateVotes) {
+ if (myVote.getVoteValue() && myVote.getVotingNode().equals(candidateVote.getVotingNode())) {
+ myVote.setVoteExchange(true);
+ candidateVote.setVoteExchange(true);
+ selectedCandidate.getVotes().put(myVote.getVotingNode(), myVote);
+ myResults.getVotes().put(candidateVote.getVotingNode(), candidateVote);
+ }
+ }
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Node " + myNodeId + " give up votes to node " + selectedCandidateId);
+ }
+ }
+
+ private String getVotedCandidateNodeId(String nodeId,
+ final Map<String, VoteCandidate> voteCandidates) {
+ for (VoteCandidate voteCandidate : voteCandidates.values()) {
+ Vote vote = voteCandidate.getVotes().get(nodeId);
+ if (vote != null && vote.getVoteValue()) {
+ return voteCandidate.getCandidateNodeId();
+ }
+ }
+ return null;
+ }
+
+ // Return true if the given candidate has passed the vote
+ private boolean isVoteSuccess(VoteCandidate voteCandidate) {
+ Set<String> liveNodes = new HashSet<>();
+ int voteCount = 0;
+ for (Vote vote : voteCandidate.getVotes().values()) {
+ liveNodes.addAll(vote.getLiveMembers());
+ if (vote.getVoteValue()) {
+ voteCount++;
+ }
+ }
+ int numberOfLiveNodes;
+ if (numberOfNodes.get() > 0) {
+ numberOfLiveNodes = numberOfNodes.get();
+ } else {
+ numberOfLiveNodes = liveNodes.size();
+ }
+ return numberOfLiveNodes > 0 && voteCount >= (numberOfLiveNodes / 2 + 1);
+ }
+
+ private String generateLockKey(String key){
+ return "lock/" + key;
+ }
+
+ public void shutdown(){
+ voteService.shutdown();
+ }
+ /**
+ * Get the voted node id from this node for a given key
+ * @param key key of the data object
+ * @return Voted node id
+ */
+ public String getVotedCandidateNodeId(String key) {
+ SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
+ if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+ return null;
+ }
+ MajorityVote majorityVote = (MajorityVote) message.getPayload();
+ return getVotedCandidateNodeId(gossipManager.getMyself().getId(), majorityVote.value());
+ }
+
+ /**
+ * Set the number of live nodes. If this value is negative, live nodes will be calculated
+ * @param numberOfNodes live node count or negative to calculate.
+ */
+ public void setNumberOfNodes(int numberOfNodes) {
+ this.numberOfNodes.set(numberOfNodes);
+ }
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java
new file mode 100644
index 0000000..4af47a2
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock;
+
+import org.apache.gossip.lock.vote.RandomVoteSelector;
+import org.apache.gossip.lock.vote.VoteSelector;
+
+/**
+ * Stores the lock manager related settings.
+ */
+public class LockManagerSettings {
+ // Time between vote updates in ms. Default is 1 second.
+ private final int voteUpdateInterval;
+ // Vote selection algorithm. Default is random voting
+ private final VoteSelector voteSelector;
+ // Number of nodes available for voting. Default is -1 (Auto calculate)
+ private final int numberOfNodes;
+ // Number of times to test for deadlock before preventing. Default is 3
+ private final int deadlockDetectionThreshold;
+ // Wait time between vote result calculation. Default is 1000
+ private final int resultCalculationDelay;
+
+ /**
+ * Construct LockManagerSettings with default settings.
+ */
+ public static LockManagerSettings getLockManagerDefaultSettings() {
+ return new LockManagerSettings(1000, new RandomVoteSelector(), -1, 3, 1000);
+ }
+
+ /**
+ * Construct a custom LockManagerSettings
+ *
+ * @param voteUpdateInterval Time between vote updates in milliseconds.
+ * @param voteSelector Vote selection algorithm. Cannot be null
+ * @param numberOfNodes Number of nodes available for voting. Set to negative value for auto calculate
+ * @param deadlockDetectionThreshold Number of times to test for deadlock before preventing
+ * @param resultCalculationDelay Wait time between vote result calculation
+ */
+ public LockManagerSettings(int voteUpdateInterval, VoteSelector voteSelector, int numberOfNodes,
+ int deadlockDetectionThreshold, int resultCalculationDelay) {
+ this.voteUpdateInterval = voteUpdateInterval;
+ this.voteSelector = voteSelector;
+ this.numberOfNodes = numberOfNodes;
+ this.deadlockDetectionThreshold = deadlockDetectionThreshold;
+ this.resultCalculationDelay = resultCalculationDelay;
+
+ }
+
+ public int getVoteUpdateInterval() {
+ return voteUpdateInterval;
+ }
+
+ public VoteSelector getVoteSelector() {
+ return voteSelector;
+ }
+
+ public int getNumberOfNodes() {
+ return numberOfNodes;
+ }
+
+ public int getDeadlockDetectionThreshold() {
+ return deadlockDetectionThreshold;
+ }
+
+ public int getResultCalculationDelay() {
+ return resultCalculationDelay;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
new file mode 100644
index 0000000..bd0a606
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.exceptions;
+
+/**
+ * This exception is thrown when the lock based voting is failed.
+ */
+public class VoteFailedException extends Exception {
+ /**
+ * Constructs a new VoteFailedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public VoteFailedException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new VoteFailedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message
+ * @param cause the cause for this exception
+ */
+ public VoteFailedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java
new file mode 100644
index 0000000..a18f3c9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import org.apache.gossip.crdt.Crdt;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * CRDT which used for distribute a votes for a given key.
+ */
+public class MajorityVote implements Crdt<Map<String, VoteCandidate>, MajorityVote> {
+
+ private final Map<String, VoteCandidate> voteCandidates = new ConcurrentHashMap<>();
+
+ public MajorityVote(Map<String, VoteCandidate> voteCandidateMap) {
+ voteCandidates.putAll(voteCandidateMap);
+ }
+
+ @Override
+ public MajorityVote merge(MajorityVote other) {
+ Map<String, VoteCandidate> mergedCandidates = new ConcurrentHashMap<>();
+ Set<String> firstKeySet = this.voteCandidates.keySet();
+ Set<String> secondKeySet = other.voteCandidates.keySet();
+ Set<String> sameCandidatesSet = getIntersection(firstKeySet, secondKeySet);
+ Set<String> differentCandidatesSet = getIntersectionCompliment(firstKeySet, secondKeySet);
+ // Merge different vote candidates by combining votes
+ for (String differentCandidateId : differentCandidatesSet) {
+ if (this.voteCandidates.containsKey(differentCandidateId)) {
+ mergedCandidates.put(differentCandidateId, this.voteCandidates.get(differentCandidateId));
+ } else if (other.voteCandidates.containsKey(differentCandidateId)) {
+ mergedCandidates.put(differentCandidateId, other.voteCandidates.get(differentCandidateId));
+ }
+ }
+ // Merge votes for the same candidate
+ for (String sameCandidateId : sameCandidatesSet) {
+ if (this.voteCandidates.containsKey(sameCandidateId) && other.voteCandidates
+ .containsKey(sameCandidateId)) {
+ mergedCandidates.put(sameCandidateId,
+ mergeCandidate(this.voteCandidates.get(sameCandidateId),
+ other.voteCandidates.get(sameCandidateId)));
+ }
+ }
+
+ return new MajorityVote(mergedCandidates);
+ }
+
+ // Merge different votes for same candidate
+ private VoteCandidate mergeCandidate(VoteCandidate firstCandidate,
+ VoteCandidate secondCandidate) {
+ VoteCandidate mergeResult = new VoteCandidate(firstCandidate.getCandidateNodeId(),
+ firstCandidate.getVotingKey(), new ConcurrentHashMap<>());
+ Set<String> firstKeySet = firstCandidate.getVotes().keySet();
+ Set<String> secondKeySet = secondCandidate.getVotes().keySet();
+ Set<String> sameVoteNodeSet = getIntersection(firstKeySet, secondKeySet);
+ Set<String> differentVoteNodeSet = getIntersectionCompliment(firstKeySet, secondKeySet);
+ // Merge different voters by combining their votes
+ for (String differentCandidateId : differentVoteNodeSet) {
+ if (firstCandidate.getVotes().containsKey(differentCandidateId)) {
+ mergeResult.getVotes()
+ .put(differentCandidateId, firstCandidate.getVotes().get(differentCandidateId));
+ } else if (secondCandidate.getVotes().containsKey(differentCandidateId)) {
+ mergeResult.getVotes()
+ .put(differentCandidateId, secondCandidate.getVotes().get(differentCandidateId));
+ }
+ }
+ // Merge vote for same voter
+ for (String sameVoteNodeId : sameVoteNodeSet) {
+ if (firstCandidate.getVotes().containsKey(sameVoteNodeId) && secondCandidate.getVotes()
+ .containsKey(sameVoteNodeId)) {
+ mergeResult.getVotes().put(sameVoteNodeId,
+ mergeVote(firstCandidate.getVotes().get(sameVoteNodeId),
+ secondCandidate.getVotes().get(sameVoteNodeId)));
+ }
+ }
+
+ return mergeResult;
+ }
+
+ // Merge two votes from same voter
+ private Vote mergeVote(Vote firstVote, Vote secondVote) {
+ if (firstVote.getVoteValue().booleanValue() != secondVote.getVoteValue().booleanValue()) {
+ if (firstVote.getVoteExchange()) {
+ return firstVote;
+ } else if (secondVote.getVoteExchange()) {
+ return secondVote;
+ } else {
+ return secondVote;
+ }
+ } else {
+ return secondVote;
+ }
+ }
+
+ private Set<String> getIntersection(Set<String> first, Set<String> second) {
+ Set<String> intersection = new HashSet<>(first);
+ intersection.retainAll(second);
+ return intersection;
+ }
+
+ private Set<String> getIntersectionCompliment(Set<String> first, Set<String> second) {
+ Set<String> union = new HashSet<>();
+ union.addAll(first);
+ union.addAll(second);
+ Set<String> intersectionCompliment = new HashSet<>(union);
+ intersectionCompliment.removeAll(getIntersection(first, second));
+ return intersectionCompliment;
+ }
+
+ @Override
+ public Map<String, VoteCandidate> value() {
+ Map<String, VoteCandidate> copy = new ConcurrentHashMap<>();
+ copy.putAll(voteCandidates);
+ return Collections.unmodifiableMap(copy);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return voteCandidates.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+ if (obj == this)
+ return true;
+ if (!(obj instanceof MajorityVote))
+ return false;
+ MajorityVote other = (MajorityVote) obj;
+ return Objects.equals(voteCandidates, other.voteCandidates);
+ }
+
+ @Override
+ public String toString() {
+ return voteCandidates.toString();
+ }
+
+ @Override
+ public MajorityVote optimize() {
+ return new MajorityVote(voteCandidates);
+ }
+
+ public Map<String, VoteCandidate> getVoteCandidates() {
+ return new ConcurrentHashMap<>(voteCandidates);
+ }
+
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java
new file mode 100644
index 0000000..d07f190
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * VoteSelector implementation which randomly select a voting node.
+ */
+public class RandomVoteSelector implements VoteSelector {
+
+ @Override
+ public String getVoteCandidateId(Set<String> voteCandidateIds) {
+ List<String> voteCandidatesIds = new ArrayList<>(voteCandidateIds);
+ return voteCandidatesIds.get(new Random().nextInt(voteCandidatesIds.size()));
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java
new file mode 100644
index 0000000..e68401c
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.List;
+
+/**
+ * Store a voter details.
+ */
+public class Vote {
+ private final String votingNode;
+ private final Boolean voteValue; // TODO: 7/16/17 weight?
+ private Boolean voteExchange;
+ private final List<String> liveMembers;
+ private final List<String> deadMembers;
+
+ public Vote(String votingNode, Boolean voteValue, Boolean voteExchange, List<String> liveMembers,
+ List<String> deadMembers) {
+ this.votingNode = votingNode;
+ this.voteValue = voteValue;
+ this.voteExchange = voteExchange;
+ this.liveMembers = liveMembers;
+ this.deadMembers = deadMembers;
+ }
+
+ public String getVotingNode() {
+ return votingNode;
+ }
+
+ public Boolean getVoteValue() {
+ return voteValue;
+ }
+
+ public Boolean getVoteExchange() {
+ return voteExchange;
+ }
+
+ public void setVoteExchange(Boolean voteExchange) {
+ this.voteExchange = voteExchange;
+ }
+
+ public List<String> getLiveMembers() {
+ return liveMembers;
+ }
+
+ public List<String> getDeadMembers() {
+ return deadMembers;
+ }
+
+ @Override
+ public String toString() {
+ return "votingNode=" + votingNode + ", voteValue=" + voteValue + ", liveMembers=" + liveMembers
+ + ", deadMembers= " + deadMembers;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java
new file mode 100644
index 0000000..b81b0b9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Stores the vote candidate details and its votes.
+ */
+public class VoteCandidate {
+
+ private final String candidateNodeId;
+ private final String votingKey;
+ private final Map<String, Vote> votes;
+
+ public VoteCandidate(String candidateNodeId, String votingKey, Map<String, Vote> votes) {
+
+ this.candidateNodeId = candidateNodeId;
+ this.votingKey = votingKey;
+ this.votes = votes;
+ }
+
+ public String getCandidateNodeId() {
+ return candidateNodeId;
+ }
+
+ public String getVotingKey() {
+ return votingKey;
+ }
+
+ public Map<String, Vote> getVotes() {
+ return votes;
+ }
+
+ public void addVote(Vote vote) {
+ votes.put(vote.getVotingNode(), vote);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(candidateNodeId, votingKey);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof VoteCandidate))
+ return false;
+ if (obj == this)
+ return true;
+ VoteCandidate other = (VoteCandidate) obj;
+ return this.candidateNodeId.equals(other.candidateNodeId) && this.votingKey
+ .equals(other.votingKey);
+ }
+
+ @Override
+ public String toString() {
+ return "candidateNodeId=" + candidateNodeId + ", votingKey=" + votingKey + ", votes= " + votes;
+ }
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java
new file mode 100644
index 0000000..91c22b3
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.Set;
+
+/**
+ * This interface defines vote selection algorithm for the vote based locking.
+ */
+public interface VoteSelector {
+ /**
+ * This method get call by the lock manager of a node to decide which candidate need to be choose for voting.
+ *
+ * @param voteCandidateIds node id set for the vote candidates
+ * @return selected node id to vote from the given vote candidate set.
+ */
+ String getVoteCandidateId(Set<String> voteCandidateIds);
+}
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index db442c6..2e45843 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -28,6 +28,8 @@
import org.apache.gossip.event.GossipState;
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
+import org.apache.gossip.lock.LockManager;
+import org.apache.gossip.lock.exceptions.VoteFailedException;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
@@ -43,7 +45,11 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -77,7 +83,8 @@
private final GossipMemberStateRefresher memberStateRefresher;
private final MessageHandler messageHandler;
-
+ private final LockManager lockManager;
+
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
@@ -89,6 +96,7 @@
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
+ this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (Member startupMember : gossipMembers) {
@@ -221,6 +229,7 @@
*/
public void shutdown() {
gossipServiceRunning.set(false);
+ lockManager.shutdown();
gossipCore.shutdown();
transportManager.shutdown();
dataReaper.close();
@@ -371,4 +380,21 @@
public void registerGossipListener(GossipListener listener) {
memberStateRefresher.register(listener);
}
+
+ /**
+ * Get the lock manager specified with this GossipManager.
+ * @return lock manager object.
+ */
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
+ /**
+ * Try to acquire a lock on given shared data key.
+ * @param key key of tha share data object.
+ * @throws VoteFailedException if the locking is failed.
+ */
+ public void acquireSharedDataLock(String key) throws VoteFailedException{
+ lockManager.acquireSharedDataLock(key);
+ }
}
diff --git a/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java
new file mode 100644
index 0000000..c558444
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(JUnitPlatform.class)
+public class MajorityVoteTest {
+
+ @Test
+ public void differentCandidateMergeTest() {
+ Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+ VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
+ voteCandidateMap1.put("1", candidateA);
+ MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+ Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+ VoteCandidate candidateB = new VoteCandidate("3", "key1", generateVotes(3, 4, true, false));
+ voteCandidateMap2.put("3", candidateB);
+ MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+ MajorityVote result = first.merge(second);
+
+ Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+ Assert.assertTrue(!result.value().get("3").getVotes().get("4").getVoteValue());
+
+ }
+
+ @Test
+ public void sameCandidateMergeTest() {
+ Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+ VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
+ voteCandidateMap1.put("1", candidateA);
+ MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+ Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+ VoteCandidate candidateB = new VoteCandidate("1", "key1", generateVotes(3, 4, true, false));
+ voteCandidateMap2.put("1", candidateB);
+ MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+ MajorityVote result = first.merge(second);
+
+ Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+ Assert.assertTrue(!result.value().get("1").getVotes().get("4").getVoteValue());
+
+ }
+
+ @Test
+ public void sameVoteMergeTest() {
+ Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+ VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
+ voteCandidateMap1.put("1", candidateA);
+ MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+ Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+ VoteCandidate candidateB = new VoteCandidate("1", "key1",
+ generateVotes(2, 4, true, false, true));
+ voteCandidateMap2.put("1", candidateB);
+ MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+ MajorityVote result = first.merge(second);
+
+ Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+ }
+
+ public Map<String, Vote> generateVotes(int startingNodeId, int endNodeId, boolean... votes) {
+ Map<String, Vote> voteMap = new HashMap<>();
+ if ((endNodeId - startingNodeId + 1) > votes.length) {
+ return voteMap;
+ }
+ for (int i = startingNodeId; i <= endNodeId; i++) {
+ String nodeId = i + "";
+ voteMap.put(nodeId, new Vote(nodeId, votes[i - startingNodeId], false, new ArrayList<>(),
+ new ArrayList<>()));
+ }
+ return voteMap;
+ }
+
+}
diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java
new file mode 100644
index 0000000..01dbac0
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import org.apache.gossip.lock.exceptions.VoteFailedException;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataLockTest extends AbstractIntegrationBase {
+
+ @Test
+ public void sharedDataLockRandomVoteTest()
+ throws InterruptedException, UnknownHostException, URISyntaxException {
+ GossipSettings settings = new GossipSettings();
+ settings.setPersistRingState(false);
+ settings.setPersistDataState(false);
+ String cluster = UUID.randomUUID().toString();
+ int seedNodes = 1;
+ List<Member> startupMembers = new ArrayList<>();
+ for (int i = 1; i < seedNodes + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+ }
+ final List<GossipManager> clients = new ArrayList<>();
+ final int clusterMembers = 10;
+ for (int i = 1; i < clusterMembers + 1; ++i) {
+ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+ .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ clients.add(gossipService);
+ gossipService.getLockManager().setNumberOfNodes(clusterMembers);
+ gossipService.init();
+ register(gossipService);
+ }
+
+ // Adding new data to Node 1
+ clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
+
+ final AtomicInteger lockSuccessCount = new AtomicInteger(0);
+ final AtomicInteger lockFailedCount = new AtomicInteger(0);
+
+ // Node 1 try to lock on key category
+ Thread Node1LockingThread = new Thread(() -> {
+ try {
+ clients.get(0).acquireSharedDataLock("category");
+ lockSuccessCount.incrementAndGet();
+ } catch (VoteFailedException ignore) {
+ lockFailedCount.incrementAndGet();
+ }
+ });
+
+ // Node 3 try to lock on key category
+ Thread Node3LockingThread = new Thread(() -> {
+ try {
+ clients.get(2).acquireSharedDataLock("category");
+ lockSuccessCount.incrementAndGet();
+ } catch (VoteFailedException ignore) {
+ lockFailedCount.incrementAndGet();
+ }
+ });
+
+ // Node 6 try to lock on key category
+ Thread Node5LockingThread = new Thread(() -> {
+ try {
+ clients.get(5).acquireSharedDataLock("category");
+ lockSuccessCount.incrementAndGet();
+ } catch (VoteFailedException ignore) {
+ lockFailedCount.incrementAndGet();
+ }
+ });
+
+ Node1LockingThread.start();
+ Node3LockingThread.start();
+ Node5LockingThread.start();
+
+ Node1LockingThread.join();
+ Node3LockingThread.join();
+ Node5LockingThread.join();
+
+ // Only one node should acquire the lock
+ Assert.assertEquals(1, lockSuccessCount.get());
+ // Other nodes should fail
+ Assert.assertEquals(2, lockFailedCount.get());
+
+ }
+
+ private SharedDataMessage sharedNodeData(String key, String value) {
+ SharedDataMessage g = new SharedDataMessage();
+ g.setExpireAt(Long.MAX_VALUE);
+ g.setKey(key);
+ g.setPayload(value);
+ g.setTimestamp(System.currentTimeMillis());
+ return g;
+ }
+
+}