blob: 269754eaf69986ce179464a045ff02d43f063835 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.metrics.prometheus.tests;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.prometheus.PrometheusReporter;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.FlinkDistribution;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
/**
* End-to-end test for the PrometheusReporter.
*/
public class PrometheusReporterEndToEndITCase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PROMETHEUS_VERSION = "2.4.3";
private static final String PROMETHEUS_FILE_NAME;
static {
final String base = "prometheus-" + PROMETHEUS_VERSION + '.';
switch (OperatingSystem.getCurrentOperatingSystem()) {
case MAC_OS:
PROMETHEUS_FILE_NAME = base + "darwin-amd64";
break;
case WINDOWS:
PROMETHEUS_FILE_NAME = base + "windows-amd64";
break;
default:
PROMETHEUS_FILE_NAME = base + "linux-amd64";
break;
}
}
private static final Pattern LOG_REPORTER_PORT_PATTERN = Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
@BeforeClass
public static void checkOS() {
Assume.assumeFalse("This test does not run on Windows.", OperatingSystem.isWindows());
}
@Rule
public final FlinkDistribution dist = new FlinkDistribution();
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
@Test
public void testReporter() throws Exception {
dist.copyOptJarsToLib("flink-metrics-prometheus");
final Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getCanonicalName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100");
dist.appendConfiguration(config);
final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus");
final Path prometheusArchive = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME + ".tar.gz");
final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml");
final Path prometheusBinary = prometheusBinDir.resolve("prometheus");
Files.createDirectory(tmpPrometheusDir);
runBlocking(
"Download of Prometheus",
Duration.ofMinutes(5),
CommandLineWrapper
.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
.targetDir(tmpPrometheusDir)
.build());
runBlocking("Extraction of Prometheus archive",
CommandLineWrapper
.tar(prometheusArchive)
.extract()
.zipped()
.targetDir(tmpPrometheusDir)
.build());
runBlocking("Set Prometheus scrape interval",
CommandLineWrapper
.sed("s/\\(scrape_interval:\\).*/\\1 1s/", prometheusConfig)
.inPlace()
.build());
dist.startFlinkCluster();
final List<Integer> ports = dist
.searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1))
.map(Integer::valueOf)
.collect(Collectors.toList());
final String scrapeTargets = ports.stream()
.map(port -> "'localhost:" + port + "'")
.collect(Collectors.joining(", "));
runBlocking("Set Prometheus scrape targets to (" + scrapeTargets + ")",
CommandLineWrapper
.sed("s/\\(targets:\\).*/\\1 [" + scrapeTargets + "]/", prometheusConfig)
.inPlace()
.build());
try (AutoClosableProcess prometheus = runNonBlocking(
"Start Prometheus server",
prometheusBinary.toAbsolutePath().toString(),
"--config.file=" + prometheusConfig.toAbsolutePath(),
"--storage.tsdb.path=" + prometheusBinDir.resolve("data").toAbsolutePath())) {
final OkHttpClient client = new OkHttpClient();
checkMetricAvailability(client, "flink_jobmanager_numRegisteredTaskManagers");
checkMetricAvailability(client, "flink_taskmanager_Status_Network_TotalMemorySegments");
}
}
private static void checkMetricAvailability(final OkHttpClient client, final String metric) throws InterruptedException {
final Request jobManagerRequest = new Request.Builder()
.get()
.url("http://localhost:9090/api/v1/query?query=" + metric)
.build();
Exception reportedException = null;
for (int x = 0; x < 30; x++) {
try (Response response = client.newCall(jobManagerRequest).execute()) {
if (response.isSuccessful()) {
final String json = response.body().string();
// Sample response:
//{
// "status": "success",
// "data": {
// "resultType": "vector",
// "result": [{
// "metric": {
// "__name__": "flink_jobmanager_numRegisteredTaskManagers",
// "host": "localhost",
// "instance": "localhost:9000",
// "job": "prometheus"
// },
// "value": [1540548500.107, "1"]
// }]
// }
//}
OBJECT_MAPPER.readTree(json)
.get("data")
.get("result")
.get(0)
.get("value")
.get(1).asInt();
// if we reach this point some value for the given metric was reported to prometheus
return;
} else {
LOG.info("Retrieving metric failed. Retrying... " + response.code() + ":" + response.message());
Thread.sleep(1000);
}
} catch (Exception e) {
reportedException = ExceptionUtils.firstOrSuppressed(e, reportedException);
Thread.sleep(1000);
}
}
throw new AssertionError("Could not retrieve metric " + metric + " from Prometheus.", reportedException);
}
}