blob: 8cdddf62587f9b6964854d136bb20d73674c0c66 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.cache;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TtlDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.OSInfo;
import com.alibaba.jstorm.utils.RunCounter;
public class RocksDBTest {
private static Logger LOG = LoggerFactory.getLogger(RocksDBTest.class);
public static String rootDir = "rocksdb_test";
// ATTENTIME
// DUE to windows can't load RocksDB jni libray,
// So these junit tests can't run under window
//
RocksTTLDBCache cache = new RocksTTLDBCache();
{
Map conf = new HashMap();
// TemporaryFolder cacheRoot = new TemporaryFolder()
conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, rootDir);
conf.put(RocksDBCache.ROCKSDB_RESET, true);
List<Integer> list = new ArrayList<Integer>() {
{
add(60);
add(120);
}
};
conf.put(RocksDBCache.TAG_TIMEOUT_LIST, list);
try {
cache.init(conf);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void testSimple() {
List<Map<String, Object>> dataList = new ArrayList<Map<String, Object>>();
for (int i = 0; i < 4; i++) {
dataList.add(new HashMap<String, Object>());
}
for (int i = 0; i < 10; i++) {
String key = "key" + i;
for (int j = 0; j < 4; j++) {
Map<String, Object> data = dataList.get(j);
String value = "value--" + i + "--" + j;
data.put(key, value);
}
}
Map<String, Object> data = dataList.get(0);
for (Entry<String, Object> entry : data.entrySet()) {
cache.put(entry.getKey(), entry.getValue());
}
for (Entry<String, Object> entry : data.entrySet()) {
String rawValue = (String) entry.getValue();
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(rawValue, fetchValue);
}
for (Entry<String, Object> entry : data.entrySet()) {
cache.remove(entry.getKey());
}
for (Entry<String, Object> entry : data.entrySet()) {
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(fetchValue, null);
}
Map<String, Object> data1 = dataList.get(1);
cache.putBatch(data1);
Map<String, Object> getBatch = new HashMap<String, Object>();
for (Entry<String, Object> entry : data1.entrySet()) {
getBatch.put(entry.getKey(), null);
}
Map<String, Object> emptyBatch = new HashMap<String, Object>();
emptyBatch.putAll(getBatch);
cache.getBatch(getBatch);
for (String key : getBatch.keySet()) {
String rawValue = (String) data1.get(key);
String fetchValue = (String) getBatch.get(key);
Assert.assertEquals(rawValue, fetchValue);
}
cache.removeBatch(data1.keySet());
getBatch.clear();
getBatch.putAll(emptyBatch);
cache.getBatch(getBatch);
for (String key : getBatch.keySet()) {
String rawValue = null;
String fetchValue = (String) getBatch.get(key);
Assert.assertEquals(rawValue, fetchValue);
}
Map<String, Object> data2 = dataList.get(2);
for (Entry<String, Object> entry : data2.entrySet()) {
cache.put(entry.getKey(), entry.getValue(), 60);
}
for (Entry<String, Object> entry : data2.entrySet()) {
String rawValue = (String) entry.getValue();
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(rawValue, fetchValue);
}
for (Entry<String, Object> entry : data2.entrySet()) {
cache.remove(entry.getKey());
}
for (Entry<String, Object> entry : data2.entrySet()) {
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(fetchValue, null);
}
Map<String, Object> data3 = dataList.get(3);
cache.putBatch(data3, 60);
for (Entry<String, Object> entry : data3.entrySet()) {
getBatch.put(entry.getKey(), null);
}
cache.getBatch(getBatch);
for (String key : getBatch.keySet()) {
String rawValue = (String) data3.get(key);
String fetchValue = (String) getBatch.get(key);
Assert.assertEquals(rawValue, fetchValue);
}
cache.removeBatch(data3.keySet());
getBatch.clear();
getBatch.putAll(emptyBatch);
cache.getBatch(getBatch);
for (String key : getBatch.keySet()) {
String rawValue = null;
String fetchValue = (String) getBatch.get(key);
Assert.assertEquals(rawValue, fetchValue);
}
for (Entry<String, Object> entry : data.entrySet()) {
cache.put(entry.getKey(), entry.getValue());
}
cache.putBatch(data3, 60);
for (Entry<String, Object> entry : data3.entrySet()) {
String rawValue = (String) entry.getValue();
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(rawValue, fetchValue);
}
cache.removeBatch(data3.keySet());
for (Entry<String, Object> entry : data3.entrySet()) {
String rawValue = null;
String fetchValue = (String) cache.get(entry.getKey());
Assert.assertEquals(rawValue, fetchValue);
}
// double delete
cache.removeBatch(data3.keySet());
}
public void visitorAccross() throws RocksDBException, InterruptedException {
DBOptions dbOptions = null;
TtlDB ttlDB = null;
List<ColumnFamilyDescriptor> cfNames = new ArrayList<ColumnFamilyDescriptor>();
List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<ColumnFamilyHandle>();
cfNames.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
cfNames.add(new ColumnFamilyDescriptor("new_cf".getBytes()));
List<Integer> ttlValues = new ArrayList<Integer>();
// new column family with 1 second ttl
ttlValues.add(1);
// Default column family with infinite lifetime
ttlValues.add(0);
try {
System.out.println("Begin to open db");
dbOptions = new DBOptions().setCreateMissingColumnFamilies(true).setCreateIfMissing(true);
ttlDB = TtlDB.open(dbOptions, rootDir, cfNames, columnFamilyHandleList, ttlValues, false);
System.out.println("Successfully open db " + rootDir);
List<String> keys = new ArrayList<String>();
keys.add("key");
ttlDB.put("key".getBytes(), "key".getBytes());
for (int i = 0; i < 2; i++) {
String key = "key" + i;
keys.add(key);
ttlDB.put(columnFamilyHandleList.get(i), key.getBytes(), key.getBytes());
}
try {
byte[] value = ttlDB.get("others".getBytes());
if (value != null) {
System.out.println("Raw get :" + new String(value));
} else {
System.out.println("No value of other");
}
} catch (Exception e) {
System.out.println("Occur exception other");
}
for (String key : keys) {
try {
byte[] value = ttlDB.get(key.getBytes());
if (value != null) {
System.out.println("Raw get :" + new String(value));
} else {
System.out.println("No value of " + key);
}
} catch (Exception e) {
System.out.println("Occur exception " + key + ", Raw");
}
for (int i = 0; i < 2; i++) {
try {
byte[] value = ttlDB.get(columnFamilyHandleList.get(i), key.getBytes());
if (value != null) {
System.out.println("handler index" + i + " get :" + new String(value));
} else {
System.out.println("No value of index" + i + " get :" + key);
}
} catch (Exception e) {
System.out.println("Occur exception " + key + ", handler index:" + i);
}
}
}
} finally {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.dispose();
}
if (ttlDB != null) {
ttlDB.close();
}
if (dbOptions != null) {
dbOptions.dispose();
}
}
}
public void ttlDbOpenWithColumnFamilies() throws RocksDBException, InterruptedException {
DBOptions dbOptions = null;
TtlDB ttlDB = null;
List<ColumnFamilyDescriptor> cfNames = new ArrayList<ColumnFamilyDescriptor>();
List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<ColumnFamilyHandle>();
cfNames.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
cfNames.add(new ColumnFamilyDescriptor("new_cf".getBytes()));
List<Integer> ttlValues = new ArrayList<Integer>();
// Default column family with infinite lifetime
ttlValues.add(0);
// new column family with 1 second ttl
ttlValues.add(1);
try {
System.out.println("Begin to open db");
dbOptions = new DBOptions().setCreateMissingColumnFamilies(true).setCreateIfMissing(true);
ttlDB = TtlDB.open(dbOptions, rootDir, cfNames, columnFamilyHandleList, ttlValues, false);
System.out.println("Successfully open db " + rootDir);
ttlDB.put("key".getBytes(), "value".getBytes());
assertThat(ttlDB.get("key".getBytes())).isEqualTo("value".getBytes());
ttlDB.put(columnFamilyHandleList.get(1), "key".getBytes(), "value".getBytes());
assertThat(ttlDB.get(columnFamilyHandleList.get(1), "key".getBytes())).isEqualTo("value".getBytes());
TimeUnit.SECONDS.sleep(2);
ttlDB.compactRange();
ttlDB.compactRange(columnFamilyHandleList.get(1));
assertThat(ttlDB.get("key".getBytes())).isNotNull();
assertThat(ttlDB.get(columnFamilyHandleList.get(1), "key".getBytes())).isNull();
} finally {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.dispose();
}
if (ttlDB != null) {
ttlDB.close();
}
if (dbOptions != null) {
dbOptions.dispose();
}
}
}
public void performanceTest() {
final LinkedBlockingDeque<String> getQueue = new LinkedBlockingDeque<String>();
final LinkedBlockingDeque<String> rmQueue = new LinkedBlockingDeque<String>();
final int TIMES = 10000000;
Thread putThread = new Thread(new Runnable() {
RunCounter runCounter = new RunCounter("Put");
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < TIMES; i++) {
String key = "Key" + i;
cache.put(key, key);
runCounter.count(1l);
getQueue.offer(key);
}
getQueue.offer(null);
System.out.println("Shutdown ");
}
});
Thread getThread = new Thread(new Runnable() {
RunCounter runCounter = new RunCounter("Get");
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
String key = null;
try {
key = getQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (key == null) {
break;
}
cache.get(key);
runCounter.count(1l);
rmQueue.offer(key);
}
System.out.println("Shutdown get");
}
});
Thread rmThread = new Thread(new Runnable() {
RunCounter runCounter = new RunCounter("Rm");
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
String key = null;
try {
key = rmQueue.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (key == null) {
break;
}
cache.remove(key);
runCounter.count(1l);
}
System.out.println("Shutdown rm");
}
});
System.out.println("Start performance test");
putThread.start();
getThread.start();
rmThread.start();
JStormUtils.sleepMs(1000000);
}
public static void main(String[] args) throws Exception {
if (OSInfo.isLinux() || OSInfo.isMac()) {
RocksDB.loadLibrary();
RocksDBTest instance = new RocksDBTest();
// instance.testSimple();
// instance.ttlDbOpenWithColumnFamilies();
// instance.visitorAccross();
instance.performanceTest();
}
}
}