blob: fee55409e95f022304ac92f1cbacc9acc0919fc0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.metric;
import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;
import junit.framework.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.generated.MetricInfo;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.common.metric.Counter;
import com.alibaba.jstorm.common.metric.Histogram;
import com.alibaba.jstorm.common.metric.LongCounter;
import com.alibaba.jstorm.common.metric.Meter;
import com.alibaba.jstorm.common.metric.MetricRegistry;
import com.alibaba.jstorm.common.metric.window.Metric;
import com.alibaba.jstorm.utils.JStormUtils;
public class MetricTest {
private static final Logger LOG = LoggerFactory.getLogger(MetricTest.class);
static int[] windows = new int[4];
static {
windows[0] = 60;
for (int i = 1; i < windows.length; i++) {
windows[i] = windows[i - 1] * 2;
}
System.out.println("Start Test " + new Date());
}
static int interval = windows[0]/20;
public void check_value(Map<Integer, Number> map, Long[] result) {
int i = 0;
for (Integer key : map.keySet()) {
StringBuilder sb = new StringBuilder();
sb.append("Time window ").append(key).append(" should be ");
sb.append(result[i]).append(", but it is ").append(map.get(key));
Assert.assertTrue(sb.toString(), map.get(key).equals(result[i]));
}
}
public Output testTimes(Input input) {
int firstRunSeconds = input.firstRunSeconds;
int interruptSeconds = input.interruptSeconds;
int endRunSeconds = input.endRunSeconds;
System.out.println("Start test " + new Date() + ", first:" + firstRunSeconds + ", interrupt:" + interruptSeconds + ", endRun:" + endRunSeconds);
final long intervalTimes = 1000000l;
long firstCounter = 0l;
long start = System.currentTimeMillis();
long now = System.currentTimeMillis();
long firstStop = start + firstRunSeconds * 1000;
Metric metric = input.metric;
while(now < firstStop) {
for (long l = 0; l < intervalTimes; l++) {
metric.update(1.0d);
}
firstCounter += intervalTimes;
now = System.currentTimeMillis();
}
try {
Thread.sleep(interruptSeconds * 1000);
}catch(Exception e) {
e.printStackTrace();
}
now = System.currentTimeMillis();
long endStop = now + endRunSeconds * 1000;
long secondCounter = 0l;
while(now < endStop) {
for (long l = 0; l < intervalTimes; l++) {
metric.update(1.0d);
}
secondCounter += intervalTimes;
now = System.currentTimeMillis();
}
long end = System.currentTimeMillis();
long totalCounter = firstCounter + secondCounter;
Output output = new Output();
output.output = metric.getSnapshot();
output.firstCounter = firstCounter;
output.secondCounter = secondCounter;
System.out.println("update " + totalCounter + " cost " + (end - start) / 1000 + ", firstCounter:" + firstCounter + ", endCounter:" + secondCounter);
System.out.println("End test " + new Date());
return output;
}
public void testUpdates(Input input) {
int firstRunSeconds = input.firstRunSeconds;
int interruptSeconds = input.interruptSeconds;
int endRunSeconds = input.endRunSeconds;
System.out.println("Start test " + new Date() + ", first:" + firstRunSeconds + ", interrupt:" + interruptSeconds + ", endRun:" + endRunSeconds);
final long intervalTimes = 1000000l;
long firstCounter = 0l;
long start = System.currentTimeMillis();
long now = System.currentTimeMillis();
long firstStop = start + firstRunSeconds * 1000;
Metric metric = input.metric;
while(now < firstStop) {
for (long l = 0; l < intervalTimes; l++) {
metric.update(1.0d);
}
firstCounter += intervalTimes;
now = System.currentTimeMillis();
}
try {
Thread.sleep(interruptSeconds * 1000);
}catch(Exception e) {
e.printStackTrace();
}
now = System.currentTimeMillis();
long endStop = now + endRunSeconds * 1000;
long secondCounter = 0l;
while(now < endStop) {
for (long l = 0; l < intervalTimes; l++) {
metric.update(1.0d);
}
secondCounter += intervalTimes;
now = System.currentTimeMillis();
}
long end = System.currentTimeMillis();
long totalCounter = firstCounter + secondCounter;
Output output = new Output();
System.out.println("update " + totalCounter + " cost " + (end - start) / 1000 + ", firstCounter:" + firstCounter + ", endCounter:" + secondCounter);
System.out.println("End test " + new Date());
return ;
}
public Metric getMetric(int type) {
Metric metric = null;
if (type == 0) {
Counter<Double> counter = new Counter<Double>(0.0d);
metric = counter;
}else if (type == 1) {
metric = new Meter();
}else if (type == 2) {
metric = new Histogram();
}else {
return null;
}
metric.setWindowSeconds(windows);
metric.init();
return metric;
}
public Metric getJStormMetric(int type) {
Metric metric = null;
if (type == 0) {
metric = JStormMetrics.registerWorkerCounter("Counter");
}else if (type == 1) {
metric = JStormMetrics.registerWorkerMeter("Meter");
}else if (type == 2) {
metric = JStormMetrics.registerWorkerHistogram("Historgram");
}else {
return null;
}
metric.setWindowSeconds(windows);
metric.init();
return metric;
}
public Metric getJStormMetric(int type, String name) {
Metric metric = null;
if (type == 0) {
metric = JStormMetrics.registerWorkerCounter("Counter", name);
}else if (type == 1) {
metric = JStormMetrics.registerWorkerMeter("Meter", name);
}else if (type == 2) {
metric = JStormMetrics.registerWorkerHistogram("Historgram", name);
}else {
return null;
}
return metric;
}
public void unregisterJStormMetric(int type) {
if (type == 0) {
JStormMetrics.unregisterWorkerMetric("Counter");
}else if (type == 1) {
JStormMetrics.unregisterWorkerMetric("Meter");
}else if (type == 2) {
JStormMetrics.unregisterWorkerMetric("Historgram");
}else {
return ;
}
return ;
}
public void test_speed() {
for (int m = 0; m < 2; m++) {
Input input = new Input();
input.metric = getMetric(m);
input.firstRunSeconds = 20;
input.interruptSeconds = 0;
input.endRunSeconds = 0;
Output result = testTimes(input);
System.out.println(input.metric.getClass().getSimpleName() + ":"
+ result.output);
}
}
// this test will cost too much time to run
//@Test
public void test_single_thread() {
LOG.info("Begin to test");
//int[] timeWindow = {0, interval/2, interval, interval + 1, windows[0]/2, windows[0], windows[0] + 1};
int[] timeWindow = {0, windows[0]};
for (int m = 2; m >= 0; m--) {
for (int i = 0; i < timeWindow.length; i++) {
// for (int j = 0; j < timeWindow.length; j++) {
int j = 0;
for (int l = 0; l < timeWindow.length; l++) {
Input input = new Input();
input.metric = getJStormMetric(m);
input.firstRunSeconds = timeWindow[i];
input.interruptSeconds = timeWindow[j];
input.endRunSeconds = timeWindow[l];
Output result = testTimes(input);
System.out.println(input.metric.getClass()
.getSimpleName() + ":" + result.output);
unregisterJStormMetric(m);
}
// }
}
}
}
public void test_one_case() {
int[] timeWindow = {0, windows[0]};
int m = 1, i = 1, j = 0, l = 1;
Input input = new Input();
input.metric = getJStormMetric(m);
input.metric.setWindowSeconds(windows);
input.firstRunSeconds = timeWindow[i];
input.interruptSeconds = timeWindow[j];
input.endRunSeconds = timeWindow[l];
Output result = testTimes(input);
System.out.println(input.metric.getClass()
.getSimpleName() + ":" + result.output);
unregisterJStormMetric(m);
}
public void alltest() {
test_one_case();
//test_single_thread();
//testMultipleThread();
}
public MetricInfo computWorkerMetrics(MetricRegistry workerMetrics) {
MetricInfo workerMetricInfo = MetricThrift.mkMetricInfo();
Map<String, Metric> workerMetricMap = workerMetrics.getMetrics();
for (Entry<String, Metric> entry : workerMetricMap.entrySet()) {
String name = entry.getKey();
Map<Integer, Double> snapshot = entry.getValue().getSnapshot();
MetricThrift.insert(workerMetricInfo, name, snapshot);
}
System.out.println(Utils.toPrettyJsonString(workerMetricInfo));
return workerMetricInfo;
}
public void testMultipleThread() {
LOG.info("Begin to test");
//int[] timeWindow = {0, interval/2, interval, interval + 1, windows[0]/2, windows[0], windows[0] + 1};
int[] timeWindow = {0, windows[0]};
//for (int m = 2; m >= 0; m--) {
for (int i = 0; i < timeWindow.length; i++) {
for (int j = 0; j < timeWindow.length; j++) {
//int j = 0;
for (int l = 0; l < timeWindow.length; l++) {
final Input input = new Input();
input.metric = getJStormMetric(1, String.valueOf(i) + ":" + String.valueOf(j) + ":" + String.valueOf(l));
input.firstRunSeconds = timeWindow[i];
input.interruptSeconds = timeWindow[j];
input.endRunSeconds = timeWindow[l];
Thread thread = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
testUpdates(input);
}
}
);
thread.start();
}
//}
}
}
for (int i = 0; i < 4 * windows[0]/60 ; i ++) {
MetricInfo metricInfo = computWorkerMetrics(JStormMetrics.workerMetrics);
JStormUtils.sleepMs(60 * 1000);
}
JStormUtils.haltProcess(0);
}
public static class Input {
public Metric metric;
public int firstRunSeconds;
public int interruptSeconds;
public int endRunSeconds;
}
public static class Output {
public Object output;
public long firstCounter;
public long secondCounter;
}
public static void main(String[] args) {
MetricTest test = new MetricTest();
test.alltest();
}
}