blob: e8bbf220fcca1851dd58c7a6a520804bae99c2a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.base.commons;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The heartbeat handler is responsible and capable of issuing both local and
* remote heartbeats and registers a periodic job with the scheduler for doing so.
* <p>
* Local heartbeats are stored in the repository. Remote heartbeats are POSTs to
* remote TopologyConnectorServlets.
*/
public abstract class BaseViewChecker implements ViewChecker, Runnable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
/** Official endpoint service registration property from Http Whiteboard spec */
private static final String REG_PROPERTY_ENDPOINTS = "osgi.http.endpoint";
/** Endpoint service registration property from RFC 189 */
private static final String REG_PROPERTY_ENDPOINTS_RFC = "osgi.http.service.endpoints";
protected static final String PROPERTY_ID_ENDPOINTS = "endpoints";
protected static final String PROPERTY_ID_SLING_HOME_PATH = "slingHomePath";
protected static final String PROPERTY_ID_RUNTIME = "runtimeId";
/** the name used for the period job with the scheduler **/
protected String NAME = "discovery.impl.heartbeat.runner.";
/** the sling id of the local instance **/
protected String slingId;
/** SLING-2901: the runtimeId is a unique id, set on activation, used for robust duplicate sling.id detection **/
protected String runtimeId;
/** lock object for synchronizing the run method **/
protected final Object lock = new Object();
/** SLING-2895: avoid heartbeats after deactivation **/
protected volatile boolean activated = false;
/** keep a reference to the component context **/
protected ComponentContext context;
/** SLING-3382 : force ping instructs the servlet to start the backoff from scratch again **/
private boolean forcePing;
/** SLING-4765 : store endpoints to /clusterInstances for more verbose duplicate slingId/ghost detection **/
protected final Map<Long, String[]> endpoints = new HashMap<>();
protected PeriodicBackgroundJob periodicPingJob;
protected abstract SlingSettingsService getSlingSettingsService();
protected abstract ResourceResolverFactory getResourceResolverFactory();
protected abstract ConnectorRegistry getConnectorRegistry();
protected abstract AnnouncementRegistry getAnnouncementRegistry();
protected abstract Scheduler getScheduler();
protected abstract BaseConfig getConnectorConfig();
@Activate
protected void activate(ComponentContext context) {
synchronized(lock) {
this.context = context;
slingId = getSlingSettingsService().getSlingId();
NAME = "discovery.connectors.common.runner." + slingId;
doActivate();
activated = true;
issueHeartbeat();
}
}
protected void doActivate() {
try {
final long interval = getConnectorConfig().getConnectorPingInterval();
logger.info("doActivate: starting periodic connectorPing job for "+slingId+" with interval "+interval+" sec.");
periodicPingJob = new PeriodicBackgroundJob(interval, NAME, this);
} catch (Exception e) {
logger.error("doActivate: Could not start connectorPing runner: " + e, e);
}
logger.info("doActivate: activated with slingId: {}, this: {}", slingId, this);
}
@Deactivate
protected void deactivate() {
// SLING-3365 : dont synchronize on deactivate
activated = false;
logger.info("deactivate: deactivated slingId: {}, this: {}", slingId, this);
if (periodicPingJob != null) {
periodicPingJob.stop();
periodicPingJob = null;
}
}
/** for testing only **/
@Override
public void checkView() {
synchronized(lock) {
doCheckView();
}
}
@Override
public void run() {
heartbeatAndCheckView();
}
@Override
public void heartbeatAndCheckView() {
logger.debug("heartbeatAndCheckView: start. [for slingId="+slingId+"]");
synchronized(lock) {
if (!activated) {
// SLING:2895: avoid heartbeats if not activated
logger.debug("heartbeatAndCheckView: not activated yet");
return;
}
// issue a heartbeat
issueHeartbeat();
// check the view
doCheckView();
}
logger.debug("heartbeatAndCheckView: end. [for slingId="+slingId+"]");
}
/** Trigger the issuance of the next heartbeat asap instead of at next heartbeat interval **/
public void triggerAsyncConnectorPing() {
forcePing = true;
try {
// then fire a job immediately
// use 'fireJobAt' here, instead of 'fireJob' to make sure the job can always be triggered
// 'fireJob' checks for a job from the same job-class to already exist
// 'fireJobAt' though allows to pass a name for the job - which can be made unique, thus does not conflict/already-exist
logger.debug("triggerAsyncConnectorPing: firing job to trigger heartbeat");
getScheduler().schedule(this, getScheduler().NOW().name(NAME+UUID.randomUUID()));
} catch (Exception e) {
logger.info("triggerAsyncConnectorPing: Could not trigger heartbeat: " + e);
}
}
/**
* Issue a heartbeat.
* <p>
* This action consists of first updating the local properties,
* then issuing a cluster-local heartbeat (within the repository)
* and then a remote heartbeat (to all the topology connectors
* which announce this part of the topology to others)
*/
protected void issueHeartbeat() {
updateProperties();
issueConnectorPings();
}
protected abstract void updateProperties();
/** Issue a remote heartbeat using the topology connectors **/
protected void issueConnectorPings() {
if (getConnectorRegistry() == null) {
logger.error("issueConnectorPings: connectorRegistry is null");
return;
}
if (logger.isDebugEnabled()) {
logger.debug("issueConnectorPings: pinging outgoing topology connectors (if there is any) for "+slingId);
}
getConnectorRegistry().pingOutgoingConnectors(forcePing);
forcePing = false;
}
/** Check whether the established view matches the reality, ie matches the
* heartbeats
*/
protected void doCheckView() {
// check the remotes first
if (getAnnouncementRegistry() == null) {
logger.info("announcementRegistry is null (will check view again later)");
return;
}
getAnnouncementRegistry().checkExpiredAnnouncements();
}
/**
* Bind a http service
*/
@Reference(service = HttpService.class,
cardinality = ReferenceCardinality.MULTIPLE,
policy = ReferencePolicy.DYNAMIC,
bind = "bindHttpService", unbind = "unbindHttpService")
protected void bindHttpService(final ServiceReference reference) {
String[] endpointUrls = toStringArray(reference.getProperty(REG_PROPERTY_ENDPOINTS));
if ( endpointUrls == null ) {
endpointUrls = toStringArray(reference.getProperty(REG_PROPERTY_ENDPOINTS_RFC));
}
if ( endpointUrls != null ) {
synchronized ( lock ) {
this.endpoints.put((Long)reference.getProperty(Constants.SERVICE_ID), endpointUrls);
}
}
}
/**
* Unbind a http service
*/
protected void unbindHttpService(final ServiceReference reference) {
synchronized ( lock ) {
if ( this.endpoints.remove(reference.getProperty(Constants.SERVICE_ID)) != null ) {
// do nothing
}
}
}
private String[] toStringArray(final Object propValue) {
if (propValue == null) {
// no value at all
return null;
} else if (propValue instanceof String) {
// single string
return new String[] { (String) propValue };
} else if (propValue instanceof String[]) {
// String[]
return (String[]) propValue;
} else if (propValue.getClass().isArray()) {
// other array
Object[] valueArray = (Object[]) propValue;
List<String> values = new ArrayList<>(valueArray.length);
for (Object value : valueArray) {
if (value != null) {
values.add(value.toString());
}
}
return values.toArray(new String[values.size()]);
} else if (propValue instanceof Collection<?>) {
// collection
Collection<?> valueCollection = (Collection<?>) propValue;
List<String> valueList = new ArrayList<>(valueCollection.size());
for (Object value : valueCollection) {
if (value != null) {
valueList.add(value.toString());
}
}
return valueList.toArray(new String[valueList.size()]);
}
return null;
}
protected String getEndpointsAsString() {
final StringBuilder sb = new StringBuilder();
boolean first = true;
for(final String[] points : endpoints.values()) {
for(final String point : points) {
if ( first ) {
first = false;
} else {
sb.append(",");
}
sb.append(point);
}
}
return sb.toString();
}
}