blob: 96c9046a3414f16f13b49102ad354f77a67883b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.felix.hc.core.impl.monitor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import org.apache.felix.hc.api.Result;
import org.apache.felix.hc.api.condition.Healthy;
import org.apache.felix.hc.api.condition.SystemReady;
import org.apache.felix.hc.api.condition.Unhealthy;
import org.apache.felix.hc.api.execution.HealthCheckExecutionResult;
import org.apache.felix.hc.api.execution.HealthCheckExecutor;
import org.apache.felix.hc.api.execution.HealthCheckSelector;
import org.apache.felix.hc.core.impl.executor.CombinedExecutionResult;
import org.apache.felix.hc.core.impl.executor.HealthCheckExecutorThreadPool;
import org.apache.felix.hc.core.impl.scheduling.AsyncIntervalJob;
import org.apache.felix.hc.core.impl.scheduling.AsyncJob;
import org.apache.felix.hc.core.impl.scheduling.CronJobFactory;
import org.apache.felix.hc.core.impl.util.lang.StringUtils;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentConstants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Monitors health check tags and/or names and depending on configuration:
* <p>
* <ul>
* <li>Activates the condition marker services {@link SystemReady},
* {@link Healthy}, {@link Unhealthy}</li>
* <li>Sends OSGi events</li>
* </ul>
* <p>
*
*/
@Component(immediate = true, configurationPolicy = ConfigurationPolicy.REQUIRE)
@Designate(ocd = HealthCheckMonitor.Config.class, factory = true)
public class HealthCheckMonitor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(HealthCheckMonitor.class);
public static final String TAG_SYSTEMREADY = "systemready";
public static final String EVENT_TOPIC_PREFIX = "org/apache/felix/healthchange";
public static final String EVENT_PROP_EXECUTION_RESULT = "executionResult";
public static final String EVENT_PROP_STATUS = "status";
public static final String EVENT_PROP_PREVIOUS_STATUS = "previousStatus";
static final Healthy MARKER_SERVICE_HEALTHY = new Healthy() {
};
static final Unhealthy MARKER_SERVICE_UNHEALTHY = new Unhealthy() {
};
static final SystemReady MARKER_SERVICE_SYSTEMREADY = new SystemReady() {
};
@ObjectClassDefinition(name = "Health Check Monitor", description = "Regularly executes health checks according to given interval/cron expression")
public @interface Config {
@AttributeDefinition(name = "Tags", description = "List of tags to query regularly")
String[] tags() default {};
@AttributeDefinition(name = "Names", description = "List of health check names to query regularly")
String[] names() default {};
@AttributeDefinition(name = "Interval (Sec)", description = "Will execute the checks for give tags every n seconds (either use intervalInSec or cronExpression )")
long intervalInSec() default 0;
@AttributeDefinition(name = "Interval (Cron Expresson)", description = "Will execute the checks for give tags according to cron expression")
String cronExpression() default "";
@AttributeDefinition(name = "Register Healthy Marker Service", description = "For the case a given tag/name is healthy, will register a service Healthy with property tag=<tagname> (or name=<hc.name>) that other services can depend on")
boolean registerHealthyMarkerService() default true;
@AttributeDefinition(name = "Register Unhealthy Marker Service", description = "For the case a given tag/name is unhealthy, will register a service Unhealthy with property tag=<tagname> (or name=<hc.name>) that other services can depend on")
boolean registerUnhealthyMarkerService() default false;
@AttributeDefinition(name = "Treat WARN as Healthy", description = "Whether to treat status WARN as healthy (it normally should because WARN indicates a working system that only possibly might become unavailable if no action is taken")
boolean treatWarnAsHealthy() default true;
@AttributeDefinition(name = "Send Events", description = "Whether to send OSGi events for the case a status has changed")
boolean sendEvents() default true;
@AttributeDefinition
String webconsole_configurationFactory_nameHint() default "Health Monitor for '{tags}'/'{names}', {intervalInSec}sec/{cronExpression}, Marker Service Healthy:{registerHealthyMarkerService} Unhealthy:{registerUnhealthyMarkerService}, Send Events {sendEvents}";
}
@Reference
HealthCheckExecutor executor;
@Reference
HealthCheckExecutorThreadPool healthCheckExecutorThreadPool;
@Reference
CronJobFactory cronJobFactory;
@Reference
private EventAdmin eventAdmin;
// component state
AsyncJob monitorJob = null;
List<String> tags;
List<String> names;
List<HealthState> healthStates = new ArrayList<>();
private long intervalInSec;
private String cronExpression;
private boolean registerHealthyMarkerService;
private boolean registerUnhealthyMarkerService;
private boolean treatWarnAsHealthy;
private boolean sendEvents;
private BundleContext bundleContext;
@Activate
protected final void activate(BundleContext bundleContext, Config config, ComponentContext componentContext)
throws InvalidSyntaxException {
this.bundleContext = bundleContext;
this.tags = Arrays.asList(config.tags());
this.tags.stream().filter(StringUtils::isNotBlank).forEach(tag -> healthStates.add(new HealthState(tag, true)));
this.names = Arrays.asList(config.names());
this.names.stream().filter(StringUtils::isNotBlank)
.forEach(name -> healthStates.add(new HealthState(name, false)));
this.registerHealthyMarkerService = config.registerHealthyMarkerService();
this.registerUnhealthyMarkerService = config.registerUnhealthyMarkerService();
this.treatWarnAsHealthy = config.treatWarnAsHealthy();
this.sendEvents = config.sendEvents();
this.intervalInSec = config.intervalInSec();
this.cronExpression = config.cronExpression();
if (StringUtils.isNotBlank(cronExpression)) {
try {
monitorJob = cronJobFactory.getAsyncCronJob(this, "job-hc-monitor-" + componentContext.getProperties().get(ComponentConstants.COMPONENT_ID),
"healthcheck-monitor", cronExpression);
} catch (UnsupportedOperationException e) {
throw new IllegalArgumentException("Cannot use cron expression " + cronExpression
+ " while class is not available: " + cronExpression);
}
} else if (intervalInSec > 0) {
monitorJob = new AsyncIntervalJob(this, healthCheckExecutorThreadPool, intervalInSec);
} else {
throw new IllegalArgumentException("Either cronExpression or intervalInSec needs to be set");
}
monitorJob.schedule();
LOG.info("HealthCheckMonitor active for tags {} and names {}", this.tags, this.names);
}
@Override
public String toString() {
return "[HealthCheckMonitor tags=" + tags + "/names=" + names + ", intervalInSec=" + intervalInSec + "/cron="
+ cronExpression + "]";
}
@Deactivate
protected final void deactivate() {
healthStates.stream().forEach(HealthState::cleanUp);
healthStates.clear();
monitorJob.unschedule();
LOG.info("HealthCheckMonitor deactivated for tags {} and names {}", this.tags, this.names);
}
public void run() {
try {
// run in tags/names in parallel
healthStates.parallelStream().forEach(healthState -> {
HealthCheckSelector selector = healthState.isTag ? HealthCheckSelector.tags(healthState.tagOrName)
: HealthCheckSelector.names(healthState.tagOrName);
List<HealthCheckExecutionResult> executionResults = executor.execute(selector);
HealthCheckExecutionResult result = executionResults.size() == 1 ? executionResults.get(0)
: new CombinedExecutionResult(executionResults, Result.Status.TEMPORARILY_UNAVAILABLE);
LOG.trace("Result of '{}' => {}", healthState.tagOrName, result.getHealthCheckResult().getStatus());
healthState.update(result);
});
LOG.trace("HealthCheckMonitor: updated results for tags {} and names {}", this.tags, this.names);
} catch (Exception e) {
LOG.error("Exception HealthCheckMonitor run(): " + e, e);
}
}
class HealthState {
private String tagOrName;
private boolean isTag;
private String propertyName;
private ServiceRegistration<?> healthyRegistration = null;
private ServiceRegistration<Unhealthy> unhealthyRegistration = null;
private Result.Status status = null;
private boolean isHealthy = false;
private boolean statusChanged = false;
HealthState(String tagOrName, boolean isTag) {
this.tagOrName = tagOrName;
this.isTag = isTag;
this.propertyName = isTag ? "tag" : "name";
}
@Override
public String toString() {
return "[HealthState tagOrName=" + tagOrName + ", isTag=" + isTag + ", status=" + status + ", isHealthy="
+ isHealthy + ", statusChanged=" + statusChanged + "]";
}
synchronized void update(HealthCheckExecutionResult executionResult) {
Result.Status previousStatus = status;
status = executionResult.getHealthCheckResult().getStatus();
isHealthy = (status == Result.Status.OK || (treatWarnAsHealthy && status == Result.Status.WARN));
statusChanged = previousStatus != status;
LOG.trace(" {}: isHealthy={} statusChanged={}", tagOrName, isHealthy, statusChanged);
registerMarkerServices();
sendEvent(executionResult, previousStatus);
}
private void registerMarkerServices() {
if (registerHealthyMarkerService) {
if (isHealthy && healthyRegistration == null) {
registerHealthyService();
} else if (!isHealthy && healthyRegistration != null) {
unregisterHealthyService();
}
}
if (registerUnhealthyMarkerService) {
if (!isHealthy && unhealthyRegistration == null) {
registerUnhealthyService();
} else if (isHealthy && unhealthyRegistration != null) {
unregisterUnhealthyService();
}
}
}
private void registerHealthyService() {
if (healthyRegistration == null) {
LOG.debug("HealthCheckMonitor: registerHealthyService() {} ", tagOrName);
Dictionary<String, String> registrationProps = new Hashtable<>();
registrationProps.put(propertyName, tagOrName);
registrationProps.put("activated", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
if (TAG_SYSTEMREADY.equals(tagOrName)) {
LOG.debug("HealthCheckMonitor: SYSTEM READY");
healthyRegistration = bundleContext.registerService(
new String[] { SystemReady.class.getName(), Healthy.class.getName() },
MARKER_SERVICE_SYSTEMREADY, registrationProps);
} else {
healthyRegistration = bundleContext.registerService(Healthy.class, MARKER_SERVICE_HEALTHY,
registrationProps);
}
LOG.debug("HealthCheckMonitor: Healthy service for {} '{}' registered", propertyName, tagOrName);
}
}
private void unregisterHealthyService() {
if (healthyRegistration != null) {
healthyRegistration.unregister();
healthyRegistration = null;
LOG.debug("HealthCheckMonitor: Healthy service for {} '{}' unregistered", propertyName, tagOrName);
}
}
private void registerUnhealthyService() {
if (unhealthyRegistration == null) {
Dictionary<String, String> registrationProps = new Hashtable<>();
registrationProps.put("tag", tagOrName);
registrationProps.put("activated", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
unhealthyRegistration = bundleContext.registerService(Unhealthy.class, MARKER_SERVICE_UNHEALTHY,
registrationProps);
LOG.debug("HealthCheckMonitor: Unhealthy service for {} '{}' registered", propertyName, tagOrName);
}
}
private void unregisterUnhealthyService() {
if (unhealthyRegistration != null) {
unhealthyRegistration.unregister();
unhealthyRegistration = null;
LOG.debug("HealthCheckMonitor: Unhealthy service for {} '{}' unregistered", propertyName, tagOrName);
}
}
private void sendEvent(HealthCheckExecutionResult executionResult, Result.Status previousStatus) {
if (sendEvents && statusChanged) {
Map<String, Object> properties = new HashMap<>();
properties.put(EVENT_PROP_STATUS, status);
if (previousStatus != null) {
properties.put(EVENT_PROP_PREVIOUS_STATUS, previousStatus);
}
properties.put(EVENT_PROP_EXECUTION_RESULT, executionResult);
String topic = EVENT_TOPIC_PREFIX + "/" + propertyName + "/" + tagOrName.replaceAll("\\s+", "_");
eventAdmin.postEvent(new Event(topic, properties));
LOG.debug("HealthCheckMonitor: Posted event for topic '{}': Status change from {} to {}", topic,
previousStatus, status);
if (!(executionResult instanceof CombinedExecutionResult)) {
String componentName = (String) executionResult.getHealthCheckMetadata().getServiceReference()
.getProperty(ComponentConstants.COMPONENT_NAME);
if (StringUtils.isNotBlank(componentName)) {
String topicClass = EVENT_TOPIC_PREFIX + "/component/" + componentName.replace(".", "/");
eventAdmin.postEvent(new Event(topicClass, properties));
LOG.debug("HealthCheckMonitor: Posted event for topic '{}': Status change from {} to {}",
topicClass, previousStatus, status);
}
}
}
}
synchronized void cleanUp() {
unregisterHealthyService();
unregisterUnhealthyService();
}
}
}