/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.discovery.impl.common.heartbeat;

import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import javax.jcr.Session;

import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.base.commons.BaseViewChecker;
import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
import org.apache.sling.discovery.impl.cluster.voting.VotingHandler;
import org.apache.sling.discovery.impl.cluster.voting.VotingHelper;
import org.apache.sling.discovery.impl.cluster.voting.VotingView;
import org.apache.sling.discovery.impl.common.View;
import org.apache.sling.discovery.impl.common.ViewHelper;
import org.apache.sling.launchpad.api.StartupListener;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleException;
import org.osgi.service.http.HttpService;

/**
 * The heartbeat handler is responsible and capable of issuing both local and
 * remote heartbeats and registers a periodic job with the scheduler for doing so.
 * <p>
 * Local heartbeats are stored in the repository. Remote heartbeats are POSTs to
 * remote TopologyConnectorServlets.
 */
@Component
@Service(value = { HeartbeatHandler.class, StartupListener.class })
@Reference(referenceInterface=HttpService.class,
           cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
           policy=ReferencePolicy.DYNAMIC)
public class HeartbeatHandler extends BaseViewChecker {

    private static final String PROPERTY_ID_LAST_HEARTBEAT = "lastHeartbeat";

    @Reference
    protected SlingSettingsService slingSettingsService;

    @Reference
    protected ResourceResolverFactory resourceResolverFactory;

    @Reference
    protected ConnectorRegistry connectorRegistry;

    @Reference
    protected AnnouncementRegistry announcementRegistry;

    @Reference
    protected Scheduler scheduler;

    @Reference
    private Config config;

    @Reference
    private VotingHandler votingHandler;

    /** the id which is to be used for the next voting **/
    private String nextVotingId = UUID.randomUUID().toString();

    /** whether or not to reset the leaderElectionId at next heartbeat time **/
    private volatile boolean resetLeaderElectionId = false;

    /** SLING-5030 : upon resetLeaderElectionId() a newLeaderElectionId is calculated 
     * and passed on to the VotingHandler - but the actual storing under ./clusterInstances
     * is only done on heartbeats - this field is used to temporarily store the new
     * leaderElectionId that the next heartbeat then stores
     */
    private volatile String newLeaderElectionId;
    
    /** SLING-2892: remember first heartbeat written to repository by this instance **/
    private long firstHeartbeatWritten = -1;

    /** SLING-2892: remember the value of the heartbeat this instance has written the last time **/
    private volatile Calendar lastHeartbeatWritten = null;

    private DiscoveryServiceImpl discoveryServiceImpl;

    private String lastEstablishedViewId;

    protected String failedEstablishedViewId;

    /** for testing only **/
    public static HeartbeatHandler testConstructor(
            SlingSettingsService slingSettingsService,
            ResourceResolverFactory factory, 
            AnnouncementRegistry announcementRegistry, 
            ConnectorRegistry connectorRegistry,
            Config config, 
            Scheduler scheduler,
            VotingHandler votingHandler) {
        HeartbeatHandler handler = new HeartbeatHandler();
        handler.slingSettingsService = slingSettingsService;
        handler.resourceResolverFactory = factory;
        handler.announcementRegistry = announcementRegistry;
        handler.connectorRegistry = connectorRegistry;
        handler.config = config;
        handler.scheduler = scheduler;
        handler.votingHandler = votingHandler;
        return handler;
    }
    
    @Override
    protected AnnouncementRegistry getAnnouncementRegistry() {
        return announcementRegistry;
    }
    
    @Override
    protected BaseConfig getConnectorConfig() {
        return config;
    }
    
    @Override
    protected ConnectorRegistry getConnectorRegistry() {
        return connectorRegistry;
    }
    
    @Override
    protected ResourceResolverFactory getResourceResolverFactory() {
        return resourceResolverFactory;
    }
    
    @Override
    protected Scheduler getScheduler() {
        return scheduler;
    }
    
    @Override
    protected SlingSettingsService getSlingSettingsService() {
        return slingSettingsService;
    }
    
