blob: 48cd976292a0163dccc8a6eb0ebcb3cf51c4006b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.oak.cluster;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException.Reason;
import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptor;
import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.oak.Config;
import org.apache.sling.settings.SlingSettingsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Oak-based implementation of the ClusterViewService interface.
*/
@Component
@Service(value = ClusterViewService.class)
public class OakClusterViewService implements ClusterViewService {
private static final String PROPERTY_CLUSTER_ID = "clusterId";
private static final String PROPERTY_CLUSTER_ID_DEFINED_AT = "clusterIdDefinedAt";
private static final String PROPERTY_CLUSTER_ID_DEFINED_BY = "clusterIdDefinedBy";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private SlingSettingsService settingsService;
@Reference
private ResourceResolverFactory resourceResolverFactory;
@Reference
private Config config;
@Reference
private IdMapService idMapService;
/** the last sequence number read from the oak discovery-lite descriptor **/
private long lastSeqNum = -1;
public static OakClusterViewService testConstructor(SlingSettingsService settingsService,
ResourceResolverFactory resourceResolverFactory,
IdMapService idMapService,
Config config) {
OakClusterViewService service = new OakClusterViewService();
service.settingsService = settingsService;
service.resourceResolverFactory = resourceResolverFactory;
service.config = config;
service.idMapService = idMapService;
return service;
}
@Override
public String getSlingId() {
if (settingsService==null) {
return null;
}
return settingsService.getSlingId();
}
protected ResourceResolver getResourceResolver() throws LoginException {
return resourceResolverFactory.getServiceResourceResolver(null);
}
@Override
public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException {
logger.trace("getLocalClusterView: start");
ResourceResolver resourceResolver = null;
try{
resourceResolver = getResourceResolver();
DiscoveryLiteDescriptor descriptor =
DiscoveryLiteDescriptor.getDescriptorFrom(resourceResolver);
if (lastSeqNum!=descriptor.getSeqNum()) {
logger.info("getLocalClusterView: sequence number change detected - clearing idmap cache");
idMapService.clearCache();
lastSeqNum = descriptor.getSeqNum();
}
return asClusterView(descriptor, resourceResolver);
} catch (UndefinedClusterViewException e) {
logger.info("getLocalClusterView: undefined clusterView: "+e.getReason()+" - "+e.getMessage());
throw e;
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("No Descriptor value available")) {
logger.warn("getLocalClusterView: repository exception: "+e);
} else {
logger.error("getLocalClusterView: repository exception: "+e, e);
}
throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION, "Exception while processing descriptor: "+e);
} finally {
logger.trace("getLocalClusterView: end");
if (resourceResolver!=null) {
resourceResolver.close();
}
}
}
private LocalClusterView asClusterView(DiscoveryLiteDescriptor descriptor, ResourceResolver resourceResolver) throws Exception {
if (descriptor == null) {
throw new IllegalArgumentException("descriptor must not be null");
}
if (resourceResolver==null) {
throw new IllegalArgumentException("resourceResolver must not be null");
}
logger.trace("asClusterView: start");
String clusterViewId = descriptor.getViewId();
if (clusterViewId == null || clusterViewId.length() == 0) {
logger.trace("asClusterView: no clusterId provided by discovery-lite descriptor - reading from repo.");
clusterViewId = readOrDefineClusterId(resourceResolver);
}
String localClusterSyncTokenId = /*descriptor.getViewId()+"_"+*/String.valueOf(descriptor.getSeqNum());
if (!descriptor.isFinal()) {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "descriptor is not yet final: "+descriptor);
}
LocalClusterView cluster = new LocalClusterView(clusterViewId, localClusterSyncTokenId);
long me = descriptor.getMyId();
int[] activeIds = descriptor.getActiveIds();
if (activeIds==null || activeIds.length==0) {
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "Descriptor contained no active ids: "+descriptor.getDescriptorStr());
}
// convert int[] to List<Integer>
//TODO: could use Guava's Ints class here..
List<Integer> activeIdsList = new LinkedList<Integer>();
for (Integer integer : activeIds) {
activeIdsList.add(integer);
}
// step 1: sort activeIds by their leaderElectionId
// serves two purposes: pos[0] is then leader
// and the rest are properly sorted within the cluster
final Map<Integer, String> leaderElectionIds = new HashMap<Integer, String>();
for (Integer id : activeIdsList) {
String slingId = idMapService.toSlingId(id, resourceResolver);
if (slingId == null) {
idMapService.clearCache();
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
"no slingId mapped for clusterNodeId="+id);
}
String leaderElectionId = getLeaderElectionId(resourceResolver,
slingId);
leaderElectionIds.put(id, leaderElectionId);
}
Collections.sort(activeIdsList, new Comparator<Integer>() {
@Override
public int compare(Integer arg0, Integer arg1) {
return leaderElectionIds.get(arg0)
.compareTo(leaderElectionIds.get(arg1));
}
});
for(int i=0; i<activeIdsList.size(); i++) {
int id = activeIdsList.get(i);
boolean isLeader = i==0; // thx to sorting above [0] is leader indeed
boolean isOwn = id==me;
String slingId = idMapService.toSlingId(id, resourceResolver);
if (slingId==null) {
idMapService.clearCache();
logger.info("asClusterView: cannot resolve oak-clusterNodeId {} to a slingId", id);
throw new Exception("Cannot resolve oak-clusterNodeId "+id+" to a slingId");
}
Map<String, String> properties = readProperties(slingId, resourceResolver);
// create a new instance (adds itself to the cluster in the constructor)
new DefaultInstanceDescription(cluster, isLeader, isOwn, slingId, properties);
}
logger.trace("asClusterView: returning {}", cluster);
InstanceDescription local = cluster.getLocalInstance();
if (local != null) {
return cluster;
} else {
logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! "
+ "This is normal at startup. At other times is pseudo-network-partitioning is an indicator for repository/network-delays or clocks-out-of-sync (SLING-3432). "
+ "(increasing the heartbeatTimeout can help as a workaround too) "
+ "The local instance will stay in TOPOLOGY_CHANGING or pre _INIT mode until a new vote was successful.");
throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY,
"established view does not include local instance - isolated");
}
}
/**
* oak's discovery-lite can opt to not provide a clusterViewId eg in the
* single-VM case. (for clusters discovery-lite normally defines the
* clusterViewId, as it is the one responsible for defining the membership
* too) Thus if we're not getting an id here we have to define one here. (we
* can typically assume that this corresponds to a singleVM case, but that's
* not a 100% requirement). This id must be stored to ensure the contract
* that the clusterId is stable across restarts. For that, the id is stored
* under /var/discovery/oak (and to account for odd/edgy cases we'll do a
* retry when storing the id, in case we'd run into conflicts, even though
* they should not occur in singleVM cases)
*
* @param resourceResolver the ResourceResolver with which to read or write
* the clusterId properties under /var/discovery/oak
* @return the clusterId to be used - either the one read or defined
* at /var/discovery/oak - or the slingId in case of non-fixable exceptions
* @throws PersistenceException when /var/discovery/oak could not be
* accessed or auto-created
*/
private String readOrDefineClusterId(ResourceResolver resourceResolver) throws PersistenceException {
//TODO: if Config gets a specific, public getDiscoveryResourcePath, this can be simplified:
final String clusterInstancesPath = config.getClusterInstancesPath();
final String discoveryResourcePath = clusterInstancesPath.substring(0,
clusterInstancesPath.lastIndexOf("/", clusterInstancesPath.length()-2));
final int MAX_RETRIES = 5;
for(int retryCnt=0; retryCnt<MAX_RETRIES; retryCnt++) {
Resource varDiscoveryOak = resourceResolver.getResource(discoveryResourcePath);
if (varDiscoveryOak == null) {
varDiscoveryOak = ResourceHelper.getOrCreateResource(resourceResolver, discoveryResourcePath);
}
if (varDiscoveryOak == null) {
logger.error("readOrDefinedClusterId: Could not create: "+discoveryResourcePath);
throw new RuntimeException("could not create " + discoveryResourcePath);
}
ModifiableValueMap props = varDiscoveryOak.adaptTo(ModifiableValueMap.class);
if (props == null) {
logger.error("readOrDefineClusterId: Could not adaptTo ModifiableValueMap: "+varDiscoveryOak);
throw new RuntimeException("could not adaptTo ModifiableValueMap: " + varDiscoveryOak);
}
Object clusterIdObj = props.get(PROPERTY_CLUSTER_ID);
String clusterId = (clusterIdObj == null) ? null : String.valueOf(clusterIdObj);
if (clusterId != null && clusterId.length() > 0) {
logger.trace("readOrDefineClusterId: read clusterId from repo as {}", clusterId);
return clusterId;
}
// must now define a new clusterId and store it under /var/discovery/oak
final String newClusterId = UUID.randomUUID().toString();
props.put(PROPERTY_CLUSTER_ID, newClusterId);
props.put(PROPERTY_CLUSTER_ID_DEFINED_BY, getSlingId());
props.put(PROPERTY_CLUSTER_ID_DEFINED_AT, Calendar.getInstance());
try {
logger.info("readOrDefineClusterId: storing new clusterId as " + newClusterId);
resourceResolver.commit();
return newClusterId;
} catch (PersistenceException e) {
logger.warn("readOrDefineClusterId: could not persist clusterId "
+ "(retrying in 1 sec max " + (MAX_RETRIES - retryCnt - 1) + " more times: " + e, e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
logger.warn("readOrDefineClusterId: got interrupted: "+e1, e1);
}
logger.info("readOrDefineClusterId: retrying now.");
}
}
throw new RuntimeException("failed to write new clusterId (see log file earlier for more details)");
}
private String getLeaderElectionId(ResourceResolver resourceResolver, String slingId) {
if (slingId==null) {
throw new IllegalStateException("slingId must not be null");
}
final String myClusterNodePath = config.getClusterInstancesPath()+"/"+slingId;
ValueMap resourceMap = resourceResolver.getResource(myClusterNodePath)
.adaptTo(ValueMap.class);
String result = resourceMap.get("leaderElectionId", String.class);
return result;
}
private Map<String, String> readProperties(String slingId, ResourceResolver resourceResolver) {
Resource res = resourceResolver.getResource(
config.getClusterInstancesPath() + "/"
+ slingId);
final Map<String, String> props = new HashMap<String, String>();
if (res != null) {
final Resource propertiesChild = res.getChild("properties");
if (propertiesChild != null) {
final ValueMap properties = propertiesChild.adaptTo(ValueMap.class);
if (properties != null) {
for (Iterator<String> it = properties.keySet().iterator(); it
.hasNext();) {
String key = it.next();
if (!key.equals("jcr:primaryType")) {
props.put(key, properties.get(key, String.class));
}
}
}
}
}
return props;
}
}