AMBARI-22343. Add ability in AMS to tee metrics to a set of configured Kafka brokers. (swagle)
diff --git a/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics-anomaly-detection-service/pom.xml
index 554d026..e96e957 100644
--- a/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics-anomaly-detection-service/pom.xml
@@ -296,6 +296,12 @@
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -419,6 +425,11 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
diff --git a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9ca9e95..bd88d57 100644
--- a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -2,9 +2,6 @@
applicationConnectors:
- type: http
port: 9999
- adminConnectors:
- - type: http
- port: 9990
requestLog:
type: external
diff --git a/ambari-metrics-timelineservice/pom.xml b/ambari-metrics-timelineservice/pom.xml
index 3d119f9..7794a11 100644
--- a/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics-timelineservice/pom.xml
@@ -80,7 +80,8 @@
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<includeScope>compile</includeScope>
<excludeScope>test</excludeScope>
- <excludeArtifactIds>jasper-runtime,jasper-compiler</excludeArtifactIds>
+ <excludeArtifactIds>jasper-runtime,jasper-compiler
+ </excludeArtifactIds>
</configuration>
</execution>
</executions>
@@ -125,11 +126,13 @@
<source>
<location>target/lib</location>
<excludes>
- <exclude>*tests.jar</exclude>
+ <exclude>*tests.jar</exclude>
</excludes>
</source>
<source>
- <location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
+ <location>
+ ${project.build.directory}/${project.artifactId}-${project.version}.jar
+ </location>
</source>
</sources>
</mapping>
@@ -214,7 +217,9 @@
<location>conf/unix/amshbase_metrics_whitelist</location>
</source>
<source>
- <location>target/embedded/${hbase.folder}/conf/hbase-site.xml</location>
+ <location>
+ target/embedded/${hbase.folder}/conf/hbase-site.xml
+ </location>
</source>
</sources>
</mapping>
@@ -287,7 +292,8 @@
<skip>true</skip>
<attach>false</attach>
<submodules>false</submodules>
- <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+ <controlDir>${project.basedir}/../src/main/package/deb/control
+ </controlDir>
</configuration>
</plugin>
</plugins>
@@ -657,23 +663,29 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-testing-util</artifactId>
- <version>${hbase.version}</version>
- <scope>test</scope>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>zookeeper</artifactId>
- <groupId>org.apache.zookeeper</groupId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.11.0.1</version>
+ </dependency>
+
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
@@ -731,17 +743,17 @@
</goals>
<configuration>
<target name="Download HBase">
- <mkdir dir="${project.build.directory}/embedded" />
+ <mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${hbase.tar}"
- dest="${project.build.directory}/embedded/hbase.tar.gz"
- usetimestamp="true"
- />
+ src="${hbase.tar}"
+ dest="${project.build.directory}/embedded/hbase.tar.gz"
+ usetimestamp="true"
+ />
<untar
- src="${project.build.directory}/embedded/hbase.tar.gz"
- dest="${project.build.directory}/embedded"
- compression="gzip"
- />
+ src="${project.build.directory}/embedded/hbase.tar.gz"
+ dest="${project.build.directory}/embedded"
+ compression="gzip"
+ />
</target>
</configuration>
</execution>
@@ -755,19 +767,19 @@
<target name="Download Phoenix">
<mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${phoenix.tar}"
- dest="${project.build.directory}/embedded/phoenix.tar.gz"
- usetimestamp="true"
- />
+ src="${phoenix.tar}"
+ dest="${project.build.directory}/embedded/phoenix.tar.gz"
+ usetimestamp="true"
+ />
<untar
- src="${project.build.directory}/embedded/phoenix.tar.gz"
- dest="${project.build.directory}/embedded"
- compression="gzip"
- />
+ src="${project.build.directory}/embedded/phoenix.tar.gz"
+ dest="${project.build.directory}/embedded"
+ compression="gzip"
+ />
<move
- file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
- tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
- />
+ file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
+ tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
+ />
</target>
</configuration>
</execution>
@@ -798,24 +810,24 @@
</goals>
<configuration>
<target name="Download HBase">
- <mkdir dir="${project.build.directory}/embedded" />
+ <mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${hbase.winpkg.zip}"
- dest="${project.build.directory}/embedded/hbase.zip"
- usetimestamp="true"
- />
+ src="${hbase.winpkg.zip}"
+ dest="${project.build.directory}/embedded/hbase.zip"
+ usetimestamp="true"
+ />
<unzip
- src="${project.build.directory}/embedded/hbase.zip"
- dest="${project.build.directory}/embedded/hbase.temp"
- />
+ src="${project.build.directory}/embedded/hbase.zip"
+ dest="${project.build.directory}/embedded/hbase.temp"
+ />
<unzip
- src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
- dest="${project.build.directory}/embedded"
- />
+ src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
+ dest="${project.build.directory}/embedded"
+ />
<copy
- file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
- tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
- />
+ file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
+ tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
+ />
</target>
</configuration>
</execution>
@@ -854,7 +866,8 @@
<!-- The configuration of the plugin -->
<configuration>
<!-- Configuration of the archiver -->
- <finalName>${project.artifactId}-simulator-${project.version}</finalName>
+ <finalName>${project.artifactId}-simulator-${project.version}
+ </finalName>
<archive>
<!-- Manifest specific configuration -->
<manifest>
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index f8d31f7..65b4614 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -120,7 +120,6 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -211,7 +210,7 @@
private HashMap<String, String> tableTTL = new HashMap<>();
private final TimelineMetricConfiguration configuration;
- private InternalMetricsSource rawMetricsSource;
+ private List<InternalMetricsSource> rawMetricsSources;
public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -278,15 +277,17 @@
LOG.info("Initialized aggregator sink class " + metricSinkClass);
}
- ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
+ List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList();
InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
- if (externalSinkProvider != null) {
- ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
- int interval = configuration.getExternalSinkInterval(RAW_METRICS);
- if (interval == -1){
- interval = cacheCommitInterval;
+ if (!externalSinkProviderList.isEmpty()) {
+ for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) {
+ ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+ int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS);
+ if (interval == -1) {
+ interval = cacheCommitInterval;
+ }
+ rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink));
}
- rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
}
TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
}
@@ -303,8 +304,10 @@
}
if (metricsList.size() > 0) {
commitMetrics(metricsList);
- if (rawMetricsSource != null) {
- rawMetricsSource.publishTimelineMetrics(metricsList);
+ if (!rawMetricsSources.isEmpty()) {
+ for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
+ rawMetricsSource.publishTimelineMetrics(metricsList);
+ }
}
}
}
@@ -316,10 +319,8 @@
private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
PreparedStatement metricRecordStmt = null;
try {
-
Map<String, String> metricMetadata = metric.getMetadata();
-
-
+
byte[] uuid = metadataManagerInstance.getUuid(metric);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 929fc8c..395ec7b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -26,6 +26,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -51,8 +52,6 @@
* Configuration class that reads properties from ams-site.xml. All values
* for time or intervals are given in seconds.
*/
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
public class TimelineMetricConfiguration {
private static final Log LOG = LogFactory.getLog(TimelineMetricConfiguration.class);
@@ -343,14 +342,22 @@
public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";
public static final String INTERNAL_CACHE_HEAP_PERCENT =
- "timeline.metrics.service.cache.%s.heap.percent";
+ "timeline.metrics.internal.cache.%s.heap.percent";
public static final String EXTERNAL_SINK_INTERVAL =
- "timeline.metrics.service.external.sink.%s.interval";
+ "timeline.metrics.external.sink.%s.%s.interval";
public static final String DEFAULT_EXTERNAL_SINK_DIR =
- "timeline.metrics.service.external.sink.dir";
+ "timeline.metrics.external.sink.dir";
+ public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
+ public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
+ public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
+ public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
+ public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
+ public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
+ public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsSslConf;
@@ -601,8 +608,24 @@
return false;
}
- public int getExternalSinkInterval(SOURCE_NAME sourceName) {
- return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+ /**
+ * Get the sink interval for a metrics source.
+ * Determines how often the metrics will be written to the sink.
+ * This determines whether any caching will be needed on the collector
+ * side, default interval disables caching by writing at the same time as
+ * we get data.
+ *
+ * @param sinkProviderClassName Simple name of your implementation of {@link ExternalSinkProvider}
+ * @param sourceName {@link SOURCE_NAME}
+ * @return seconds
+ */
+ public int getExternalSinkInterval(String sinkProviderClassName,
+ SOURCE_NAME sourceName) {
+ String sinkProviderSimpleClassName = sinkProviderClassName.substring(
+ sinkProviderClassName.lastIndexOf(".") + 1);
+
+ return Integer.parseInt(metricsConf.get(
+ String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, sourceName), "-1"));
}
public InternalSourceProvider getInternalSourceProvider() {
@@ -612,12 +635,18 @@
return ReflectionUtils.newInstance(providerClass, metricsConf);
}
- public ExternalSinkProvider getExternalSinkProvider() {
- Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
- if (providerClass != null) {
- return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+ /**
+ * List of external sink provider classes. Comma-separated.
+ */
+ public List<ExternalSinkProvider> getExternalSinkProviderList() {
+ Class<?>[] providerClasses = metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+ List<ExternalSinkProvider> providerList = new ArrayList<>();
+ if (providerClasses != null) {
+ for (Class<?> providerClass : providerClasses) {
+ providerList.add((ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf));
+ }
}
- return null;
+ return providerList;
}
public String getInternalCacheHeapPercent(String instanceName) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
index 48887d9..7c7683b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
@@ -1,8 +1,3 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,9 +16,14 @@
* limitations under the License.
*/
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+
/**
* Configurable provider for sink classes that match the metrics sources.
- * Provider can return same sink of different sinks for each source.
+ * Provider can return same sink or different sinks for each source.
*/
public interface ExternalSinkProvider {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
index bb84c8a..9c2a93e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
@@ -92,7 +92,7 @@
@Override
public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
- return null;
+ return new DefaultHttpMetricsSink();
}
protected HttpURLConnection getConnection(String spec) throws IOException {
@@ -147,7 +147,7 @@
@Override
public int getSinkTimeOutSeconds() {
try {
- return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10);
+ return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10);
} catch (Exception e) {
return 10;
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
new file mode 100644
index 0000000..3b34b55
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_ACKS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SERVERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/*
+ This will be used by the single Metrics committer thread. Hence it is
+ important to make this non-blocking export.
+ */
+public class KafkaSinkProvider implements ExternalSinkProvider {
+ private static String TOPIC_NAME = "ambari-metrics-topic";
+ private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
+
+ private Producer producer;
+ private int TIMEOUT_SECONDS = 10;
+ private int FLUSH_SECONDS = 3;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ public KafkaSinkProvider() {
+ TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+ Properties configProperties = new Properties();
+ try {
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS));
+ configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all"));
+ // Avoid duplicates - No transactional semantics
+ configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0));
+ configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
+ configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
+ configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+ FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
+ } catch (Exception e) {
+ LOG.error("Configuration error!", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+
+
+
+ producer = new KafkaProducer(configProperties);
+ }
+
+ @Override
+ public ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName) {
+ switch (sourceName) {
+ case RAW_METRICS:
+ return new KafkaRawMetricsSink();
+ default:
+ throw new UnsupportedOperationException("Provider does not support " +
+ "the expected source " + sourceName);
+ }
+ }
+
+ class KafkaRawMetricsSink implements ExternalMetricsSink {
+
+ @Override
+ public int getSinkTimeOutSeconds() {
+ return TIMEOUT_SECONDS;
+ }
+
+ @Override
+ public int getFlushSeconds() {
+ return FLUSH_SECONDS;
+ }
+
+ @Override
+ public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+ JsonNode jsonNode = objectMapper.valueToTree(metrics);
+ ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
+ Future<RecordMetadata> f = producer.send(rec);
+ }
+ }
+
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
index b97c39f..c6b071f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -24,7 +24,7 @@
public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider {
private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
- // TODO: Implement read based sources for higher level data
+ // TODO: Implement read based sources for higher order data
@Override
public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
if (sink == null) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
index 967d819..879577a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
@@ -63,21 +63,14 @@
}
private void initializeFixedRateScheduler() {
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- rawMetricsSink.sinkMetricData(cache.evictAll());
- }
- }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+ executorService.scheduleAtFixedRate(() -> rawMetricsSink.sinkMetricData(cache.evictAll()),
+ rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
}
private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
- Future f = executorService.submit(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- rawMetricsSink.sinkMetricData(metrics);
- return null;
- }
+ Future f = executorService.submit(() -> {
+ rawMetricsSink.sinkMetricData(metrics);
+ return null;
});
try {
f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);