blob: 3d3b8220c4d1d1709faea3c84fc73cd1f832d52b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.impl.cluster.voting;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The osgi event handler responsible for following any votings and vote
* accordingly
*/
@Component(immediate = true)
@Service(value = {EventHandler.class, VotingHandler.class})
@Properties({
@Property(name = Constants.SERVICE_DESCRIPTION, value = "New Voting Event Listener."),
@Property(name = EventConstants.EVENT_TOPIC, value = {
SlingConstants.TOPIC_RESOURCE_ADDED,
SlingConstants.TOPIC_RESOURCE_CHANGED,
SlingConstants.TOPIC_RESOURCE_REMOVED }) })
// ,
// @Property(name = EventConstants.EVENT_FILTER, value = "(path="
// + org.apache.sling.discovery.viewmgr.Constants.ROOT_PATH + ")") })
public class VotingHandler implements EventHandler {
public static enum VotingDetail {
PROMOTED,
WINNING,
VOTED_YES,
VOTED_NO,
UNCHANGED,
TIMEDOUT
}
private final static Comparator<VotingView> VOTING_COMPARATOR = new Comparator<VotingView>() {
public int compare(VotingView o1, VotingView o2) {
if (o1 == o2) {
return 0;
}
if (o1 == null && o2 != null) {
return 1;
}
if (o2 == null && o1 != null) {
return -1;
}
// now both are non-null
return (o1.getVotingId().compareTo(o2.getVotingId()));
}
};
/** the name used for the period job with the scheduler **/
protected String NAME = "discovery.impl.analyzeVotings.runner.";
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private SlingSettingsService slingSettingsService;
@Reference
private ResourceResolverFactory resolverFactory;
@Reference
private Config config;
/** the sling id of the local instance **/
private String slingId;
/** the HeartbeatHandler sets the leaderElectionid - this is subsequently used
* to ensure the leaderElectionId is correctly set upon voting
*/
private volatile String leaderElectionId;
private volatile boolean activated;
/** for testing only **/
public static VotingHandler testConstructor(SlingSettingsService settingsService,
ResourceResolverFactory factory, Config config) {
VotingHandler handler = new VotingHandler();
handler.slingSettingsService = settingsService;
handler.resolverFactory = factory;
handler.config = config;
return handler;
}
@Deactivate
protected void deactivate() {
activated = false;
logger.info("deactivate: deactivated slingId: {}, this: {}", slingId, this);
}
@Activate
protected void activate(final ComponentContext context) {
slingId = slingSettingsService.getSlingId();
logger = LoggerFactory.getLogger(this.getClass().getCanonicalName()
+ "." + slingId);
activated = true;
logger.info("activated: activated ("+slingId+")");
}
/**
* handle repository changes and react to ongoing votings
*/
public void handleEvent(final Event event) {
if (!activated) {
return;
}
String resourcePath = (String) event.getProperty("path");
String ongoingVotingsPath = config.getOngoingVotingsPath();
if (resourcePath == null) {
// not of my business
return;
}
if (!resourcePath.startsWith(ongoingVotingsPath)) {
// not of my business
return;
}
ResourceResolver resourceResolver = null;
try {
resourceResolver = resolverFactory
.getAdministrativeResourceResolver(null);
} catch (LoginException e) {
logger.error(
"handleEvent: could not log in administratively: " + e, e);
return;
}
try {
if (logger.isDebugEnabled()) {
logger.debug("handleEvent: path = "+resourcePath+", event = "+event);
}
analyzeVotings(resourceResolver);
} catch (PersistenceException e) {
logger.error(
"handleEvent: got a PersistenceException during votings analysis: "
+ e, e);
} finally {
if (resourceResolver != null) {
resourceResolver.close();
}
}
}
/**
* Analyze any ongoing voting in the repository.
* <p>
* SLING-2885: this method must be synchronized as it can be called concurrently
* by the HearbeatHandler.doCheckView and the VotingHandler.handleEvent.
*/
public synchronized Map<VotingView,VotingDetail> analyzeVotings(final ResourceResolver resourceResolver) throws PersistenceException {
if (!activated) {
logger.info("analyzeVotings: VotingHandler not yet initialized, can't vote.");
return null;
}
Map<VotingView,VotingDetail> result = new HashMap<VotingView,VotingDetail>();
// SLING-3406: refreshing resourceResolver/session here to get the latest state from the repository
logger.debug("analyzeVotings: start. slingId: {}", slingId);
resourceResolver.refresh();
VotingView winningVote = VotingHelper.getWinningVoting(
resourceResolver, config);
if (winningVote != null) {
if (winningVote.isInitiatedBy(slingId)) {
logger.info("analyzeVotings: my voting was winning. I'll mark it as established then! "
+ winningVote);
promote(resourceResolver, winningVote.getResource());
// SLING-3406: committing resourceResolver/session here, while we're in the synchronized
resourceResolver.commit();
// for test verification
result.put(winningVote, VotingDetail.PROMOTED);
return result;
} else {
logger.info("analyzeVotings: there is a winning vote. No need to vote any further. Expecting it to get promoted to established: "
+ winningVote);
result.put(winningVote, VotingDetail.WINNING);
return result;
}
}
List<VotingView> ongoingVotings = VotingHelper.listVotings(resourceResolver, config);
if (ongoingVotings == null || ongoingVotings.size() == 0) {
logger.debug("analyzeVotings: no ongoing votings at the moment. done.");
return result;
}
Collections.sort(ongoingVotings, VOTING_COMPARATOR);
VotingView yesVote = null;
for (VotingView voting : ongoingVotings) {
Boolean myVote = voting.getVote(slingId);
boolean votedNo = myVote != null && !myVote;
boolean votedYes = myVote != null && myVote;
if (voting.isTimedoutVoting(config)) {
// if a voting has timed out, delete it
logger.info("analyzeVotings: deleting a timed out voting: "+voting);
voting.remove(false);
result.put(voting, VotingDetail.TIMEDOUT);
continue;
}
if (voting.hasNoVotes()) {
if (!votedNo) {
logger.info("analyzeVotings: vote already has no votes, so I shall also vote no: "+voting);
voting.vote(slingId, false, null);
result.put(voting, VotingDetail.VOTED_NO);
} else {
// else ignore silently
result.put(voting, VotingDetail.UNCHANGED);
}
continue;
}
String liveComparison = voting.matchesLiveView(config);
if (liveComparison != null) {
if (!votedNo) {
logger.info("analyzeVotings: vote doesnt match my live view, voting no. "
+ "comparison result: "+liveComparison+", vote: "+voting);
voting.vote(slingId, false, null);
result.put(voting, VotingDetail.VOTED_NO);
} else {
result.put(voting, VotingDetail.UNCHANGED);
}
continue;
}
if (!voting.isOngoingVoting(config)) {
logger.debug("analyzeVotings: vote is not ongoing (ignoring): "+voting);
continue;
}
if (yesVote != null) {
// as soon as I found the one I should vote yes for,
// vote no for the rest
if (!votedNo) {
logger.info("analyzeVotings: already voted yes, so voting no for: "+voting);
voting.vote(slingId, false, null);
result.put(voting, VotingDetail.VOTED_NO);
} else {
// else ignore silently
result.put(voting, VotingDetail.UNCHANGED);
}
continue;
}
if (!votedYes) {
logger.info("analyzeVotings: not timed out, no no-votes, matches live, still ongoing, "
+ "I have not yet voted yes, so noting candidate for yes as: "+voting);
}
yesVote = voting;
}
if (yesVote != null) {
Boolean myVote = yesVote.getVote(slingId);
boolean votedYes = myVote != null && myVote;
if (!votedYes) {
logger.info("analyzeVotings: declaring my personal winner: "+yesVote);
yesVote.vote(slingId, true, leaderElectionId);
result.put(yesVote, VotingDetail.VOTED_YES);
} else {
// else don't double vote / log
result.put(yesVote, VotingDetail.UNCHANGED);
}
}
resourceResolver.commit();
logger.debug("analyzeVotings: result: my yes vote was for: " + yesVote);
return result;
}
public void cleanupTimedoutVotings(final ResourceResolver resourceResolver) {
List<VotingView> timedoutVotings = VotingHelper
.listTimedoutVotings(resourceResolver,
config);
Iterator<VotingView> it = timedoutVotings.iterator();
while (it.hasNext()) {
VotingView timedoutVotingRes = it.next();
if (timedoutVotingRes!=null) {
logger.info("cleanupTimedoutVotings: removing a timed out voting: "+timedoutVotingRes);
timedoutVotingRes.remove(false);
}
}
}
/**
* Promote a particular voting to be the new established view
*/
private void promote(final ResourceResolver resourceResolver,
final Resource winningVoteResource) throws PersistenceException {
Resource previousViewsResource = ResourceHelper
.getOrCreateResource(
resourceResolver,
config.getPreviousViewPath());
final Resource establishedViewsResource = ResourceHelper
.getOrCreateResource(
resourceResolver,
config.getEstablishedViewPath());
final Resource ongoingVotingsResource = ResourceHelper
.getOrCreateResource(
resourceResolver,
config.getOngoingVotingsPath());
if (logger.isDebugEnabled()) {
logger.debug("promote: previousViewsResource="
+ previousViewsResource.getPath());
logger.debug("promote: establishedViewsResource="
+ establishedViewsResource.getPath());
logger.debug("promote: ongoingVotingsResource="
+ ongoingVotingsResource.getPath());
logger.debug("promote: winningVoteResource="
+ winningVoteResource.getPath());
}
// step 1: remove any nodes under previousViews
final Iterator<Resource> it1 = previousViewsResource.getChildren().iterator();
try{
while (it1.hasNext()) {
Resource previousView = it1.next();
resourceResolver.delete(previousView);
}
} catch(PersistenceException e) {
// if we cannot delete, apply workaround suggested in SLING-3785
logger.error("promote: Could not delete a previous view - trying move next: "+e, e);
ResourceHelper.moveResource(previousViewsResource, config.getPreviousViewPath()+"_trash_"+UUID.randomUUID().toString());
logger.info("promote: recreating the previousviews node");
previousViewsResource = ResourceHelper
.getOrCreateResource(
resourceResolver,
config.getPreviousViewPath());
}
// step 2: retire the existing established view.
// Note that there must always only be one. But if there's more, retire
// them all now.
final Iterator<Resource> it = establishedViewsResource.getChildren()
.iterator();
boolean first = true;
while (it.hasNext()) {
Resource retiredView = it.next();
if (first) {
first = !first;
if (logger.isDebugEnabled()) {
logger.debug("promote: moving the old established view to previous views: "
+ retiredView.getPath());
}
ResourceHelper.moveResource(retiredView,
previousViewsResource.getPath()
+ "/" + retiredView.getName());
} else {
if (logger.isDebugEnabled()) {
logger.debug("promote: retiring an erroneously additionally established node "
+ retiredView.getPath());
}
resourceResolver.delete(retiredView);
}
}
// step 3: move the winning vote resource under the
// establishedViewsResource
// 3a: set the leaderid
final Iterator<Resource> it2 = winningVoteResource.getChild("members")
.getChildren().iterator();
String leaderElectionId = null;
String leaderid = null;
while (it2.hasNext()) {
Resource aMember = it2.next();
String leid = aMember.adaptTo(ValueMap.class).get(
"leaderElectionId", String.class);
if (leaderElectionId == null
|| (leid != null && leid.compareTo(leaderElectionId) < 0)) {
leaderElectionId = leid;
leaderid = aMember.getName();
}
}
if (logger.isDebugEnabled()) {
logger.debug("promote: leader is " + leaderid
+ " - with leaderElectionId=" + leaderElectionId);
}
ModifiableValueMap winningVoteMap = winningVoteResource.adaptTo(ModifiableValueMap.class);
winningVoteMap.put("leaderId", leaderid);
winningVoteMap.put("leaderElectionId", leaderElectionId);
winningVoteMap.put("promotedAt", Calendar.getInstance());
winningVoteMap.put("promotedBy", slingId);
// 3b: move the result under /established
final String newEstablishedViewPath = establishedViewsResource.getPath()
+ "/" + winningVoteResource.getName();
logger.info("promote: promote to new established node "
+ newEstablishedViewPath);
ResourceHelper.moveResource(winningVoteResource, newEstablishedViewPath);
// step 4: delete all ongoing votings...
final Iterable<Resource> ongoingVotingsChildren = ongoingVotingsResource
.getChildren();
if (ongoingVotingsChildren != null) {
Iterator<Resource> it4 = ongoingVotingsChildren.iterator();
while (it4.hasNext()) {
Resource anOngoingVoting = it4.next();
resourceResolver.delete(anOngoingVoting);
}
}
// step 5: make sure there are no duplicate ongoingVotings nodes
// created. if so, cleanup
final Iterator<Resource> it5 = ongoingVotingsResource.getParent()
.getChildren().iterator();
while (it5.hasNext()) {
Resource resource = it5.next();
if (!resource
.getPath()
.startsWith(
config.getOngoingVotingsPath())) {
continue;
}
if (resource
.getPath()
.equals(config.getOngoingVotingsPath())) {
// then it's [0] so to speak .. which we're not cleaning up
continue;
}
logger.warn("promote: cleaning up a duplicate ongoingVotingPath: "
+ resource.getPath());
resourceResolver.delete(resource);
}
logger.debug("promote: done with promotiong. saving.");
resourceResolver.commit();
}
public void setLeaderElectionId(String leaderElectionId) {
logger.info("setLeaderElectionId: leaderElectionId="+leaderElectionId);
this.leaderElectionId = leaderElectionId;
}
}