blob: c3af72c3ec8048e798bc8804ba3c8ca2cbf4f21b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.extensions.metrics;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
/**
* Sink to push metrics to Graphite. Graphite requires a timestamp. So metrics are reported with the
* timestamp (seconds from epoch) when the push to the sink was done (except with gauges that
* already have a timestamp value). The graphite metric name will be in the form of
* beam.metricType.metricNamespace.metricName.[committed|attempted].metricValueType For example:
* {@code beam.counter.throughput.nbRecords.attempted.value} Or {@code
* beam.distribution.throughput.nbRecordsPerSec.attempted.mean}
*/
public class MetricsGraphiteSink implements MetricsSink {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
private static final String SPACE_REPLACEMENT = "_";
private final String address;
private final int port;
private final Charset charset;
public MetricsGraphiteSink(MetricsOptions pipelineOptions) {
this.address = pipelineOptions.getMetricsGraphiteHost();
this.port = pipelineOptions.getMetricsGraphitePort();
this.charset = UTF_8;
}
@Experimental(Experimental.Kind.METRICS)
@Override
public void writeMetrics(MetricQueryResults metricQueryResults) throws Exception {
final long metricTimestamp = System.currentTimeMillis() / 1000L;
Socket socket = new Socket(InetAddress.getByName(address), port);
BufferedWriter writer =
new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset));
StringBuilder messagePayload = new StringBuilder();
Iterable<MetricResult<Long>> counters = metricQueryResults.getCounters();
Iterable<MetricResult<GaugeResult>> gauges = metricQueryResults.getGauges();
Iterable<MetricResult<DistributionResult>> distributions =
metricQueryResults.getDistributions();
for (MetricResult<Long> counter : counters) {
messagePayload.append(new CounterMetricMessage(counter, "value", metricTimestamp).toString());
}
for (MetricResult<GaugeResult> gauge : gauges) {
messagePayload.append(new GaugeMetricMessage(gauge, "value").toString());
}
for (MetricResult<DistributionResult> distribution : distributions) {
messagePayload.append(
new DistributionMetricMessage(distribution, "min", metricTimestamp).toString());
messagePayload.append(
new DistributionMetricMessage(distribution, "max", metricTimestamp).toString());
messagePayload.append(
new DistributionMetricMessage(distribution, "count", metricTimestamp).toString());
messagePayload.append(
new DistributionMetricMessage(distribution, "sum", metricTimestamp).toString());
messagePayload.append(
new DistributionMetricMessage(distribution, "mean", metricTimestamp).toString());
}
writer.write(messagePayload.toString());
writer.flush();
writer.close();
socket.close();
}
private abstract static class MetricMessage {
@Override
public String toString() {
StringBuilder messagePayload = new StringBuilder();
// if committed metrics are not supported, exception is thrown and we don't append the message
try {
messagePayload.append(createCommittedMessage());
} catch (UnsupportedOperationException e) {
if (!e.getMessage().contains("committed metrics")) {
throw e;
}
}
messagePayload.append(createAttemptedMessage());
return messagePayload.toString();
}
protected abstract String createCommittedMessage();
protected abstract String createAttemptedMessage();
}
private static class CounterMetricMessage extends MetricMessage {
private String valueType;
private MetricResult<Long> counter;
private long metricTimestamp;
private CounterMetricMessage(
MetricResult<Long> counter, String valueType, long metricTimestamp) {
this.valueType = valueType;
this.counter = counter;
this.metricTimestamp = metricTimestamp;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(
counter, "counter", valueType, CommittedOrAttemped.COMMITTED),
counter.getCommitted(),
metricTimestamp);
return metricMessage;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(
counter, "counter", valueType, CommittedOrAttemped.ATTEMPTED),
counter.getAttempted(),
metricTimestamp);
return metricMessage;
}
}
private static class GaugeMetricMessage extends MetricMessage {
private String valueType;
private MetricResult<GaugeResult> gauge;
private GaugeMetricMessage(MetricResult<GaugeResult> gauge, String valueType) {
this.valueType = valueType;
this.gauge = gauge;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(gauge, "gauge", valueType, CommittedOrAttemped.COMMITTED),
gauge.getCommitted().getValue(),
gauge.getCommitted().getTimestamp().getMillis() / 1000L);
return metricMessage;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(gauge, "gauge", valueType, CommittedOrAttemped.ATTEMPTED),
gauge.getAttempted().getValue(),
gauge.getAttempted().getTimestamp().getMillis() / 1000L);
return metricMessage;
}
}
private static class DistributionMetricMessage extends MetricMessage {
private String valueType;
private MetricResult<DistributionResult> distribution;
private long metricTimestamp;
public DistributionMetricMessage(
MetricResult<DistributionResult> distribution, String valueType, long metricTimestamp) {
this.valueType = valueType;
this.distribution = distribution;
this.metricTimestamp = metricTimestamp;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
Number value = null;
switch (valueType) {
case "min":
value = distribution.getCommitted().getMin();
break;
case "max":
value = distribution.getCommitted().getMax();
break;
case "count":
value = distribution.getCommitted().getCount();
break;
case "sum":
value = distribution.getCommitted().getSum();
break;
case "mean":
value = distribution.getCommitted().getMean();
break;
default:
break;
}
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(
distribution, "distribution", valueType, CommittedOrAttemped.COMMITTED),
value,
metricTimestamp);
return metricMessage;
}
@SuppressFBWarnings(
value = "VA_FORMAT_STRING_USES_NEWLINE",
justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
Number value = null;
switch (valueType) {
case "min":
value = distribution.getAttempted().getMin();
break;
case "max":
value = distribution.getAttempted().getMax();
break;
case "count":
value = distribution.getAttempted().getCount();
break;
case "sum":
value = distribution.getAttempted().getSum();
break;
case "mean":
value = distribution.getAttempted().getMean();
break;
default:
break;
}
String metricMessage =
String.format(
Locale.US,
"%s %s %s\n",
createNormalizedMetricName(
distribution, "distribution", valueType, CommittedOrAttemped.ATTEMPTED),
value,
metricTimestamp);
return metricMessage;
}
}
private static <T> String createNormalizedMetricName(
MetricResult<T> metric,
String metricType,
String valueType,
CommittedOrAttemped committedOrAttemped) {
String metricName =
String.format(
"beam.%s.%s.%s.%s.%s",
metricType,
metric.getName().getNamespace(),
metric.getName().getName(),
committedOrAttemped,
valueType);
return WHITESPACE.matcher(metricName).replaceAll(SPACE_REPLACEMENT);
}
private enum CommittedOrAttemped {
COMMITTED("committed"),
ATTEMPTED("attempted");
private final String committedOrAttempted;
CommittedOrAttemped(String committedOrAttempted) {
this.committedOrAttempted = committedOrAttempted;
}
@Override
public String toString() {
return committedOrAttempted;
}
}
}