    @Override
    protected void doActivate() {
        // on activate the resetLeaderElectionId is set to true to ensure that
        // the 'leaderElectionId' property is reset on next heartbeat issuance.
        // the idea being that a node which leaves the cluster should not
        // become leader on next join - and by resetting the leaderElectionId
        // to the current time, this is ensured.
        resetLeaderElectionId = true;
        runtimeId = UUID.randomUUID().toString();

        // SLING-2895: reset variables to avoid unnecessary log.error
        firstHeartbeatWritten = -1;
        lastHeartbeatWritten = null;

        logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
    }
    
    @Override
    protected void deactivate() {
        super.deactivate();
        scheduler.removeJob(NAME+".checkForTopologyChange");        
    }
    
    /**
     * The initialize method is called by the DiscoveryServiceImpl.activate
     * as we require the discoveryService (and the discoveryService has
     * a reference on us - but we cant have circular references in osgi).
     * <p>
     * The initialVotingId is used to avoid an unnecessary topologyChanged event
     * when starting up an instance in a 1-node cluster: the instance
     * will wait until the first voting has been finished to send
     * the TOPOLOGY_INIT event - BUT even before that the API method
     * getTopology() is open - so if anyone asks for the topology
     * BEFORE the first voting in a 1-node cluster is done, it gets
     * a particular clusterId - that one we aim to reuse for the first
     * voting.
     */
    public void initialize(final DiscoveryServiceImpl discoveryService,
            final String initialVotingId) {
        synchronized(lock) {
            this.discoveryServiceImpl = discoveryService;
        	this.nextVotingId = initialVotingId;
        	logger.info("initialize: nextVotingId="+nextVotingId);
            issueHeartbeat();
        }

        try {
            long interval = config.getHeartbeatInterval();
            logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
            if (interval==0) {
                logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec");
                interval = 10;
            }
            scheduler.addPeriodicJob(NAME, this,
                    null, interval, false);
        } catch (Exception e) {
            logger.error("activate: Could not start heartbeat runner: " + e, e);
        }

        // SLING-5195 - to account for repository delays, the writing of heartbeats and voting
        // should be done independently of getting the current clusterView and 
        // potentially sending a topology event.
        // so this second part is now done (additionally) in a 2nd runner here:
        try {
            long interval = config.getHeartbeatInterval();
            logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
            if (interval==0) {
                logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec.");
                interval = 10;
            }
            scheduler.addPeriodicJob(NAME+".checkForTopologyChange", new Runnable() {

                @Override
                public void run() {
                    Calendar lastHb = lastHeartbeatWritten;
                    if (lastHb!=null) {
                        // check to see when we last wrote a heartbeat
                        // if it is older than the configured timeout,
                        // then mark ourselves as in topologyChanging automatically
                        long timeSinceHb = System.currentTimeMillis() - lastHb.getTimeInMillis();
                        if (timeSinceHb > config.getHeartbeatTimeoutMillis()) {
                            logger.info("checkForTopologyChange/.run: time since local instance last wrote a heartbeat is "+timeSinceHb+"ms. Flagging us as (still) changing");
                            // mark the current establishedView as faulty
                            invalidateCurrentEstablishedView();
                            
                            // then tell the listeners immediately
                            // note that just calling handleTopologyChanging alone - without the above invalidate -
                            // won't be sufficient, because that would only affect the listeners, not the
                            // getTopology() call.
                            discoveryService.handleTopologyChanging();
                            return;
                        }
                    }
                    // SLING-5195: guarantee frequent calls to checkForTopologyChange,
                    // independently of blocked write/save operations
                    logger.debug("checkForTopologyChange/.run: going to check for topology change...");
                    discoveryService.checkForTopologyChange();
                    logger.debug("checkForTopologyChange/.run: check for topology change done.");
                }
                
            },
                    null, interval, false);
        } catch (Exception e) {
            logger.error("activate: Could not start heartbeat runner: " + e, e);
        }
    }

    /** Get or create a ResourceResolver **/
    private ResourceResolver getResourceResolver() throws LoginException {
        if (resourceResolverFactory == null) {
            logger.error("getResourceResolver: resourceResolverFactory is null!");
            return null;
        }
        return resourceResolverFactory.getAdministrativeResourceResolver(null);
    }

    /** Calcualte the local cluster instance path **/
    private String getLocalClusterNodePath() {
        return config.getClusterInstancesPath() + "/" + slingId;
    }

