blob: ef6f80224b7dfe58cf8f8a6a769731cffa6ca17c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.common.cache.compress.impl;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
import org.apache.rocketmq.streams.common.cache.compress.ByteStore;
import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
import org.junit.Assert;
/**
* 支持key是string,value是int的场景,支持size不大于10000000.只支持int,long,boolean,string类型
*/
public class IntValueKV extends CacheKV<Integer> {
protected ByteStore conflicts = new ByteStore(4);
@Override
public Integer get(String key) {
ByteArray byteArray = super.getInner(key);
if (byteArray == null) {
return null;
}
int value = byteArray.castInt(0, 4);
return value;
}
@Override
public void put(String key, Integer value) {
byte[] bytes = NumberUtils.toByte(value, 5);
bytes[4] = (byte) 0;
super.putInner(key, new ByteArray(bytes), true);
}
@Override
public boolean contains(String key) {
Integer value = get(key);
if (value == null) {
return false;
}
return true;
}
@Override
public int calMemory() {
return super.calMemory() + (this.conflicts.getConflictIndex() + 1) * this.conflicts.getBlockSize();
}
/**
* TODO remove the key from the sinkcache and return the removed value
*
* @return
*/
public IntValueKV(int capacity) {
super(capacity, true);
}
public static void main(String[] args) throws Exception {
IntValueKV cache = new IntValueKV(5);
cache.put("A", 0);
cache.put("B", 1);
cache.put("C", 2);
cache.put("D", 3);
cache.put("E", 4);
cache.put("F", 5);
cache.put("G", 6);
System.exit(0);
int size = 10000000;
int sampleSize = 1024;
int dataSize = 3974534;
IntValueKV compressByteMap = new IntValueKV(size);
Map<String, Integer> dataMap = new HashMap<>(size);
Set<Integer> whiteSet = new HashSet<>(1024);
Map<String, Integer> sample1Map = new HashMap<>(1024);
Map<String, Integer> sample2Map = new HashMap<>(1024);
//init data
Random random = new Random();
while (true) {
if (whiteSet.size() >= sampleSize) {
break;
}
int seed = random.nextInt(dataSize);
if (!whiteSet.contains(seed)) {
whiteSet.add(seed);
}
}
long originWriteCounter = 0;
long compressWriteCounter = 0;
String path = "/Users/arthur/Downloads/";
String blackFile = "2020-11-11-14-08-32_EXPORT_CSV_16231630_392_0.csv";
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path + blackFile)))) {
reader.readLine();
String line = null;
int counter = 0;
while ((line = reader.readLine()) != null) {
line = line.replaceAll("\"", "");
String[] parts = line.split(",", 2);
long begin = System.nanoTime();
dataMap.put(parts[1].trim(), Integer.parseInt(parts[0]));
originWriteCounter += (System.nanoTime() - begin);
if (whiteSet.contains(counter++)) {
sample1Map.put(parts[1].trim(), Integer.parseInt(parts[0]));
}
}
}
for (int i = 0; i < sampleSize * 100; i++) {
sample2Map.put(UUID.randomUUID().toString(), -1);
}
System.out.println("sample1 size:\t" + sample1Map.size());
System.out.println("sample2 size:\t" + sample2Map.size());
//System.out.println(
// "origin map size(computed by third party):\t" + RamUsageEstimator.humanSizeOf(dataMap) + "\tline's\t"
// + dataMap.size());
//
Iterator<Entry<String, Integer>> iterator = dataMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, Integer> entry = iterator.next();
long begin = System.nanoTime();
compressByteMap.put(entry.getKey(), entry.getValue());
compressWriteCounter += (System.nanoTime() - begin);
}
//System.out.println(
// "compressed map size(computed by third party):\t" + RamUsageEstimator.humanSizeOf(compressByteMap)
// + "\tline's\t"
// + compressByteMap.size);
System.out.println("compressed map size(computed by it's self)\t" + compressByteMap.calMemory() + " MB");
System.out.println(
"origin write cost:\t" + originWriteCounter / 1000 + "\tcompress write cost:\t"
+ compressWriteCounter / 1000);
//
long originSearchCounter = 0;
long compressCounter = 0;
Iterator<Entry<String, Integer>> iterator1 = sample1Map.entrySet().iterator();
Iterator<Entry<String, Integer>> iterator2 = sample2Map.entrySet().iterator();
while (iterator1.hasNext() && iterator2.hasNext()) {
Entry<String, Integer> entry1 = iterator1.next();
String key1 = entry1.getKey();
Integer value1 = entry1.getValue();
Entry<String, Integer> entry2 = iterator2.next();
String key2 = entry2.getKey();
Integer value2 = entry2.getValue();
long begin = System.nanoTime();
Assert.assertEquals(value1, dataMap.get(key1));
Assert.assertNotEquals(value2, dataMap.get(key2));
originSearchCounter += (System.nanoTime() - begin);
begin = System.nanoTime();
Assert.assertEquals(value1, compressByteMap.get(key1));
Assert.assertNotEquals(value2, compressByteMap.get(key2));
compressCounter += (System.nanoTime() - begin);
}
System.out.println(
"origin search cost:\t" + originSearchCounter / 1000 + "\tcompress search cost:\t"
+ compressCounter / 1000);
}
}