| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.network.yarn; |
| |
| import java.util.Map; |
| |
| import com.codahale.metrics.*; |
| import org.apache.hadoop.metrics2.MetricsCollector; |
| import org.apache.hadoop.metrics2.MetricsInfo; |
| import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| import org.apache.hadoop.metrics2.MetricsSource; |
| |
| /** |
| * Forward {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics} |
| * to hadoop metrics system. |
| * NodeManager by default exposes JMX endpoint where can be collected. |
| */ |
| class YarnShuffleServiceMetrics implements MetricsSource { |
| |
| private final String metricsNamespace; |
| private final MetricSet metricSet; |
| |
| YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) { |
| this.metricsNamespace = metricsNamespace; |
| this.metricSet = metricSet; |
| } |
| |
| /** |
| * Get metrics from the source |
| * |
| * @param collector to contain the resulting metrics snapshot |
| * @param all if true, return all metrics even if unchanged. |
| */ |
| @Override |
| public void getMetrics(MetricsCollector collector, boolean all) { |
| MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace); |
| |
| for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) { |
| collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| /** |
| * The metric types used in |
| * {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics}. |
| * Visible for testing. |
| */ |
| public static void collectMetric( |
| MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { |
| |
| if (metric instanceof Timer) { |
| // Timer records both the operations count and delay |
| // Snapshot inside the Timer provides the information for the operation delay |
| Timer t = (Timer) metric; |
| Snapshot snapshot = t.getSnapshot(); |
| metricsRecordBuilder |
| .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), |
| t.getCount()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), |
| t.getFifteenMinuteRate()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), |
| t.getFiveMinuteRate()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), |
| t.getOneMinuteRate()) |
| .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), |
| t.getMeanRate()) |
| .addGauge( |
| getShuffleServiceMetricsInfoForGenericValue(name, "max"), snapshot.getMax()) |
| .addGauge( |
| getShuffleServiceMetricsInfoForGenericValue(name, "min"), snapshot.getMin()) |
| .addGauge( |
| getShuffleServiceMetricsInfoForGenericValue(name, "mean"), snapshot.getMean()) |
| .addGauge( |
| getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), snapshot.getStdDev()); |
| for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) { |
| String percentileStr; |
| switch (percentileThousands) { |
| case 10: |
| percentileStr = "1stPercentile"; |
| break; |
| case 999: |
| percentileStr = "999thPercentile"; |
| break; |
| default: |
| percentileStr = String.format("%dthPercentile", percentileThousands / 10); |
| break; |
| } |
| metricsRecordBuilder.addGauge( |
| getShuffleServiceMetricsInfoForGenericValue(name, percentileStr), |
| snapshot.getValue(percentileThousands / 1000.0)); |
| } |
| } else if (metric instanceof Meter) { |
| Meter m = (Meter) metric; |
| metricsRecordBuilder |
| .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), |
| m.getCount()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), |
| m.getFifteenMinuteRate()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), |
| m.getFiveMinuteRate()) |
| .addGauge( |
| new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), |
| m.getOneMinuteRate()) |
| .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), |
| m.getMeanRate()); |
| } else if (metric instanceof Gauge) { |
| final Object gaugeValue = ((Gauge) metric).getValue(); |
| if (gaugeValue instanceof Integer) { |
| metricsRecordBuilder.addGauge( |
| getShuffleServiceMetricsInfoForGauge(name), (Integer) gaugeValue); |
| } else if (gaugeValue instanceof Long) { |
| metricsRecordBuilder.addGauge( |
| getShuffleServiceMetricsInfoForGauge(name), (Long) gaugeValue); |
| } else if (gaugeValue instanceof Float) { |
| metricsRecordBuilder.addGauge( |
| getShuffleServiceMetricsInfoForGauge(name), (Float) gaugeValue); |
| } else if (gaugeValue instanceof Double) { |
| metricsRecordBuilder.addGauge( |
| getShuffleServiceMetricsInfoForGauge(name), (Double) gaugeValue); |
| } else { |
| throw new IllegalStateException( |
| "Not supported class type of metric[" + name + "] for value " + gaugeValue); |
| } |
| } else if (metric instanceof Counter) { |
| Counter c = (Counter) metric; |
| long counterValue = c.getCount(); |
| metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfoForCounter(name), counterValue); |
| } |
| } |
| |
| private static MetricsInfo getShuffleServiceMetricsInfoForGauge(String name) { |
| return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name); |
| } |
| |
| private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForCounter(String name) { |
| return new ShuffleServiceMetricsInfo(name, "Value of counter " + name); |
| } |
| |
| private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForGenericValue( |
| String baseName, String valueName) { |
| return new ShuffleServiceMetricsInfo( |
| baseName + "_" + valueName, |
| valueName + " value of " + baseName); |
| } |
| |
| private static class ShuffleServiceMetricsInfo implements MetricsInfo { |
| |
| private final String name; |
| private final String description; |
| |
| ShuffleServiceMetricsInfo(String name, String description) { |
| this.name = name; |
| this.description = description; |
| } |
| |
| @Override |
| public String name() { |
| return name; |
| } |
| |
| @Override |
| public String description() { |
| return description; |
| } |
| } |
| } |