blob: 0b4093f3d8d03270166f26f61d98099340fb76bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.oak;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.DiscoveryService;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.PropertyProvider;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.base.commons.BaseDiscoveryService;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.DefaultTopologyView;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
import org.apache.sling.discovery.commons.providers.DefaultClusterView;
import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
import org.apache.sling.discovery.commons.providers.ViewStateManager;
import org.apache.sling.discovery.commons.providers.base.ViewStateManagerFactory;
import org.apache.sling.discovery.commons.providers.spi.ClusterSyncService;
import org.apache.sling.discovery.commons.providers.spi.base.ClusterSyncHistory;
import org.apache.sling.discovery.commons.providers.spi.base.ClusterSyncServiceChain;
import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
import org.apache.sling.discovery.commons.providers.spi.base.OakBacklogClusterSyncService;
import org.apache.sling.discovery.commons.providers.spi.base.SyncTokenService;
import org.apache.sling.discovery.commons.providers.util.PropertyNameHelper;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.oak.pinger.OakViewChecker;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This implementation of the cross-cluster service uses the view manager
* implementation for detecting changes in a cluster and only supports one
* cluster (of which this instance is part of).
*/
@Component(immediate = true)
@Service(value = { DiscoveryService.class, OakDiscoveryService.class })
public class OakDiscoveryService extends BaseDiscoveryService {
private final static Logger logger = LoggerFactory.getLogger(OakDiscoveryService.class);
@Reference
private SlingSettingsService settingsService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = TopologyEventListener.class)
private TopologyEventListener[] eventListeners = new TopologyEventListener[0];
/**
* All property providers.
*/
@Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = PropertyProvider.class, updated = "updatedPropertyProvider")
private List<ProviderInfo> providerInfos = new ArrayList<ProviderInfo>();
/** lock object used for synching bind/unbind and topology event sending **/
private final Object lock = new Object();
/**
* whether or not this service is activated - necessary to avoid sending
* events to discovery awares before activate is done
**/
private volatile boolean activated = false;
@Reference
private ResourceResolverFactory resourceResolverFactory;
@Reference
private Scheduler scheduler;
@Reference
private OakViewChecker oakViewChecker;
@Reference
private AnnouncementRegistry announcementRegistry;
@Reference
private ConnectorRegistry connectorRegistry;
@Reference
private ClusterViewService clusterViewService;
@Reference
private Config config;
@Reference
private IdMapService idMapService;
@Reference
private OakBacklogClusterSyncService oakBacklogClusterSyncService;
@Reference
private SyncTokenService syncTokenService;
/** the slingId of the local instance **/
private String slingId;
private ServiceRegistration mbeanRegistration;
private ViewStateManager viewStateManager;
private final ReentrantLock viewStateManagerLock = new ReentrantLock();
private final List<TopologyEventListener> pendingListeners = new LinkedList<TopologyEventListener>();
private TopologyEventListener changePropagationListener = new TopologyEventListener() {
@Override
public void handleTopologyEvent(TopologyEvent event) {
OakViewChecker checker = oakViewChecker;
if (activated && checker != null
&& (event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.PROPERTIES_CHANGED)) {
logger.info("changePropagationListener.handleTopologyEvent: topology changed - propagate through connectors");
checker.triggerAsyncConnectorPing();
}
}
};
public static OakDiscoveryService testConstructor(SlingSettingsService settingsService,
AnnouncementRegistry announcementRegistry,
ConnectorRegistry connectorRegistry,
ClusterViewService clusterViewService,
Config config,
OakViewChecker connectorPinger,
Scheduler scheduler,
IdMapService idMapService,
OakBacklogClusterSyncService oakBacklogClusterSyncService,
SyncTokenService syncTokenService,
ResourceResolverFactory factory) {
OakDiscoveryService discoService = new OakDiscoveryService();
discoService.settingsService = settingsService;
discoService.announcementRegistry = announcementRegistry;
discoService.connectorRegistry = connectorRegistry;
discoService.clusterViewService = clusterViewService;
discoService.config = config;
discoService.oakViewChecker = connectorPinger;
discoService.scheduler = scheduler;
discoService.idMapService = idMapService;
discoService.oakBacklogClusterSyncService = oakBacklogClusterSyncService;
discoService.syncTokenService = syncTokenService;
discoService.resourceResolverFactory = factory;
return discoService;
}
@Override
protected void handleIsolatedFromTopology() {
if (oakViewChecker!=null) {
// SLING-5030 part 2: when we detect being isolated we should
// step at the end of the leader-election queue and
// that can be achieved by resetting the leaderElectionId
// (which will in turn take effect on the next round of
// voting, or also double-checked when the local instance votes)
//
//TODO:
// Note that when the local instance doesn't notice
// an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId
// will not be reset. Which means that it then could potentially
// regain leadership.
if (oakViewChecker.resetLeaderElectionId()) {
logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)");
}
}
}
/**
* Activate this service
*/
@Activate
protected void activate(final BundleContext bundleContext) {
logger.debug("OakDiscoveryService activating...");
if (settingsService == null) {
throw new IllegalStateException("settingsService not found");
}
if (oakViewChecker == null) {
throw new IllegalStateException("heartbeatHandler not found");
}
slingId = settingsService.getSlingId();
ClusterSyncService consistencyService;
if (config.getSyncTokenEnabled()) {
//TODO: ConsistencyHistory is implemented a little bit hacky ..
ClusterSyncHistory consistencyHistory = new ClusterSyncHistory();
oakBacklogClusterSyncService.setConsistencyHistory(consistencyHistory);
syncTokenService.setConsistencyHistory(consistencyHistory);
consistencyService = new ClusterSyncServiceChain(oakBacklogClusterSyncService, syncTokenService);
} else {
consistencyService = oakBacklogClusterSyncService;
}
viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, consistencyService);
if (config.getMinEventDelay()>0) {
viewStateManager.installMinEventDelayHandler(this, scheduler, config.getMinEventDelay());
}
final String isolatedClusterId = UUID.randomUUID().toString();
{
// create a pre-voting/isolated topologyView which would be used
// until the first voting has finished.
// this way for the single-instance case the clusterId can
// remain the same between a getTopology() that is invoked before
// the first TOPOLOGY_INIT and afterwards
DefaultClusterView isolatedCluster = new DefaultClusterView(isolatedClusterId);
Map<String, String> emptyProperties = new HashMap<String, String>();
DefaultInstanceDescription isolatedInstance =
new DefaultInstanceDescription(isolatedCluster, true, true, slingId, emptyProperties);
Collection<InstanceDescription> col = new ArrayList<InstanceDescription>();
col.add(isolatedInstance);
final DefaultTopologyView topology = new DefaultTopologyView();
topology.addInstances(col);
topology.setNotCurrent();
setOldView(topology);
}
setOldView((DefaultTopologyView) getTopology());
getOldView().setNotCurrent();
// make sure the first heartbeat is issued as soon as possible - which
// is right after this service starts. since the two (discoveryservice
// and heartbeatHandler need to know each other, the discoveryservice
// is passed on to the heartbeatHandler in this initialize call).
oakViewChecker.initialize(this);
viewStateManagerLock.lock();
try{
viewStateManager.handleActivated();
doUpdateProperties();
DefaultTopologyView newView = (DefaultTopologyView) getTopology();
if (newView.isCurrent()) {
viewStateManager.handleNewView(newView);
} else {
// SLING-3750: just issue a log.info about the delaying
logger.info("activate: this instance is in isolated mode and must yet finish voting before it can send out TOPOLOGY_INIT.");
}
activated = true;
setOldView(newView);
// in case bind got called before activate we now have pending listeners,
// bind them to the viewstatemanager too
for (TopologyEventListener listener : pendingListeners) {
viewStateManager.bind(listener);
}
pendingListeners.clear();
viewStateManager.bind(changePropagationListener);
} finally {
if (viewStateManagerLock!=null) {
viewStateManagerLock.unlock();
}
}
URL[] topologyConnectorURLs = config.getTopologyConnectorURLs();
if (topologyConnectorURLs != null) {
for (int i = 0; i < topologyConnectorURLs.length; i++) {
final URL aURL = topologyConnectorURLs[i];
if (aURL!=null) {
try{
logger.info("activate: registering outgoing topology connector to "+aURL);
connectorRegistry.registerOutgoingConnector(clusterViewService, aURL);
} catch (final Exception e) {
logger.info("activate: could not register url: "+aURL+" due to: "+e, e);
}
}
}
}
logger.debug("OakDiscoveryService activated.");
}
/**
* Deactivate this service
*/
@Deactivate
protected void deactivate() {
logger.debug("OakDiscoveryService deactivated.");
viewStateManagerLock.lock();
try{
viewStateManager.unbind(changePropagationListener);
viewStateManager.handleDeactivated();
activated = false;
} finally {
if (viewStateManagerLock!=null) {
viewStateManagerLock.unlock();
}
}
try{
if ( this.mbeanRegistration != null ) {
this.mbeanRegistration.unregister();
this.mbeanRegistration = null;
}
} catch(Exception e) {
logger.error("deactivate: Error on unregister: "+e, e);
}
}
/**
* bind a topology event listener
*/
protected void bindTopologyEventListener(final TopologyEventListener eventListener) {
viewStateManagerLock.lock();
try{
if (!activated) {
pendingListeners.add(eventListener);
} else {
viewStateManager.bind(eventListener);
}
} finally {
if (viewStateManagerLock!=null) {
viewStateManagerLock.unlock();
}
}
}
/**
* Unbind a topology event listener
*/
protected void unbindTopologyEventListener(final TopologyEventListener eventListener) {
viewStateManagerLock.lock();
try{
if (!activated) {
pendingListeners.remove(eventListener);
} else {
viewStateManager.unbind(eventListener);
}
} finally {
if (viewStateManagerLock!=null) {
viewStateManagerLock.unlock();
}
}
}
/**
* Bind a new property provider.
*/
protected void bindPropertyProvider(final PropertyProvider propertyProvider,
final Map<String, Object> props) {
logger.debug("bindPropertyProvider: Binding PropertyProvider {}",
propertyProvider);
synchronized (lock) {
this.bindPropertyProviderInteral(propertyProvider, props);
}
}
/**
* Bind a new property provider.
*/
private void bindPropertyProviderInteral(final PropertyProvider propertyProvider,
final Map<String, Object> props) {
final ProviderInfo info = new ProviderInfo(propertyProvider, props);
this.providerInfos.add(info);
Collections.sort(this.providerInfos);
if (activated) {
this.doUpdateProperties();
}
checkForTopologyChange();
}
/**
* Update a property provider.
*/
protected void updatedPropertyProvider(final PropertyProvider propertyProvider,
final Map<String, Object> props) {
logger.debug("bindPropertyProvider: Updating PropertyProvider {}",
propertyProvider);
synchronized (lock) {
this.unbindPropertyProviderInternal(propertyProvider, props, false);
this.bindPropertyProviderInteral(propertyProvider, props);
}
}
/**
* Unbind a property provider
*/
protected void unbindPropertyProvider(final PropertyProvider propertyProvider,
final Map<String, Object> props) {
logger.debug("unbindPropertyProvider: Releasing PropertyProvider {}",
propertyProvider);
synchronized (lock) {
this.unbindPropertyProviderInternal(propertyProvider, props, true);
}
}
/**
* Unbind a property provider
*/
private void unbindPropertyProviderInternal(
final PropertyProvider propertyProvider,
final Map<String, Object> props, final boolean update) {
final ProviderInfo info = new ProviderInfo(propertyProvider, props);
if ( this.providerInfos.remove(info) && update ) {
if (activated) {
this.doUpdateProperties();
}
this.checkForTopologyChange();
}
}
/**
* Update the properties by inquiring the PropertyProvider's current values.
* <p>
* This method is invoked regularly by the heartbeatHandler.
* The properties are stored in the repository under Config.getClusterInstancesPath()
* and announced in the topology.
* <p>
* @see Config#getClusterInstancesPath()
*/
private void doUpdateProperties() {
// SLING-5382 : the caller must ensure that this method
// is not invoked after deactivation or before activation.
// so this method doesn't have to do any further synchronization.
// what we do nevertheless is a paranoia way of checking if
// all variables are available and do a NOOP if that's not the case.
final ResourceResolverFactory rrf = resourceResolverFactory;
final Config c = config;
final String sid = slingId;
if (rrf == null || c == null || sid == null) {
// cannot update the properties then..
logger.debug("doUpdateProperties: too early to update the properties. "
+ "resourceResolverFactory ({}), config ({}) or slingId ({}) not yet set.",
new Object[] {rrf, c, sid});
return;
} else {
logger.debug("doUpdateProperties: updating properties now..");
}
final Map<String, String> newProps = new HashMap<String, String>();
for (final ProviderInfo info : this.providerInfos) {
info.refreshProperties();
newProps.putAll(info.properties);
}
ResourceResolver resourceResolver = null;
try {
resourceResolver = rrf
.getServiceResourceResolver(null);
Resource myInstance = ResourceHelper
.getOrCreateResource(
resourceResolver,
c.getClusterInstancesPath()
+ "/" + sid + "/properties");
// SLING-2879 - revert/refresh resourceResolver here to work
// around a potential issue with jackrabbit in a clustered environment
resourceResolver.revert();
resourceResolver.refresh();
final ModifiableValueMap myInstanceMap = myInstance.adaptTo(ModifiableValueMap.class);
final Set<String> keys = new HashSet<String>(myInstanceMap.keySet());
for(final String key : keys) {
if (newProps.containsKey(key)) {
// perfect
continue;
} else if (key.indexOf(":")!=-1) {
// ignore
continue;
} else {
// remove
myInstanceMap.remove(key);
}
}
boolean anyChanges = false;
for(final Entry<String, String> entry : newProps.entrySet()) {
Object existingValue = myInstanceMap.get(entry.getKey());
if (entry.getValue().equals(existingValue)) {
// SLING-3389: dont rewrite the properties if nothing changed!
if (logger.isDebugEnabled()) {
logger.debug("doUpdateProperties: unchanged: {}={}", entry.getKey(), entry.getValue());
}
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("doUpdateProperties: changed: {}={}", entry.getKey(), entry.getValue());
}
anyChanges = true;
myInstanceMap.put(entry.getKey(), entry.getValue());
}
if (anyChanges) {
resourceResolver.commit();
}
} catch (LoginException e) {
logger.error(
"handleEvent: could not log in administratively: " + e, e);
throw new RuntimeException("Could not log in to repository (" + e
+ ")", e);
} catch (PersistenceException e) {
logger.error("handleEvent: got a PersistenceException: " + e, e);
throw new RuntimeException(
"Exception while talking to repository (" + e + ")", e);
} finally {
if (resourceResolver != null) {
resourceResolver.close();
}
}
logger.debug("doUpdateProperties: updating properties done.");
}
/**
* Update the properties and sent a topology event if applicable
*/
public void updateProperties() {
synchronized (lock) {
if (!activated) {
logger.debug("updateProperties: not yet activated, not calling doUpdateProperties this time.");
} else {
logger.debug("updateProperties: calling doUpdateProperties.");
doUpdateProperties();
}
logger.debug("updateProperties: calling handlePotentialTopologyChange.");
checkForTopologyChange();
logger.debug("updateProperties: done.");
}
}
/**
* Internal class caching some provider infos like service id and ranking.
*/
private final static class ProviderInfo implements Comparable<ProviderInfo> {
public final PropertyProvider provider;
public final Object propertyProperties;
public final int ranking;
public final long serviceId;
public final Map<String, String> properties = new HashMap<String, String>();
public ProviderInfo(final PropertyProvider provider,
final Map<String, Object> serviceProps) {
this.provider = provider;
this.propertyProperties = serviceProps.get(PropertyProvider.PROPERTY_PROPERTIES);
final Object sr = serviceProps.get(Constants.SERVICE_RANKING);
if (sr == null || !(sr instanceof Integer)) {
this.ranking = 0;
} else {
this.ranking = (Integer) sr;
}
this.serviceId = (Long) serviceProps.get(Constants.SERVICE_ID);
refreshProperties();
}
public void refreshProperties() {
properties.clear();
if (this.propertyProperties instanceof String) {
final String val = provider.getProperty((String) this.propertyProperties);
if (val != null) {
putPropertyIfValid((String) this.propertyProperties, val);
}
} else if (this.propertyProperties instanceof String[]) {
for (final String name : (String[]) this.propertyProperties) {
final String val = provider.getProperty(name);
if (val != null) {
putPropertyIfValid(name, val);
}
}
}
}
/** SLING-2883 : put property only if valid **/
private void putPropertyIfValid(final String name, final String val) {
if (PropertyNameHelper.isValidPropertyName(name)) {
this.properties.put(name, val);
}
}
/**
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(final ProviderInfo o) {
// Sort by rank in ascending order.
if (this.ranking < o.ranking) {
return -1; // lower rank
} else if (this.ranking > o.ranking) {
return 1; // higher rank
}
// If ranks are equal, then sort by service id in descending order.
return (this.serviceId < o.serviceId) ? 1 : -1;
}
@Override
public boolean equals(final Object obj) {
if (obj instanceof ProviderInfo) {
return ((ProviderInfo) obj).serviceId == this.serviceId;
}
return false;
}
@Override
public int hashCode() {
return provider.hashCode();
}
}
/**
* Check the current topology for any potential change
*/
public void checkForTopologyChange() {
viewStateManagerLock.lock();
try{
if (!activated) {
logger.debug("checkForTopologyChange: not yet activated, ignoring");
return;
}
DefaultTopologyView t = (DefaultTopologyView) getTopology();
if (t.isCurrent()) {
// if we have a valid view, let the viewStateManager do the
// comparison and sending of an event, if necessary
viewStateManager.handleNewView(t);
setOldView(t);
} else {
// if we don't have a view, then we might have to send
// a CHANGING event, let that be decided by the viewStateManager as well
viewStateManager.handleChanging();
}
} finally {
if (viewStateManagerLock!=null) {
viewStateManagerLock.unlock();
}
}
}
/**
* Handle the fact that the topology has started to change - inform the listeners asap
*/
public void handleTopologyChanging() {
logger.debug("handleTopologyChanging: invoking viewStateManager.handlechanging");
viewStateManager.handleChanging();
}
@Override
protected ClusterViewService getClusterViewService() {
return clusterViewService;
}
@Override
protected AnnouncementRegistry getAnnouncementRegistry() {
return announcementRegistry;
}
/** for testing only
* @return */
public ViewStateManager getViewStateManager() {
return viewStateManager;
}
}