blob: 2c400e13a3715f63e306ec81f082766149189f78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The SF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.felix.hc.core.impl.executor;
import static org.apache.felix.hc.api.FormattingResultLog.msHumanReadable;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.LONGRUNNING_FUTURE_THRESHOLD_CRITICAL_DEFAULT_MS;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.RESULT_CACHE_TTL_DEFAULT_MS;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.TIMEOUT_DEFAULT_MS;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.felix.hc.api.FormattingResultLog;
import org.apache.felix.hc.api.HealthCheck;
import org.apache.felix.hc.api.Result;
import org.apache.felix.hc.api.ResultLog;
import org.apache.felix.hc.api.execution.HealthCheckExecutionOptions;
import org.apache.felix.hc.api.execution.HealthCheckExecutionResult;
import org.apache.felix.hc.api.execution.HealthCheckExecutor;
import org.apache.felix.hc.api.execution.HealthCheckMetadata;
import org.apache.felix.hc.api.execution.HealthCheckSelector;
import org.apache.felix.hc.core.impl.executor.async.AsyncHealthCheckExecutor;
import org.apache.felix.hc.core.impl.util.HealthCheckFilter;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.startlevel.FrameworkStartLevel;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Runs health checks for a given list of tags in parallel. */
@Component(service = { HealthCheckExecutor.class, ExtendedHealthCheckExecutor.class }, immediate = true // immediate = true to keep the
// cache!
)
@Designate(ocd = HealthCheckExecutorImplConfiguration.class)
public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor, ServiceListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String HC_LOGGING_SYS_PROP = "org.apache.felix.hc.autoLogging";
private long timeoutInMs;
private long longRunningFutureThresholdForRedMs;
private long resultCacheTtlInMs;
private String[] defaultTags;
private HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();
private TempUnavailableGracePeriodEvaluator tempUnavailableGracePeriodEvaluator;
private final Map<HealthCheckMetadata, HealthCheckFuture> stillRunningFutures = new HashMap<HealthCheckMetadata, HealthCheckFuture>();
@Reference
private AsyncHealthCheckExecutor asyncHealthCheckExecutor;
@Reference
HealthCheckExecutorThreadPool healthCheckExecutorThreadPool;
private BundleContext bundleContext;
@Activate
protected final void activate(final HealthCheckExecutorImplConfiguration configuration, final BundleContext bundleContext) {
this.bundleContext = bundleContext;
configure(configuration);
try {
this.bundleContext.addServiceListener(this, "("
+ Constants.OBJECTCLASS + "=" + HealthCheck.class.getName() + ")");
} catch (final InvalidSyntaxException ise) {
// this should really never happen as the expression above is constant
throw new RuntimeException("Unexpected problem with filter syntax", ise);
}
logger.info("HealthCheckExecutor active at start level {}", getCurrentStartLevel());
}
@Modified
protected final void modified(final HealthCheckExecutorImplConfiguration configuration) {
configure(configuration);
}
@Deactivate
protected final void deactivate() {
this.bundleContext.removeServiceListener(this);
this.healthCheckResultCache.clear();
logger.info("HealthCheckExecutor shutdown at start level {}", getCurrentStartLevel());
}
private int getCurrentStartLevel() {
return bundleContext.getBundle(Constants.SYSTEM_BUNDLE_ID).adapt(FrameworkStartLevel.class).getStartLevel();
}
protected final void configure(final HealthCheckExecutorImplConfiguration configuration) {
this.timeoutInMs = configuration.timeoutInMs();
if (this.timeoutInMs <= 0L) {
this.timeoutInMs = TIMEOUT_DEFAULT_MS;
}
this.longRunningFutureThresholdForRedMs = configuration.longRunningFutureThresholdForCriticalMs();
if (this.longRunningFutureThresholdForRedMs <= 0L) {
this.longRunningFutureThresholdForRedMs = LONGRUNNING_FUTURE_THRESHOLD_CRITICAL_DEFAULT_MS;
}
this.resultCacheTtlInMs = configuration.resultCacheTtlInMs();
if (this.resultCacheTtlInMs <= 0L) {
this.resultCacheTtlInMs = RESULT_CACHE_TTL_DEFAULT_MS;
}
this.defaultTags = configuration.defaultTags();
tempUnavailableGracePeriodEvaluator = new TempUnavailableGracePeriodEvaluator(configuration.temporarilyAvailableGracePeriodInMs());
System.setProperty(HC_LOGGING_SYS_PROP, String.valueOf(configuration.autoLogging()));
}
@Override
public void serviceChanged(final ServiceEvent event) {
if (event.getType() == ServiceEvent.UNREGISTERING) {
final Long serviceId = (Long) event.getServiceReference().getProperty(Constants.SERVICE_ID);
this.healthCheckResultCache.removeCachedResult(serviceId);
}
}
@Override
public List<HealthCheckExecutionResult> execute(HealthCheckSelector selector) {
return execute(selector, new HealthCheckExecutionOptions());
}
@Override
public List<HealthCheckExecutionResult> execute(HealthCheckSelector selector, HealthCheckExecutionOptions options) {
logger.debug("Starting executing checks for filter selector {} and execution options {}", selector, options);
if (ArrayUtils.isEmpty(selector.tags())) {
logger.debug("Using default tags");
selector.withTags(defaultTags);
}
final ServiceReference<HealthCheck>[] healthCheckReferences = selectHealthCheckReferences(selector, options);
List<HealthCheckExecutionResult> results = this.execute(healthCheckReferences, options);
return results;
}
/** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#selectHealthCheckReferences(HealthCheckSelector,
* HealthCheckExecutionOptions) */
@Override
public ServiceReference<HealthCheck>[] selectHealthCheckReferences(HealthCheckSelector selector, HealthCheckExecutionOptions options) {
final HealthCheckFilter filter = new HealthCheckFilter(this.bundleContext);
final ServiceReference<HealthCheck>[] healthCheckReferences = filter.getHealthCheckServiceReferences(selector, options.isCombineTagsWithOr());
return healthCheckReferences;
}
/** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#execute(org.osgi.framework.ServiceReference) */
@Override
public HealthCheckExecutionResult execute(final ServiceReference<HealthCheck> ref) {
final HealthCheckMetadata metadata = this.getHealthCheckMetadata(ref);
return createResultsForDescriptor(metadata);
}
/** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#execute(ServiceReference[], HealthCheckExecutionOptions) */
@Override
public List<HealthCheckExecutionResult> execute(final ServiceReference<HealthCheck>[] healthCheckReferences,
HealthCheckExecutionOptions options) {
final long startTime = System.currentTimeMillis();
final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences);
createResultsForDescriptors(healthCheckDescriptors, results, options);
if (logger.isDebugEnabled()) {
logger.debug("Time consumed for all checks: {}", msHumanReadable(System.currentTimeMillis() - startTime));
}
// sort result
Collections.sort(results, new Comparator<HealthCheckExecutionResult>() {
@Override
public int compare(final HealthCheckExecutionResult arg0,
final HealthCheckExecutionResult arg1) {
return ((ExecutionResult) arg0).compareTo((ExecutionResult) arg1);
}
});
return results;
}
// method to get the result for one HC (using the generic method to get multiple under the hood
private HealthCheckExecutionResult createResultsForDescriptor(final HealthCheckMetadata metadata) {
final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
final List<HealthCheckMetadata> healthCheckDescriptors = new ArrayList<HealthCheckMetadata>();
healthCheckDescriptors.add(metadata);
createResultsForDescriptors(healthCheckDescriptors, results, new HealthCheckExecutionOptions());
if (results.size() != 1) {
throw new IllegalStateException("Execute method for a single service reference unexpectedly resulted in "+results.size()+ " results: "+results);
}
return results.get(0);
}
private void createResultsForDescriptors(final List<HealthCheckMetadata> healthCheckDescriptors,
final List<HealthCheckExecutionResult> results, HealthCheckExecutionOptions options) {
// -- All methods below check if they can transform a healthCheckDescriptor into a result
// -- if yes the descriptor is removed from the list and the result added
// get async results
if (!options.isForceInstantExecution()) {
if (asyncHealthCheckExecutor != null) {
asyncHealthCheckExecutor.collectAsyncResults(healthCheckDescriptors, results, healthCheckResultCache);
}
}
// reuse cached results where possible
if (!options.isForceInstantExecution()) {
healthCheckResultCache.useValidCacheResults(healthCheckDescriptors, results, resultCacheTtlInMs);
}
// everything else is executed in parallel via futures
List<HealthCheckFuture> futures = createOrReuseFutures(healthCheckDescriptors);
// wait for futures at most until timeout (but will return earlier if all futures are finished)
waitForFuturesRespectingTimeout(futures, options);
collectResultsFromFutures(futures, results);
// respect sticky results if configured via HealthCheck.KEEP_NON_OK_RESULTS_STICKY_FOR_SEC
appendStickyResultLogIfConfigured(results);
// ensure long standing TEMPORARILY_UNAVAILABLE results are marked as CRITICAL
tempUnavailableGracePeriodEvaluator.evaluateGracePeriodForTemporarilyUnavailableResults(results);
}
private void appendStickyResultLogIfConfigured(List<HealthCheckExecutionResult> results) {
ListIterator<HealthCheckExecutionResult> resultsIt = results.listIterator();
while (resultsIt.hasNext()) {
HealthCheckExecutionResult result = resultsIt.next();
Long warningsStickForMinutes = result.getHealthCheckMetadata().getKeepNonOkResultsStickyForSec();
if (warningsStickForMinutes != null && warningsStickForMinutes > 0) {
result = healthCheckResultCache.createExecutionResultWithStickyResults(result);
resultsIt.set(result);
}
}
}
/** Create the health check meta data */
private List<HealthCheckMetadata> getHealthCheckMetadata(final ServiceReference... healthCheckReferences) {
final List<HealthCheckMetadata> descriptors = new LinkedList<HealthCheckMetadata>();
for (final ServiceReference serviceReference : healthCheckReferences) {
final HealthCheckMetadata descriptor = getHealthCheckMetadata(serviceReference);
descriptors.add(descriptor);
}
return descriptors;
}
/** Create the health check meta data */
private HealthCheckMetadata getHealthCheckMetadata(final ServiceReference healthCheckReference) {
final HealthCheckMetadata descriptor = new HealthCheckMetadata(healthCheckReference);
return descriptor;
}
/** Create or reuse future for the list of health checks */
private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
final List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();
synchronized (this.stillRunningFutures) {
for (final HealthCheckMetadata md : healthCheckDescriptors) {
futuresForResultOfThisCall.add(createOrReuseFuture(md));
}
}
return futuresForResultOfThisCall;
}
/** Create or reuse future for the health check This method must be synchronized by the caller(!) on stillRunningFutures */
private HealthCheckFuture createOrReuseFuture(final HealthCheckMetadata metadata) {
HealthCheckFuture future = this.stillRunningFutures.get(metadata);
if (future != null) {
logger.debug("Found a future that is still running for {}", metadata);
} else {
logger.debug("Creating future for {}", metadata);
future = new HealthCheckFuture(metadata, bundleContext, new HealthCheckFuture.Callback() {
@Override
public void finished(final HealthCheckExecutionResult result) {
healthCheckResultCache.updateWith(result);
asyncHealthCheckExecutor.updateWith(result);
tempUnavailableGracePeriodEvaluator.updateTemporarilyUnavailableTimestampWith(result);
synchronized (stillRunningFutures) {
stillRunningFutures.remove(metadata);
}
}
});
this.stillRunningFutures.put(metadata, future);
final HealthCheckFuture newFuture = future;
healthCheckExecutorThreadPool.execute(new Runnable() {
@Override
public void run() {
newFuture.run();
synchronized (stillRunningFutures) {
// notify executor threads that newFuture is finished. Wrapping it in another runnable
// ensures that newFuture.isDone() will return true (if e.g. done in callback above, there are
// still a few lines of code until the future is really done and hence then the executor thread
// is sometime notified a bit too early, still receives the result isDone()=false and then waits
// for another 50ms, even though the future was about to be done one ms later)
stillRunningFutures.notifyAll();
}
}
});
}
return future;
}
/** Wait for the futures until the timeout is reached */
private void waitForFuturesRespectingTimeout(final List<HealthCheckFuture> futuresForResultOfThisCall,
HealthCheckExecutionOptions options) {
final StopWatch callExcutionTimeStopWatch = new StopWatch();
callExcutionTimeStopWatch.start();
boolean allFuturesDone;
long effectiveTimeout = this.timeoutInMs;
if (options != null && options.getOverrideGlobalTimeout() > 0) {
effectiveTimeout = options.getOverrideGlobalTimeout();
}
if (futuresForResultOfThisCall.isEmpty()) {
return; // nothing to wait for (usually because of cached results)
}
do {
try {
synchronized (stillRunningFutures) {
stillRunningFutures.wait(50); // wait for notifications of callbacks of HealthCheckFutures
}
} catch (final InterruptedException ie) {
logger.warn("Unexpected InterruptedException while waiting for healthCheckContributors", ie);
}
allFuturesDone = true;
for (final HealthCheckFuture healthCheckFuture : futuresForResultOfThisCall) {
allFuturesDone &= healthCheckFuture.isDone();
}
} while (!allFuturesDone && callExcutionTimeStopWatch.getTime() < effectiveTimeout);
}
/** Collect the results from all futures
*
* @param futuresForResultOfThisCall The list of futures
* @param results The result collection */
void collectResultsFromFutures(final List<HealthCheckFuture> futuresForResultOfThisCall,
final Collection<HealthCheckExecutionResult> results) {
final Set<HealthCheckExecutionResult> resultsFromFutures = new HashSet<HealthCheckExecutionResult>();
final Iterator<HealthCheckFuture> futuresIt = futuresForResultOfThisCall.iterator();
while (futuresIt.hasNext()) {
final HealthCheckFuture future = futuresIt.next();
final HealthCheckExecutionResult result = this.collectResultFromFuture(future);
resultsFromFutures.add(result);
futuresIt.remove();
}
logger.debug("Adding {} results from futures", resultsFromFutures.size());
results.addAll(resultsFromFutures);
}
/** Collect the result from a single future
*
* @param future The future
* @return The execution result or a result for a reached timeout */
HealthCheckExecutionResult collectResultFromFuture(final HealthCheckFuture future) {
HealthCheckExecutionResult result;
HealthCheckMetadata hcMetadata = future.getHealthCheckMetadata();
if (future.isDone()) {
logger.debug("Health Check is done: {}", hcMetadata);
try {
result = future.get();
} catch (final Exception e) {
logger.warn("Unexpected Exception during future.get(): " + e, e);
long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
result = new ExecutionResult(hcMetadata, Result.Status.HEALTH_CHECK_ERROR,
"Unexpected Exception during future.get(): " + e, futureElapsedTimeMs, false);
}
} else {
logger.debug("Health Check timed out: {}", hcMetadata);
// Futures must not be cancelled as interrupting a health check might leave the system in invalid state
// (worst case could be a corrupted repository index if using write operations)
// normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
// future we turn the result CRITICAL
long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
FormattingResultLog resultLog = new FormattingResultLog();
if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
resultLog.warn("Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs));
} else {
resultLog.critical("Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
+ " (exceeding the configured threshold for CRITICAL: "
+ msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")");
}
// add logs from previous, cached result if exists (using a 1 year TTL)
HealthCheckExecutionResult lastCachedResult = healthCheckResultCache.getValidCacheResult(hcMetadata, 1000 * 60 * 60 * 24 * 365);
if (lastCachedResult != null) {
DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
resultLog.info("*** Result log of last execution finished at {} after {} ***",
df.format(lastCachedResult.getFinishedAt()),
msHumanReadable(lastCachedResult.getElapsedTimeInMs()));
for (ResultLog.Entry entry : lastCachedResult.getHealthCheckResult()) {
resultLog.add(entry);
}
}
result = new ExecutionResult(hcMetadata, new Result(resultLog), futureElapsedTimeMs, true);
}
return result;
}
public void setTimeoutInMs(final long timeoutInMs) {
this.timeoutInMs = timeoutInMs;
}
public void setLongRunningFutureThresholdForRedMs(
final long longRunningFutureThresholdForRedMs) {
this.longRunningFutureThresholdForRedMs = longRunningFutureThresholdForRedMs;
}
}