blob: 688a6769267fa33e427c105b164dfa4905474713 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import com.codahale.metrics.MetricRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is the main class of the metrics system.
*/
public class Metrics {
private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);
private static final Map<String, Metrics> METRICS_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
private final MetricRegistry registry;
private final List<MetricsReporter> reporters;
private final String commonMetricPrefix;
private final String basePath;
private boolean initialized = false;
private transient Thread shutdownThread = null;
private final HoodieStorage storage;
public Metrics(HoodieMetricsConfig metricConfig, HoodieStorage storage) {
this.storage = storage;
registry = new MetricRegistry();
commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix();
reporters = new ArrayList<>();
Option<MetricsReporter> defaultReporter = MetricsReporterFactory.createReporter(metricConfig, registry);
defaultReporter.ifPresent(reporters::add);
if (StringUtils.nonEmpty(metricConfig.getMetricReporterFileBasedConfigs())) {
reporters.addAll(addAdditionalMetricsExporters(metricConfig));
}
if (reporters.size() == 0) {
throw new RuntimeException("Cannot initialize Reporters.");
}
reporters.forEach(MetricsReporter::start);
basePath = getBasePath(metricConfig);
shutdownThread = new Thread(() -> shutdown(true));
Runtime.getRuntime().addShutdownHook(shutdownThread);
this.initialized = true;
}
private void registerHoodieCommonMetrics() {
registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix));
}
public static synchronized Metrics getInstance(HoodieMetricsConfig metricConfig, HoodieStorage storage) {
String basePath = getBasePath(metricConfig);
if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
return METRICS_INSTANCE_PER_BASEPATH.get(basePath);
}
Metrics metrics = new Metrics(metricConfig, storage);
METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics);
return metrics;
}
public static synchronized void shutdownAllMetrics() {
METRICS_INSTANCE_PER_BASEPATH.values().forEach(Metrics::shutdown);
// to avoid reusing already stopped metrics
METRICS_INSTANCE_PER_BASEPATH.clear();
}
private List<MetricsReporter> addAdditionalMetricsExporters(HoodieMetricsConfig metricConfig) {
List<MetricsReporter> reporterList = new ArrayList<>();
List<String> propPathList = StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ",");
try (HoodieStorage newStorage = storage.newInstance(new StoragePath(propPathList.get(0)), storage.getConf())) {
for (String propPath : propPathList) {
HoodieMetricsConfig secondarySourceConfig = HoodieMetricsConfig.newBuilder().fromInputStream(
newStorage.open(new StoragePath(propPath))).withPath(metricConfig.getBasePath()).build();
Option<MetricsReporter> reporter = MetricsReporterFactory.createReporter(secondarySourceConfig, registry);
if (reporter.isPresent()) {
reporterList.add(reporter.get());
} else {
LOG.error(String.format("Could not create reporter using properties path %s base path %s",
propPath, metricConfig.getBasePath()));
}
}
} catch (IOException e) {
LOG.error("Failed to add MetricsExporters", e);
}
LOG.info("total additional metrics reporters added =" + reporterList.size());
return reporterList;
}
public void shutdown() {
shutdown(false);
}
private synchronized void shutdown(boolean fromShutdownHook) {
if (!fromShutdownHook) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} else {
LOG.warn("Shutting down the metrics reporter from shutdown hook.");
}
if (initialized) {
try {
registerHoodieCommonMetrics();
reporters.forEach(MetricsReporter::report);
LOG.info("Stopping the metrics reporter...");
reporters.forEach(MetricsReporter::stop);
} catch (Exception e) {
LOG.warn("Error while closing reporter", e);
} finally {
METRICS_INSTANCE_PER_BASEPATH.remove(basePath);
initialized = false;
}
}
}
public synchronized void flush() {
try {
LOG.info("Reporting and flushing all metrics");
registerHoodieCommonMetrics();
reporters.forEach(MetricsReporter::report);
registry.getNames().forEach(this.registry::remove);
registerHoodieCommonMetrics();
} catch (Exception e) {
LOG.error("Error while reporting and flushing metrics", e);
}
}
public void registerGauges(Map<String, Long> metricsMap, Option<String> prefix) {
String metricPrefix = prefix.isPresent() ? prefix.get() + "." : "";
metricsMap.forEach((k, v) -> registerGauge(metricPrefix + k, v));
}
public Option<HoodieGauge<Long>> registerGauge(String metricName, final long value) {
HoodieGauge<Long> gauge = null;
try {
gauge = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
gauge.setValue(value);
} catch (Exception e) {
// Here we catch all exception, so the major upsert pipeline will not be affected if the
// metrics system has some issues.
LOG.error("Failed to send metrics: ", e);
}
return Option.ofNullable(gauge);
}
public Option<HoodieGauge<Long>> registerGauge(String metricName) {
return registerGauge(metricName, 0);
}
public MetricRegistry getRegistry() {
return registry;
}
public static boolean isInitialized(String basePath) {
if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) {
return METRICS_INSTANCE_PER_BASEPATH.get(basePath).initialized;
}
return false;
}
/**
* Use the same base path as the hudi table so that Metrics instance is shared.
*/
private static String getBasePath(HoodieMetricsConfig metricsConfig) {
String basePath = metricsConfig.getBasePath();
if (basePath.endsWith(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH)) {
String toRemoveSuffix = StoragePath.SEPARATOR + HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH;
basePath = basePath.substring(0, basePath.length() - toRemoveSuffix.length());
}
return basePath;
}
}