blob: 7a503e1ee657a31c649c6b5a709d806389dbaa5a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.sorter;
import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
import com.google.common.base.Preconditions;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* ===== Benchmark Result Report =====<br/><br/>
*
* <p>Num. Operation Type Time<br/>
* ---- --------- ---- ----<br/>
* 1000 FlushTime DIRECT_MEMORY : 55<br/>
* 1000 FlushTime FILE_RAF : 63<br/>
* 1000 FlushTime MEMORY : 146<br/>
* 1000 FlushTime ONHEAP : 17<br/>
* 1000 InsertTime DIRECT_MEMORY : 68<br/>
* 1000 InsertTime FILE_RAF : 223<br/>
* 1000 InsertTime MEMORY : 273<br/>
* 1000 InsertTime ONHEAP : 20<br/>
* 10000 FlushTime DIRECT_MEMORY : 551<br/>
* 10000 FlushTime FILE_RAF : 668<br/>
* 10000 FlushTime MEMORY : 643<br/>
* 10000 FlushTime ONHEAP : 5<br/>
* 10000 InsertTime DIRECT_MEMORY : 446<br/>
* 10000 InsertTime FILE_RAF : 2095<br/>
* 10000 InsertTime MEMORY : 784<br/>
* 10000 InsertTime ONHEAP : 29<br/>
* 100000 FlushTime DIRECT_MEMORY : 6139<br/>
* 100000 FlushTime FILE_RAF : 6237<br/>
* 100000 FlushTime MEMORY : 6238<br/>
* 100000 FlushTime ONHEAP : 18<br/>
* 100000 InsertTime DIRECT_MEMORY : 4499<br/>
* 100000 InsertTime FILE_RAF : 22343<br/>
* 100000 InsertTime MEMORY : 4962<br/>
* 100000 InsertTime ONHEAP : 107<br/>
* 1000000 FlushTime DIRECT_MEMORY : 61356<br/>
* 1000000 FlushTime FILE_RAF : 63025<br/>
* 1000000 FlushTime MEMORY : 61380<br/>
* 1000000 FlushTime ONHEAP : 47<br/>
* 1000000 InsertTime DIRECT_MEMORY : 43637<br/>
* 1000000 InsertTime FILE_RAF : 464481<br/>
* 1000000 InsertTime MEMORY : 44367<br/>
* 1000000 InsertTime ONHEAP : 2040<br/>
* </p>
* @see StreamSortedWindowOnHeap
* @see org.mapdb.DBMaker
*/
public class StreamWindowRepository {
public enum StorageType {
/**
* Creates new in-memory database which stores all data on heap without serialization.
* This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections.
*/
ONHEAP,
/**
* Creates new in-memory database. Changes are lost after JVM exits.
* This option serializes data into {@code byte[]},
* so they are not affected by Garbage PartitionedEventCollector.
*/
MEMORY,
/**
* <p>
* Creates new in-memory database. Changes are lost after JVM exits.
* </p><p>
* This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected
* You should increase ammount of direct memory with
* {@code -XX:MaxDirectMemorySize=10G} JVM param
* </p>
*/
DIRECT_MEMORY,
/**
* By default use File.createTempFile("streamwindows","temp")
*/
FILE_RAF
}
private static final Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
private final Map<StorageType, DB> dbPool;
private StreamWindowRepository() {
dbPool = new HashMap<>();
}
private static StreamWindowRepository repository;
/**
* Close automatically when JVM exists.
*
* @return StreamWindowRepository singletonInstance
*/
public static StreamWindowRepository getSingletonInstance() {
synchronized (StreamWindowRepository.class) {
if (repository == null) {
repository = new StreamWindowRepository();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
repository.close();
}
});
}
return repository;
}
}
private DB createMapDB(StorageType storageType) {
synchronized (dbPool) {
if (!dbPool.containsKey(storageType)) {
DB db;
switch (storageType) {
case ONHEAP:
db = DBMaker.heapDB().closeOnJvmShutdown().make();
LOG.info("Create ONHEAP mapdb");
break;
case MEMORY:
db = DBMaker.memoryDB().closeOnJvmShutdown().make();
LOG.info("Create MEMORY mapdb");
break;
case DIRECT_MEMORY:
db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make();
LOG.info("Create DIRECT_MEMORY mapdb");
break;
case FILE_RAF:
try {
File file = File.createTempFile("window-", ".map");
file.delete();
file.deleteOnExit();
Preconditions.checkNotNull(file, "file is null");
db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
LOG.info("Created FILE_RAF map file at {}", file.getAbsolutePath());
} catch (IOException e) {
throw new IllegalStateException(e);
}
break;
default:
throw new IllegalArgumentException("Illegal storage type: " + storageType);
}
dbPool.put(storageType, db);
return db;
}
return dbPool.get(storageType);
}
}
public StreamWindow createWindow(long start, long end, long margin, StorageType type) {
StreamWindow ret;
switch (type) {
case ONHEAP:
ret = new StreamSortedWindowOnHeap(start, end, margin);
break;
default:
ret = new StreamSortedWindowInMapDB(
start, end, margin,
createMapDB(type),
UUID.randomUUID().toString()
);
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created new {}, type: {}", ret, type);
}
return ret;
}
public StreamWindow createWindow(long start, long end, long margin, StreamWindowStrategy strategy) {
return strategy.createWindow(start, end, margin, this);
}
public StreamWindow createWindow(long start, long end, long margin) {
return OnHeapStrategy.INSTANCE.createWindow(start, end, margin, this);
}
public void close() {
for (Map.Entry<StorageType, DB> entry : dbPool.entrySet()) {
entry.getValue().close();
}
dbPool.clear();
}
public interface StreamWindowStrategy {
StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository);
}
public static class OnHeapStrategy implements StreamWindowStrategy {
public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
@Override
public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
return repository.createWindow(start, end, margin, StorageType.ONHEAP);
}
}
public static class WindowSizeStrategy implements StreamWindowStrategy {
private static final long ONE_HOUR = 3600 * 1000;
private static final long FIVE_HOURS = 5 * 3600 * 1000;
private final long onheapWindowSizeLimit;
private final long offheapWindowSizeLimit;
public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR, FIVE_HOURS);
public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit) {
this.offheapWindowSizeLimit = offheapWindowSizeLimit;
this.onheapWindowSizeLimit = onheapWindowSizeLimit;
if (this.offheapWindowSizeLimit < this.onheapWindowSizeLimit) {
throw new IllegalStateException("offheapWindowSizeLimit " + this.offheapWindowSizeLimit + " < onheapWindowSizeLimit " + this.onheapWindowSizeLimit);
}
}
@Override
public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
long windowLength = end - start;
if (windowLength <= onheapWindowSizeLimit) {
return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.ONHEAP);
} else if (windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit) {
return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
} else {
return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.FILE_RAF);
}
}
}
}