blob: 9f4636a7cc6dd8229f887d19b8af4bd39438157a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.gossip.Member;
import org.apache.gossip.lock.exceptions.VoteFailedException;
import org.apache.gossip.lock.vote.MajorityVote;
import org.apache.gossip.lock.vote.Vote;
import org.apache.gossip.lock.vote.VoteCandidate;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class LockManager {
public static final Logger LOGGER = Logger.getLogger(LockManager.class);
private final GossipManager gossipManager;
private final LockManagerSettings lockSettings;
private final ScheduledExecutorService voteService;
private final AtomicInteger numberOfNodes;
private final Set<String> lockKeys;
// For MetricRegistry
public static final String LOCK_KEY_SET_SIZE = "gossip.lock.key_set_size";
public static final String LOCK_TIME = "gossip.lock.time";
private final Timer lockTimeMetric;
public LockManager(GossipManager gossipManager, final LockManagerSettings lockManagerSettings,
MetricRegistry metrics) {
this.gossipManager = gossipManager;
this.lockSettings = lockManagerSettings;
this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes());
this.lockKeys = new CopyOnWriteArraySet<>();
metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size);
lockTimeMetric = metrics.timer(LOCK_TIME);
// Register listener for lock keys
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (key.contains("lock/")) {
lockKeys.add(key);
}
});
voteService = Executors.newScheduledThreadPool(2);
voteService.scheduleAtFixedRate(this::updateVotes, 0, lockSettings.getVoteUpdateInterval(),
TimeUnit.MILLISECONDS);
}
public void acquireSharedDataLock(String key) throws VoteFailedException {
final Timer.Context context = lockTimeMetric.time();
gossipManager.merge(generateLockMessage(key));
int deadlockDetectCount = 0;
while (true) {
SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
continue;
}
MajorityVote majorityVoteResult = (MajorityVote) message.getPayload();
final Map<String, VoteCandidate> voteCandidatesMap = majorityVoteResult.value();
final Map<String, Boolean> voteResultMap = new HashMap<>();
// Store the vote result for each vote candidate nodes
voteCandidatesMap.forEach((candidateId, voteCandidate) -> voteResultMap
.put(candidateId, isVoteSuccess(voteCandidate)));
long passedCandidates = voteResultMap.values().stream().filter(aBoolean -> aBoolean).count();
String myNodeId = gossipManager.getMyself().getId();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("NodeId=" + myNodeId + ", VoteMap=" + voteResultMap + ", WinnerCount="
+ passedCandidates);
}
// Check for possible dead lock when no candidates were won
if (passedCandidates == 0) {
if (isDeadLock(voteCandidatesMap)) {
deadlockDetectCount++;
// Testing for deadlock is not always correct, therefore test for continues deadlocks
if (deadlockDetectCount >= lockSettings.getDeadlockDetectionThreshold()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deadlock detected from node " + myNodeId + ". VoteCandidatesMap="
+ voteCandidatesMap);
}
preventDeadLock(voteCandidatesMap);
}
} else {
deadlockDetectCount = 0;
}
} else if (passedCandidates == 1 && voteResultMap.containsKey(myNodeId)) {
context.stop();
if (voteResultMap.get(myNodeId)) {
// There is one winner and that is my node, therefore break the while loop and continue
break;
} else {
throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key);
}
} else if (passedCandidates > 1) {
// Multiple winners are not possible
context.stop();
throw new IllegalStateException("Multiple nodes get voted.");
}
try {
Thread.sleep(lockSettings.getResultCalculationDelay());
} catch (InterruptedException e) {
throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key, e);
}
}
}
// Generate Crdt lock message for voting
private SharedDataMessage generateLockMessage(String key) {
VoteCandidate voteCandidate = new VoteCandidate(gossipManager.getMyself().getId(), key,
new ConcurrentHashMap<>());
voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true, false,
gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()),
gossipManager.getDeadMembers().stream().map(Member::getId)
.collect(Collectors.toList())));
Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>();
voteCandidateMap.put(voteCandidate.getCandidateNodeId(), voteCandidate);
MajorityVote majorityVote = new MajorityVote(voteCandidateMap);
SharedDataMessage lockMessage = new SharedDataMessage();
lockMessage.setKey(generateLockKey(key));
lockMessage.setPayload(majorityVote);
lockMessage.setExpireAt(Long.MAX_VALUE);
lockMessage.setTimestamp(System.currentTimeMillis());
return lockMessage;
}
// This method will run periodically to vote the other nodes
private void updateVotes() {
for (String lockKey : lockKeys) {
SharedDataMessage message = gossipManager.findSharedGossipData(lockKey);
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
continue;
}
MajorityVote majorityVote = (MajorityVote) message.getPayload();
Map<String, VoteCandidate> voteCandidateMap = majorityVote.value();
String myNodeId = gossipManager.getMyself().getId();
// No need to vote if my node is already voted to every node for the key
if (isVotedToAll(myNodeId, voteCandidateMap)) {
continue;
}
String myVoteCandidate = getVotedCandidateNodeId(myNodeId, voteCandidateMap);
if (myVoteCandidate == null) {
myVoteCandidate = lockSettings.getVoteSelector().getVoteCandidateId(voteCandidateMap.keySet());
}
for (VoteCandidate voteCandidate : voteCandidateMap.values()) {
if (voteCandidate.getCandidateNodeId().equals(myNodeId)) {
continue;
}
// Vote for selected candidate
boolean voteResult = voteCandidate.getCandidateNodeId().equals(myVoteCandidate);
voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), voteResult, false,
gossipManager.getLiveMembers().stream().map(Member::getId)
.collect(Collectors.toList()),
gossipManager.getDeadMembers().stream().map(Member::getId)
.collect(Collectors.toList())));
}
}
}
// Return true if every node has a vote from given node id.
private boolean isVotedToAll(String nodeId, final Map<String, VoteCandidate> voteCandidates) {
int voteCount = 0;
for (VoteCandidate voteCandidate : voteCandidates.values()) {
if (voteCandidate.getVotes().containsKey(nodeId)) {
voteCount++;
}
}
return voteCount == voteCandidates.size();
}
// Returns true if there is a deadlock for given vote candidates
private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates) {
boolean result = true;
int numberOfLiveNodes;
if (numberOfNodes.get() > 0) {
numberOfLiveNodes = numberOfNodes.get();
} else {
// numberOfNodes is not set by the user, therefore calculate it.
Set<String> liveNodes = voteCandidates.values().stream()
.map(voteCandidate -> voteCandidate.getVotes().values()).flatMap(Collection::stream)
.map(Vote::getLiveMembers).flatMap(List::stream).collect(Collectors.toSet());
numberOfLiveNodes = liveNodes.size();
}
for (VoteCandidate voteCandidate : voteCandidates.values()) {
result = result && voteCandidate.getVotes().size() == numberOfLiveNodes;
}
return result;
}
// Prevent the deadlock by giving up the votes
private void preventDeadLock(Map<String, VoteCandidate> voteCandidates) {
String myNodeId = gossipManager.getMyself().getId();
VoteCandidate myResults = voteCandidates.get(myNodeId);
if (myResults == null) {
return;
}
// Set of nodes that is going to receive this nodes votes
List<String> donateCandidateIds = voteCandidates.keySet().stream()
.filter(s -> s.compareTo(myNodeId) < 0).collect(Collectors.toList());
if (donateCandidateIds.size() == 0) {
return;
}
// Select a random node to donate
Random randomizer = new Random();
String selectedCandidateId = donateCandidateIds
.get(randomizer.nextInt(donateCandidateIds.size()));
VoteCandidate selectedCandidate = voteCandidates.get(selectedCandidateId);
Set<Vote> myVotes = new HashSet<>(myResults.getVotes().values());
Set<Vote> selectedCandidateVotes = new HashSet<>(selectedCandidate.getVotes().values());
// Exchange the votes
for (Vote myVote : myVotes) {
for (Vote candidateVote : selectedCandidateVotes) {
if (myVote.getVoteValue() && myVote.getVotingNode().equals(candidateVote.getVotingNode())) {
myVote.setVoteExchange(true);
candidateVote.setVoteExchange(true);
selectedCandidate.getVotes().put(myVote.getVotingNode(), myVote);
myResults.getVotes().put(candidateVote.getVotingNode(), candidateVote);
}
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Node " + myNodeId + " give up votes to node " + selectedCandidateId);
}
}
private String getVotedCandidateNodeId(String nodeId,
final Map<String, VoteCandidate> voteCandidates) {
for (VoteCandidate voteCandidate : voteCandidates.values()) {
Vote vote = voteCandidate.getVotes().get(nodeId);
if (vote != null && vote.getVoteValue()) {
return voteCandidate.getCandidateNodeId();
}
}
return null;
}
// Return true if the given candidate has passed the vote
private boolean isVoteSuccess(VoteCandidate voteCandidate) {
Set<String> liveNodes = new HashSet<>();
int voteCount = 0;
for (Vote vote : voteCandidate.getVotes().values()) {
liveNodes.addAll(vote.getLiveMembers());
if (vote.getVoteValue()) {
voteCount++;
}
}
int numberOfLiveNodes;
if (numberOfNodes.get() > 0) {
numberOfLiveNodes = numberOfNodes.get();
} else {
numberOfLiveNodes = liveNodes.size();
}
return numberOfLiveNodes > 0 && voteCount >= (numberOfLiveNodes / 2 + 1);
}
private String generateLockKey(String key){
return "lock/" + key;
}
public void shutdown(){
voteService.shutdown();
}
/**
* Get the voted node id from this node for a given key
* @param key key of the data object
* @return Voted node id
*/
public String getVotedCandidateNodeId(String key) {
SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
return null;
}
MajorityVote majorityVote = (MajorityVote) message.getPayload();
return getVotedCandidateNodeId(gossipManager.getMyself().getId(), majorityVote.value());
}
/**
* Set the number of live nodes. If this value is negative, live nodes will be calculated
* @param numberOfNodes live node count or negative to calculate.
*/
public void setNumberOfNodes(int numberOfNodes) {
this.numberOfNodes.set(numberOfNodes);
}
}