blob: b0ed8438f929706e20d1c6575ee75b82b068c66d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.oak.pinger;
import java.util.Calendar;
import java.util.UUID;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.base.commons.BaseViewChecker;
import org.apache.sling.discovery.base.commons.PeriodicBackgroundJob;
import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.oak.Config;
import org.apache.sling.discovery.oak.OakDiscoveryService;
import org.apache.sling.launchpad.api.StartupListener;
import org.apache.sling.launchpad.api.StartupMode;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.service.http.HttpService;
/**
* The OakViewChecker is taking care of checking the oak discovery-lite
* descriptor when checking the local cluster view and passing that
* on to the ViewStateManager which will then detect whether there was
* any change or not. Unlike discovery.impl's HeartbeatHandler this one
* does not store any heartbeats in the repository anymore.
* <p>
* Remote heartbeats are POSTs to remote TopologyConnectorServlets using
* discovery.base
*/
@Component
@Service(value = { OakViewChecker.class, StartupListener.class })
@Reference(referenceInterface=HttpService.class,
cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC)
public class OakViewChecker extends BaseViewChecker {
@Reference
protected SlingSettingsService slingSettingsService;
@Reference
protected ResourceResolverFactory resourceResolverFactory;
@Reference
protected ConnectorRegistry connectorRegistry;
@Reference
protected AnnouncementRegistry announcementRegistry;
@Reference
protected Scheduler scheduler;
@Reference
private Config config;
private OakDiscoveryService discoveryService;
protected PeriodicBackgroundJob periodicCheckViewJob;
/** for testing only **/
public static OakViewChecker testConstructor(
SlingSettingsService slingSettingsService,
ResourceResolverFactory resourceResolverFactory,
ConnectorRegistry connectorRegistry,
AnnouncementRegistry announcementRegistry,
Scheduler scheduler,
Config config) {
OakViewChecker pinger = new OakViewChecker();
pinger.slingSettingsService = slingSettingsService;
pinger.resourceResolverFactory = resourceResolverFactory;
pinger.connectorRegistry = connectorRegistry;
pinger.announcementRegistry = announcementRegistry;
pinger.scheduler = scheduler;
pinger.config = config;
return pinger;
}
@Override
protected AnnouncementRegistry getAnnouncementRegistry() {
return announcementRegistry;
}
@Override
protected BaseConfig getConnectorConfig() {
return config;
}
@Override
protected ConnectorRegistry getConnectorRegistry() {
return connectorRegistry;
}
@Override
protected ResourceResolverFactory getResourceResolverFactory() {
return resourceResolverFactory;
}
@Override
protected Scheduler getScheduler() {
return scheduler;
}
@Override
protected SlingSettingsService getSlingSettingsService() {
return slingSettingsService;
}
@Override
protected void doActivate() {
// on activate the resetLeaderElectionId is set to true to ensure that
// the 'leaderElectionId' property is reset on next heartbeat issuance.
// the idea being that a node which leaves the cluster should not
// become leader on next join - and by resetting the leaderElectionId
// to the current time, this is ensured.
runtimeId = UUID.randomUUID().toString();
logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
resetLeaderElectionId();
}
@Override
protected void deactivate() {
super.deactivate();
if (periodicCheckViewJob != null) {
periodicCheckViewJob.stop();
periodicCheckViewJob = null;
}
}
@Override
public void startupFinished(StartupMode mode) {
super.startupFinished(mode);
synchronized(lock) {
if (activated) {
// only reset if activated
resetLeaderElectionId();
}
}
}
/**
* The initialize method is called by the OakDiscoveryService.activate
* as we require the discoveryService (and the discoveryService has
* a reference on us - but we cant have circular references in osgi).
*/
public void initialize(final OakDiscoveryService discoveryService) {
logger.info("initialize: initializing.");
synchronized(lock) {
this.discoveryService = discoveryService;
issueHeartbeat();
}
// start the (less frequent) periodic job that does the
// connector pings and checks the connector/topology view
try {
final long interval = config.getConnectorPingInterval();
logger.info("initialize: starting periodic connectorPing job for "+slingId+" with interval "+interval+" sec.");
periodicPingJob = new PeriodicBackgroundJob(interval, NAME+".connectorPinger", this);
} catch (Exception e) {
logger.error("activate: Could not start heartbeat runner: " + e, e);
}
// start the (more frequent) periodic job that checks
// the discoveryLite descriptor - that can be more frequent
// since it is only reading an oak repository descriptor
// which is designed to be read very frequently (it caches
// the value and only updates it on change, so reading is very cheap)
// and because doing this more frequently means that the
// reaction time is faster
try{
final long interval = config.getDiscoveryLiteCheckInterval();
logger.info("initialize: starting periodic discoveryLiteCheck job for "+slingId+" with interval "+interval+" sec.");
periodicCheckViewJob = new PeriodicBackgroundJob(interval, NAME+".discoveryLiteCheck", new Runnable() {
@Override
public void run() {
discoveryLiteCheck();
}
});
} catch (Exception e) {
logger.error("activate: Could not start heartbeat runner: " + e, e);
}
}
private void discoveryLiteCheck() {
logger.debug("discoveryLiteCheck: start. [for slingId="+slingId+"]");
synchronized(lock) {
if (!activated) {
// SLING:2895: avoid checks if not activated
logger.debug("discoveryLiteCheck: not activated yet");
return;
}
// check the view
// discovery.oak relies on oak's discovery-lite descriptor
// to be updated independently in case of cluster view change.
// all that we can therefore do here is assume something
// might have changed and let discoveryService/viewStateManager
// filter out the 99.99% of unchanged cases.
discoveryService.checkForTopologyChange();
}
logger.debug("discoveryLiteCheck: end. [for slingId="+slingId+"]");
}
/** Get or create a ResourceResolver **/
private ResourceResolver getResourceResolver() throws LoginException {
if (resourceResolverFactory == null) {
logger.error("getResourceResolver: resourceResolverFactory is null!");
return null;
}
return resourceResolverFactory.getAdministrativeResourceResolver(null);
}
/** Calcualte the local cluster instance path **/
private String getLocalClusterNodePath() {
return config.getClusterInstancesPath() + "/" + slingId;
}
/**
* Hook that will cause a reset of the leaderElectionId
* on next invocation of issueClusterLocalHeartbeat.
* @return true if the leaderElectionId was reset - false if that was not
* necessary as that happened earlier already and it has not propagated
* yet to the ./clusterInstances in the meantime
*/
public boolean resetLeaderElectionId() {
ResourceResolver resourceResolver = null;
try{
final String myClusterNodePath = getLocalClusterNodePath();
resourceResolver = getResourceResolver();
if (resourceResolver==null) {
logger.warn("resetLeaderElectionId: could not login, new leaderElectionId will be calculated upon next heartbeat only!");
return false;
}
String newLeaderElectionId = newLeaderElectionId();
final Resource resource = ResourceHelper.getOrCreateResource(
resourceResolver, myClusterNodePath);
final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
// SLING-4765 : store more infos to be able to be more verbose on duplicate slingId/ghost detection
final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
final String endpointsAsString = getEndpointsAsString();
resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
Calendar leaderElectionCreatedAt = Calendar.getInstance();
resourceMap.put("leaderElectionId", newLeaderElectionId);
resourceMap.put("leaderElectionIdCreatedAt", leaderElectionCreatedAt);
logger.info("resetLeaderElectionId: storing my runtimeId: {}, endpoints: {}, sling home path: {}, new leaderElectionId: {}, created at: {}",
new Object[]{runtimeId, endpointsAsString, slingHomePath, newLeaderElectionId, leaderElectionCreatedAt});
resourceResolver.commit();
} catch (LoginException e) {
logger.error("resetLeaderElectionid: could not login: "+e, e);
} catch (PersistenceException e) {
logger.error("resetLeaderElectionid: got PersistenceException: "+e, e);
} finally {
if (resourceResolver!=null) {
resourceResolver.close();
}
}
return true;
}
/**
* Calculate a new leaderElectionId based on the current config and system time
*/
private String newLeaderElectionId() {
int maxLongLength = String.valueOf(Long.MAX_VALUE).length();
String currentTimeMillisStr = String.format("%0"
+ maxLongLength + "d", System.currentTimeMillis());
String prefix = "1";
final String newLeaderElectionId = prefix + "_"
+ currentTimeMillisStr + "_" + slingId;
return newLeaderElectionId;
}
@Override
protected void doCheckView() {
super.doCheckView();
// discovery.oak relies on oak's discovery-lite descriptor
// to be updated independently in case of cluster view change.
// all that we can therefore do here is assume something
// might have changed and let discoveryService/viewStateManager
// filter out the 99.99% of unchanged cases.
discoveryService.checkForTopologyChange();
}
protected void updateProperties() {
if (discoveryService == null) {
logger.error("issueHeartbeat: discoveryService is null");
} else {
discoveryService.updateProperties();
}
}}