    /**
     * Hook that will cause a reset of the leaderElectionId 
     * on next invocation of issueClusterLocalHeartbeat.
     * @return true if the leaderElectionId was reset - false if that was not
     * necessary as that happened earlier already and it has not propagated
     * yet to the ./clusterInstances in the meantime
     */
    public boolean resetLeaderElectionId() {
        if (resetLeaderElectionId) {
            // then we already have a reset pending
            // resetting twice doesn't work
            return false;
        }
        resetLeaderElectionId = true;
        ResourceResolver resourceResolver = null;
        try{
            resourceResolver = getResourceResolver();
            if (resourceResolver!=null) {
                newLeaderElectionId = newLeaderElectionId(resourceResolver);
                if (votingHandler!=null) {
                    logger.info("resetLeaderElectionId: set new leaderElectionId with votingHandler to: "+newLeaderElectionId);
                    votingHandler.setLeaderElectionId(newLeaderElectionId);
                } else {
                    logger.info("resetLeaderElectionId: no votingHandler, new leaderElectionId would be: "+newLeaderElectionId);
                }
            } else {
                logger.warn("resetLeaderElectionId: could not login, new leaderElectionId will be calculated upon next heartbeat only!");
            }
        } catch (LoginException e) {
            logger.error("resetLeaderElectionid: could not login: "+e, e);
        } finally {
            if (resourceResolver!=null) {
                resourceResolver.close();
            }
        }
        return true;
    }

    /**
     * Issue a heartbeat.
     * <p>
     * This action consists of first updating the local properties,
     * then issuing a cluster-local heartbeat (within the repository)
     * and then a remote heartbeat (to all the topology connectors
     * which announce this part of the topology to others)
     */
    protected void issueHeartbeat() {
        updateProperties();
        issueClusterLocalHeartbeat();
        issueConnectorPings();
    }

    protected void updateProperties() {
        if (discoveryServiceImpl == null) {
            logger.debug("updateProperties: discoveryService is null");
        } else {
            discoveryServiceImpl.updateProperties();
        }
    }
    
