[AMBARI-24557] Remove legacy storm sink module from ambari-metrics. (#2193)
diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index f89b9df..4f36e5a 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -39,7 +39,6 @@
<grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
- <storm-sink-legacy.dir>${project.basedir}/../ambari-metrics-storm-sink-legacy</storm-sink-legacy.dir>
<flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
<kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
<python.ver>python >= 2.6</python.ver>
@@ -53,7 +52,6 @@
<deb.dependency.list>${deb.python.ver},python-dev,gcc</deb.dependency.list>
<hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
<storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
- <storm.sink.legacy.jar>ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</storm.sink.legacy.jar>
<flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
<kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
</properties>
@@ -424,14 +422,6 @@
</sources>
</mapping>
<mapping>
- <directory>/usr/lib/storm/lib</directory>
- <sources>
- <source>
- <location>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</location>
- </source>
- </sources>
- </mapping>
- <mapping>
<directory>/usr/lib/ambari-metrics-kafka-sink</directory>
<sources>
<source>
@@ -1019,16 +1009,6 @@
<prefix>/usr/lib/storm/lib</prefix>
</mapper>
</data>
- <data>
- <src>${storm-sink-legacy.dir}/target/${storm.sink.legacy.jar}</src>
- <type>file</type>
- <mapper>
- <type>perm</type>
- <filemode>644</filemode>
- <dirmode>755</dirmode>
- <prefix>/usr/lib/storm/lib</prefix>
- </mapper>
- </data>
<!-- kafka sink -->
@@ -1337,11 +1317,6 @@
</dependency>
<dependency>
<groupId>org.apache.ambari</groupId>
- <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.ambari</groupId>
<artifactId>ambari-metrics-kafka-sink</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/ambari-metrics-assembly/src/main/assembly/sink-windows.xml b/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
index 14b49b2..e82d2d4 100644
--- a/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
+++ b/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
@@ -39,10 +39,6 @@
<outputDirectory>hadoop-sink/conf</outputDirectory>
</fileSet>
<fileSet>
- <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
- <outputDirectory>hadoop-sink/conf</outputDirectory>
- </fileSet>
- <fileSet>
<directory>${kafka-sink.dir}/target/lib</directory>
<outputDirectory>hadoop-sink/lib</outputDirectory>
</fileSet>
@@ -62,10 +58,6 @@
<outputDirectory>hadoop-sink</outputDirectory>
</file>
<file>
- <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
- <outputDirectory>hadoop-sink</outputDirectory>
- </file>
- <file>
<source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
<outputDirectory>hadoop-sink</outputDirectory>
</file>
diff --git a/ambari-metrics-assembly/src/main/assembly/sink.xml b/ambari-metrics-assembly/src/main/assembly/sink.xml
index 34cdbc3..1400c7b 100644
--- a/ambari-metrics-assembly/src/main/assembly/sink.xml
+++ b/ambari-metrics-assembly/src/main/assembly/sink.xml
@@ -39,10 +39,6 @@
<outputDirectory>hadoop-sink/conf</outputDirectory>
</fileSet>
<fileSet>
- <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
- <outputDirectory>hadoop-sink/conf</outputDirectory>
- </fileSet>
- <fileSet>
<directory>${kafka-sink.dir}/target/lib</directory>
<outputDirectory>hadoop-sink/lib</outputDirectory>
</fileSet>
@@ -66,11 +62,6 @@
</file>
<file>
<fileMode>644</fileMode>
- <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
- <outputDirectory>hadoop-sink</outputDirectory>
- </file>
- <file>
- <fileMode>644</fileMode>
<source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
<outputDirectory>hadoop-sink</outputDirectory>
</file>
diff --git a/ambari-metrics-storm-sink-legacy/pom.xml b/ambari-metrics-storm-sink-legacy/pom.xml
deleted file mode 100644
index 4fc4d17..0000000
--- a/ambari-metrics-storm-sink-legacy/pom.xml
+++ /dev/null
@@ -1,207 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>ambari-metrics</artifactId>
- <groupId>org.apache.ambari</groupId>
- <version>2.0.0.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
- <version>2.0.0.0-SNAPSHOT</version>
- <name>Ambari Metrics Storm Sink (Legacy)</name>
- <packaging>jar</packaging>
-
- <properties>
- <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
- <storm.version>0.10.0.2.3.0.0-2557</storm.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.0</version>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <id>parse-version</id>
- <phase>validate</phase>
- <goals>
- <goal>parse-version</goal>
- </goals>
- </execution>
- <execution>
- <id>regex-property</id>
- <goals>
- <goal>regex-property</goal>
- </goals>
- <configuration>
- <name>ambariVersion</name>
- <value>${project.version}</value>
- <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
- <replacement>$1.$2.$3.$4</replacement>
- <failIfNoMatch>false</failIfNoMatch>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.github.goldin</groupId>
- <artifactId>copy-maven-plugin</artifactId>
- <version>0.2.5</version>
- <executions>
- <execution>
- <id>create-archive</id>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <outputFile>${project.build.directory}/${project.artifactId}-with-common-${project.version}.jar</outputFile>
- <minimizeJar>false</minimizeJar>
- <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
- <artifactSet>
- <includes>
- <include>org.apache.ambari:ambari-metrics-common</include>
- <include>org.codehaus.jackson:jackson-mapper-asl</include>
- <include>org.codehaus.jackson:jackson-core-asl</include>
- <include>org.codehaus.jackson:jackson-xc</include>
- <include>org.apache.hadoop:hadoop-annotations</include>
- <include>commons-logging:commons-logging</include>
- <include>org.apache.commons:commons-lang3</include>
- <include>commons-codec:commons-codec</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>org.apache.commons.logging</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.logging</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.hadoop.classification</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.hadoop.classification</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.codehaus.jackson</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jackson</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons.lang3</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang3</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons.codec</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.codec</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.vafer</groupId>
- <artifactId>jdeb</artifactId>
- <version>1.0.1</version>
- <executions>
- <execution>
- <!--Stub execution on direct plugin call - workaround for ambari deb build process-->
- <id>stub-execution</id>
- <phase>none</phase>
- <goals>
- <goal>jdeb</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <skip>true</skip>
- <attach>false</attach>
- <submodules>false</submodules>
- <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.3.2</version>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.8</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.ambari</groupId>
- <artifactId>ambari-metrics-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- <version>4.10</version>
- </dependency>
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <version>3.2</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-easymock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml b/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
deleted file mode 100644
index 35738b1..0000000
--- a/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly>
- <id>empty</id>
- <formats/>
-</assembly>
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
deleted file mode 100644
index 842fad8..0000000
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.metric.IClusterReporter;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
- implements IClusterReporter {
-
- public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
- public static final String APP_ID = "appId";
-
- private String hostname;
- private String collectorUri;
- private String port;
- private Collection<String> collectorHosts;
- private String zkQuorum;
- private String protocol;
- private boolean setInstanceId;
- private String instanceId;
- private NimbusClient nimbusClient;
- private String applicationId;
- private int timeoutSeconds;
- private boolean hostInMemoryAggregationEnabled;
- private int hostInMemoryAggregationPort;
- private String hostInMemoryAggregationProtocol;
-
- public StormTimelineMetricsReporter() {
-
- }
-
- @Override
- protected String getCollectorUri(String host) {
- return constructTimelineMetricUri(protocol, host, port);
- }
-
- @Override
- protected String getCollectorProtocol() {
- return protocol;
- }
-
- @Override
- protected int getTimeoutSeconds() {
- return timeoutSeconds;
- }
-
- @Override
- protected String getZookeeperQuorum() {
- return zkQuorum;
- }
-
- @Override
- protected Collection<String> getConfiguredCollectorHosts() {
- return collectorHosts;
- }
-
- @Override
- protected String getCollectorPort() {
- return port;
- }
-
- @Override
- protected String getHostname() {
- return hostname;
- }
-
- @Override
- protected boolean isHostInMemoryAggregationEnabled() {
- return hostInMemoryAggregationEnabled;
- }
-
- @Override
- protected int getHostInMemoryAggregationPort() {
- return hostInMemoryAggregationPort;
- }
-
- @Override
- protected String getHostInMemoryAggregationProtocol() {
- return hostInMemoryAggregationProtocol;
- }
-
- @Override
- public void prepare(Map conf) {
- LOG.info("Preparing Storm Metrics Reporter");
- try {
- try {
- hostname = InetAddress.getLocalHost().getHostName();
- // If not FQDN , call DNS
- if ((hostname == null) || (!hostname.contains("."))) {
- hostname = InetAddress.getLocalHost().getCanonicalHostName();
- }
- } catch (UnknownHostException e) {
- LOG.error("Could not identify hostname.");
- throw new RuntimeException("Could not identify hostname.", e);
- }
- Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
- Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
- Map stormConf = Utils.readStormConfig();
- this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-
- collectorHosts = parseHostsStringIntoCollection(cf.get(COLLECTOR_HOSTS_PROPERTY).toString());
- protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http";
- port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188";
- Object zkQuorumObj = cf.get(COLLECTOR_ZOOKEEPER_QUORUM);
- if (zkQuorumObj != null) {
- zkQuorum = zkQuorumObj.toString();
- } else {
- zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null;
- }
-
- timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
- Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
- DEFAULT_POST_TIMEOUT_SECONDS;
- applicationId = cf.get(APP_ID).toString();
- if (cf.containsKey(SET_INSTANCE_ID_PROPERTY)) {
- setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
- instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
- }
- hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY) != null ?
- cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString() : "false");
- hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY) != null ?
- cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString() : "61888");
- hostInMemoryAggregationProtocol = cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY) != null ?
- cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY).toString() : "http";
-
- collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
- if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
- String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
- String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
- String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
- loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
- }
- } catch (Exception e) {
- LOG.warn("Could not initialize metrics collector, please specify " +
- "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
- }
- // Initialize the collector write strategy
- super.init();
- }
-
- @Override
- public void reportMetrics() throws Exception {
- List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
- ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
- long currentTimeMillis = System.currentTimeMillis();
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
-
- List<SupervisorSummary> sups = cs.get_supervisors();
- int totalSlots = 0;
- int usedSlots = 0;
- for (SupervisorSummary ssum : sups) {
- totalSlots += ssum.get_num_workers();
- usedSlots += ssum.get_num_used_workers();
- }
- int freeSlots = totalSlots - usedSlots;
-
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Total Slots", String.valueOf(totalSlots)));
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Used Slots", String.valueOf(usedSlots)));
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Free Slots", String.valueOf(freeSlots)));
-
- List<TopologySummary> topos = cs.get_topologies();
- int totalExecutors = 0;
- int totalTasks = 0;
- for (TopologySummary topo : topos) {
- totalExecutors += topo.get_num_executors();
- totalTasks += topo.get_num_tasks();
- }
-
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Total Executors", String.valueOf(totalExecutors)));
- totalMetrics.add(createTimelineMetric(currentTimeMillis,
- applicationId, "Total Tasks", String.valueOf(totalTasks)));
-
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- timelineMetrics.setMetrics(totalMetrics);
-
- try {
- emitMetrics(timelineMetrics);
- } catch (UnableToConnectException e) {
- LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
- }
-
- }
-
- private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
- TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(attributeName);
- timelineMetric.setHostName(hostname);
- if (setInstanceId) {
- timelineMetric.setInstanceId(instanceId);
- }
- timelineMetric.setAppId(component);
- timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
- return timelineMetric;
- }
-
-}
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
deleted file mode 100644
index e3494fd..0000000
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
-
-public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
- private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" };
-
- // create String manually in order to not rely on Guava Joiner or having our own
- private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
-
- public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
- public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
- public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
-
- private String collectorUri;
- private TimelineMetricsCache metricsCache;
- private String hostname;
- private int timeoutSeconds;
- private Collection<String> collectorHosts;
- private String zkQuorum;
- private String protocol;
- private String port;
- private String topologyName;
- private String applicationId;
- private boolean setInstanceId;
- private String instanceId;
- private boolean hostInMemoryAggregationEnabled;
- private int hostInMemoryAggregationPort;
- private String hostInMemoryAggregationProtocol;
-
- @Override
- protected String getCollectorUri(String host) {
- return constructTimelineMetricUri(protocol, host, port);
- }
-
- @Override
- protected String getCollectorProtocol() {
- return protocol;
- }
-
- @Override
- protected int getTimeoutSeconds() {
- return timeoutSeconds;
- }
-
- @Override
- protected String getZookeeperQuorum() {
- return zkQuorum;
- }
-
- @Override
- protected Collection<String> getConfiguredCollectorHosts() {
- return collectorHosts;
- }
-
- @Override
- protected String getCollectorPort() {
- return port;
- }
-
- @Override
- protected String getHostname() {
- return hostname;
- }
-
- @Override
- protected boolean isHostInMemoryAggregationEnabled() {
- return hostInMemoryAggregationEnabled;
- }
-
- @Override
- protected int getHostInMemoryAggregationPort() {
- return hostInMemoryAggregationPort;
- }
-
- @Override
- protected String getHostInMemoryAggregationProtocol() {
- return hostInMemoryAggregationProtocol;
- }
-
- @Override
- public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
- LOG.info("Preparing Storm Metrics Sink");
- try {
- hostname = InetAddress.getLocalHost().getHostName();
- //If not FQDN , call DNS
- if ((hostname == null) || (!hostname.contains("."))) {
- hostname = InetAddress.getLocalHost().getCanonicalHostName();
- }
- } catch (UnknownHostException e) {
- LOG.error("Could not identify hostname.");
- throw new RuntimeException("Could not identify hostname.", e);
- }
- Configuration configuration = new Configuration("/storm-metrics2.properties");
- timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
- String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
- int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
- String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
- int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
- String.valueOf(MAX_EVICTION_TIME_MILLIS)));
- applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
- metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
- collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
- zkQuorum = configuration.getProperty("zookeeper.quorum");
- protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
- port = configuration.getProperty(COLLECTOR_PORT, "6188");
-
- instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
- setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
- hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
- hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
- hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
- // Initialize the collector write strategy
- super.init();
-
- if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
- String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
- String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
- String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
- loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
- }
- this.topologyName = removeNonce(topologyContext.getStormId());
- warnIfTopologyNameContainsWarnString(topologyName);
- }
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
- List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-
- for (DataPoint dataPoint : dataPoints) {
- LOG.debug(dataPoint.name + " = " + dataPoint.value);
- List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
-
- for (DataPoint populatedDataPoint : populatedDataPoints) {
- String metricName;
- if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) {
- metricName = createKafkaOffsetMetricName(populatedDataPoint.name);
- } else {
- metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
- taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
- }
-
- LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
-
- TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
- taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString()));
-
- // Put intermediate values into the cache until it is time to send
- metricsCache.putTimelineMetric(timelineMetric);
-
- TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
-
- if (cachedMetric != null) {
- metricList.add(cachedMetric);
- }
- }
- }
-
- if (!metricList.isEmpty()) {
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- timelineMetrics.setMetrics(metricList);
-
- try {
- emitMetrics(timelineMetrics);
- } catch (UnableToConnectException uce) {
- LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
- }
- }
- }
-
- @Override
- public void cleanup() {
- LOG.info("Stopping Storm Metrics Sink");
- }
-
- // purpose just for testing
- void setTopologyName(String topologyName) {
- this.topologyName = topologyName;
- }
-
- private String removeNonce(String topologyId) {
- return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
- }
-
- private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
- List<DataPoint> dataPoints = new ArrayList<>();
-
- if (dataPoint.value == null) {
- LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
- } else if (dataPoint.value instanceof Map) {
- Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
-
- for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
- Double value = convertValueToDouble(entry.getKey(), entry.getValue());
- if (value != null) {
- dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
- }
- }
- } else {
- Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
- if (value != null) {
- dataPoints.add(new DataPoint(dataPoint.name, value));
- }
- }
-
- return dataPoints;
- }
-
- private Double convertValueToDouble(String metricName, Object value) {
- if (value instanceof Number) {
- return ((Number) value).doubleValue();
- } else if (value instanceof String) {
- try {
- return Double.parseDouble((String) value);
- } catch (NumberFormatException e) {
- LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
- value + ". Discarding.");
- }
-
- return null;
- } else {
- LOG.warn("Data point with name " + metricName + " has value " + value +
- " which is not supported. Discarding.");
-
- return null;
- }
- }
-
- private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
- String attributeName, Double attributeValue) {
- TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(attributeName);
- timelineMetric.setHostName(hostName);
- if (setInstanceId) {
- timelineMetric.setInstanceId(instanceId);
- }
- timelineMetric.setAppId(applicationId);
- timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.setType(ClassUtils.getShortCanonicalName(
- attributeValue, "Number"));
- timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
- return timelineMetric;
- }
-
- private String createMetricName(String componentId, String workerHost, int workerPort, int taskId,
- String attributeName) {
- // <topology name>.<component name>.<worker host>.<worker port>.<task id>.<metric name>
- String metricName = "topology." + topologyName + "." + componentId + "." + workerHost + "." + workerPort +
- "." + taskId + "." + attributeName;
-
- // since '._' is treat as special character (separator) so it should be replaced
- return metricName.replace('_', '-');
- }
-
- private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) {
- // get rid of "kafkaOffset."
- // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName>
- String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length());
-
- String[] slashSplittedNames = tempMetricName.split("/");
-
- if (slashSplittedNames.length == 1) {
- // unknown metrics
- throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName);
- }
-
- String topic = slashSplittedNames[0];
- String metricName = "topology." + topologyName + ".kafka-topic." + topic;
- if (slashSplittedNames.length > 2) {
- // partition level
- metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2];
- } else {
- // topic level
- metricName = metricName + "." + slashSplittedNames[1];
- }
-
- // since '._' is treat as special character (separator) so it should be replaced
- return metricName.replace('_', '-');
- }
-
- private void warnIfTopologyNameContainsWarnString(String name) {
- for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
- if (name.contains(warn)) {
- LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" which can be problematic for AMS.");
- LOG.warn("Encouraged to not using any of these strings: " + JOINED_WARN_STRINGS_FOR_MESSAGE);
- LOG.warn("Same suggestion applies to component name.");
- }
- }
- }
-
- public void setMetricsCache(TimelineMetricsCache metricsCache) {
- this.metricsCache = metricsCache;
- }
-
-}
diff --git a/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
deleted file mode 100644
index 3b3e236..0000000
--- a/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-
-public class StormTimelineMetricsSinkTest {
- @Test
- public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
- StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
- stormTimelineMetricsSink.setTopologyName("topology1");
- TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
- replay(timelineMetricsCache);
- stormTimelineMetricsSink.handleDataPoints(
- new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
- Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
- verify(timelineMetricsCache);
- }
-
- @Test
- @Ignore // TODO: Fix for failover
- public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
- StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
- stormTimelineMetricsSink.setTopologyName("topology1");
- TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
- .andReturn(new TimelineMetric()).once();
- timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
- expectLastCall().once();
- stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
- replay(timelineMetricsCache);
- stormTimelineMetricsSink.handleDataPoints(
- new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
- Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
- verify(timelineMetricsCache);
- }
-
- @Test
- @Ignore // TODO: Fix for failover
- public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
- StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
- stormTimelineMetricsSink.setTopologyName("topology1");
- TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
- .andReturn(new TimelineMetric()).once();
- timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
- expectLastCall().once();
- stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
- replay(timelineMetricsCache);
- stormTimelineMetricsSink.handleDataPoints(
- new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
- Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42)));
- verify(timelineMetricsCache);
- }
-
- @Test
- @Ignore // TODO: Fix for failover
- public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
- StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
- stormTimelineMetricsSink.setTopologyName("topology1");
- TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
- .andReturn(new TimelineMetric()).once();
- timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
- expectLastCall().once();
- stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
- replay(timelineMetricsCache);
- stormTimelineMetricsSink.handleDataPoints(
- new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
- Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42)));
- verify(timelineMetricsCache);
- }
-
- @Test
- @Ignore // TODO: Fix for failover
- public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
- StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
- stormTimelineMetricsSink.setTopologyName("topology1");
- TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
- .andReturn(new TimelineMetric()).once();
- expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
- .andReturn(new TimelineMetric()).once();
- timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
- expectLastCall().once();
- stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
- replay(timelineMetricsCache);
-
- Map<String, Object> valueMap = new HashMap<>();
- valueMap.put("field1", 53);
- valueMap.put("field2", 64.12);
- stormTimelineMetricsSink.handleDataPoints(
- new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
- Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
- verify(timelineMetricsCache);
- }
-}
diff --git a/pom.xml b/pom.xml
index c6e9335..7b5f02f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,6 @@
<module>ambari-metrics-flume-sink</module>
<module>ambari-metrics-kafka-sink</module>
<module>ambari-metrics-storm-sink</module>
- <module>ambari-metrics-storm-sink-legacy</module>
<module>ambari-metrics-timelineservice</module>
<module>ambari-metrics-host-monitoring</module>
<module>ambari-metrics-grafana</module>