blob: dc9782e9bdb59a0a7aaacb7fbbef569f579d2dd8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.sorter;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.utils.DateTimeUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
@Ignore
public class StreamWindowBenchmarkTest {
private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int i=0;
while(i<num) {
PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
window.add(event);
i++;
}
stopWatch.stop();
performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
stopWatch.reset();
stopWatch.start();
window.flush();
stopWatch.stop();
performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
}
private ScheduledReporter metricReporter;
private Map<String,Long> performanceReport;
@Before
public void setUp(){
final MetricRegistry metrics = new MetricRegistry();
metrics.registerAll(new MemoryUsageGaugeSet());
metrics.registerAll(new GarbageCollectorMetricSet());
metricReporter = ConsoleReporter.forRegistry(metrics)
.filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
// .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
metricReporter.start(60,TimeUnit.SECONDS);
performanceReport = new TreeMap<>();
}
@After
public void after(){
StringBuilder sb = new StringBuilder();
for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
}
LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
}
private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
private final long margin = (stop - start)/3;
private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
metricReporter.report();
sendDESCOrderedEventsToWindow(window,storageType,1000);
metricReporter.report();
sendDESCOrderedEventsToWindow(window,storageType,10000);
metricReporter.report();
sendDESCOrderedEventsToWindow(window,storageType,100000);
metricReporter.report();
sendDESCOrderedEventsToWindow(window,storageType,1000000);
metricReporter.report();
stopWatch.stop();
LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
}
@Test @Ignore
public void testStreamWindowBenchmarkMain(){
testStreamSortedWindowOnHeap();
testStreamSortedWindowInSerializedMemory();
testStreamSortedWindowOffHeap();
testStreamSortedWindowFile();
}
@Test @Ignore
public void testStreamSortedWindowOnHeap() {
StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
window.close();
}
@Test @Ignore
public void testStreamSortedWindowInSerializedMemory() {
StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
window.close();
}
@Test @Ignore
public void testStreamSortedWindowOffHeap() {
StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
window.close();
}
@Test @Ignore
public void testStreamSortedWindowFile() {
StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
window.close();
}
}