    /** Issue a cluster local heartbeat (into the repository) **/
    protected void issueClusterLocalHeartbeat() {
        if (logger.isDebugEnabled()) {
            logger.debug("issueClusterLocalHeartbeat: storing cluster-local heartbeat to repository for "+slingId);
        }
        ResourceResolver resourceResolver = null;
        final String myClusterNodePath = getLocalClusterNodePath();
        final Calendar currentTime = Calendar.getInstance();
        try {
            resourceResolver = getResourceResolver();
            if (resourceResolver == null) {
                logger.error("issueClusterLocalHeartbeat: no resourceresolver available!");
                return;
            }

            final Resource resource = ResourceHelper.getOrCreateResource(
                    resourceResolver, myClusterNodePath);
            final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);

            if (firstHeartbeatWritten!=-1 && lastHeartbeatWritten!=null) {
            	// SLING-2892: additional paranoia check
            	// after the first heartbeat, check if there's someone else using
            	// the same sling.id in this cluster
            	final long timeSinceFirstHeartbeat =
            			System.currentTimeMillis() - firstHeartbeatWritten;
            	if (timeSinceFirstHeartbeat > 2*config.getHeartbeatInterval()) {
            		// but wait at least 2 heartbeat intervals to handle the situation
            		// where a bundle is refreshed, and startup cases.
            		final Calendar lastHeartbeat = resourceMap.get(PROPERTY_ID_LAST_HEARTBEAT, Calendar.class);
            		if (lastHeartbeat!=null) {
            			// if there is a heartbeat value, check if it is what I've written
            			// the last time
            			if (!lastHeartbeatWritten.getTime().equals(lastHeartbeat.getTime())) {
            				// then we've likely hit the situation where there is another
            				// sling instance accessing the same repository (ie in the same cluster)
            				// using the same sling.id - hence writing to the same
            				// resource
            			    invalidateCurrentEstablishedView();
            			    discoveryServiceImpl.handleTopologyChanging();
            				logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
            						myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
            						"this likely indicates that there is more than 1 instance running in this cluster" +
            						" with the same sling.id. My sling.id is "+slingId+"." +
            						" Check for sling.id.file in your installation of all instances in this cluster " +
            						"to verify this! Duplicate sling.ids are not allowed within a cluster!");
            			}
            		}
            	}

            	// SLING-2901 : robust paranoia check: on first heartbeat write, the
            	//              'runtimeId' is set as a property (ignoring any former value).
            	//              If in subsequent calls the value of 'runtimeId' changes, then
            	//              there is someone else around with the same slingId.
            	final String readRuntimeId = resourceMap.get(PROPERTY_ID_RUNTIME, String.class);
            	if ( readRuntimeId == null ) { // SLING-3977
            	    // someone deleted the resource property
            	    firstHeartbeatWritten = -1;
            	} else if (!runtimeId.equals(readRuntimeId)) {
            	    invalidateCurrentEstablishedView();
            	    discoveryServiceImpl.handleTopologyChanging();
                    final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
                    final String endpointsAsString = getEndpointsAsString();
                    final String readEndpoints = resourceMap.get(PROPERTY_ID_ENDPOINTS, String.class);
                    final String readSlingHomePath = resourceMap.get(PROPERTY_ID_SLING_HOME_PATH, String.class);
            		logger.error("issueClusterLocalHeartbeat: SLING-2901: Detected more than 1 instance running in this cluster " +
            				" with the same sling.id. " +
            				"My sling.id: "+slingId+", my runtimeId: " + runtimeId+", my endpoints: "+endpointsAsString+", my slingHomePath: "+slingHomePath+
            				", other runtimeId: "+readRuntimeId+", other endpoints: "+readEndpoints+", other slingHomePath:"+readSlingHomePath+
    						" Check for sling.id.file in your installation of all instances in this cluster " +
    						"to verify this! Duplicate sling.ids are not allowed within a cluster!");
            		logger.error("issueClusterLocalHeartbeat: sending TOPOLOGY_CHANGING before self-disabling.");
            		discoveryServiceImpl.forcedShutdown();
            		logger.error("issueClusterLocalHeartbeat: disabling discovery.impl");
            		activated = false;
            		if (context!=null) {
            			// disable all components
            			try {
							context.getBundleContext().getBundle().stop();
						} catch (BundleException e) {
							logger.warn("issueClusterLocalHeartbeat: could not stop bundle: "+e, e);
							// then disable all compnoents instead
							context.disableComponent(null);
						}
            		}
            		return;
            	}
            }
            resourceMap.put(PROPERTY_ID_LAST_HEARTBEAT, currentTime);
            if (firstHeartbeatWritten==-1) {
            	resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
            	// SLING-4765 : store more infos to be able to be more verbose on duplicate slingId/ghost detection
            	final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
                resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
            	final String endpointsAsString = getEndpointsAsString();
                resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
            	logger.info("issueClusterLocalHeartbeat: storing my runtimeId: {}, endpoints: {} and sling home path: {}", 
            	        new Object[]{runtimeId, endpointsAsString, slingHomePath});
            }
            if (resetLeaderElectionId || !resourceMap.containsKey("leaderElectionId")) {
                // the new leaderElectionId might have been 'pre set' in the field 'newLeaderElectionId'
                // if that's the case, use that one, otherwise calculate a new one now
                final String newLeaderElectionId = this.newLeaderElectionId!=null ? this.newLeaderElectionId : newLeaderElectionId(resourceResolver);
                this.newLeaderElectionId = null;
                resourceMap.put("leaderElectionId", newLeaderElectionId);
                resourceMap.put("leaderElectionIdCreatedAt", new Date());
                logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId+" (resetLeaderElectionId: "+resetLeaderElectionId+")");
                if (votingHandler!=null) {
                    votingHandler.setLeaderElectionId(newLeaderElectionId);
                }
                resetLeaderElectionId = false;
            }
            resourceResolver.commit();

            // SLING-2892: only in success case: remember the last heartbeat value written
            lastHeartbeatWritten = currentTime;
            // and set the first heartbeat written value - if it is not already set
            if (firstHeartbeatWritten==-1) {
            	firstHeartbeatWritten = System.currentTimeMillis();
            }

        } catch (LoginException e) {
            logger.error("issueHeartbeat: could not log in administratively: "
                    + e, e);
        } catch (PersistenceException e) {
            logger.error("issueHeartbeat: Got a PersistenceException: "
                    + myClusterNodePath + " " + e, e);
        } finally {
            if (resourceResolver != null) {
                resourceResolver.close();
            }
        }
    }

    /**
     * Calculate a new leaderElectionId based on the current config and system time
     */
    private String newLeaderElectionId(ResourceResolver resourceResolver) {
        int maxLongLength = String.valueOf(Long.MAX_VALUE).length();
        String currentTimeMillisStr = String.format("%0"
                + maxLongLength + "d", System.currentTimeMillis());

        final boolean shouldInvertRepositoryDescriptor = config.shouldInvertRepositoryDescriptor();
        String prefix = (shouldInvertRepositoryDescriptor ? "1" : "0");

        String leaderElectionRepositoryDescriptor = config.getLeaderElectionRepositoryDescriptor();
        if (leaderElectionRepositoryDescriptor!=null && leaderElectionRepositoryDescriptor.length()!=0) {
            // when this property is configured, check the value of the repository descriptor
            // and if that value is set, include it in the leader election id

            final Session session = resourceResolver.adaptTo(Session.class);
            if ( session != null ) {
                String value = session.getRepository()
                        .getDescriptor(leaderElectionRepositoryDescriptor);
                if (value != null) {
                    if (value.equalsIgnoreCase("true")) {
                        if (!shouldInvertRepositoryDescriptor) {
                            prefix = "1";
                        } else {
                            prefix = "0";
                        }
                    }
                }
            }
        }
        final String newLeaderElectionId = prefix + "_"
                + currentTimeMillisStr + "_" + slingId;
        return newLeaderElectionId;
    }

    /** Check whether the established view matches the reality, ie matches the
     * heartbeats
     */
    protected void doCheckView() {
        super.doCheckView();

        ResourceResolver resourceResolver = null;
        try {
            resourceResolver = getResourceResolver();
            doCheckViewWith(resourceResolver);
        } catch (LoginException e) {
            logger.error("checkView: could not log in administratively: " + e,
                    e);
        } catch (PersistenceException e) {
            logger.error(
                    "checkView: encountered a persistence exception during view check: "
                            + e, e);
        } finally {
            if (resourceResolver != null) {
                resourceResolver.close();
            }
        }
    }

    /** do the established-against-heartbeat view check using the given resourceResolver.
     */
    private void doCheckViewWith(final ResourceResolver resourceResolver) throws PersistenceException {

        if (votingHandler==null) {
            logger.info("doCheckViewWith: votingHandler is null! slingId="+slingId);
        } else {
            votingHandler.analyzeVotings(resourceResolver);
            try{
                votingHandler.cleanupTimedoutVotings(resourceResolver);
            } catch(Exception e) {
                logger.warn("doCheckViewWith: Exception occurred while cleaning up votings: "+e, e);
            }
        }

        final View establishedView = ViewHelper.getEstablishedView(resourceResolver, config);
        lastEstablishedViewId = establishedView == null ? null : establishedView.getResource().getName();

        final VotingView winningVoting = VotingHelper.getWinningVoting(
                resourceResolver, config);
        int numOpenNonWinningVotes = VotingHelper.listOpenNonWinningVotings(
                resourceResolver, config).size();
        if (winningVoting != null || (numOpenNonWinningVotes > 0)) {
            // then there are votings pending and I shall wait for them to
            // settle
            
            // but first: make sure we sent the TOPOLOGY_CHANGING
            logger.info("doCheckViewWith: there are pending votings, marking topology as changing...");
            invalidateCurrentEstablishedView();
            discoveryServiceImpl.handleTopologyChanging();
            
        	if (logger.isDebugEnabled()) {
	            logger.debug("doCheckViewWith: "
	                    + numOpenNonWinningVotes
	                    + " ongoing votings, no one winning yet - I shall wait for them to settle.");
        	}
            return;
        }

        final Resource clusterNodesRes = ResourceHelper.getOrCreateResource(
                resourceResolver, config.getClusterInstancesPath());
        final Set<String> liveInstances = ViewHelper.determineLiveInstances(
                clusterNodesRes, config);

        boolean establishedViewMatches;
        if (lastEstablishedViewId != null && failedEstablishedViewId != null
                && lastEstablishedViewId.equals(failedEstablishedViewId)) {
            // SLING-5195 : heartbeat-self-check caused this establishedViewId
            // to be declared as failed - so we must now cause a new voting
            logger.info("doCheckView: current establishedViewId ({}) was declared as failed earlier already.", lastEstablishedViewId);
            establishedViewMatches = false;
        } else {
            if (establishedView == null) {
                establishedViewMatches = false;
            } else {
                String mismatchDetails;
                try{
                    mismatchDetails = establishedView.matches(liveInstances);
                } catch(Exception e) {
                    logger.error("doCheckViewWith: could not compare established view with live ones: "+e, e);
                    invalidateCurrentEstablishedView();
                    discoveryServiceImpl.handleTopologyChanging();
                    return;
                }
                if (mismatchDetails != null) {
                    logger.info("doCheckView: established view does not match. (details: " + mismatchDetails + ")");
                } else {
                    logger.debug("doCheckView: established view matches with expected.");
                }
                establishedViewMatches = mismatchDetails == null;
            }
        }
        
        if (establishedViewMatches) {
            // that's the normal case. the established view matches what we're
            // seeing.
            // all happy and fine
            logger.debug("doCheckViewWith: no pending nor winning votes. view is fine. we're all happy.");
            return;
        }
        
        // immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
        logger.info("doCheckViewWith: no matching established view, marking topology as changing");
        invalidateCurrentEstablishedView();
        discoveryServiceImpl.handleTopologyChanging();
        
        List<VotingView> myYesVotes = VotingHelper.getYesVotingsOf(resourceResolver, config, slingId);
        if (myYesVotes != null && myYesVotes.size() > 0) {
            logger.info("doCheckViewWith: I have voted yes (" + myYesVotes.size() + "x)- the vote was not yet promoted but expecting it to be soon. Not voting again in the meantime. My yes vote was for: "+myYesVotes);
            return;
        }
        
    	if (logger.isDebugEnabled()) {
	        logger.debug("doCheckViewWith: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
	        Iterator<String> it = liveInstances.iterator();
	        while (it.hasNext()) {
	            logger.debug("doCheckViewWith: one of the live instances is: "
	                    + it.next());
	        }
    	}

        // we seem to be the first to realize that the currently established
        // view doesnt match
        // the currently live instances.

        // initiate a new voting
        doStartNewVoting(resourceResolver, liveInstances);
    }

    private void doStartNewVoting(final ResourceResolver resourceResolver,
            final Set<String> liveInstances) throws PersistenceException {
        String votingId = nextVotingId;
        nextVotingId = UUID.randomUUID().toString();

        VotingView.newVoting(resourceResolver, config, votingId, slingId, liveInstances);
    }
    
    /**
     * Mark the current establishedView as invalid - requiring it to be
     * replaced with a new one, be it by another instance or this one,
     * via a new vote
     */
    public void invalidateCurrentEstablishedView() {
        if (lastEstablishedViewId == null) {
            logger.info("invalidateCurrentEstablishedView: cannot invalidate, lastEstablishedViewId==null");
            return;
        }
        logger.info("invalidateCurrentEstablishedView: invalidating slingId=" + slingId
                + ", lastEstablishedViewId=" + lastEstablishedViewId);
        failedEstablishedViewId = lastEstablishedViewId;
        discoveryServiceImpl.getClusterViewServiceImpl().invalidateEstablishedViewId(lastEstablishedViewId);
    }
    
    /**
     * Management function to trigger the otherwise algorithm-dependent
     * start of a new voting.
     * This can make sense when explicitly trying to force a leader
     * change (which is otherwise not allowed by the discovery API)
     */
    public void startNewVoting() {
        logger.info("startNewVoting: explicitly starting new voting...");
        ResourceResolver resourceResolver = null;
        try {
            resourceResolver = getResourceResolver();
            final Resource clusterNodesRes = ResourceHelper.getOrCreateResource(
                    resourceResolver, config.getClusterInstancesPath());
            final Set<String> liveInstances = ViewHelper.determineLiveInstances(
                    clusterNodesRes, config);
            doStartNewVoting(resourceResolver, liveInstances);
            logger.info("startNewVoting: explicit new voting was started.");
        } catch (LoginException e) {
            logger.error("startNewVoting: could not log in administratively: " + e,
                    e);
        } catch (PersistenceException e) {
            logger.error(
                    "startNewVoting: encountered a persistence exception during view check: "
                            + e, e);
        } finally {
            if (resourceResolver != null) {
                resourceResolver.close();
            }
        }
    }
    
}
