blob: 769cfe9b23795739a27f2649104d114b605190d2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class PulsarFunctionTestUtils {
public static String getPrometheusMetrics(int metricsPort) throws IOException {
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line + System.lineSeparator());
}
rd.close();
return result.toString();
}
/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
*/
public static Map<String, Metric> parseMetrics(String metrics) {
final Map<String, Metric> parsed = new HashMap<>();
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Arrays.asList(metrics.split("\n")).forEach(line -> {
if (line.isEmpty() || line.startsWith("#")) {
return;
}
Matcher matcher = pattern.matcher(line);
log.info("line: {}", line);
checkArgument(matcher.matches());
String name = matcher.group(1);
Metric m = new Metric();
String numericValue = matcher.group(3);
if (numericValue.equalsIgnoreCase("-Inf")) {
m.value = Double.NEGATIVE_INFINITY;
} else if (numericValue.equalsIgnoreCase("+Inf")) {
m.value = Double.POSITIVE_INFINITY;
} else {
m.value = Double.parseDouble(numericValue);
}
String tags = matcher.group(2);
if (tags != null) {
tags = tags.replace("{", "").replace("}", "");
Matcher tagsMatcher = tagsPattern.matcher(tags);
while (tagsMatcher.find()) {
String tag = tagsMatcher.group(1);
String value = tagsMatcher.group(2);
m.tags.put(tag, value);
}
}
parsed.put(name, m);
});
log.info("parsed metrics: {}", parsed);
return parsed;
}
@ToString
public static class Metric {
public final Map<String, String> tags = new TreeMap<>();
public double value;
}
}