AMBARI-22740 : Rename ambari metrics collector package to org.apache.ambari.metrics. (Commit 2)
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index 4165b1e..d21edfc 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.metrics.core.timeline;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
import java.io.IOException;
@@ -92,7 +94,6 @@
/**
* Construct the service.
- *
*/
public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
super(HBaseTimelineMetricsService.class.getName());
@@ -159,8 +160,8 @@
}
}
- defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT, "20"));
- if (Boolean.parseBoolean(metricsConf.get(TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+ defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
+ if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}
@@ -587,7 +588,7 @@
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
- +aggregator.getSleepIntervalMillis() + " milliseconds.");
+ + aggregator.getSleepIntervalMillis() + " milliseconds.");
} else {
LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
index a7793c0..0a8dcc5 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
@@ -18,6 +18,14 @@
package org.apache.ambari.metrics.core.timeline;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getJavaRegexFromSqlRegex;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
@@ -84,7 +92,7 @@
//TODO add config to disable logging
//enable ssl for ignite requests
- if (metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
+ if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
SslContextFactory sslContextFactory = new SslContextFactory();
String keyStorePath = sslConf.get("ssl.server.keystore.location");
String keyStorePassword = sslConf.get("ssl.server.keystore.password");
@@ -100,11 +108,11 @@
//aggregation parameters
appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
- interpolationEnabled = Boolean.parseBoolean(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
- cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
- Long aggregationInterval = metricConf.getLong(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
+ interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
+ cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+ Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
- String filteredMetricPatterns = metricConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+ String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
if (!StringUtils.isEmpty(filteredMetricPatterns)) {
LOG.info("Skipping aggregation for metric patterns : " + filteredMetricPatterns);
for (String patternString : filteredMetricPatterns.split(",")) {
@@ -113,10 +121,10 @@
}
}
- if (metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
+ if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
- ipFinder.setAddresses(Arrays.asList(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
+ ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
discoverySpi.setIpFinder(ipFinder);
igniteConfiguration.setDiscoverySpi(discoverySpi);
@@ -143,7 +151,7 @@
cacheConfiguration.setName("metrics_cache");
//set cache mode to partitioned with # of backups
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
- cacheConfiguration.setBackups(metricConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
+ cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
//disable throttling due to cpu impact
cacheConfiguration.setRebalanceThrottle(0);
//enable locks
@@ -214,7 +222,7 @@
putMetricIntoCache(metricDoubleEntry.getKey(), newMetricClusterAggregate);
if (hostMetadata != null) {
//calculate app host metric
- if (metric.getAppId().equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
+ if (metric.getAppId().equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
if (hostMetadata.containsKey(metric.getHostName())) {
updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), newMetricClusterAggregate, hostMetadata.get(metric.getHostName()));
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
index 190ad9a..4c62366 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -36,6 +36,9 @@
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+
/**
* Aggregator responsible for providing app level host aggregates. This task
* is accomplished without doing a round trip to storage, rather
@@ -91,7 +94,7 @@
}
// If metric is a host metric and host has apps on it
- if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
+ if (appId.equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
if (hostMetadata.containsKey(hostname)) {
updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
@@ -128,7 +131,7 @@
return;
}
- TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+ TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps();
for (String appId : apps.keySet()) {
if (appIdsToAggregate.contains(appId)) {
@@ -136,7 +139,7 @@
appKey.setAppId(appId);
TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (appMetadata == null) {
- TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
+ TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key);
if (hostMetricMetadata != null) {
@@ -178,7 +181,7 @@
}
private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
- String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS);
+ String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
if (!StringUtils.isEmpty(appIds)) {
return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
index 371d9fa..b0aec2f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
@@ -33,6 +33,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class);
private TimelineMetricMetadataManager metricMetadataManager;
@@ -84,7 +86,7 @@
endTime, null, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
index c25d6ce..6f2351b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -36,6 +36,8 @@
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
TimelineMetricReadHelper readHelper;
@@ -73,7 +75,7 @@
endTime, null, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 9e8df6d..f8757a4 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
+
public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
@@ -62,7 +64,7 @@
EmptyCondition condition = new EmptyCondition();
condition.setDoUpdate(true);
- condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+ condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
outputTableName, endTime, tableName,
getDownsampledMetricSkipClause(), startTime, endTime));
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
index 97eb7b1..ff24c10 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java
@@ -50,6 +50,12 @@
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS;
+
public class TimelineMetricMetadataManager {
private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class);
private boolean isDisabled = false;
@@ -82,7 +88,7 @@
public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) {
this.metricsConf = metricsConf;
this.hBaseAccessor = hBaseAccessor;
- String patternStrings = metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS);
+ String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS);
if (!StringUtils.isEmpty(patternStrings)) {
metricNameFilters.addAll(Arrays.asList(patternStrings.split(",")));
}
@@ -98,14 +104,14 @@
* Initialize Metadata from the store
*/
public void initializeMetadata() {
- if (metricsConf.getBoolean(TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT, false)) {
+ if (metricsConf.getBoolean(DISABLE_METRIC_METADATA_MGMT, false)) {
isDisabled = true;
} else {
metricMetadataSync = new TimelineMetricMetadataSync(this);
// Schedule the executor to sync to store
executorService.scheduleWithFixedDelay(metricMetadataSync,
- metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
- metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
+ metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
+ metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
TimeUnit.SECONDS);
// Read from store and initialize map
try {
@@ -330,7 +336,7 @@
* @return
*/
private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) {
- String strategy = configuration.get(TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
+ String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
if ("random".equalsIgnoreCase(strategy)) {
return new RandomUuidGenStrategy();
} else {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index fda6214..819a20e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -345,7 +345,6 @@
"MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " +
"SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
-
public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
index 52abc1e..4c0920c 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
public class DefaultFSSinkProvider implements ExternalSinkProvider {
private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class);
TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
@@ -64,7 +66,7 @@
@Override
public int getFlushSeconds() {
try {
- return conf.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
} catch (Exception e) {
LOG.warn("Cannot read cache commit interval.");
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
index 1ce624b..9935d38 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
@@ -35,6 +35,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_ACKS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_RETRIES;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_SERVERS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
/*
This will be used by the single Metrics committer thread. Hence it is
important to make this non-blocking export.
@@ -54,15 +63,15 @@
Properties configProperties = new Properties();
try {
- configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_SERVERS));
- configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_ACKS, "all"));
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS));
+ configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all"));
// Avoid duplicates - No transactional semantics
- configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_RETRIES, 0));
- configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_BATCH_SIZE, 128));
- configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_LINGER_MS, 1));
- configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(TimelineMetricConfiguration.KAFKA_BUFFER_MEM, 33554432)); // 32 MB
- FLUSH_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
- TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS, 10);
+ configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0));
+ configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
+ configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
+ configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+ FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
} catch (Exception e) {
LOG.error("Configuration error!", e);
throw new ExceptionInInitializerError(e);