blob: c57c1cdf8010f768a3c19a072ff3203e1355bbb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics.reporter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistryWithSource;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.MetricsVisitor;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.serializers.Serializer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream.
*
* jobName // my-samza-job
* jobId // an id that differentiates multiple executions of the same job
* taskName // container_567890
* host // eat1-app128.gird
* version // 0.0.1
* blacklist // Regex of metrics to ignore when flushing
*/
public class MetricsSnapshotReporter implements MetricsReporter, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotReporter.class);
private final SystemProducer producer;
private final SystemStream out;
private final Duration reportingInterval;
private final String jobName;
private final String jobId;
private final String containerName;
private final String version;
private final String samzaVersion;
private final String host;
private final Serializer<MetricsSnapshot> serializer;
private final Optional<Pattern> blacklist;
private final Clock clock;
private final String executionEnvContainerId;
private final String samzaEpochId;
private final ScheduledExecutorService executor;
private final long resetTime;
private final List<MetricsRegistryWithSource> registries = new ArrayList<>();
private final Set<String> blacklistedMetrics = new HashSet<>();
public MetricsSnapshotReporter(SystemProducer producer, SystemStream out, Duration reportingInterval, String jobName,
String jobId, String containerName, String version, String samzaVersion, String host,
Serializer<MetricsSnapshot> serializer, Optional<Pattern> blacklist, Clock clock) {
this.producer = producer;
this.out = out;
this.reportingInterval = reportingInterval;
this.jobName = jobName;
this.jobId = jobId;
this.containerName = containerName;
this.version = version;
this.samzaVersion = samzaVersion;
this.host = host;
this.serializer = serializer;
this.blacklist = blacklist;
this.clock = clock;
this.executionEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).orElse("");
this.samzaEpochId = Optional.ofNullable(System.getenv(EnvironmentVariables.SAMZA_EPOCH_ID)).orElse("");
this.executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build());
this.resetTime = this.clock.currentTimeMillis();
LOG.info(
"got metrics snapshot reporter properties [job name: {}, job id: {}, containerName: {}, version: {}, samzaVersion: {}, host: {}, reportingInterval {}]",
jobName, jobId, containerName, version, samzaVersion, host, reportingInterval);
}
@Override
public void start() {
LOG.info("Starting producer.");
this.producer.start();
LOG.info("Starting reporter timer.");
this.executor.scheduleWithFixedDelay(this, 0, reportingInterval.getSeconds(), TimeUnit.SECONDS);
}
@Override
public void register(String source, ReadableMetricsRegistry registry) {
this.registries.add(new MetricsRegistryWithSource(source, registry));
LOG.info("Registering {} with producer.", source);
this.producer.register(source);
}
@Override
public void stop() {
// Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown
this.executor.schedule(this, 0, TimeUnit.SECONDS);
LOG.info("Stopping reporter timer.");
// Allow the scheduled task above to finish, and block for termination (for max 60 seconds)
this.executor.shutdown();
try {
this.executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new SamzaException(e);
}
LOG.info("Stopping producer.");
this.producer.stop();
if (!this.executor.isTerminated()) {
LOG.warn("Unable to shutdown reporter timer.");
}
}
@Override
public void run() {
try {
innerRun();
} catch (Exception e) {
// Ignore all exceptions - because subsequent executions of this scheduled task will be suppressed
// by the executor if the current task throws an unhandled exception.
LOG.warn("Error while reporting metrics. Will retry in " + reportingInterval + " seconds.", e);
}
}
public void innerRun() {
LOG.debug("Begin flushing metrics.");
for (MetricsRegistryWithSource metricsRegistryWithSource : this.registries) {
String source = metricsRegistryWithSource.getSource();
ReadableMetricsRegistry registry = metricsRegistryWithSource.getRegistry();
LOG.debug("Flushing metrics for {}.", source);
Map<String, Map<String, Object>> metricsMsg = new HashMap<>();
// metrics
registry.getGroups().forEach(group -> {
Map<String, Object> groupMsg = new HashMap<>();
registry.getGroup(group).forEach((name, metric) -> {
if (!shouldIgnore(group, name)) {
metric.visit(new MetricsVisitor() {
@Override
public void counter(Counter counter) {
groupMsg.put(name, counter.getCount());
}
@Override
public <T> void gauge(Gauge<T> gauge) {
groupMsg.put(name, gauge.getValue());
}
@Override
public void timer(Timer timer) {
groupMsg.put(name, timer.getSnapshot().getAverage());
}
});
}
});
// dont emit empty groups
if (!groupMsg.isEmpty()) {
metricsMsg.put(group, groupMsg);
}
});
// publish to Kafka only if the metricsMsg carries any metrics
if (!metricsMsg.isEmpty()) {
MetricsHeader header =
new MetricsHeader(this.jobName, this.jobId, this.containerName, this.executionEnvContainerId,
Optional.of(this.samzaEpochId), source, this.version, this.samzaVersion, this.host,
this.clock.currentTimeMillis(), this.resetTime);
Metrics metrics = new Metrics(metricsMsg);
LOG.debug("Flushing metrics for {} to {} with header and map: header={}, map={}.", source, out,
header.getAsMap(), metrics.getAsMap());
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(header, metrics);
Object maybeSerialized = (this.serializer != null) ? this.serializer.toBytes(metricsSnapshot) : metricsSnapshot;
try {
this.producer.send(source, new OutgoingMessageEnvelope(this.out, this.host, null, maybeSerialized));
// Always flush, since we don't want metrics to get batched up.
this.producer.flush(source);
} catch (Exception e) {
LOG.error(String.format("Exception when flushing metrics for source %s", source), e);
}
}
}
LOG.debug("Finished flushing metrics.");
}
protected boolean shouldIgnore(String group, String metricName) {
boolean isBlacklisted = this.blacklist.isPresent();
String fullMetricName = group + "." + metricName;
if (isBlacklisted && !this.blacklistedMetrics.contains(fullMetricName)) {
if (this.blacklist.get().matcher(fullMetricName).matches()) {
this.blacklistedMetrics.add(fullMetricName);
LOG.debug("Samza diagnostics: blacklisted metric {} because it matched blacklist regex: {}", fullMetricName,
this.blacklist.get());
} else {
isBlacklisted = false;
}
}
return isBlacklisted;
}
}