| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ambari.metrics.core.timeline; |
| |
| import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; |
| import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; |
| |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.URISyntaxException; |
| import java.net.UnknownHostException; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.ambari.metrics.core.timeline.aggregators.Function; |
| import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator; |
| import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregatorFactory; |
| import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; |
| import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata; |
| import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; |
| import org.apache.ambari.metrics.core.timeline.function.SeriesAggregateFunction; |
| import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunction; |
| import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory; |
| import org.apache.ambari.metrics.core.timeline.query.Condition; |
| import org.apache.ambari.metrics.core.timeline.query.ConditionBuilder; |
| import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; |
| import org.apache.ambari.metrics.core.timeline.query.TopNCondition; |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; |
| import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; |
| import org.apache.hadoop.metrics2.sink.timeline.Precision; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; |
| import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; |
| import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; |
| import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; |
| |
| import com.google.common.collect.ArrayListMultimap; |
| import com.google.common.collect.Multimap; |
| |
| public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore { |
| |
| static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class); |
| private final TimelineMetricConfiguration configuration; |
| private TimelineMetricDistributedCache cache; |
| private PhoenixHBaseAccessor hBaseAccessor; |
| private static volatile boolean isInitialized = false; |
| private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); |
| private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); |
| private TimelineMetricMetadataManager metricMetadataManager; |
| private MetricCollectorHAController haController; |
| private boolean containerMetricsDisabled = false; |
| |
| /** |
| * Construct the service. |
| */ |
| public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { |
| super(HBaseTimelineMetricsService.class.getName()); |
| this.configuration = configuration; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| super.serviceInit(conf); |
| initializeSubsystem(); |
| } |
| |
| private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException { |
| //TODO make configurable |
| return new TimelineMetricsIgniteCache(metricMetadataManager); |
| } |
| |
| |
| private synchronized void initializeSubsystem() { |
| if (!isInitialized) { |
| hBaseAccessor = new PhoenixHBaseAccessor(null); |
| |
| // Initialize metadata |
| try { |
| metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); |
| } catch (MalformedURLException | URISyntaxException e) { |
| throw new ExceptionInInitializerError("Unable to initialize metadata manager"); |
| } |
| metricMetadataManager.initializeMetadata(); |
| |
| // Initialize metric schema |
| hBaseAccessor.initMetricSchema(); |
| |
| // Initialize policies before TTL update |
| hBaseAccessor.initPoliciesAndTTL(); |
| // Start HA service |
| // Start the controller |
| if (!configuration.isDistributedCollectorModeDisabled()) { |
| haController = new MetricCollectorHAController(configuration); |
| try { |
| haController.initializeHAController(); |
| } catch (Exception e) { |
| LOG.error(e); |
| throw new MetricsSystemInitializationException("Unable to " + |
| "initialize HA controller", e); |
| } |
| } else { |
| LOG.info("Distributed collector mode disabled"); |
| } |
| |
| //Initialize whitelisting & blacklisting if needed |
| TimelineMetricsFilter.initializeMetricFilter(configuration); |
| |
| Configuration metricsConf = null; |
| try { |
| metricsConf = configuration.getMetricsConf(); |
| } catch (Exception e) { |
| throw new ExceptionInInitializerError("Cannot initialize configuration."); |
| } |
| |
| if (configuration.isCollectorInMemoryAggregationEnabled()) { |
| try { |
| cache = startCacheNode(); |
| } catch (Exception e) { |
| throw new MetricsSystemInitializationException("Unable to " + |
| "start cache node", e); |
| } |
| } |
| |
| if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { |
| LOG.info("Using group by aggregators for aggregating host and cluster metrics."); |
| } |
| |
| // Start the cluster aggregator second |
| TimelineMetricAggregator secondClusterAggregator = |
| TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController, cache); |
| scheduleAggregatorThread(secondClusterAggregator); |
| |
| // Start the minute cluster aggregator |
| TimelineMetricAggregator minuteClusterAggregator = |
| TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(minuteClusterAggregator); |
| |
| // Start the hourly cluster aggregator |
| TimelineMetricAggregator hourlyClusterAggregator = |
| TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(hourlyClusterAggregator); |
| |
| // Start the daily cluster aggregator |
| TimelineMetricAggregator dailyClusterAggregator = |
| TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(dailyClusterAggregator); |
| |
| // Start the minute host aggregator |
| if (!configuration.isHostInMemoryAggregationEnabled()) { |
| TimelineMetricAggregator minuteHostAggregator = |
| TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(minuteHostAggregator); |
| } |
| |
| // Start the hourly host aggregator |
| TimelineMetricAggregator hourlyHostAggregator = |
| TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(hourlyHostAggregator); |
| |
| // Start the daily host aggregator |
| TimelineMetricAggregator dailyHostAggregator = |
| TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( |
| hBaseAccessor, metricsConf, metricMetadataManager, haController); |
| scheduleAggregatorThread(dailyHostAggregator); |
| |
| if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { |
| int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); |
| int delay = configuration.getTimelineMetricsServiceWatcherDelay(); |
| // Start the watchdog |
| watchdogExecutorService.scheduleWithFixedDelay( |
| new TimelineMetricStoreWatcher(this, configuration), |
| initDelay, delay, TimeUnit.SECONDS); |
| LOG.info("Started watchdog for timeline metrics store with initial " + |
| "delay = " + initDelay + ", delay = " + delay); |
| } |
| containerMetricsDisabled = configuration.isContainerMetricsDisabled(); |
| isInitialized = true; |
| } |
| |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| super.serviceStop(); |
| } |
| |
| @Override |
| public TimelineMetrics getTimelineMetrics(List<String> metricNames, |
| List<String> hostnames, String applicationId, String instanceId, |
| Long startTime, Long endTime, Precision precision, Integer limit, |
| boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException { |
| |
| if (metricNames == null || metricNames.isEmpty()) { |
| throw new IllegalArgumentException("No metric name filter specified."); |
| } |
| if ((startTime == null && endTime != null) |
| || (startTime != null && endTime == null)) { |
| throw new IllegalArgumentException("Open ended query not supported "); |
| } |
| if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT) { |
| throw new IllegalArgumentException("Limit too big"); |
| } |
| |
| TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null; |
| if (!StringUtils.isEmpty(seriesAggregateFunction)) { |
| SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction); |
| seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func); |
| } |
| |
| Multimap<String, List<Function>> metricFunctions = |
| parseMetricNamesToAggregationFunctions(metricNames); |
| |
| TimelineMetrics metrics = new TimelineMetrics(); |
| List<String> transientMetricNames = new ArrayList<>(); |
| |
| List<byte[]> uuids = metricMetadataManager.getUuidsForGetMetricQuery(metricFunctions.keySet(), |
| hostnames, |
| applicationId, |
| instanceId, |
| transientMetricNames); |
| |
| if (uuids.isEmpty() && transientMetricNames.isEmpty()) { |
| LOG.trace("No metrics satisfy the query: " + Arrays.asList(metricNames).toString()); |
| return metrics; |
| } |
| |
| ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) |
| .hostnames(hostnames) |
| .appId(applicationId) |
| .instanceId(instanceId) |
| .startTime(startTime) |
| .endTime(endTime) |
| .precision(precision) |
| .limit(limit) |
| .grouped(groupedByHosts) |
| .uuid(uuids) |
| .transientMetricNames(transientMetricNames); |
| |
| applyTopNCondition(conditionBuilder, topNConfig, metricNames, hostnames); |
| |
| Condition condition = conditionBuilder.build(); |
| |
| if (hostnames == null || hostnames.isEmpty()) { |
| metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); |
| } else { |
| metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); |
| } |
| |
| metrics = postProcessMetrics(metrics); |
| |
| if (metrics.getMetrics().size() == 0) { |
| return metrics; |
| } |
| |
| return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics); |
| } |
| |
| private void applyTopNCondition(ConditionBuilder conditionBuilder, |
| TopNConfig topNConfig, |
| List<String> metricNames, |
| List<String> hostnames) { |
| if (topNConfig != null) { |
| if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true. |
| TopNCondition.isTopNMetricCondition(metricNames, hostnames)) { |
| conditionBuilder.topN(topNConfig.getTopN()); |
| conditionBuilder.isBottomN(topNConfig.getIsBottomN()); |
| Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); |
| Function function = new Function(readFunction, null); |
| conditionBuilder.topNFunction(function); |
| } else { |
| LOG.info("Invalid Input for TopN query. Ignoring TopN Request."); |
| } |
| } |
| } |
| |
| private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) { |
| List<TimelineMetric> metricsList = metrics.getMetrics(); |
| |
| for (TimelineMetric metric : metricsList) { |
| String name = metric.getMetricName(); |
| if (name.contains("._rate")) { |
| updateValuesAsRate(metric.getMetricValues(), false); |
| } else if (name.contains("._diff")) { |
| updateValuesAsRate(metric.getMetricValues(), true); |
| } |
| } |
| |
| return metrics; |
| } |
| |
| private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance, |
| TimelineMetrics metrics) { |
| if (seriesAggrFuncInstance != null) { |
| TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics); |
| metrics.setMetrics(Collections.singletonList(appliedMetric)); |
| } |
| return metrics; |
| } |
| |
| static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) { |
| Long prevTime = null; |
| Double prevVal = null; |
| long step; |
| Double diff; |
| |
| for (Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) { |
| Map.Entry<Long, Double> timeValueEntry = it.next(); |
| Long currTime = timeValueEntry.getKey(); |
| Double currVal = timeValueEntry.getValue(); |
| |
| if (prevTime != null) { |
| step = currTime - prevTime; |
| diff = currVal - prevVal; |
| if (diff < 0) { |
| it.remove(); //Discard calculating rate when the metric counter has been reset. |
| } else { |
| Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step)); |
| timeValueEntry.setValue(rate); |
| } |
| } else { |
| it.remove(); |
| } |
| |
| prevTime = currTime; |
| prevVal = currVal; |
| } |
| |
| return metricValues; |
| } |
| |
| static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { |
| Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create(); |
| |
| for (String metricName : metricNames) { |
| Function function = Function.DEFAULT_VALUE_FUNCTION; |
| String cleanMetricName = metricName; |
| |
| try { |
| function = Function.fromMetricName(metricName); |
| int functionStartIndex = metricName.indexOf("._"); |
| if (functionStartIndex > 0) { |
| cleanMetricName = metricName.substring(0, functionStartIndex); |
| } |
| } catch (Function.FunctionFormatException ffe) { |
| // unknown function so |
| // fallback to VALUE, and fullMetricName |
| } |
| |
| List<Function> functionsList = new ArrayList<>(); |
| functionsList.add(function); |
| metricsFunctions.put(cleanMetricName, functionsList); |
| } |
| |
| return metricsFunctions; |
| } |
| |
| public void putMetricsSkipCache(TimelineMetrics metrics) throws SQLException, IOException { |
| hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, true); |
| } |
| |
| @Override |
| public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException { |
| // Error indicated by the Sql exception |
| TimelinePutResponse response = new TimelinePutResponse(); |
| |
| hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); |
| |
| if (configuration.isCollectorInMemoryAggregationEnabled()) { |
| cache.putMetrics(metrics.getMetrics()); |
| } |
| |
| return response; |
| } |
| |
| @Override |
| public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) |
| throws SQLException, IOException { |
| |
| if (containerMetricsDisabled) { |
| LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored."); |
| return new TimelinePutResponse(); |
| } |
| |
| hBaseAccessor.insertContainerMetrics(metrics); |
| return new TimelinePutResponse(); |
| } |
| |
| @Override |
| public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, |
| boolean includeBlacklistedMetrics) throws SQLException, IOException { |
| return metricMetadataManager.getTimelineMetricMetadataByAppId(appId, metricPattern, includeBlacklistedMetrics); |
| } |
| |
| @Override |
| public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { |
| return metricMetadataManager.getUuid(metricName, appId, instanceId, hostname, false); |
| } |
| |
| @Override |
| public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { |
| Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache(); |
| Map<String, Set<String>> hostAppMap = new HashMap<>(); |
| for (String hostname : hostsMetadata.keySet()) { |
| hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet()); |
| } |
| return hostAppMap; |
| } |
| |
| @Override |
| public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { |
| Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); |
| String hostname = null; |
| for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) { |
| aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate()); |
| hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname; |
| break; |
| } |
| long timestamp = aggregationResult.getTimeInMilis(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format("Adding host %s to aggregated by in-memory aggregator. Timestamp : %s", hostname, timestamp)); |
| } |
| hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); |
| |
| return new TimelinePutResponse(); |
| } |
| |
| @Override |
| public Map<String, Map<String, Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) |
| throws SQLException, IOException { |
| |
| Map<String, TimelineMetricHostMetadata> hostedApps = metricMetadataManager.getHostedAppsCache(); |
| Map<String, Set<String>> instanceHosts = new HashMap<>(); |
| if (configuration.getTimelineMetricsMultipleClusterSupport()) { |
| instanceHosts = metricMetadataManager.getHostedInstanceCache(); |
| } |
| |
| Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>(); |
| |
| if (MapUtils.isEmpty(instanceHosts)) { |
| Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); |
| for (String host : hostedApps.keySet()) { |
| for (String app : hostedApps.get(host).getHostedApps().keySet()) { |
| if (!appHostMap.containsKey(app)) { |
| appHostMap.put(app, new HashSet<String>()); |
| } |
| appHostMap.get(app).add(host); |
| } |
| } |
| instanceAppHosts.put("", appHostMap); |
| } else { |
| for (String instance : instanceHosts.keySet()) { |
| |
| if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) { |
| continue; |
| } |
| Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); |
| instanceAppHosts.put(instance, appHostMap); |
| |
| Set<String> hostsWithInstance = instanceHosts.get(instance); |
| for (String host : hostsWithInstance) { |
| for (String app : hostedApps.get(host).getHostedApps().keySet()) { |
| if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) { |
| continue; |
| } |
| |
| if (!appHostMap.containsKey(app)) { |
| appHostMap.put(app, new HashSet<String>()); |
| } |
| appHostMap.get(app).add(host); |
| } |
| } |
| } |
| } |
| |
| return instanceAppHosts; |
| } |
| |
| @Override |
| public List<String> getLiveInstances() { |
| |
| List<String> instances = null; |
| try { |
| if (haController == null) { |
| // Always return current host as live (embedded operation mode) |
| return Collections.singletonList(configuration.getInstanceHostnameFromEnv()); |
| } |
| instances = haController.getLiveInstanceHostNames(); |
| if (instances == null || instances.isEmpty()) { |
| // fallback |
| instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv()); |
| } |
| } catch (UnknownHostException e) { |
| LOG.debug("Exception on getting hostname from env.", e); |
| } |
| return instances; |
| } |
| |
| @Override |
| public TimelineMetricServiceSummary getTimelineMetricServiceSummary() { |
| return new TimelineMetricServiceSummary(metricMetadataManager, haController); |
| } |
| |
| private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) { |
| if (!aggregator.isDisabled()) { |
| ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactory() { |
| @Override |
| public Thread newThread(Runnable r) { |
| return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); |
| } |
| } |
| ); |
| scheduledExecutors.put(aggregator.getName(), executorService); |
| executorService.scheduleAtFixedRate(aggregator, |
| 0l, |
| aggregator.getSleepIntervalMillis(), |
| TimeUnit.MILLISECONDS); |
| LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + |
| + aggregator.getSleepIntervalMillis() + " milliseconds."); |
| } else { |
| LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); |
| } |
| } |
| } |