Add quantile summary for tsfile (#8889)
diff --git a/client-py/SessionSyn.py b/client-py/SessionSyn.py index 2649c7a..7059cd3 100644 --- a/client-py/SessionSyn.py +++ b/client-py/SessionSyn.py
@@ -24,6 +24,7 @@ from iotdb.utils.Tablet import Tablet import pandas as pd import numpy as np +import time # creating session connection. ip = "127.0.0.1" @@ -33,12 +34,7 @@ session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") session.open(False) -grp = "tx_syn_04" -# syn_01: 1024 -# syn_02: 512 -# syn_03: 256 -# syn_04: 128 -# syn_05: false +grp = "bt_syn_01" # set and delete storage groups session.delete_storage_group("root." + grp) @@ -55,34 +51,52 @@ session.check_time_series_exists("root." + grp + ".d_01.s_01"), ) -# df = pd.read_csv("~/LSM-Quantile/wh.csv") +df = pd.read_csv("D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2\\1_bitcoin.csv") +data = df["bitcoin dataset"].tolist() +# df = pd.read_csv("../../4_wh.csv") # data = df["value"].tolist() -# data = [[datum] for datum in data] -df = pd.read_csv("~/LSM-Quantile/taxi.txt") -data = (np.array(df)).tolist() -data = [[datum[0]] for datum in data] -data = data[:50000000] -batch = 81920 +data = [[datum] for datum in data] +# df = pd.read_csv("../../SpacecraftThruster.txt") +# df = pd.read_csv("../../4_taxipredition8M.txt") +# data = (np.array(df)).tolist() +# data = [[datum[0]] for datum in data] +batch = 8192 print(data[:10]) print(type(data[0])) +print(len(data)) measurements_ = ["s_01"] data_types_ = [ TSDataType.DOUBLE ] -for i in range(int(len(data) / batch)): +values_ = range(batch) +timestamps_ = range(1<<40,(1<<40)+batch) +tablet_ = Tablet( + "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ +) +session.insert_tablet(tablet_) +session.execute_non_query_statement("flush") + +total_time = 0 +for i in range(6713): if i % 100 == 0: print("Iter: " + str(i)) -# insert one tablet into the database. + # insert one tablet into the database. values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. timestamps_ = list(range(i * batch, (i + 1) * batch)) + if len(timestamps_) != len(values_): + break tablet_ = Tablet( "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ ) + curr_time = time.time() session.insert_tablet(tablet_) + total_time += (time.time() - curr_time) # session.execute_non_query_statement("flush") +print(total_time) + # close session connection. session.close()
diff --git a/client-py/SessionSynUnseq.py b/client-py/SessionSynUnseq.py new file mode 100644 index 0000000..06c8a10 --- /dev/null +++ b/client-py/SessionSynUnseq.py
@@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Uncomment the following line to use apache-iotdb module installed by pip3 +# + +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.Tablet import Tablet +import pandas as pd +import numpy as np +import time + +mu, sig = 2.0, 2.6 +grpSuf = '_sig'+str(int(sig*10)) + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session.open(False) + +grp = "bt_syn_01"+grpSuf + +# set and delete storage groups +session.delete_storage_group("root." + grp) +session.set_storage_group("root." + grp) + +# setting time series. +session.create_time_series( + "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY +) + +# checking time series +print( + "s_01 expecting True, checking result: ", + session.check_time_series_exists("root." + grp + ".d_01.s_01"), +) + +df = pd.read_csv("D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2\\1_bitcoin.csv") +data = df["bitcoin dataset"].tolist() +# df = pd.read_csv("../../4_wh.csv") +# data = df["value"].tolist() +data = [[datum] for datum in data] +# df = pd.read_csv("../../SpacecraftThruster.txt") +# df = pd.read_csv("../../4_taxipredition8M.txt") +# data = (np.array(df)).tolist() +# data = [[datum[0]] for datum in data] +batch = 8192 +print(data[:10]) +print(type(data[0])) +print(len(data)) + +n=6713*8192 +lat_data = [] +import random +import math + +random.seed(233) +for i in range(n): + lat_data.append( ( int(i+math.exp(mu + sig * random.gauss(0, 1))), i, data[i] ) ) +lat_data=sorted(lat_data,key=lambda x:x[0]) + + +measurements_ = ["s_01"] +data_types_ = [ + TSDataType.DOUBLE +] + +values_ = range(batch) +timestamps_ = range(1<<40,(1<<40)+batch) +tablet_ = Tablet( + "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ +) +session.insert_tablet(tablet_) +session.execute_non_query_statement("flush") + +total_time = 0 +for i in range(int(n/batch)): + if i % 100 == 0: + print("Iter: " + str(i)) + # insert one tablet into the database. + values_ = data[i * batch : (i + 1) * batch][1] + timestamps_ = list(range(i * batch, (i + 1) * batch)) + values_ = [] + timestamps_ = [] + for j in range(i*batch,(i+1)*batch): + timestamps_.append(lat_data[j][1]) + values_.append(lat_data[j][2]) + # print('\t\t'+str(lat_data[j][1])) + if len(timestamps_) != len(values_): + break + tablet_ = Tablet( + "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ + ) + curr_time = time.time() + session.insert_tablet(tablet_) + total_time += (time.time() - curr_time) + +print(total_time) +# +# +# # -------------------------------------------------------------------------------------------------------------------- +# +# grp = "tx_syn_01" +# +# # set and delete storage groups +# session.delete_storage_group("root." + grp) +# session.set_storage_group("root." + grp) +# +# # setting time series. +# session.create_time_series( +# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# +# # checking time series +# print( +# "s_01 expecting True, checking result: ", +# session.check_time_series_exists("root." + grp + ".d_01.s_01"), +# ) +# +# # df = pd.read_csv("../../1_bitcoin.csv") +# # data = df["bitcoin dataset"].tolist() +# # df = pd.read_csv("../../4_wh.csv") +# # data = df["value"].tolist() +# # data = [[datum] for datum in data] +# # df = pd.read_csv("../../SpacecraftThruster.txt") +# df = pd.read_csv("../../4_taxipredition8M.txt") +# data = (np.array(df)).tolist() +# data = [[datum[0]] for datum in data] +# batch = 8192 +# print(data[:10]) +# print(type(data[0])) +# print(len(data)) +# +# measurements_ = ["s_01"] +# data_types_ = [ +# TSDataType.DOUBLE +# ] +# +# total_time = 0 +# for i in range(6713): +# if i % 100 == 0: +# print("Iter: " + str(i)) +# # insert one tablet into the database. +# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +# timestamps_ = list(range(i * batch, (i + 1) * batch)) +# if len(timestamps_) != len(values_): +# break +# tablet_ = Tablet( +# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ +# ) +# curr_time = time.time() +# session.insert_tablet(tablet_) +# total_time += (time.time() - curr_time) +# +# print(total_time) +# +# # -------------------------------------------------------------------------------------------------------------------- +# +# grp = "th_syn_01" +# +# # set and delete storage groups +# session.delete_storage_group("root." + grp) +# session.set_storage_group("root." + grp) +# +# # setting time series. +# session.create_time_series( +# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# +# # checking time series +# print( +# "s_01 expecting True, checking result: ", +# session.check_time_series_exists("root." + grp + ".d_01.s_01"), +# ) +# +# # df = pd.read_csv("../../1_bitcoin.csv") +# # data = df["bitcoin dataset"].tolist() +# # df = pd.read_csv("../../4_wh.csv") +# # data = df["value"].tolist() +# # data = [[datum] for datum in data] +# df = pd.read_csv("../../SpacecraftThruster.txt") +# # df = pd.read_csv("../../4_taxipredition8M.txt") +# data = (np.array(df)).tolist() +# data = [[datum[0]] for datum in data] +# batch = 8192 +# print(data[:10]) +# print(type(data[0])) +# print(len(data)) +# +# measurements_ = ["s_01"] +# data_types_ = [ +# TSDataType.DOUBLE +# ] +# +# total_time = 0 +# for i in range(6713): +# if i % 100 == 0: +# print("Iter: " + str(i)) +# # insert one tablet into the database. +# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +# timestamps_ = list(range(i * batch, (i + 1) * batch)) +# if len(timestamps_) != len(values_): +# break +# tablet_ = Tablet( +# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ +# ) +# curr_time = time.time() +# session.insert_tablet(tablet_) +# total_time += (time.time() - curr_time) +# +# print(total_time) +# +# # -------------------------------------------------------------------------------------------------------------------- +# +# grp = "wh_syn_01" +# +# # set and delete storage groups +# session.delete_storage_group("root." + grp) +# session.set_storage_group("root." + grp) +# +# # setting time series. +# session.create_time_series( +# "root." + grp + ".d_01.s_01", TSDataType.DOUBLE, TSEncoding.PLAIN, Compressor.SNAPPY +# ) +# +# # checking time series +# print( +# "s_01 expecting True, checking result: ", +# session.check_time_series_exists("root." + grp + ".d_01.s_01"), +# ) +# +# # df = pd.read_csv("../../1_bitcoin.csv") +# # data = df["bitcoin dataset"].tolist() +# df = pd.read_csv("../../4_wh.csv") +# data = df["value"].tolist() +# data = [[datum] for datum in data] +# # df = pd.read_csv("../../SpacecraftThruster.txt") +# # df = pd.read_csv("../../4_taxipredition8M.txt") +# # data = (np.array(df)).tolist() +# # data = [[datum[0]] for datum in data] +# batch = 8192 +# print(data[:10]) +# print(type(data[0])) +# print(len(data)) +# +# measurements_ = ["s_01"] +# data_types_ = [ +# TSDataType.DOUBLE +# ] +# +# total_time = 0 +# for i in range(6713): +# if i % 100 == 0: +# print("Iter: " + str(i)) +# # insert one tablet into the database. +# values_ = data[i * batch : (i + 1) * batch] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +# timestamps_ = list(range(i * batch, (i + 1) * batch)) +# if len(timestamps_) != len(values_): +# break +# tablet_ = Tablet( +# "root." + grp + ".d_01", measurements_, data_types_, values_, timestamps_ +# ) +# curr_time = time.time() +# session.insert_tablet(tablet_) +# total_time += (time.time() - curr_time) +# +# print(total_time) + +session.close() + +print("All executions done!!") \ No newline at end of file
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 4a26cf2..1a41fcb 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -20,32 +20,45 @@ #################### ### RPC Configuration #################### +no_update=false +only_use_page_synopsis=false +synopsis_for_whole_chunk_when_flush=true +sketch_size_ratio=4 +target_chunk_point_num=8192 +max_number_of_points_in_chunk=8192 +max_number_of_points_in_page=8192 +avg_series_point_number_threshold=8192 enable_synopsis=true -synopsis_size_in_byte=1024 -aggregator_memory_in_kb=1024 +enable_SST_sketch=true +synopsis_size_in_byte=4096 +aggregator_memory_in_kb=256 summary_type=0 quantile=0.5 -enable_seq_space_compaction=false -enable_unseq_space_compaction=false +enable_seq_space_compaction=true +enable_unseq_space_compaction=true enable_cross_space_compaction=false -meta_data_cache_enable=true +meta_data_cache_enable=false +# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query. +# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400 +chunk_timeseriesmeta_free_memory_proportion=100:1:20000:30000:40000 + aggregation_strategy=0 bloom_filter_bits_per_key=16 enable_bloom_filter=false -max_number_of_points_in_chunk=8000 -flush_wal_threshold=4096 +flush_wal_threshold=8192 # The memory size for each series writer to pack page, default value is 64KB # Datatype: int -page_size_in_byte=65536 +page_size_in_byte=131072 +#65536 # The maximum number of data points in a page, default 1024*1024 # Datatype: int -max_number_of_points_in_page=4096 +#max_number_of_points_in_page=8192 # Datatype: String rpc_address=0.0.0.0
diff --git a/server/src/assembly/resources/conf/iotdb-env.bat b/server/src/assembly/resources/conf/iotdb-env.bat index df6b0d6..e53a65e 100644 --- a/server/src/assembly/resources/conf/iotdb-env.bat +++ b/server/src/assembly/resources/conf/iotdb-env.bat
@@ -61,7 +61,7 @@ set /a half_=%system_memory_in_mb%/2 set /a quarter_=%half_%/2 -if %half_% GTR 1024 set half_=1024 +if %half_% GTR 2048 set half_=2048 if %quarter_% GTR 65536 set quarter_=65536 if %half_% GTR %quarter_% (
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 9d5f479..a9dff42 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -149,7 +149,7 @@ * When a certain amount of write ahead logs is reached, they will be flushed to the disk. It is * possible to lose at most flush_wal_threshold operations. */ - private int flushWalThreshold = 10000; + private int flushWalThreshold = 8192; /** this variable set timestamp precision as millisecond, microsecond or nanosecond */ private String timestampPrecision = "ms"; @@ -2545,4 +2545,24 @@ public int getAggregationStrategy() { return aggregationStrategy; } + + private boolean noUpdate = true; + + public void setNoUpdate(boolean no) { + this.noUpdate = no; + } + + public boolean getNoUpdate() { + return this.noUpdate; + } + + private boolean onlyUsePageSynopsis = true; + + public void setOnlyUsePageSynopsis(boolean only) { + this.onlyUsePageSynopsis = only; + } + + public boolean getOnlyUsePageSynopsis() { + return this.onlyUsePageSynopsis; + } }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index a06f2ef..22d25a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -158,6 +158,15 @@ properties.getProperty( "aggregation_strategy", Integer.toString(conf.getAggregationStrategy())))); + conf.setNoUpdate( + Boolean.parseBoolean( + properties.getProperty("no_update", Boolean.toString(conf.getNoUpdate())))); + + conf.setOnlyUsePageSynopsis( + Boolean.parseBoolean( + properties.getProperty( + "only_use_page_synopsis", Boolean.toString(conf.getOnlyUsePageSynopsis())))); + conf.setRpcThriftCompressionEnable( Boolean.parseBoolean( properties.getProperty( @@ -1015,6 +1024,24 @@ TSFileDescriptor.getInstance() .getConfig() + .setSketchSizeRatio( + Integer.parseInt( + properties.getProperty( + "sketch_size_ratio", + Integer.toString( + TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio())))); + TSFileDescriptor.getInstance() + .getConfig() + .setSynopsisForWholeChunkWhenFlush( + Boolean.parseBoolean( + properties.getProperty( + "synopsis_for_whole_chunk_when_flush", + Boolean.toString( + TSFileDescriptor.getInstance() + .getConfig() + .getSynopsisForWholeChunkWhenFlush())))); + TSFileDescriptor.getInstance() + .getConfig() .setEnableSynopsis( Boolean.parseBoolean( properties.getProperty( @@ -1023,6 +1050,14 @@ TSFileDescriptor.getInstance().getConfig().isEnableSynopsis())))); TSFileDescriptor.getInstance() .getConfig() + .setEnableSSTSketch( + Boolean.parseBoolean( + properties.getProperty( + "enable_SST_sketch", + Boolean.toString( + TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch())))); + TSFileDescriptor.getInstance() + .getConfig() .setEnableBloomFilter( Boolean.parseBoolean( properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java index 7b4a51c..2e0a387 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.service.metrics.Tag; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Path; @@ -45,12 +44,7 @@ import java.io.IOException; import java.lang.ref.WeakReference; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.WeakHashMap; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -97,13 +91,14 @@ + RamUsageEstimator.shallowSizeOf(value) + RamUsageEstimator.sizeOf(value.getMeasurementId()) + RamUsageEstimator.shallowSizeOf(value.getStatistics()) - + (value.getChunkMetadataList().get(0) == null - ? 0 - : ((ChunkMetadata) value.getChunkMetadataList().get(0)) - .calculateRamSize() - + RamUsageEstimator.NUM_BYTES_OBJECT_REF) - * value.getChunkMetadataList().size() - + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList()))) + + value.getDataSizeOfChunkMetaDataList() + /*(value.getChunkMetadataList().get(0) == null + ? 0 + : ((ChunkMetadata) value.getChunkMetadataList().get(0)) + .calculateRamSize() + + RamUsageEstimator.NUM_BYTES_OBJECT_REF) + * value.getChunkMetadataList().size() + + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList())*/ )) .recordStats() .build();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index b6737a6..bfd6c3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -101,6 +101,8 @@ public static final String TDIGEST_STAT_SINGLE = "tdigest_quantile"; public static final String SAMPLING_STAT_SINGLE = "sampling_quantile"; public static final String STRICT_KLL_STAT_SINGLE = "kll_quantile"; + public static final String DDSKETCH_SINGLE = "ddsketch_quantile"; + public static final String CHUNK_STAT_AVAIL = "chunk_stat_available"; public static final String ALL = "all"; @@ -140,7 +142,9 @@ EXACT_MEDIAN_KLL_STAT_OVERLAP_SINGLE, TDIGEST_STAT_SINGLE, SAMPLING_STAT_SINGLE, - STRICT_KLL_STAT_SINGLE)); + STRICT_KLL_STAT_SINGLE, + DDSKETCH_SINGLE, + CHUNK_STAT_AVAIL)); public static final int TOK_WHERE = 23; public static final int TOK_INSERT = 24;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java index 8e5951f..f887c8e 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/AggregationQueryOperator.java
@@ -148,6 +148,8 @@ case SQLConstant.TDIGEST_STAT_SINGLE: case SQLConstant.SAMPLING_STAT_SINGLE: case SQLConstant.STRICT_KLL_STAT_SINGLE: + case SQLConstant.DDSKETCH_SINGLE: + case SQLConstant.CHUNK_STAT_AVAIL: return dataType.isNumeric(); case SQLConstant.COUNT: case SQLConstant.MIN_TIME: @@ -197,6 +199,8 @@ case SQLConstant.TDIGEST_STAT_SINGLE: case SQLConstant.SAMPLING_STAT_SINGLE: case SQLConstant.STRICT_KLL_STAT_SINGLE: + case SQLConstant.DDSKETCH_SINGLE: + case SQLConstant.CHUNK_STAT_AVAIL: return dataTypes.stream().allMatch(dataTypes.get(0)::equals); default: return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java index e31c0d7..6c4d45d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -59,7 +59,9 @@ EXACT_MEDIAN_KLL_STAT_OVERLAP_SINGLE, TDIGEST_STAT_SINGLE, SAMPLING_STAT_SINGLE, - STRICT_KLL_STAT_SINGLE; + STRICT_KLL_STAT_SINGLE, + DDSKETCH_SINGLE, + CHUNK_STAT_AVAIL; /** * give an integer to return a data type. @@ -137,6 +139,10 @@ return SAMPLING_STAT_SINGLE; case 33: return STRICT_KLL_STAT_SINGLE; + case 34: + return DDSKETCH_SINGLE; + case 35: + return CHUNK_STAT_AVAIL; default: throw new IllegalArgumentException("Invalid Aggregation Type: " + i); } @@ -247,6 +253,12 @@ case STRICT_KLL_STAT_SINGLE: i = 33; break; + case DDSKETCH_SINGLE: + i = 34; + break; + case CHUNK_STAT_AVAIL: + i = 35; + break; default: throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name()); }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java index f344d55..2802bdf 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -46,7 +46,13 @@ @Override public void updateResultFromStatistics(Statistics statistics) { - System.out.println("\t\t[DEBUG Count] update from statistics:" + statistics.getCount()); + // System.out.println( + // "\t\t[DEBUG Count] update from statistics:" + // + statistics.getCount() + // + "\tT:" + // + statistics.getStartTime() + // + "..." + // + statistics.getEndTime()); setLongValue(getLongValue() + statistics.getCount()); }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java new file mode 100644 index 0000000..e6f13d1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/DDSketchSingleAggrResult.java
@@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.AggregationType; +import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.utils.DDSketchForQuantile; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE; + +public class DDSketchSingleAggrResult extends AggregateResult { + private TSDataType seriesDataType; + private int iteration; + private long pageKLLNum, statNum; + private long cntL, cntR, lastL; + private long n, K1, heapN; + private DDSketchForQuantile DDSketch; + private boolean hasFinalResult; + long DEBUG = 0; + + private int getBitsOfDataType() { + switch (seriesDataType) { + case INT32: + case FLOAT: + return 32; + case INT64: + case DOUBLE: + return 64; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + // private long approximateDataAvgError() { + // long dataAvgError = (long) Math.ceil(2.0 * heapN / heapKLL.getMaxMemoryNum()) + 1; + // return dataAvgError; + // } + // + // private long approximateStatAvgError() { + // if (SKETCH_SIZE < 0) return 0; + // double pageAvgError = 1.0 * TOT_SKETCH_N / TOT_SKETCH_SIZE / 3.0; + // double rate = 1.0 * SKETCH_SIZE * pageKLLNum / (maxMemoryByte ); + // long pageStatAvgError; + // if (rate < 1.0) { + // pageStatAvgError = (long) Math.ceil(pageAvgError * Math.pow(pageKLLNum, 0.5)); + // if (pageKLLNum <= 10) pageStatAvgError += pageAvgError * 3.0; + // } else { + // int memKLLNum = (maxMemoryByte ) / SKETCH_SIZE; + // long memErr = (long) Math.ceil(pageAvgError * Math.pow(memKLLNum, 0.5)); + // pageStatAvgError = (long) Math.ceil(rate * 0.5 * memErr + 0.5 * memErr); + // } + // return pageStatAvgError; + // } + // + // private long approximateMaxError() { + // return 0; + // } + + private boolean hasTwoMedians() { + return (n & 1) == 0; + } + + public DDSketchSingleAggrResult(TSDataType seriesDataType) throws UnSupportedDataTypeException { + super(DOUBLE, AggregationType.DDSKETCH_SINGLE); + this.seriesDataType = seriesDataType; + reset(); + } + + private long dataToLong(Object data) throws UnSupportedDataTypeException { + long result; + switch (seriesDataType) { + case INT32: + return (int) data; + case FLOAT: + result = Float.floatToIntBits((float) data); + return (float) data >= 0f ? result : result ^ Long.MAX_VALUE; + case INT64: + return (long) data; + case DOUBLE: + result = Double.doubleToLongBits((double) data); + return (double) data >= 0d ? result : result ^ Long.MAX_VALUE; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + private double longToResult(long result) throws UnSupportedDataTypeException { + switch (seriesDataType) { + case INT32: + return (double) (result); + case FLOAT: + result = (result >>> 31) == 0 ? result : result ^ Long.MAX_VALUE; + return Float.intBitsToFloat((int) (result)); + case INT64: + return (double) (result); + case DOUBLE: + result = (result >>> 63) == 0 ? result : result ^ Long.MAX_VALUE; + return Double.longBitsToDouble(result); + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + private void updateStatusFromData(Object data) { + long dataL = dataToLong(data); + if (iteration == 0) n++; + if (cntL <= dataL && dataL <= cntR) { + DDSketch.insert(longToResult(dataL)); + heapN++; + } else if (lastL <= dataL && dataL < cntL) K1--; + } + + @Override + public void startIteration() { + heapN = statNum = 0; + if (iteration == 0) { // first iteration + + int dataset_V = 40000, limit = maxMemoryByte / 42; + double DDSketch_ALPHA = Math.pow(10, Math.log10(dataset_V) / limit) - 1; + DDSketch = new DDSketchForQuantile(DDSketch_ALPHA, limit); + lastL = cntL = Long.MIN_VALUE; + cntR = Long.MAX_VALUE; + n = 0; + pageKLLNum = 0; + } + } + + @Override + public void finishIteration() { + System.out.println( + "\t[DDSKETCH SINGLE DEBUG]" + + "finish iteration " + + iteration + + " cntL,R:" + + "[" + + cntL + + "," + + cntR + + "]" + + "\tlastL:" + + lastL + + "\tK1:" + + K1); + iteration++; + if (n == 0) { + hasFinalResult = true; + return; + } + lastL = cntL; + + if (iteration == 1) { // first iteration over + K1 = (n + 1) >> 1; + } + long K2 = hasTwoMedians() ? (K1 + 1) : K1; + + System.out.println("\t[DDSKETCH SINGLE DEBUG]" + " K1,K2:" + K1 + ", " + K2); + double ans = DDSketch.getQuantile(QUANTILE); + setDoubleValue(ans); + hasFinalResult = true; + } + + @Override + protected boolean hasCandidateResult() { + return hasFinalResult && n > 0; + } + + @Override + public Double getResult() { + return hasCandidateResult() ? getDoubleValue() : null; + } + + @Override + public void updateResultFromStatistics(Statistics statistics) { + switch (statistics.getType()) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + break; + case TEXT: + case BOOLEAN: + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type in aggregation MEDIAN : %s", statistics.getType())); + } + } + + @Override + public void updateResultFromPageData(IBatchDataIterator batchIterator) { + updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE); + } + + @Override + public void updateResultFromPageData( + IBatchDataIterator batchIterator, long minBound, long maxBound) { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultFromPageData:"); + // int tmp_tot = 0; + while (batchIterator.hasNext()) { + if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) { + break; + } + // System.out.print( + // " (" + batchIterator.currentTime() + "," + batchIterator.currentValue() + ")"); + // tmp_tot++; + updateStatusFromData(batchIterator.currentValue()); + batchIterator.next(); + } + // System.out.println(" tot:" + tmp_tot); + } + + @Override + public void updateResultUsingTimestamps( + long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultUsingTimestamps:"); + // int tmp_tot = 0; + Object[] values = dataReader.getValuesInTimestamps(timestamps, length); + for (int i = 0; i < length; i++) { + if (values[i] != null) { + updateStatusFromData(values[i]); + // tmp_tot++; + } + } + // System.out.println(" tot:" + tmp_tot); + } + + @Override + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { + // List<Object> tmp = new ArrayList<>(); + while (valueIterator.hasNext()) { + updateStatusFromData(valueIterator.next()); + // Object tmpObj = valueIterator.next(); + // updateStatusFromData(tmpObj, 1); + // tmp.add(tmpObj); + } + // + // System.out.println("\t\t[MEDIAN]"+this.hashCode()+"[updateResultUsingValues]"+tmp.toString()); + } + + @Override + public int maxIteration() { + return 1; + } + + @Override + public boolean hasFinalResult() { + return hasFinalResult; + } + + @Override + public void merge(AggregateResult another) { + // System.out.println("[DEBUG] [merge] " + this.getResult() + " " + another.getResult()); + // merge not supported + // throw new QueryProcessException("Can't merge MedianAggregateResult"); + } + + @Override + protected void deserializeSpecificFields(ByteBuffer buffer) { + this.seriesDataType = TSDataType.deserialize(buffer.get()); + // TODO + } + + @Override + protected void serializeSpecificFields(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(seriesDataType, outputStream); + // TODO + } + + public long getN() { + return n; + } + + @Override + public void reset() { + super.reset(); + DDSketch = null; + lastL = cntL = Long.MIN_VALUE; + cntR = Long.MAX_VALUE; + n = 0; + iteration = 0; + hasFinalResult = false; + } + + @Override + public boolean canUpdateFromStatistics(Statistics statistics) { + return false; + } + + @Override + public boolean groupByLevelBeforeAggregation() { + return true; + } +}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java index a5eb166..ab44728 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLDebugResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch; import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; -import org.apache.iotdb.tsfile.utils.LongKLLSketch; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -348,7 +347,7 @@ if (statistics.getType() == DOUBLE) { DoubleStatistics stat = (DoubleStatistics) statistics; if (stat.getKllSketchNum() > 0) { - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch); return; } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!"); }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java new file mode 100644 index 0000000..b6a660a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatChunkAvailAggrResult.java
@@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.aggregation.impl; + +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.AggregationType; +import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.utils.HeapLongStrictKLLSketch; +import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; +import org.apache.iotdb.tsfile.utils.LongKLLSketch; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE; + +public class KLLStatChunkAvailAggrResult extends AggregateResult { + private TSDataType seriesDataType; + private int iteration; + private long pageKLLNum, statNum; + private long cntL, cntR, lastL; + private long n, K1, heapN; + private HeapLongStrictKLLSketch heapKLL; + private boolean hasFinalResult; + private List<KLLSketchForQuantile> pageKLL; + private int pageKLLIndex; + private long TOT_SKETCH_N = 0, TOT_SKETCH_SIZE = 0; + private int SKETCH_SIZE = -1; + private int pageKLLMaxIndex; + long DEBUG = 0; + + private int getBitsOfDataType() { + switch (seriesDataType) { + case INT32: + case FLOAT: + return 32; + case INT64: + case DOUBLE: + return 64; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + private long approximateDataAvgError() { + long dataAvgError = (long) Math.ceil(2.0 * heapN / heapKLL.getMaxMemoryNum()) + 1; + return dataAvgError; + } + + private long approximateStatAvgError() { + if (SKETCH_SIZE < 0) return 0; + double pageAvgError = 1.0 * TOT_SKETCH_N / TOT_SKETCH_SIZE / 3.0; + double rate = 1.0 * SKETCH_SIZE * pageKLLNum / (maxMemoryByte); + long pageStatAvgError; + if (rate < 1.0) { + pageStatAvgError = (long) Math.ceil(pageAvgError * Math.pow(pageKLLNum, 0.5)); + if (pageKLLNum <= 10) pageStatAvgError += pageAvgError * 3.0; + } else { + int memKLLNum = (maxMemoryByte) / SKETCH_SIZE; + long memErr = (long) Math.ceil(pageAvgError * Math.pow(memKLLNum, 0.5)); + pageStatAvgError = (long) Math.ceil(rate * 0.5 * memErr + 0.5 * memErr); + } + return pageStatAvgError; + } + + private long approximateMaxError() { + return 0; + } + + private boolean hasTwoMedians() { + return (n & 1) == 0; + } + + public KLLStatChunkAvailAggrResult(TSDataType seriesDataType) + throws UnSupportedDataTypeException { + super(DOUBLE, AggregationType.CHUNK_STAT_AVAIL); + this.seriesDataType = seriesDataType; + reset(); + } + + private long dataToLong(Object data) throws UnSupportedDataTypeException { + long result; + switch (seriesDataType) { + case INT32: + return (int) data; + case FLOAT: + result = Float.floatToIntBits((float) data); + return (float) data >= 0f ? result : result ^ Long.MAX_VALUE; + case INT64: + return (long) data; + case DOUBLE: + result = Double.doubleToLongBits((double) data); + return (double) data >= 0d ? result : result ^ Long.MAX_VALUE; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + private double longToResult(long result) throws UnSupportedDataTypeException { + switch (seriesDataType) { + case INT32: + return (double) (result); + case FLOAT: + result = (result >>> 31) == 0 ? result : result ^ Long.MAX_VALUE; + return Float.intBitsToFloat((int) (result)); + case INT64: + return (double) (result); + case DOUBLE: + result = (result >>> 63) == 0 ? result : result ^ Long.MAX_VALUE; + return Double.longBitsToDouble(result); + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation MEDIAN : %s", seriesDataType)); + } + } + + private void updateStatusFromData(Object data) { + long dataL = dataToLong(data); + if (iteration == 0) n++; + if (cntL <= dataL && dataL <= cntR) { + heapKLL.update(dataL); + heapN++; + } else if (lastL <= dataL && dataL < cntL) K1--; + } + + private long getRankInKLL(long val) { + // long rank = 0; + // if (pageKLL != null) { + // for (HeapLongKLLSketch heapLongKLLSketch : pageKLL) + // if (heapLongKLLSketch != null) rank += heapLongKLLSketch.getApproxRank(val); + // } + // rank += heapKLL.getApproxRank(val); + // return rank; + return heapKLL.getApproxRank(val); + } + + public long findMaxValueWithRankLE(long K) { + long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid; + while (L < R) { + mid = L + ((R - L) >>> 1); + if (mid == L) mid++; + // System.out.println( + // "\t\t\t" + L + "\t" + R + "\t\t\tmid:" + mid + "\trank:" + getRankInKLL(mid)); + if (getRankInKLL(mid) <= K) L = mid; + else R = mid - 1; + // System.out.println("\t mid:"+mid+" mid_rank:"+getRankInKLL(mid)); + } + return L; + } + + public long findMinValueWithRankGE(long K) { + long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid; + while (L < R) { + mid = L + ((R - L) >>> 1); + if (mid == R) mid--; + // System.out.println( + // "\t\t\t" + L + "\t" + R + "\t\t\tmid:" + mid + "\trank:" + getRankInKLL(mid)); + if (getRankInKLL(mid) >= K) R = mid; + else L = mid + 1; + } + return L; + } + + @Override + public void startIteration() { + heapN = statNum = 0; + if (iteration == 0) { // first iteration + heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte); + lastL = cntL = Long.MIN_VALUE; + cntR = Long.MAX_VALUE; + n = 0; + pageKLLNum = 0; + pageKLLIndex = 0; + } else { + heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte); + pageKLLNum = 0; + pageKLL = null; + System.out.println( + "\t[KLL STAT DEBUG] start iteration " + + iteration + + " cntL,R:" + + "[" + + cntL + + "," + + cntR + + "]" + + "\tlastL:" + + lastL + + "\tK1:" + + K1); + } + } + + @Override + public void finishIteration() { + System.out.println( + "\t[STAT CHUNK AVAIL DEBUG]" + + "finish iteration " + + iteration + + " cntL,R:" + + "[" + + cntL + + "," + + cntR + + "]" + + "\tlastL:" + + lastL + + "\tK1:" + + K1); + System.out.println( + "\t[STAT CHUNK AVAIL DEBUG]" + + " statNum:" + + statNum + + " pageKllNum:" + + pageKLLNum + + " heapN:" + + heapN); + iteration++; + if (n == 0) { + setDoubleValue(0.0); + hasFinalResult = true; + return; + } + lastL = cntL; + + if (iteration == 1) { // first iteration over + K1 = (long) Math.floor((n + 1) * QUANTILE); + } + long K2 = K1 + 1; // hasTwoMedians() ? (K1 + 1) : K1; + + System.out.println("\t[STAT CHUNK AVAIL DEBUG]" + " K1,K2:" + K1 + ", " + K2); + if (pageKLLNum == 0) { // all in heap + System.out.println("\t[STAT CHUNK AVAIL DEBUG]" + " calc by heap only. N:" + heapKLL.getN()); + heapKLL.show(); + + double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1)); + // System.out.println("\t[KLL STAT DEBUG]" + "v1:" + v1); + double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1)); + double ans = 0.5 * (v1 + v2); + setDoubleValue(0.0); + hasFinalResult = true; + return; + } + // iteration = 0 && there are page KLL statistics + // heapKLL.show(); + // System.out.println("\t[KLL STAT DEBUG] remaining pageKLLSize:" + pageKLLIndex); + mergePageKLL(); + heapKLL.show(); + System.out.println( + "\t[STAT CHUNK AVAIL DEBUG] after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n); + double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1)); + double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1)); + double ans = 0.5 * (v1 + v2); + setDoubleValue(1.0 - 1.0 * heapN / heapKLL.getN()); + hasFinalResult = true; + System.out.println( + "\t[STAT CHUNK AVAIL DEBUG]" + " est_stats_err:" + approximateStatAvgError()); + } + + @Override + protected boolean hasCandidateResult() { + return hasFinalResult && n > 0; + } + + @Override + public Double getResult() { + return hasCandidateResult() ? getDoubleValue() : null; + } + + // private void addSketch(KLLSketchForQuantile sketch, List<HeapLongKLLSketch> a, int baseByte) { + // int pos0 = 0; + // while (pos0 < pageKLLMaxLen && a.get(pos0) != null) pos0++; + // HeapLongKLLSketch bigger_sketch = new HeapLongKLLSketch(baseByte << pos0); + // bigger_sketch.mergeWithTempSpace(sketch); + // for (int i = 0; i < pos0; i++) { + // bigger_sketch.mergeWithTempSpace(a.get(i)); + // a.set(i, null); + // } + // if (pos0 == pageKLLMaxLen) { // mem of pageKLL list is too large. + // heapKLL.mergeWithTempSpace(bigger_sketch); + // } else a.set(pos0, bigger_sketch); + // } + public void addSketch(KLLSketchForQuantile sketch) { + n += sketch.getN(); + pageKLLNum++; + TOT_SKETCH_N += sketch.getN(); + TOT_SKETCH_SIZE += sketch.getNumLen(); + if (SKETCH_SIZE < 0) { + SKETCH_SIZE = sketch.getNumLen() * 8; + pageKLLMaxIndex = (int) Math.floor((0.5 * maxMemoryByte / SKETCH_SIZE)); + pageKLL = new ArrayList<>(pageKLLMaxIndex); + for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.add(null); + } + if (pageKLLIndex < pageKLLMaxIndex) pageKLL.set(pageKLLIndex++, sketch); + else { + heapKLL.mergeWithTempSpace(pageKLL); + for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.set(i, null); + pageKLLIndex = 0; + pageKLL.set(pageKLLIndex++, sketch); + // System.out.println( + // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: " + // + heapKLL.getN() + // + " n_true:" + // + n); + // heapKLL.show(); + } + } + + private void mergePageKLL() { + HeapLongStrictKLLSketch tmpSketch = heapKLL; + heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte); + heapKLL.mergeWithTempSpace(tmpSketch); + heapKLL.mergeWithTempSpace(pageKLL); + } + + @Override + public void updateResultFromStatistics(Statistics statistics) { + switch (statistics.getType()) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + break; + case TEXT: + case BOOLEAN: + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type in aggregation MEDIAN : %s", statistics.getType())); + } + if (iteration == 0) { + // n += statistics.getCount(); + // if (statistics.getType() == DOUBLE) { + // } + if (statistics.getType() == DOUBLE) { + DoubleStatistics stat = (DoubleStatistics) statistics; + if (stat.getSummaryNum() > 0) { + // pageKLLNum += stat.getSummaryNum(); + statNum += 1; + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) { + // System.out.println("\t[STRICT STAT CHUNK AVAIL DEBUG] pageTime:"+); + // if (sketch.getN() > 10000) + // System.out.println( + // "\t[STRICT KLL STAT DEBUG] updateResultFromStatistics\tstatN:" + // + sketch.getN() + // + "\tstatNumLen:" + // + sketch.getNumLen()); + + ((LongKLLSketch) sketch).deserializeFromBuffer(); + addSketch(sketch); + } + // System.out.println( + // "\t[STAT CHUNK AVAIL DEBUG] updateResultFromStatistics. pageN:" + // + stat.getKllSketch().getN()); + // stat.getKllSketch().show(); + return; + } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!"); + } + } + long minVal = dataToLong(statistics.getMinValue()); + long maxVal = dataToLong(statistics.getMaxValue()); + // System.out.println( + // "\t[KLL STAT DEBUG] no KLL in stat. update from statistics:\t" + // + "min,max:" + // + minVal + // + "," + // + maxVal + // + " n:" + // + statistics.getCount()); + // out of range + if (minVal > cntR || maxVal < lastL) return; + if (lastL <= minVal && maxVal < cntL) { + K1 -= statistics.getCount(); + return; + } + if (minVal == maxVal) { // min == max + for (int i = 0; i < statistics.getCount(); i++) updateStatusFromData(longToResult(minVal)); + return; + } + } + + @Override + public void updateResultFromPageData(IBatchDataIterator batchIterator) { + updateResultFromPageData(batchIterator, Long.MIN_VALUE, Long.MAX_VALUE); + } + + @Override + public void updateResultFromPageData( + IBatchDataIterator batchIterator, long minBound, long maxBound) { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultFromPageData:"); + // int tmp_tot = 0; + while (batchIterator.hasNext()) { + if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) { + break; + } + // System.out.print( + // " (" + batchIterator.currentTime() + "," + batchIterator.currentValue() + ")"); + // tmp_tot++; + updateStatusFromData(batchIterator.currentValue()); + batchIterator.next(); + } + // System.out.println(" tot:" + tmp_tot); + } + + @Override + public void updateResultUsingTimestamps( + long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultUsingTimestamps:"); + // int tmp_tot = 0; + Object[] values = dataReader.getValuesInTimestamps(timestamps, length); + for (int i = 0; i < length; i++) { + if (values[i] != null) { + updateStatusFromData(values[i]); + // tmp_tot++; + } + } + // System.out.println(" tot:" + tmp_tot); + } + + @Override + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { + // List<Object> tmp = new ArrayList<>(); + while (valueIterator.hasNext()) { + updateStatusFromData(valueIterator.next()); + // Object tmpObj = valueIterator.next(); + // updateStatusFromData(tmpObj, 1); + // tmp.add(tmpObj); + } + // + // System.out.println("\t\t[MEDIAN]"+this.hashCode()+"[updateResultUsingValues]"+tmp.toString()); + } + + @Override + public int maxIteration() { + return 1; + } + + @Override + public boolean hasFinalResult() { + return hasFinalResult; + } + + @Override + public void merge(AggregateResult another) { + // System.out.println("[DEBUG] [merge] " + this.getResult() + " " + another.getResult()); + // merge not supported + // throw new QueryProcessException("Can't merge MedianAggregateResult"); + } + + @Override + protected void deserializeSpecificFields(ByteBuffer buffer) { + this.seriesDataType = TSDataType.deserialize(buffer.get()); + // TODO + } + + @Override + protected void serializeSpecificFields(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(seriesDataType, outputStream); + // TODO + } + + public long getN() { + return n; + } + + @Override + public void reset() { + super.reset(); + heapKLL = null; + lastL = cntL = Long.MIN_VALUE; + cntR = Long.MAX_VALUE; + n = 0; + iteration = 0; + hasFinalResult = false; + TOT_SKETCH_N = TOT_SKETCH_SIZE = 0; + SKETCH_SIZE = -1; + } + + @Override + public boolean canUpdateFromStatistics(Statistics statistics) { + if ((seriesDataType == DOUBLE) && iteration == 0) { + DoubleStatistics doubleStats = (DoubleStatistics) statistics; + // if (statistics.getCount() > 10000) + // System.out.println( + // "\t[DEBUG][STRICT KLL STAT DEBUG]\tcanUseStat? count:" + // + doubleStats.getCount() + // + " summaryNum:" + // + doubleStats.getSummaryNum()); + if (doubleStats.getSummaryNum() > 0) return true; + } + return false; + } + + @Override + public boolean groupByLevelBeforeAggregation() { + return true; + } +}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java index 4be7500..3216788 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugFullReadingAggrResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch; import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; -import org.apache.iotdb.tsfile.utils.LongKLLSketch; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -385,7 +384,7 @@ // stat.getKllSketch().getN()); // addSketch(stat.getKllSketch(), pageKLL, pageKLLMemoryByte); pageKLLNum += stat.getKllSketchNum(); - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch); // heapKLL.mergeWithTempSpace(stat.getKllSketch()); return; } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java index 9740898..955e894 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatDebugPageDemandRateAggrResult.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch; import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; -import org.apache.iotdb.tsfile.utils.LongKLLSketch; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -398,7 +397,7 @@ // stat.getKllSketch().getN()); // addSketch(stat.getKllSketch(), pageKLL, pageKLLMemoryByte); pageKLLNum += stat.getKllSketchNum(); - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch); // heapKLL.mergeWithTempSpace(stat.getKllSketch()); return; } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java index 8275def..499918e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatMedianAggrResult.java
@@ -398,7 +398,11 @@ // if(stat.getBfNum()>1) // System.out.println("\t\tFK\tstat:" + // stat.getStartTime() + "..." + stat.getEndTime()); - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) { + + ((LongKLLSketch) sketch).deserializeFromBuffer(); + addSketch(sketch); + } // heapKLL.mergeWithTempSpace(stat.getKllSketch()); return; } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!");
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java index c233815..560cd2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatOverlapSingleAggrResult.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch; import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; -import org.apache.iotdb.tsfile.utils.LongKLLSketch; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.eclipse.collections.api.iterator.MutableLongIterator; @@ -370,7 +369,7 @@ DoubleStatistics stat = (DoubleStatistics) statistics; if (stat.getKllSketchNum() > 0) { pageKLLNum += stat.getKllSketchNum(); - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) addSketch(sketch); // System.out.println( // "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:" // + stat.getKllSketch().getN());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java index 98e7211..0853062 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleAggrResult.java
@@ -361,7 +361,11 @@ if (stat.getSummaryNum() > 0) { pageKLLNum += stat.getSummaryNum(); statNum += 1; - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) { + + ((LongKLLSketch) sketch).deserializeFromBuffer(); + addSketch(sketch); + } // System.out.println( // "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:" // + stat.getKllSketch().getN());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java index 45c6dc5..de7220a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/KLLStatSingleReadAggrResult.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.aggregation.impl; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -28,26 +29,34 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.utils.HeapLongKLLSketch; +import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE; -import static org.apache.iotdb.tsfile.file.metadata.statistics.Statistics.SYNOPSIS_SIZE_IN_BYTE; public class KLLStatSingleReadAggrResult extends AggregateResult { private TSDataType seriesDataType; private int iteration; - private long pageKLLNum; + private long pageKLLNum, statNum; private long cntL, cntR, lastL; private long n, K1, heapN; + private HeapLongKLLSketch heapKLL; private boolean hasFinalResult; - private final int pageAvgError = 50, pageMaxError = 127; - private final int pageKLLMemoryByte = (68 + 15) * 8, pageKLLNumMemoryByte = SYNOPSIS_SIZE_IN_BYTE; - // private List<HeapLongKLLSketch> pageKLL; + private List<KLLSketchForQuantile> pageKLL; + private int pageKLLIndex; + private long TOT_SKETCH_N = 0, TOT_SKETCH_SIZE = 0; + private int SKETCH_SIZE = -1; + private int pageKLLMaxIndex; long DEBUG = 0; + public final boolean onlyUsePageSynopsis = + IoTDBDescriptor.getInstance().getConfig().getOnlyUsePageSynopsis(); private int getBitsOfDataType() { switch (seriesDataType) { @@ -111,21 +120,25 @@ long dataL = dataToLong(data); if (iteration == 0) n++; if (cntL <= dataL && dataL <= cntR) { - // heapKLL.update(dataL); + heapKLL.update(dataL); heapN++; } else if (lastL <= dataL && dataL < cntL) K1--; } @Override public void startIteration() { - heapN = 0; + heapN = statNum = 0; if (iteration == 0) { // first iteration + heapKLL = new HeapLongKLLSketch(maxMemoryByte); lastL = cntL = Long.MIN_VALUE; cntR = Long.MAX_VALUE; n = 0; pageKLLNum = 0; + pageKLLIndex = 0; } else { + heapKLL = new HeapLongKLLSketch(maxMemoryByte); pageKLLNum = 0; + pageKLL = null; System.out.println( "\t[KLL STAT DEBUG] start iteration " + iteration @@ -145,26 +158,15 @@ @Override public void finishIteration() { System.out.println( - "\t[KLL STAT DEBUG]" + "\t[KLL STAT SINGLE READ DEBUG]" + "finish iteration " - + iteration - + " cntL,R:" - + "[" - + cntL - + "," - + cntR - + "]" - + "\tlastL:" - + lastL - + "\tK1:" - + K1); - System.out.println("\t[KLL STAT DEBUG]" + " statNum:" + pageKLLNum); - iteration++; - if (n == 0) { - hasFinalResult = true; - return; - } - setDoubleValue(-233.0); + + " statNum:" + + statNum + + " pageKllNum:" + + pageKLLNum + + " heapN:" + + heapN); + setDoubleValue(heapN); hasFinalResult = true; } @@ -178,6 +180,43 @@ return hasCandidateResult() ? getDoubleValue() : null; } + // private void addSketch(KLLSketchForQuantile sketch, List<HeapLongKLLSketch> a, int baseByte) { + // int pos0 = 0; + // while (pos0 < pageKLLMaxLen && a.get(pos0) != null) pos0++; + // HeapLongKLLSketch bigger_sketch = new HeapLongKLLSketch(baseByte << pos0); + // bigger_sketch.mergeWithTempSpace(sketch); + // for (int i = 0; i < pos0; i++) { + // bigger_sketch.mergeWithTempSpace(a.get(i)); + // a.set(i, null); + // } + // if (pos0 == pageKLLMaxLen) { // mem of pageKLL list is too large. + // heapKLL.mergeWithTempSpace(bigger_sketch); + // } else a.set(pos0, bigger_sketch); + // } + private void addSketch(KLLSketchForQuantile sketch) { + TOT_SKETCH_N += sketch.getN(); + TOT_SKETCH_SIZE += sketch.getNumLen(); + if (SKETCH_SIZE < 0) { + SKETCH_SIZE = sketch.getNumLen() * 8; + pageKLLMaxIndex = (int) Math.floor((0.5 * maxMemoryByte / SKETCH_SIZE)); + pageKLL = new ArrayList<>(pageKLLMaxIndex); + for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.add(null); + } + if (pageKLLIndex < pageKLLMaxIndex) pageKLL.set(pageKLLIndex++, sketch); + else { + heapKLL.mergeWithTempSpace(pageKLL); + for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.set(i, null); + pageKLLIndex = 0; + pageKLL.set(pageKLLIndex++, sketch); + // System.out.println( + // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: " + // + heapKLL.getN() + // + " n_true:" + // + n); + // heapKLL.show(); + } + } + @Override public void updateResultFromStatistics(Statistics statistics) { switch (statistics.getType()) { @@ -195,28 +234,29 @@ } if (iteration == 0) { n += statistics.getCount(); - if (statistics.getType() == DOUBLE && ((DoubleStatistics) statistics).getKllSketchNum() > 0) - pageKLLNum += ((DoubleStatistics) statistics).getKllSketchNum(); - } - long minVal = dataToLong(statistics.getMinValue()); - long maxVal = dataToLong(statistics.getMaxValue()); - // System.out.println( - // "\t[KLL STAT DEBUG] update from statistics:\t" - // + "min,max:" - // + minVal - // + "," - // + maxVal - // + " n:" - // + statistics.getCount()); - // out of range - if (minVal > cntR || maxVal < lastL) return; - if (lastL <= minVal && maxVal < cntL) { - K1 -= statistics.getCount(); - return; - } - if (minVal == maxVal) { // min == max - for (int i = 0; i < statistics.getCount(); i++) updateStatusFromData(minVal); - return; + // if (statistics.getType() == DOUBLE) { + // } + if (statistics.getType() == DOUBLE) { + DoubleStatistics stat = (DoubleStatistics) statistics; + if (stat.getSummaryNum() > 0) { + pageKLLNum += stat.getSummaryNum(); + statNum += 1; + // for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); + System.out.println( + "\t[KLL STAT SINGLE READ DEBUG] updateResultFromStatistics. N:" + + stat.getCount() + + "\tT:" + + stat.getStartTime() + + "..." + + stat.getEndTime()); + System.out.print("\t\t\t\tPageN:"); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) + System.out.print("\t" + sketch.getN()); + System.out.println(); + // stat.getKllSketch().show(); + return; + } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!"); + } } } @@ -228,24 +268,34 @@ @Override public void updateResultFromPageData( IBatchDataIterator batchIterator, long minBound, long maxBound) { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultFromPageData:"); + // int tmp_tot = 0; while (batchIterator.hasNext()) { if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) { break; } + // System.out.print( + // " (" + batchIterator.currentTime() + "," + batchIterator.currentValue() + ")"); + // tmp_tot++; updateStatusFromData(batchIterator.currentValue()); batchIterator.next(); } + // System.out.println(" tot:" + tmp_tot); } @Override public void updateResultUsingTimestamps( long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException { + // System.out.print("\t[KLL STAT DEBUG]\tupdateResultUsingTimestamps:"); + // int tmp_tot = 0; Object[] values = dataReader.getValuesInTimestamps(timestamps, length); for (int i = 0; i < length; i++) { if (values[i] != null) { updateStatusFromData(values[i]); + // tmp_tot++; } } + // System.out.println(" tot:" + tmp_tot); } @Override @@ -297,28 +347,29 @@ @Override public void reset() { super.reset(); + heapKLL = null; lastL = cntL = Long.MIN_VALUE; cntR = Long.MAX_VALUE; n = 0; iteration = 0; hasFinalResult = false; + TOT_SKETCH_N = TOT_SKETCH_SIZE = 0; + SKETCH_SIZE = -1; } @Override public boolean canUpdateFromStatistics(Statistics statistics) { + System.out.println( + "\t\t[DEBUG single read]\tcanUpdateFromStatistics\tT:" + + statistics.getStartTime() + + "..." + + statistics.getEndTime()); if ((seriesDataType == DOUBLE) && iteration == 0) { DoubleStatistics doubleStats = (DoubleStatistics) statistics; - if (doubleStats.getKllSketchNum() > 0) return true; + if (onlyUsePageSynopsis ? doubleStats.getSummaryNum() == 1 : doubleStats.getSummaryNum() >= 1) + return true; // DEBUG } - if (iteration > 0) { - long minVal = dataToLong(statistics.getMinValue()); - long maxVal = dataToLong(statistics.getMaxValue()); - if (minVal > cntR || maxVal < lastL) return true; - if (lastL <= minVal && maxVal < cntL) return true; - } - Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue(); - Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue(); - return (minVal.compareTo(maxVal) == 0); // min==max + return false; } @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java index 24bfb02..2119371 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/StrictKLLStatSingleAggrResult.java
@@ -46,11 +46,10 @@ private long n, K1, heapN; private HeapLongStrictKLLSketch heapKLL; private boolean hasFinalResult; - private List<KLLSketchForQuantile> pageKLL; - private int pageKLLIndex; + private List<KLLSketchForQuantile> preComputedSketch; + private int preComputedSketchSize = 0; private long TOT_SKETCH_N = 0, TOT_SKETCH_SIZE = 0; private int SKETCH_SIZE = -1; - private int pageKLLMaxIndex; long DEBUG = 0; private int getBitsOfDataType() { @@ -196,11 +195,10 @@ cntR = Long.MAX_VALUE; n = 0; pageKLLNum = 0; - pageKLLIndex = 0; } else { heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte); pageKLLNum = 0; - pageKLL = null; + preComputedSketch = null; System.out.println( "\t[KLL STAT DEBUG] start iteration " + iteration @@ -220,7 +218,7 @@ @Override public void finishIteration() { System.out.println( - "\t[KLL STAT Single DEBUG]" + "\t[KLL STAT SINGLE" + "finish iteration " + iteration + " cntL,R:" @@ -234,10 +232,10 @@ + "\tK1:" + K1); System.out.println( - "\t[KLL STAT Single DEBUG]" + "\t[KLL STAT SINGLE" + " statNum:" + statNum - + " pageKllNum:" + + " summaryNum:" + pageKLLNum + " heapN:" + heapN); @@ -253,9 +251,9 @@ } long K2 = K1 + 1; // hasTwoMedians() ? (K1 + 1) : K1; - System.out.println("\t[KLL STAT Single DEBUG]" + " K1,K2:" + K1 + ", " + K2); + System.out.println("\t[KLL STAT SINGLE" + " K1,K2:" + K1 + ", " + K2); if (pageKLLNum == 0) { // all in heap - System.out.println("\t[KLL STAT Single DEBUG]" + " calc by heap only. N:" + heapKLL.getN()); + System.out.println("\t[KLL STAT SINGLE" + " calc by heap only. N:" + heapKLL.getN()); heapKLL.show(); double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1)); @@ -271,14 +269,13 @@ // System.out.println("\t[KLL STAT DEBUG] remaining pageKLLSize:" + pageKLLIndex); mergePageKLL(); heapKLL.show(); - System.out.println( - "\t[KLL STAT Single DEBUG] after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n); + System.out.println("\t[KLL STAT SINGLE after merge. heapN:" + heapKLL.getN() + "\tn_true:" + n); double v1 = longToResult(heapKLL.findMinValueWithRank(K1 - 1)); double v2 = longToResult(heapKLL.findMinValueWithRank(K2 - 1)); double ans = 0.5 * (v1 + v2); setDoubleValue(ans); hasFinalResult = true; - System.out.println("\t[KLL STAT Single DEBUG]" + " est_stats_err:" + approximateStatAvgError()); + // System.out.println("\t[KLL STAT SINGLE" + " est_stats_err:" + approximateStatAvgError()); } @Override @@ -304,35 +301,43 @@ // heapKLL.mergeWithTempSpace(bigger_sketch); // } else a.set(pos0, bigger_sketch); // } - private void addSketch(KLLSketchForQuantile sketch) { + public void addSketch(KLLSketchForQuantile sketch) { + n += sketch.getN(); + pageKLLNum++; TOT_SKETCH_N += sketch.getN(); TOT_SKETCH_SIZE += sketch.getNumLen(); if (SKETCH_SIZE < 0) { SKETCH_SIZE = sketch.getNumLen() * 8; - pageKLLMaxIndex = (int) Math.floor((0.5 * maxMemoryByte / SKETCH_SIZE)); - pageKLL = new ArrayList<>(pageKLLMaxIndex); - for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.add(null); + preComputedSketch = new ArrayList<>(); + preComputedSketchSize = 0; } - if (pageKLLIndex < pageKLLMaxIndex) pageKLL.set(pageKLLIndex++, sketch); - else { - heapKLL.mergeWithTempSpace(pageKLL); - for (int i = 0; i < pageKLLMaxIndex; i++) pageKLL.set(i, null); - pageKLLIndex = 0; - pageKLL.set(pageKLLIndex++, sketch); - // System.out.println( - // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: " - // + heapKLL.getN() - // + " n_true:" - // + n); - // heapKLL.show(); + preComputedSketch.add(sketch); + preComputedSketchSize += sketch.getNumLen() * 8; + if (preComputedSketchSize >= maxMemoryByte / 2) { + heapKLL.mergeWithTempSpace(preComputedSketch); + preComputedSketch.clear(); + preComputedSketchSize = 0; } + // if (pageKLLIndex < pageKLLMaxIndex) preComputedSketch.set(pageKLLIndex++, sketch); + // else { + // heapKLL.mergeWithTempSpace(preComputedSketch); + // for (int i = 0; i < pageKLLMaxIndex; i++) preComputedSketch.set(i, null); + // pageKLLIndex = 0; + // preComputedSketch.set(pageKLLIndex++, sketch); + // // System.out.println( + // // "\t[KLL STAT DEBUG]\theapKLL merge pageKLLList. newN: " + // // + heapKLL.getN() + // // + " n_true:" + // // + n); + // // heapKLL.show(); + // } } private void mergePageKLL() { HeapLongStrictKLLSketch tmpSketch = heapKLL; heapKLL = new HeapLongStrictKLLSketch(maxMemoryByte); heapKLL.mergeWithTempSpace(tmpSketch); - heapKLL.mergeWithTempSpace(pageKLL); + heapKLL.mergeWithTempSpace(preComputedSketch); } @Override @@ -351,19 +356,29 @@ "Unsupported data type in aggregation MEDIAN : %s", statistics.getType())); } if (iteration == 0) { - n += statistics.getCount(); + // n += statistics.getCount(); // if (statistics.getType() == DOUBLE) { // } if (statistics.getType() == DOUBLE) { DoubleStatistics stat = (DoubleStatistics) statistics; if (stat.getSummaryNum() > 0) { - pageKLLNum += stat.getSummaryNum(); + // pageKLLNum += stat.getSummaryNum(); statNum += 1; - for (LongKLLSketch sketch : stat.getKllSketchList()) addSketch(sketch); - // System.out.println( - // "\t[KLL STAT Single DEBUG] updateResultFromStatistics. pageN:" - // + stat.getKllSketch().getN()); - // stat.getKllSketch().show(); + for (KLLSketchForQuantile sketch : stat.getKllSketchList()) { + // System.out.println("\t[STRICT KLL STAT Single DEBUG] pageTime:"+); + // if (sketch.getN() > 10000) + // System.out.println( + // "\t[STRICT KLL STAT DEBUG] updateResultFromStatistics\tstatN:" + // + sketch.getN() + // + "\tstatNumLen:" + // + sketch.getNumLen()); + ((LongKLLSketch) sketch).deserializeFromBuffer(); + addSketch(sketch); + } + // System.out.println( + // "\t[KLL STAT SINGLE updateResultFromStatistics. pageN:" + // + stat.getKllSketch().getN()); + // stat.getKllSketch().show(); return; } else System.out.println("\t\t\t\t!!!!!![ERROR!] no KLL in stat!"); } @@ -485,17 +500,14 @@ hasFinalResult = false; TOT_SKETCH_N = TOT_SKETCH_SIZE = 0; SKETCH_SIZE = -1; + preComputedSketch = new ArrayList<>(); + preComputedSketchSize = 0; } @Override public boolean canUpdateFromStatistics(Statistics statistics) { if ((seriesDataType == DOUBLE) && iteration == 0) { DoubleStatistics doubleStats = (DoubleStatistics) statistics; - // System.out.println( - // "\t[DEBUG][KLL STAT SINGLE]\tcanUseStat? count:" - // + doubleStats.getCount() - // + " KLLNum:" - // + doubleStats.getKllSketchNum()); if (doubleStats.getSummaryNum() > 0) return true; } return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 0c6cdca..cd912d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.SingleDataSet; @@ -315,7 +316,7 @@ allUseStatistics &= aggregateResult.useStatisticsIfPossible(); IAggregateReader seriesReader; int strategy = IoTDBDescriptor.getInstance().getConfig().getAggregationStrategy(); - if (!allUseStatistics || strategy == 0) + if (!allUseStatistics || strategy == 0) { seriesReader = new SeriesAggregateReader( seriesPath, @@ -327,7 +328,13 @@ null, null, true); - else { + if (ascAggregateResultList.get(0).getAggregationType() + == AggregationType.STRICT_KLL_STAT_SINGLE) { + (((SeriesAggregateReader) seriesReader).getSeriesReader()).quantileAggrResult = + ascAggregateResultList.get(0); + (((SeriesAggregateReader) seriesReader).getSeriesReader()).aggrSST = true; + } + } else { if (strategy == 1) seriesReader = new SeriesAggregateReaderForStatChain( @@ -614,9 +621,29 @@ while (seriesReader.hasNextFile()) { aggregateResultList = findUnfinishedAggregateResults(aggregateResultList, resultToGroupedAhead); + // System.out.println( + // "\t\t[DEBUG nextFile In AggrExe] startT:" + // + seriesReader.currentFileStatistics().getStartTime() + // + "\tN:" + // + seriesReader.currentFileStatistics().getCount()); // try to calc by file statistics if (seriesReader.canUseCurrentFileStatistics()) { Statistics fileStatistics = seriesReader.currentFileStatistics(); + // { + // DoubleStatistics doubleStats = (DoubleStatistics) fileStatistics; + // System.out.println( + // "\t\t\t[aggr files from reader]\tfileStat\tN:" + // + doubleStats.getCount() + // + "\tT:" + // + doubleStats.getStartTime() + // + "..." + // + doubleStats.getEndTime() + // + "\tsummaryNum:" + // + doubleStats.getSummaryNum()); + // if (doubleStats.getSummaryNum() > 0) + // System.out.println( + // "\t\t\t\t\t\tsketch numLen:" + doubleStats.getOneKllSketch().getNumLen()); + // } List<AggregateResult> remainingAggregateResultList = tryToAggregateFromStatistics(aggregateResultList, fileStatistics, resultToGroupedAhead); if (remainingAggregateResultList.isEmpty()) { @@ -644,6 +671,7 @@ while (seriesReader.hasNextFile()) { // cal by file statistics // TODO + // if (seriesReader.canUseCurrentFileStatistics()) { // while (seriesReader.hasNextSubSeries()) { // Statistics fileStatistics = seriesReader.currentFileStatistics();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java index 602ca68..a829a6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -115,6 +115,10 @@ return new SamplingStatSingleAggrResult(dataType); case SQLConstant.STRICT_KLL_STAT_SINGLE: return new StrictKLLStatSingleAggrResult(dataType); + case SQLConstant.DDSKETCH_SINGLE: + return new DDSketchSingleAggrResult(dataType); + case SQLConstant.CHUNK_STAT_AVAIL: + return new KLLStatChunkAvailAggrResult(dataType); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName); } @@ -194,6 +198,10 @@ return new SamplingStatSingleAggrResult(dataType); case SQLConstant.STRICT_KLL_STAT_SINGLE: return new StrictKLLStatSingleAggrResult(dataType); + case SQLConstant.DDSKETCH_SINGLE: + return new DDSketchSingleAggrResult(dataType); + case SQLConstant.CHUNK_STAT_AVAIL: + return new KLLStatChunkAvailAggrResult(dataType); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName); } @@ -274,6 +282,10 @@ return new SamplingStatSingleAggrResult(dataType); case STRICT_KLL_STAT_SINGLE: return new StrictKLLStatSingleAggrResult(dataType); + case DDSKETCH_SINGLE: + return new DDSketchSingleAggrResult(dataType); + case CHUNK_STAT_AVAIL: + return new KLLStatChunkAvailAggrResult(dataType); default: throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name()); }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java index aef6f55..0d434c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java
@@ -41,6 +41,12 @@ private final SeriesReader seriesReader; + public SeriesReader getSeriesReader() { + return seriesReader; + } + + // public AggregateResult quantileAggrResult; + public SeriesAggregateReader( PartialPath seriesPath, Set<String> allSensors, @@ -100,6 +106,19 @@ @Override public boolean canUseCurrentFileStatistics() throws IOException { Statistics fileStatistics = currentFileStatistics(); + // if (fileStatistics.getCount() > 10000) { + // System.out.println( + // "\t\t\t\t[SeriesAggrReader checkFileStat]\toverlap:" + // + seriesReader.isFileOverlapped() + // + "\t" + // + fileStatistics.getStartTime() + // + "..." + // + fileStatistics.getEndTime() + // + "containedByT:" + // + containedByTimeFilter(fileStatistics) + // + "\tmodified:" + // + seriesReader.currentFileModified()); + // } return !seriesReader.isFileOverlapped() && containedByTimeFilter(fileStatistics) && !seriesReader.currentFileModified();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 2e7eea5..1f852f2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -18,10 +18,14 @@ */ package org.apache.iotdb.db.query.reader.series; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.idtable.IDTable; import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.KLLStatChunkAvailAggrResult; +import org.apache.iotdb.db.query.aggregation.impl.StrictKLLStatSingleAggrResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryTimeManager; import org.apache.iotdb.db.query.control.tracing.TracingManager; @@ -37,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -45,18 +50,18 @@ import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader; import org.apache.iotdb.tsfile.read.reader.IPageReader; +import org.apache.iotdb.tsfile.utils.KLLSketchForQuantile; +import org.apache.iotdb.tsfile.utils.LongKLLSketch; +import org.apache.iotdb.tsfile.utils.SegTreeBySketch; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.io.Serializable; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.Set; +import java.util.*; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -122,6 +127,12 @@ protected boolean hasCachedNextOverlappedPage; protected BatchData cachedBatchData; + public final boolean NoUpdate = IoTDBDescriptor.getInstance().getConfig().getNoUpdate(); + + public boolean aggrSST = false; + public AggregateResult quantileAggrResult; + private long unpackedTSEndTime = Long.MIN_VALUE; + /** * @param seriesPath For querying aligned series, the seriesPath should be AlignedPath. All * selected series belonging to one aligned device should be all in this one AlignedPath's @@ -264,7 +275,11 @@ // init first time series metadata whose startTime is minimum tryToUnpackAllOverlappedFilesToTimeSeriesMetadata(); } - + // if (firstTimeSeriesMetadata != null) { + // System.out.println("\tHasNextFile:" + + // firstTimeSeriesMetadata.getStatistics().getStartTime()); + // // System.out.println("\t\tunpacked other seq:" + seqTimeSeriesMetadata.size()); + // } return firstTimeSeriesMetadata != null; } @@ -272,6 +287,7 @@ if (firstTimeSeriesMetadata == null) { throw new IOException("no first file"); } + if (NoUpdate) return false; Statistics fileStatistics = firstTimeSeriesMetadata.getStatistics(); return !seqTimeSeriesMetadata.isEmpty() @@ -335,7 +351,9 @@ return true; } - while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) { + if (firstChunkMetadata == null + && (!cachedChunkMetadata.isEmpty() + || /*hasNextFile()*/ firstTimeSeriesMetadata != null)) { // TO CHECK initFirstChunkMetadata(); } return firstChunkMetadata != null; @@ -344,28 +362,37 @@ /** construct first chunk metadata */ private void initFirstChunkMetadata() throws IOException { if (firstTimeSeriesMetadata != null) { + // System.out.println("\t??? initChunkMD by firstTSMD."); /* * try to unpack all overlapped TimeSeriesMetadata to cachedChunkMetadata */ - unpackAllOverlappedTsFilesToTimeSeriesMetadata( - orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics())); + + if (!NoUpdate) { + unpackAllOverlappedTsFilesToTimeSeriesMetadata( + orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics())); + } unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( orderUtils.getOverlapCheckTime(firstTimeSeriesMetadata.getStatistics()), true); } else { /* * first time series metadata is already unpacked, consume cached ChunkMetadata */ - while (!cachedChunkMetadata.isEmpty()) { - firstChunkMetadata = cachedChunkMetadata.first(); - unpackAllOverlappedTsFilesToTimeSeriesMetadata( - orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics())); - unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( - orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false); - if (firstChunkMetadata.equals(cachedChunkMetadata.first())) { + if (!NoUpdate) { + while (!cachedChunkMetadata.isEmpty()) { firstChunkMetadata = cachedChunkMetadata.first(); - cachedChunkMetadata.remove(firstChunkMetadata); - break; + unpackAllOverlappedTsFilesToTimeSeriesMetadata( + orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics())); + unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( + orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false); + if (firstChunkMetadata.equals(cachedChunkMetadata.first())) { + firstChunkMetadata = cachedChunkMetadata.first(); + cachedChunkMetadata.remove(firstChunkMetadata); + break; + } } + } else { + firstChunkMetadata = cachedChunkMetadata.first(); + cachedChunkMetadata.remove(firstChunkMetadata); } } if (valueFilter != null @@ -376,15 +403,100 @@ } } + // protected void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( + // long endpointTime, boolean init) throws IOException { + // if (!NoUpdate) { + // while (!seqTimeSeriesMetadata.isEmpty() + // && orderUtils.isOverlapped(endpointTime, + // seqTimeSeriesMetadata.get(0).getStatistics())) { + // unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); + // } + // while (!unSeqTimeSeriesMetadata.isEmpty() + // && orderUtils.isOverlapped( + // endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { + // unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); + // } + // } + // + // if (firstTimeSeriesMetadata != null + // && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) { + // unpackOneTimeSeriesMetadata(firstTimeSeriesMetadata); + // firstTimeSeriesMetadata = null; + // } + // + // if (init && firstChunkMetadata == null && !cachedChunkMetadata.isEmpty()) { + // firstChunkMetadata = cachedChunkMetadata.first(); + // cachedChunkMetadata.remove(firstChunkMetadata); + // } + // } protected void unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( long endpointTime, boolean init) throws IOException { - while (!seqTimeSeriesMetadata.isEmpty() - && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) { - unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); + ObjectArrayList<ITimeSeriesMetadata> allTSMD = new ObjectArrayList<>(); + if (!NoUpdate && aggrSST) { + while (!seqTimeSeriesMetadata.isEmpty() + && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) { + // unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); + allTSMD.add(seqTimeSeriesMetadata.remove(0)); + } + while (!unSeqTimeSeriesMetadata.isEmpty() + && orderUtils.isOverlapped( + endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { + // unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); + allTSMD.add(unSeqTimeSeriesMetadata.poll()); + } + if (firstTimeSeriesMetadata != null + && orderUtils.isOverlapped(endpointTime, firstTimeSeriesMetadata.getStatistics())) { + allTSMD.add(firstTimeSeriesMetadata); + firstTimeSeriesMetadata = null; + } + for (ITimeSeriesMetadata tsmd : allTSMD) { + System.out.println( + "\t\t\t\t[debug unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata]\t\ttsmd_T:" + + tsmd.getStatistics().getStartTime() + + "..." + + tsmd.getStatistics().getEndTime() + + "\t\t\tunpackedTSEndTime=" + + unpackedTSEndTime + + " statN=" + + tsmd.getStatistics().getCount()); + LongArrayList usedChunkL = new LongArrayList(), usedChunkR = new LongArrayList(); + ObjectArrayList<ITimeSeriesMetadata> otherTSMD = new ObjectArrayList<>(allTSMD); + otherTSMD.remove(tsmd); + if (((DoubleStatistics) tsmd.getStatistics()).hasSegTreeBySketch) { + SegTreeBySketch segT = ((DoubleStatistics) tsmd.getStatistics()).segTreeBySketch; + ObjectArrayList<KLLSketchForQuantile> queriedSketch = new ObjectArrayList<>(); + segT.range_query_in_SST_sketches( + queriedSketch, timeFilter, unpackedTSEndTime + 1, Long.MAX_VALUE, otherTSMD); + for (KLLSketchForQuantile sketch : queriedSketch) { + ((LongKLLSketch) sketch).deserializeFromBuffer(); + System.out.println("\t\t\t\t\ta sketch in SegT queried. sketchN=" + sketch.getN()); + if (quantileAggrResult instanceof StrictKLLStatSingleAggrResult) + ((StrictKLLStatSingleAggrResult) quantileAggrResult).addSketch(sketch); + if (quantileAggrResult instanceof KLLStatChunkAvailAggrResult) + ((KLLStatChunkAvailAggrResult) quantileAggrResult).addSketch(sketch); + } + usedChunkL.addAll(segT.queriedChunkL); + usedChunkR.addAll(segT.queriedChunkR); + } + unpackOneTimeSeriesMetadata(tsmd, usedChunkL, usedChunkR); + } + for (ITimeSeriesMetadata tsmd : allTSMD) + unpackedTSEndTime = Math.max(unpackedTSEndTime, tsmd.getStatistics().getEndTime()); + // if (!allTSMD.isEmpty()) + // System.out.println( + // "\t\t\t\t\tafter try to use sgt. unpackedTSEndTime<--" + unpackedTSEndTime); } - while (!unSeqTimeSeriesMetadata.isEmpty() - && orderUtils.isOverlapped(endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { - unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); + if (!aggrSST) { + + while (!seqTimeSeriesMetadata.isEmpty() + && orderUtils.isOverlapped(endpointTime, seqTimeSeriesMetadata.get(0).getStatistics())) { + unpackOneTimeSeriesMetadata(seqTimeSeriesMetadata.remove(0)); + } + while (!unSeqTimeSeriesMetadata.isEmpty() + && orderUtils.isOverlapped( + endpointTime, unSeqTimeSeriesMetadata.peek().getStatistics())) { + unpackOneTimeSeriesMetadata(unSeqTimeSeriesMetadata.poll()); + } } if (firstTimeSeriesMetadata != null @@ -399,8 +511,47 @@ } } + protected void unpackOneTimeSeriesMetadata( + ITimeSeriesMetadata timeSeriesMetadata, LongArrayList usedChunkL, LongArrayList usedChunkR) + throws IOException { + List<IChunkMetadata> chunkMetadataList = + FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata); + chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq())); + int last = 0, beforeUnpack = cachedChunkMetadata.size(), skippedN = 0; + for (int i = 0; i < usedChunkL.size(); i++) { + while (last < chunkMetadataList.size() + && chunkMetadataList.get(last).getEndTime() < usedChunkL.getLong(i)) + cachedChunkMetadata.add(chunkMetadataList.get(last++)); + while (last < chunkMetadataList.size() + && chunkMetadataList.get(last).getEndTime() <= usedChunkR.getLong(i)) { + // System.out.println( + // "\t\t\t\t\t\ta chunkMD skipped since queried in seg. chunk T:" + // + chunkMetadataList.get(last).getStartTime() + // + "..." + // + chunkMetadataList.get(last).getEndTime() + // + "\t\tchunk_sketch_num=" + // + ((DoubleStatistics) + // chunkMetadataList.get(last).getStatistics()).getSummaryNum()); + skippedN += chunkMetadataList.get(last).getStatistics().getCount(); + last++; + } + } + while (last < chunkMetadataList.size()) cachedChunkMetadata.add(chunkMetadataList.get(last++)); + System.out.println( + "\t\tin the partial TS_unpacking, " + + (cachedChunkMetadata.size() - beforeUnpack) + + " of all " + + chunkMetadataList.size() + + "chunks unpacked."); + System.out.println("\t\t\tskippedN:" + skippedN); + } + protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata timeSeriesMetadata) throws IOException { + unpackedTSEndTime = + Math.max(unpackedTSEndTime, timeSeriesMetadata.getStatistics().getEndTime()); + System.out.println( + "\t\t\t\t\tunpackOneTimeSeriesMetadata. unpackedTSEndTime<--" + unpackedTSEndTime); List<IChunkMetadata> chunkMetadataList = FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata); chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq())); @@ -426,6 +577,7 @@ if (firstChunkMetadata == null) { throw new IOException("no first chunk"); } + if (NoUpdate) return false; Statistics chunkStatistics = firstChunkMetadata.getStatistics(); return !cachedChunkMetadata.isEmpty() @@ -545,6 +697,7 @@ if (firstPageReader == null) { return false; } + if (NoUpdate) return false; long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime); @@ -570,18 +723,20 @@ unpackOneChunkMetaData(firstChunkMetadata); firstChunkMetadata = null; } - // In case unpacking too many sequence chunks - boolean hasMeetSeq = false; - while (!cachedChunkMetadata.isEmpty() - && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.first().getStatistics())) { - if (cachedChunkMetadata.first().isSeq() && hasMeetSeq) { - break; - } else if (cachedChunkMetadata.first().isSeq()) { - hasMeetSeq = true; + if (!NoUpdate) { + // In case unpacking too many sequence chunks + boolean hasMeetSeq = false; + while (!cachedChunkMetadata.isEmpty() + && orderUtils.isOverlapped(endpointTime, cachedChunkMetadata.first().getStatistics())) { + if (cachedChunkMetadata.first().isSeq() && hasMeetSeq) { + break; + } else if (cachedChunkMetadata.first().isSeq()) { + hasMeetSeq = true; + } + IChunkMetadata tmp = cachedChunkMetadata.first(); + cachedChunkMetadata.remove(tmp); + unpackOneChunkMetaData(tmp); } - IChunkMetadata tmp = cachedChunkMetadata.first(); - cachedChunkMetadata.remove(tmp); - unpackOneChunkMetaData(tmp); } if (init && firstPageReader == null @@ -655,6 +810,7 @@ if (hasCachedNextOverlappedPage) { return true; } + if (NoUpdate) return false; /* * has a non-overlapped page in firstPageReader @@ -749,6 +905,8 @@ return true; } + if (NoUpdate) return false; + tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); while (true) { @@ -935,10 +1093,12 @@ // unpack overlapped page using current page reader if (firstPageReader != null) { - long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); - unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime); - unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false); - unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false); + if (!NoUpdate) { + long overlapCheckTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); + unpackAllOverlappedTsFilesToTimeSeriesMetadata(overlapCheckTime); + unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(overlapCheckTime, false); + unpackAllOverlappedChunkMetadataToPageReaders(overlapCheckTime, false); + } // this page after unpacking must be the first page if (firstPageReader.equals(getFirstPageReaderFromCachedReaders())) { @@ -1032,9 +1192,11 @@ /* * Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty */ - while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) { - unpackUnseqTsFileResource(); - } + if (!NoUpdate || seqTimeSeriesMetadata.isEmpty()) { + while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) { + unpackUnseqTsFileResource(); + } + } // if NoUpdate, consume all seq first. /* * find end time of the first TimeSeriesMetadata @@ -1057,8 +1219,10 @@ /* * unpack all directly overlapped seq/unseq files with first TimeSeriesMetadata */ - if (endTime != -1) { - unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime); + if (!NoUpdate) { + if (endTime != -1) { + unpackAllOverlappedTsFilesToTimeSeriesMetadata(endTime); + } } /* @@ -1086,6 +1250,9 @@ && !valueFilter.satisfy(firstTimeSeriesMetadata.getStatistics())) { firstTimeSeriesMetadata = null; } + // System.out.println("[tryToUnpackAllOverlappedFilesToTimeSeriesMetadata]"+ + // "\tFirst:"+firstTimeSeriesMetadata.getStatistics().getStartTime()+ + // "--"+firstTimeSeriesMetadata.getStatistics().getEndTime()); } protected void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index d2329d01..480bfb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -216,6 +216,8 @@ case SQLConstant.TDIGEST_STAT_SINGLE: case SQLConstant.SAMPLING_STAT_SINGLE: case SQLConstant.STRICT_KLL_STAT_SINGLE: + case SQLConstant.DDSKETCH_SINGLE: + case SQLConstant.CHUNK_STAT_AVAIL: return TSDataType.DOUBLE; case SQLConstant.LAST_VALUE: case SQLConstant.FIRST_VALUE:
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java index aaf30fa..1af9616 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -145,6 +145,8 @@ case SQLConstant.TDIGEST_STAT_SINGLE: case SQLConstant.SAMPLING_STAT_SINGLE: case SQLConstant.STRICT_KLL_STAT_SINGLE: + case SQLConstant.DDSKETCH_SINGLE: + case SQLConstant.CHUNK_STAT_AVAIL: return TSDataType.DOUBLE; default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java index d8d97ac..3d3fa94 100644 --- a/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java +++ b/session/src/test/java/org/apache/iotdb/session/InsertCsvDataIT.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -49,8 +50,8 @@ private static int originCompactionThreadNum; private static final List<String> deviceList = new ArrayList<>(); private static final List<Integer> sizeList = new ArrayList<>(); - private static final int baseSize = 4096 * 20000; - private static final int TABLET_SIZE = 4096; + private static final int baseSize = 8192 * 6713; + private static final int TABLET_SIZE = 8192; private static final int deviceNumL = 0, deviceNumR = 1; private static final List<String> seriesList = new ArrayList<>(); private static final List<TSDataType> dataTypeList = new ArrayList<>(); @@ -163,69 +164,145 @@ private static void insertDataFromTXT() throws IoTDBConnectionException, StatementExecutionException, IOException { - long START_TIME = new Date().getTime(); + final int TEST_CASE = 1; + String[] fileList = new String[10], sgName = new String[10]; + String sketch_size = "4096T4"; + fileList[0] = "1_bitcoin.csv"; + sgName[0] = "root.bitcoin" + sketch_size; + fileList[1] = "2_SpacecraftThruster.txt"; + sgName[1] = "root.thruster" + sketch_size; + fileList[2] = "3_taxipredition8M.txt"; + sgName[2] = "root.taxi" + sketch_size; + fileList[3] = "4_wh.csv"; + sgName[3] = "root.wh" + sketch_size; + // fileList[1] = "tmp_3_55.txt"; + // fileList[2] = "tmp_0_55.txt"; + // fileList[3] = "tmp_2_55.txt"; + // fileList[4] = "tmp_1_60.txt"; + // fileList[5] = "tmp_0_131.txt"; + // fileList[6] = "tmp_1_131.txt"; + // fileList[7] = "tmp_0_356.txt"; + for (int fileID : new int[] {0}) { + System.out.println("\t\t" + fileList[fileID] + "\t" + sketch_size + "\t\t"); + System.out.print("\t\t\t"); - String[] fileList = new String[10]; - fileList[0] = "tmp_1_55.txt"; - fileList[1] = "tmp_3_55.txt"; - fileList[2] = "tmp_0_55.txt"; - fileList[3] = "tmp_2_55.txt"; - fileList[4] = "tmp_1_60.txt"; - fileList[5] = "tmp_0_131.txt"; - fileList[6] = "tmp_1_131.txt"; - fileList[7] = "tmp_0_356.txt"; + String filename = fileList[fileID]; + String folder = "D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2"; + String filepath = folder + "\\" + filename; + DoubleArrayList vv = new DoubleArrayList(); - for (int i = 0; i < 8; i++) { - String series = "s" + i; - session.createTimeseries( - "root.real.d0." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY); + for (int T = 0; T < TEST_CASE; T++) { + try { + session.executeNonQueryStatement("delete storage group " + sgName[fileID]); + Thread.sleep(4000); + } catch (Exception e) { + // no-op + } + File file = new File(filepath); + BufferedInputStream fis = null; + fis = new BufferedInputStream(new FileInputStream(file)); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024); + reader.readLine(); // ignore first line! + + long START_TIME = new Date().getTime(); + + String series = "s0"; + session.createTimeseries( + sgName[fileID] + ".d0.s0", TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY); + MeasurementSchema schema = + new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN); + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(schema); + + // Random random = new Random(233); + + int chunk_num = 0; + String device = sgName[fileID] + ".d0"; + long CNT_TIME = 0; // new Date().getTime(); + long INGEST_TIME = 0; + while (true) { + vv.clear(); + for (String tmps = reader.readLine(); + tmps != null && vv.size() < TABLET_SIZE * 1000; + tmps = reader.readLine()) vv.add(Double.parseDouble(tmps)); + + INGEST_TIME -= new Date().getTime(); + for (int i = 0; i < vv.size() / TABLET_SIZE; i++) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (int j = 0; j < TABLET_SIZE; j++) { + int row = tablet.rowSize++; + timestamps[row] = CNT_TIME++; + ((double[]) values[0])[row] = vv.getDouble(i * TABLET_SIZE + j); + } + session.insertTablet(tablet); + if (++chunk_num == 6713) break; + } + INGEST_TIME += new Date().getTime(); + if (chunk_num == 6713) break; + if (vv.size() < TABLET_SIZE) break; + } + System.out.print("\t" + INGEST_TIME); + System.out.flush(); + } + System.out.println(); + } + } + + private static void append(int chunkToAppend) + throws IoTDBConnectionException, StatementExecutionException, IOException { + final int TEST_CASE = 1; + String[] fileList = new String[10], sgName = new String[10]; + String sketch_size = "4096T32"; + fileList[0] = "1_bitcoin.csv"; + sgName[0] = "root.bitcoin" + sketch_size; + fileList[1] = "2_SpacecraftThruster.txt"; + sgName[1] = "root.thruster" + sketch_size; + fileList[2] = "3_taxipredition8M.txt"; + sgName[2] = "root.taxi" + sketch_size; + fileList[3] = "4_wh.csv"; + sgName[3] = "root.wh" + sketch_size; + for (int fileID : new int[] {1}) { + System.out.println("APPEND to\t\t" + fileList[fileID] + "\t" + sketch_size + "\t\t"); + System.out.print("\t\t\t"); + + String series = "s0"; MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN); List<MeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(schema); - Random random = new Random(233); - String filename = fileList[i]; - String folder = "E:\\real-world data\\Kaggle"; - String filepath = folder + "\\" + filename; - File file = new File(filepath); - BufferedInputStream fis = null; - fis = new BufferedInputStream(new FileInputStream(file)); - BufferedReader reader = - new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024); - reader.readLine(); // ignore first line! + // Random random = new Random(233); - String device = "root.real.d0"; - long CNT_TIME = i * real_data_series_base_time; - String tmps; - boolean over_flag = false; - while (!over_flag) { + int chunk_num = 0; + String device = sgName[fileID] + ".d0"; + long CNT_TIME = new Date().getTime(); + long INGEST_TIME = 0; + Random random = new Random(233); + + INGEST_TIME -= new Date().getTime(); + for (int i = 0; i < chunkToAppend; i++) { Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); long[] timestamps = tablet.timestamps; Object[] values = tablet.values; for (int j = 0; j < TABLET_SIZE; j++) { - if ((tmps = reader.readLine()) != null) { - int row = tablet.rowSize++; - timestamps[row] = CNT_TIME; - ((double[]) values[0])[row] = Double.parseDouble(tmps); - CNT_TIME++; - } else { - over_flag = true; - break; - } + int row = tablet.rowSize++; + timestamps[row] = CNT_TIME++; + ((double[]) values[0])[row] = random.nextGaussian(); } - if (!over_flag) { - session.insertTablet(tablet); - } + session.insertTablet(tablet); } } - System.out.println("\t\t[WRITE FINISH]:\t" + (new Date().getTime() - START_TIME)); } @Test public void insertDATA() { try { - prepareTimeSeriesData(); - // insertDataFromTXT(); + // prepareTimeSeriesData(); + insertDataFromTXT(); + // append(5); // insertDataFromTXT(); // insertDataFromTXT(3, 3, 0); } catch (IoTDBConnectionException | StatementExecutionException | IOException e) {
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java index 269032a..7aca826 100644 --- a/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java +++ b/session/src/test/java/org/apache/iotdb/session/InsertDataIT.java
@@ -49,8 +49,8 @@ private static int originCompactionThreadNum; private static final List<String> deviceList = new ArrayList<>(); private static final List<Integer> sizeList = new ArrayList<>(); - private static final int baseSize = 4096 * 20000; - private static final int TABLET_SIZE = 4096 * 20; + private static final int TABLET_SIZE = 8192; + private static final int baseSize = TABLET_SIZE * 1500; // (30 * 30 * 30 - 1); private static final int deviceNumL = 0, deviceNumR = 1; private static final List<String> seriesList = new ArrayList<>(); private static final List<TSDataType> dataTypeList = new ArrayList<>(); @@ -61,7 +61,7 @@ @BeforeClass public static void setUp() throws Exception { for (int i = deviceNumL; i < deviceNumR; i++) { - deviceList.add("root.Summary1.d" + i); + deviceList.add("root.test1500.SSTd" + i); sizeList.add(baseSize * (i + 1)); } for (int i = 0; i < series_num; i++) {
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java index 160ff7e..6b5166e 100644 --- a/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java +++ b/session/src/test/java/org/apache/iotdb/session/InsertLatencyDataIT.java
@@ -35,11 +35,9 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Random; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.*; import static org.junit.Assert.fail; @@ -49,20 +47,22 @@ private static int originCompactionThreadNum; private static final List<String> deviceList = new ArrayList<>(); private static final List<Integer> sizeList = new ArrayList<>(); - private static final int baseSize = 100000 * 200; - private static final int TABLET_SIZE = 100000; + private static final int baseSize = 4096 * 64; + private static final int TABLET_SIZE = 4096 * 16; private static final int device_num = 1; private static final List<String> seriesList = new ArrayList<>(); private static final List<TSDataType> dataTypeList = new ArrayList<>(); private static final int series_num = 1; private static final int Long_Series_Num = 0; private static final boolean inMemory = false; - static final double mu = 2, sig = 3; + static final double mu = 2, sig = 3.0; + static final String muS = Integer.toString((int) (Math.round(mu * 10))); + static final String sigS = Integer.toString((int) (Math.round(sig * 10))); @BeforeClass public static void setUp() throws Exception { for (int i = 0; i < device_num; i++) { - deviceList.add("root.disorder_16_23.d" + i); + deviceList.add("root.latency_" + muS + "_" + sigS + ".d" + i); sizeList.add(baseSize * (i + 1)); } for (int i = 0; i < series_num; i++) { @@ -97,6 +97,12 @@ for (String device : deviceList) { for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { String series = seriesList.get(seriesID); + try { + session.executeNonQueryStatement( + "delete storage group " + device.substring(0, device.length() - 3)); + } catch (Exception e) { + // no-op + } session.createTimeseries( device + "." + series, dataTypeList.get(seriesID), @@ -114,17 +120,6 @@ } Random random = new Random(233); - long REVERSE_TIME = 1L << 40; - - // List<LongLongPair> a = new ArrayList<>(baseSize); - // for (int i = 0; i < baseSize; i++) { - // a.add( - // PrimitiveTuples.pair( - // (long) i, (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian())))); - // // System.out.println("\t\t"+a.get(i).getOne()+"\t"+a.get(i).getTwo()); - // } - // a.sort(Comparator.comparingLong(LongLongPair::getTwo)); - long[] aa; long[] bb; IntArrayList cc; @@ -140,13 +135,19 @@ for (int deviceID = 0; deviceID < device_num; deviceID++) { String device = deviceList.get(deviceID); + + // ArrayList<Double> tmpList = new ArrayList<>(); + // for (int seriesID = 0; seriesID < series_num; seriesID++) tmpList.add(-233.0); + // session.insertRecord(device, 1L << 40, seriesList, dataTypeList, tmpList.toArray()); + // session.executeNonQueryStatement("flush"); + int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1); long TOTAL_SIZE = baseSize * (deviceID + 1); long index = 0; for (int i = 0; i < TABLET_NUM; i++) { - long BASE_TIME; - if (i == 0) BASE_TIME = REVERSE_TIME; - else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1; + // long BASE_TIME; + // if (i == 0) BASE_TIME = REVERSE_TIME; + // else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1; Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); @@ -155,9 +156,9 @@ for (long time = 0; time < TABLET_SIZE; index++, time++) { int row = tablet.rowSize++; - timestamps[row] = aa[cc.get((int) index)]; + timestamps[row] = aa[cc.getInt((int) index)]; // if (index < 100) System.out.println("\t" + timestamps[row]); - if (i == 0) timestamps[row] += 1L << 30; + // if (i == 0) timestamps[row] += 1L << 30; for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { String series = seriesList.get(seriesID); @@ -169,6 +170,100 @@ } } session.insertTablet(tablet); + // session.executeNonQueryStatement("flush"); + } + session.executeNonQueryStatement("flush"); + } + System.out.println( + "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME)); + } + + private static void insertDataFromFile() + throws IoTDBConnectionException, StatementExecutionException, IOException { + long START_TIME = System.currentTimeMillis(); + + Random random = new Random(233); + long[] aa; + long[] bb; + IntArrayList cc; + aa = new long[baseSize]; + bb = new long[baseSize]; + cc = new IntArrayList(baseSize); + for (int i = 0; i < baseSize; i++) { + cc.add(i); + aa[i] = i; + bb[i] = (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian())); + } + cc.sort((x, y) -> (Long.compare(bb[x], bb[y]))); + + String[] fileList = new String[10]; + fileList[1] = "1_bitcoin.csv"; + fileList[2] = "2_physiological_stress.txt"; + fileList[3] = "4_taxipredition8M.txt"; + fileList[4] = "5_wh.csv"; + + for (int fileID = 2; fileID <= 2; fileID++) { + String series = "s0"; + String storage_group = "root.real_" + fileID + "_latency_" + muS + "_" + sigS; + String device = storage_group + ".d0"; + try { + session.executeNonQueryStatement("delete storage group " + storage_group); + } catch (Exception e) { + // no-op + } + session.createTimeseries( + device + "." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY); + + MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN); + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(schema); + + session.insertRecord( + device, + 1L << 40, + Collections.singletonList(series), + Collections.singletonList(TSDataType.DOUBLE), + Collections.singletonList(-233.0).toArray()); + session.executeNonQueryStatement("flush"); + String filename = fileList[fileID]; + String folder = "F:\\real-world data\\LSM-quantile"; + String filepath = folder + "\\" + filename; + File file = new File(filepath); + BufferedInputStream fis = null; + fis = new BufferedInputStream(new FileInputStream(file)); + BufferedReader reader = + new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024); + reader.readLine(); // ignore first line! + String tmps; + boolean over_flag = false; + long index = 0; + double[] vv = new double[baseSize]; + while (!over_flag && index < baseSize) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (int j = 0; j < TABLET_SIZE; index++, j++) { + if ((tmps = reader.readLine()) != null) { + vv[(int) index] = Double.parseDouble(tmps); + } else { + over_flag = true; + break; + } + } + } + + index = 0; + for (int i = 0; i < baseSize / TABLET_SIZE; i++) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (long time = 0; time < TABLET_SIZE; index++, time++) { + int row = tablet.rowSize++; + timestamps[row] = aa[cc.getInt((int) index)]; + double num = random.nextGaussian(); + ((double[]) values[0])[row] = vv[cc.getInt((int) index)]; + } + session.insertTablet(tablet); session.executeNonQueryStatement("flush"); } } @@ -176,16 +271,11 @@ "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME)); } - static final long real_data_series_base_time = 1L << 32; - - private static void insertDataFromTXT() - throws IoTDBConnectionException, StatementExecutionException, IOException {} - @Test public void insertDATA() { try { prepareTimeSeriesData(mu, sig); - // insertDataFromTXT(); + // insertDataFromFile(); // insertDataFromTXT(); // insertDataFromTXT(3, 3, 0); } catch (IoTDBConnectionException | StatementExecutionException | IOException e) { @@ -194,20 +284,19 @@ } } - @Test - public void executeStatement() - throws IoTDBConnectionException, StatementExecutionException, IOException { - SessionDataSet dataSet; - dataSet = session.executeQueryStatement("show timeseries"); - while (dataSet.hasNext()) System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); - long ST; - - ST = new Date().getTime(); - for (int i = 0; i < 1; i++) - dataSet = - session.executeQueryStatement( - "select exact_median_kll_stat_single(s0) from " + deviceList.get(0)); - System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); - System.out.println("\t\ttime:" + (new Date().getTime() - ST)); - } + // @Test + // public void executeStatement() + // throws IoTDBConnectionException, StatementExecutionException, IOException { + // SessionDataSet dataSet; + // dataSet = session.executeQueryStatement("show timeseries"); + // while (dataSet.hasNext()) System.out.println("[DEBUG]" + + // dataSet.next().getFields().toString()); + // long ST; + // + // ST = new Date().getTime(); + // for (int i = 0; i < 1; i++) + // dataSet = session.executeQueryStatement("select count(s0) from " + deviceList.get(0)); + // System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); + // System.out.println("\t\ttime:" + (new Date().getTime() - ST)); + // } }
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java new file mode 100644 index 0000000..cc9aed4 --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/InsertUnseqDataIT.java
@@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.fail; + +public class InsertUnseqDataIT { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static Session session; + private static int originCompactionThreadNum; + private static final List<String> deviceList = new ArrayList<>(); + private static final List<Integer> sizeList = new ArrayList<>(); + private static final int TABLET_SIZE = 8192; + private static final int baseSize = TABLET_SIZE * 6713; // (30 * 30 * 30 - 1); + private static final int deviceNumL = 0, deviceNumR = 1; + private static final List<String> seriesList = new ArrayList<>(); + private static final List<TSDataType> dataTypeList = new ArrayList<>(); + private static final int series_num = 1; + private static final int Long_Series_Num = 0; + private static final boolean inMemory = false; + + @BeforeClass + public static void setUp() throws Exception { + for (int i = deviceNumL; i < deviceNumR; i++) { + deviceList.add("root.testU1.d" + i); + sizeList.add(baseSize * (i + 1)); + } + for (int i = 0; i < series_num; i++) { + seriesList.add("s" + i); + if (i < Long_Series_Num) dataTypeList.add(TSDataType.INT64); + else dataTypeList.add(TSDataType.DOUBLE); + } + originCompactionThreadNum = CONFIG.getConcurrentCompactionThread(); + CONFIG.setConcurrentCompactionThread(0); + if (inMemory) EnvironmentUtils.envSetUp(); + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + if (inMemory) { + prepareTimeSeriesData(); + // insertDataFromTXT(5); + } + } + + @AfterClass + public static void tearDown() throws Exception { + session.close(); + if (inMemory) EnvironmentUtils.cleanEnv(); + CONFIG.setConcurrentCompactionThread(originCompactionThreadNum); + } + + private static void prepareTimeSeriesData() + throws IoTDBConnectionException, StatementExecutionException, IOException { + System.out.println("\t\t????" + deviceList + "||||" + seriesList); + long START_TIME = System.currentTimeMillis(); + final int START_SERIES = 0; + for (String device : deviceList) { + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + String series = seriesList.get(seriesID); + session.createTimeseries( + device + "." + series, + dataTypeList.get(seriesID), + TSEncoding.PLAIN, + CompressionType.SNAPPY); + } + } + + List<MeasurementSchema> schemaList = new ArrayList<>(); + + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + schemaList.add( + new MeasurementSchema( + seriesList.get(seriesID), dataTypeList.get(seriesID), TSEncoding.PLAIN)); + } + + Random random = new Random(233); + + for (int deviceID = 0; deviceID < deviceNumR - deviceNumL; deviceID++) { + String device = deviceList.get(deviceID); + int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1); + long TOTAL_SIZE = baseSize * (deviceID + 1); + + // Tablet unSeqTablet = new Tablet(device, schemaList, 1); + // unSeqTablet.timestamps[0]=1L<<40; + // ((double[]) unSeqTablet.values[0])[0]=-2.33; + // session.insertTablet(unSeqTablet); + // session.executeNonQueryStatement("flush"); + + for (int i = -1; i < TABLET_NUM; i++) { + long BASE_TIME = (long) i * TABLET_SIZE; + if (i < 0) BASE_TIME = 1L << 40; + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + + for (long time = 0; time < TABLET_SIZE; time++) { + int row = tablet.rowSize++; + // if (i % 2 == 1) + // timestamps[row] = /*BASE_TIME + time*/ +baseSize + random.nextInt(baseSize); + // else + timestamps[row] = BASE_TIME + time; + long index = timestamps[row]; + + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + String series = seriesList.get(seriesID); + + if (seriesID == 0) { + double num = random.nextDouble(); + ((double[]) values[seriesID])[row] = num; + } else if (seriesID == 1) { + double num = index; + ((double[]) values[seriesID])[row] = num; + } else if (seriesID == 2) { + long num = 1; // = ((random.nextInt() & 1) == 1) ? 1 : -1; + num = num * (long) (Math.pow(10, 1 + random.nextDouble() * 17.5)); // iid log-uniform + ((double[]) values[seriesID])[row] = Double.longBitsToDouble(num); + } else if (seriesID == 3) { + double num = (index % 7989) * Math.sin(index % 7989); + ((double[]) values[seriesID])[row] = num; + } + } + } + session.insertTablet(tablet); + // session.executeNonQueryStatement("flush"); + } + } + System.out.println( + "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME)); + } + + static final long real_data_series_base_time = 1L << 32; + + private static void insertDataFromTXT() + throws IoTDBConnectionException, StatementExecutionException, IOException { + long START_TIME = new Date().getTime(); + + String[] fileList = new String[10]; + fileList[0] = "tmp_1_55.txt"; + fileList[1] = "tmp_3_55.txt"; + fileList[2] = "tmp_0_55.txt"; + fileList[3] = "tmp_2_55.txt"; + fileList[4] = "tmp_1_60.txt"; + fileList[5] = "tmp_0_131.txt"; + fileList[6] = "tmp_1_131.txt"; + fileList[7] = "tmp_0_356.txt"; + + for (int i = 0; i < 8; i++) { + String series = "s" + i; + session.createTimeseries( + "root.real.d0." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY); + MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN); + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(schema); + + Random random = new Random(233); + String filename = fileList[i]; + String folder = "E:\\real-world data\\Kaggle"; + String filepath = folder + "\\" + filename; + File file = new File(filepath); + BufferedInputStream fis = null; + fis = new BufferedInputStream(new FileInputStream(file)); + BufferedReader reader = + new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024); + reader.readLine(); // ignore first line! + + String device = "root.real.d0"; + long CNT_TIME = i * real_data_series_base_time; + String tmps; + boolean over_flag = false; + while (!over_flag) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (int j = 0; j < TABLET_SIZE; j++) { + if ((tmps = reader.readLine()) != null) { + int row = tablet.rowSize++; + timestamps[row] = CNT_TIME; + ((double[]) values[0])[row] = Double.parseDouble(tmps); + CNT_TIME++; + } else { + over_flag = true; + break; + } + } + if (!over_flag) { + session.insertTablet(tablet); + } + } + } + System.out.println("\t\t[WRITE FINISH]:\t" + (new Date().getTime() - START_TIME)); + } + + @Test + public void insertDATA() { + try { + prepareTimeSeriesData(); + // insertDataFromTXT(); + // insertDataFromTXT(); + // insertDataFromTXT(3, 3, 0); + } catch (IoTDBConnectionException | StatementExecutionException | IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // @Test + // public void run() throws IoTDBConnectionException, StatementExecutionException, IOException { + // // prepareTimeSeriesData(); + // insertDataFromTXT(); + // } +}
diff --git a/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java b/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java new file mode 100644 index 0000000..781511c --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/InsertUnseqLatencyDataIT.java
@@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.fail; + +public class InsertUnseqLatencyDataIT { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static Session session; + private static int originCompactionThreadNum; + private static final List<String> deviceList = new ArrayList<>(); + private static final List<Integer> sizeList = new ArrayList<>(); + private static final int TABLET_SIZE = 8192; + private static final int baseSize = TABLET_SIZE * 6713; // 6713; + private static final int device_num = 1; + private static final List<String> seriesList = new ArrayList<>(); + private static final List<TSDataType> dataTypeList = new ArrayList<>(); + private static final int series_num = 1; + private static final int Long_Series_Num = 0; + private static final boolean inMemory = false; + static final double mu = 3.0, sig = 2.6; // sig: 0,1.0,2.1,2.4,2.6 + static final String muS = Integer.toString((int) (Math.round(mu * 10))); + static final String sigS = Integer.toString((int) (Math.round(sig * 10))); + + @BeforeClass + public static void setUp() throws Exception { + System.out.println("\t\tfreeMem\t" + (Runtime.getRuntime().freeMemory()) / (1024 * 1024.0)); + System.out.println("\t\tmaxMem\t" + (Runtime.getRuntime().maxMemory()) / (1024 * 1024.0)); + System.out.println("\t\ttotalMem\t" + (Runtime.getRuntime().totalMemory()) / (1024 * 1024.0)); + for (int i = 0; i < device_num; i++) { + deviceList.add("root.sst_latency_" + muS + "_" + sigS + ".d" + i); + sizeList.add(baseSize * (i + 1)); + } + for (int i = 0; i < series_num; i++) { + seriesList.add("s" + i); + if (i < Long_Series_Num) dataTypeList.add(TSDataType.INT64); + else dataTypeList.add(TSDataType.DOUBLE); + } + originCompactionThreadNum = CONFIG.getConcurrentCompactionThread(); + CONFIG.setConcurrentCompactionThread(0); + if (inMemory) EnvironmentUtils.envSetUp(); + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + if (inMemory) { + prepareTimeSeriesData(mu, sig); + // insertDataFromTXT(5); + } + } + + @AfterClass + public static void tearDown() throws Exception { + session.close(); + if (inMemory) EnvironmentUtils.cleanEnv(); + CONFIG.setConcurrentCompactionThread(originCompactionThreadNum); + } + + private static void prepareTimeSeriesData(double mu, double sig) + throws IoTDBConnectionException, StatementExecutionException, IOException { + System.out.println("\t\t????" + deviceList + "||||" + seriesList); + + long START_TIME = System.currentTimeMillis(); + final int START_SERIES = 0; + for (String device : deviceList) { + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + String series = seriesList.get(seriesID); + try { + session.executeNonQueryStatement( + "delete storage group " + device.substring(0, device.length() - 3)); + } catch (Exception e) { + // no-op + } + session.createTimeseries( + device + "." + series, + dataTypeList.get(seriesID), + TSEncoding.PLAIN, + CompressionType.SNAPPY); + } + } + + List<MeasurementSchema> schemaList = new ArrayList<>(); + + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + schemaList.add( + new MeasurementSchema( + seriesList.get(seriesID), dataTypeList.get(seriesID), TSEncoding.PLAIN)); + } + + Random random = new Random(233); + // long[] aa; + long[] bb; + IntArrayList cc; + // aa = new long[baseSize]; + bb = new long[baseSize]; + System.out.println("\t\tfreeMem\t" + (Runtime.getRuntime().freeMemory()) / (1024 * 1024.0)); + System.out.println("\t\tmaxMem\t" + (Runtime.getRuntime().maxMemory()) / (1024 * 1024.0)); + System.out.println("\t\ttotalMem\t" + (Runtime.getRuntime().totalMemory()) / (1024 * 1024.0)); + cc = new IntArrayList(baseSize); + for (int i = 0; i < baseSize; i++) { + cc.add(i); + // aa[i] = i; + bb[i] = (long) Math.round(i + Math.exp(mu + sig * random.nextGaussian())); + } + cc.sort((x, y) -> (Long.compare(bb[x], bb[y]))); + + for (int deviceID = 0; deviceID < device_num; deviceID++) { + String device = deviceList.get(deviceID); + + // ArrayList<Double> tmpList = new ArrayList<>(); + // for (int seriesID = 0; seriesID < series_num; seriesID++) tmpList.add(-233.0); + // session.insertRecord(device, 1L << 40, seriesList, dataTypeList, tmpList.toArray()); + // session.executeNonQueryStatement("flush"); + + Tablet unSeqTablet = + new Tablet(device, schemaList, TABLET_SIZE); // 重要! 先插一批时间戳极大的数据并且flush,这样后续数据才会全部划归乱序区。 + for (int i = 0; i < TABLET_SIZE; i++) { + unSeqTablet.rowSize++; + unSeqTablet.timestamps[i] = (1L << 40) + i; + ((double[]) unSeqTablet.values[0])[i] = -2.33; + } + session.insertTablet(unSeqTablet); + session.executeNonQueryStatement("flush"); + + int TABLET_NUM = (baseSize / TABLET_SIZE) * (deviceID + 1); + long TOTAL_SIZE = baseSize * (deviceID + 1); + long index = 0; + for (int i = 0; i < TABLET_NUM; i++) { + // long BASE_TIME; + // if (i == 0) BASE_TIME = REVERSE_TIME; + // else BASE_TIME = (long) (i - 1) * TABLET_SIZE + 1; + + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + + for (long time = 0; time < TABLET_SIZE; index++, time++) { + int row = tablet.rowSize++; + timestamps[row] = cc.getInt((int) index); // aa[cc.getInt((int) index)]; + // if (index < 100) System.out.println("\t" + timestamps[row]); + // if (i == 0) timestamps[row] += 1L << 30; + + for (int seriesID = START_SERIES; seriesID < series_num; seriesID++) { + String series = seriesList.get(seriesID); + + if (seriesID == 0) { + double num = random.nextGaussian(); + ((double[]) values[seriesID])[row] = num; + } + } + } + session.insertTablet(tablet); + // session.executeNonQueryStatement("flush"); + } + // session.executeNonQueryStatement("flush"); + } + System.out.println( + "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME)); + } + + private static void insertDataFromFile(int fileID) + throws IoTDBConnectionException, StatementExecutionException, IOException { + long START_TIME = System.currentTimeMillis(); + + Random random = new Random(233); + // long[] aa; + int[] bb; + IntArrayList cc; + // aa = new long[baseSize]; + bb = new int[baseSize]; + cc = new IntArrayList(baseSize); + for (int i = 0; i < baseSize; i++) { + cc.add(i); + // aa[i] = i; + bb[i] = (int) Math.round(i + Math.exp(mu + sig * random.nextGaussian())); + } + cc.sort((x, y) -> (Long.compare(bb[x], bb[y]))); + + String[] fileList = new String[10], fileName = new String[10]; + String folder = "D:\\Study\\Lab\\iotdb\\add_quantile_to_aggregation\\test_project_2"; + fileList[1] = "1_bitcoin.csv"; + fileList[2] = "2_SpacecraftThruster.txt"; + fileList[3] = "3_taxipredition8M.txt"; + fileList[4] = "4_wh.csv"; + fileName[1] = "bitcoin"; + fileName[2] = "thruster"; + fileName[3] = "taxi"; + fileName[4] = "wh"; + + { + String series = "s0"; + String storage_group = "root.real_" + fileName[fileID] + "_latency_" + muS + "_" + sigS; + String device = storage_group + ".d0"; + try { + session.executeNonQueryStatement("delete storage group " + storage_group); + } catch (Exception e) { + // no-op + } + session.createTimeseries( + device + "." + series, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.SNAPPY); + + MeasurementSchema schema = new MeasurementSchema(series, TSDataType.DOUBLE, TSEncoding.PLAIN); + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(schema); + + Tablet unSeqTablet = + new Tablet(device, schemaList, TABLET_SIZE); // 重要! 先插一批时间戳极大的数据并且flush,这样后续数据才会全部划归乱序区。 + for (int i = 0; i < TABLET_SIZE; i++) { + unSeqTablet.rowSize++; + unSeqTablet.timestamps[i] = (1L << 40) + i; + ((double[]) unSeqTablet.values[0])[i] = -2.33; + } + session.insertTablet(unSeqTablet); + session.executeNonQueryStatement("flush"); + + String filename = fileList[fileID]; + String filepath = folder + "\\" + filename; + File file = new File(filepath); + BufferedInputStream fis = null; + fis = new BufferedInputStream(new FileInputStream(file)); + BufferedReader reader = + new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8), 50 * 1024 * 1024); + reader.readLine(); // ignore first line! + String tmps; + boolean over_flag = false; + long index = 0; + double[] vv = new double[baseSize]; + while (!over_flag && index < baseSize) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (int j = 0; j < TABLET_SIZE; index++, j++) { + if ((tmps = reader.readLine()) != null) { + vv[(int) index] = Double.parseDouble(tmps); + } else { + over_flag = true; + break; + } + } + } + + index = 0; + for (int i = 0; i < baseSize / TABLET_SIZE; i++) { + Tablet tablet = new Tablet(device, schemaList, TABLET_SIZE); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + for (long time = 0; time < TABLET_SIZE; index++, time++) { + int row = tablet.rowSize++; + timestamps[row] = cc.getInt((int) index); + // double num = random.nextGaussian(); + ((double[]) values[0])[row] = vv[cc.getInt((int) index)]; + } + session.insertTablet(tablet); + // session.executeNonQueryStatement("flush"); + } + } + System.out.println( + "\t\t create designed data cost time:" + (System.currentTimeMillis() - START_TIME)); + } + + @Test + public void insertDATA() { + try { + // prepareTimeSeriesData(mu, sig); + insertDataFromFile(1); + // insertDataFromTXT(); + // insertDataFromTXT(3, 3, 0); + } catch (IoTDBConnectionException | StatementExecutionException | IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // @Test + // public void executeStatement() + // throws IoTDBConnectionException, StatementExecutionException, IOException { + // SessionDataSet dataSet; + // dataSet = session.executeQueryStatement("show timeseries"); + // while (dataSet.hasNext()) System.out.println("[DEBUG]" + + // dataSet.next().getFields().toString()); + // long ST; + // + // ST = new Date().getTime(); + // for (int i = 0; i < 1; i++) + // dataSet = session.executeQueryStatement("select count(s0) from " + deviceList.get(0)); + // System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); + // System.out.println("\t\ttime:" + (new Date().getTime() - ST)); + // } +}
diff --git a/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java b/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java new file mode 100644 index 0000000..903ff5a --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/QueryLSMIT.java
@@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +public class QueryLSMIT { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static Session session; + private static int originCompactionThreadNum; + private static final int baseSize = 4096 * 5000; // 7989 * (12518 - 1); + private static final int series_num = 1; + private static final boolean inMemory = false; + private static final long REVERSE_TIME = 1L << 60, UPDATE_ARRIVAL_TIME = 1L << 50; + private static final List<String> storageGroupList = new ArrayList<>(); + private static final int datasetID = 1; + int TEST_CASE = 512; + int queryN = 100000000, seriesN = 110000000; + + @BeforeClass + public static void setUp() throws Exception { + + storageGroupList.add("root.Summary" + "3"); + storageGroupList.add("root.Summary" + "0"); + // storageGroupList.add("root.noSum"); + originCompactionThreadNum = CONFIG.getConcurrentCompactionThread(); + CONFIG.setConcurrentCompactionThread(0); + if (inMemory) EnvironmentUtils.envSetUp(); + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + if (inMemory) {} + } + + @AfterClass + public static void tearDown() throws Exception { + session.close(); + if (inMemory) EnvironmentUtils.cleanEnv(); + CONFIG.setConcurrentCompactionThread(originCompactionThreadNum); + } + + private String getQueryStatement(String body, long L, long R) { + return body + " where time>=" + L + " and time<" + R; + } + + private void testTime() throws IoTDBConnectionException, StatementExecutionException { + List<String> aggrList = new ArrayList<>(); + aggrList.add("kll_quantile"); + aggrList.add("count"); + + SessionDataSet dataSet; + // System.out.println("\t\t\tqueryN:" + queryN + "\tDataset: random"); + long[] LL = new long[TEST_CASE]; + long[] RR = new long[TEST_CASE]; + Random random = new Random(233); + for (int i = 0; i < TEST_CASE; i++) { + LL[i] = 0; + // random.nextInt(seriesN - queryN + 1); + RR[i] = LL[i] + queryN; + // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i])); + } + for (String sg : storageGroupList) { + System.out.print(queryN); + for (String aggr : aggrList) { + // if (aggr.equals("count")) if (!sg.contains("Summary0")) continue; + String queryBody = "select " + aggr + "(" + "s0" + ") from " + sg + ".d0"; + session.executeQueryStatement(queryBody); + for (int i = 0; i < TEST_CASE / 8 + 4; i++) + session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i])); + // warm up. + + long TIME = new Date().getTime(); + LongArrayList tList = new LongArrayList(); + for (int t = 0; t < TEST_CASE; t++) { + dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t])); + long mmp = new Date().getTime(); + tList.add(mmp - TIME); + TIME = mmp; + // System.out.println(getQueryStatement(queryBody,LL[t],RR[t])); + } + tList.sort(Long::compare); + long sum = 0, cnt = 0; + for (int i = TEST_CASE / 4; i < TEST_CASE * 3 / 4; cnt++, i++) sum += tList.getLong(i); + System.out.print("\t\t" + 1.0 * sum / cnt); + } + } + System.out.println(); + } + + // @Test + // public void executeStatement() + // throws IoTDBConnectionException, StatementExecutionException, IOException { + // SessionDataSet dataSet; + // dataSet = session.executeQueryStatement("show timeseries"); + // while (dataSet.hasNext()) System.out.println("[DEBUG]" + + // dataSet.next().getFields().toString()); + // long ST; + // ST = new Date().getTime(); + // for (int i = 0; i < 1; i++) + // dataSet = + // session.executeQueryStatement( + // "select exact_median_kll_stat_single(s0) from " + // + storageGroupList.get(0) + // + " where time<" + // + REVERSE_TIME); + // System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); + // System.out.println("\t\ttime:" + (new Date().getTime() - ST)); + // } + + @Test + public void run() throws IoTDBConnectionException, StatementExecutionException, IOException { + long ST = new Date().getTime(); + System.out.println( + "queryN\t\ttime_lsm\t\ttime_count_lsm\t\ttime_chunk\t\ttime_count_chunk\t\t两个count只是数据不同"); + for (int x = 20000000; x <= 100000000; x += 20000000) { + queryN = x; + testTime(); + } + // testValue(); + System.out.println("\t\tALL_TIME:" + (new Date().getTime() - ST)); + } +}
diff --git a/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java b/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java new file mode 100644 index 0000000..a5b2238 --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/QueryLatencyIT.java
@@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +public class QueryLatencyIT { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static Session session; + private static int originCompactionThreadNum; + private static final int baseSize = 4096 * 5000; // 7989 * (12518 - 1); + private static final int series_num = 1; + private static final boolean inMemory = false; + private static final long REVERSE_TIME = 1L << 60, UPDATE_ARRIVAL_TIME = 1L << 50; + private static final List<String> storageGroupList = new ArrayList<>(); + private static final int datasetID = 1; + int TEST_CASE = 1024; + int queryN = 100000, seriesN = 4096 * 5000; + + @BeforeClass + public static void setUp() throws Exception { + for (double mu = 2, sig = 1.5; sig <= 3.6; sig += 0.5) { + String muS = Integer.toString((int) (Math.round(mu * 10))); + String sigS = Integer.toString((int) (Math.round(sig * 10))); + storageGroupList.add("root.real_" + datasetID + "_latency_" + muS + "_" + sigS); + } + originCompactionThreadNum = CONFIG.getConcurrentCompactionThread(); + CONFIG.setConcurrentCompactionThread(0); + if (inMemory) EnvironmentUtils.envSetUp(); + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + if (inMemory) {} + } + + @AfterClass + public static void tearDown() throws Exception { + session.close(); + if (inMemory) EnvironmentUtils.cleanEnv(); + CONFIG.setConcurrentCompactionThread(originCompactionThreadNum); + } + + private String getQueryStatement(String body, long L, long R) { + return body + " where time>=" + L + " and time<" + R; + } + + private void testTime() throws IoTDBConnectionException, StatementExecutionException { + List<String> aggrList = new ArrayList<>(); + aggrList.add("kll_quantile"); + aggrList.add("count"); + + SessionDataSet dataSet; + System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + datasetID); + long[] LL = new long[TEST_CASE]; + long[] RR = new long[TEST_CASE]; + Random random = new Random(233); + for (int i = 0; i < TEST_CASE; i++) { + LL[i] = random.nextInt(seriesN - queryN + 1); + RR[i] = LL[i] + queryN; + // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i])); + } + for (String latencyData : storageGroupList) { + System.out.print( + "\tlatency=" + latencyData.substring(latencyData.lastIndexOf("y_") + 2) + "\t\t"); + for (String aggr : aggrList) { + String queryBody = "select " + aggr + "(" + "s0" + ") from " + latencyData + ".d0"; + session.executeQueryStatement(queryBody); + for (int i = 0; i < TEST_CASE / 8 + 4; i++) + session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i])); + // warm up. + + long TIME = new Date().getTime(); + for (int t = 0; t < TEST_CASE; t++) { + dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t])); + // System.out.println(getQueryStatement(queryBody,LL[t],RR[t])); + } + TIME = new Date().getTime() - TIME; + System.out.print("\t" + 1.0 * TIME / TEST_CASE); + } + System.out.println(); + } + } + + private void testValue() throws IoTDBConnectionException, StatementExecutionException { + List<String> aggrList = new ArrayList<>(); + aggrList.add("exact_median_kll_stat_single_read"); + + SessionDataSet dataSet; + System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + datasetID); + long[] LL = new long[TEST_CASE]; + long[] RR = new long[TEST_CASE]; + Random random = new Random(233); + for (int i = 0; i < TEST_CASE; i++) { + LL[i] = random.nextInt(seriesN - queryN + 1); + RR[i] = LL[i] + queryN; + // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i])); + } + for (String latencyData : storageGroupList) { + System.out.print( + "\tlatency=" + latencyData.substring(latencyData.lastIndexOf("y_") + 2) + "\t\t"); + for (String aggr : aggrList) { + String queryBody = "select " + aggr + "(" + "s0" + ") from " + latencyData + ".d0"; + session.executeQueryStatement(queryBody); + for (int i = 0; i < TEST_CASE / 8 + 4; i++) + session.executeQueryStatement(getQueryStatement(queryBody, LL[i], RR[i])); + // warm up. + + long TIME = new Date().getTime(); + double SUM = 0; + for (int t = 0; t < TEST_CASE; t++) { + dataSet = session.executeQueryStatement(getQueryStatement(queryBody, LL[t], RR[t])); + String value = dataSet.next().getFields().toString(); + value = value.substring(1, value.length() - 1); + SUM += Double.parseDouble(value); + // System.out.println(getQueryStatement(queryBody,LL[t],RR[t])); + } + TIME = new Date().getTime() - TIME; + System.out.print("\t" + SUM / TEST_CASE + "\t" + SUM / TEST_CASE / queryN); + } + System.out.println(); + } + } + + // @Test + // public void executeStatement() + // throws IoTDBConnectionException, StatementExecutionException, IOException { + // SessionDataSet dataSet; + // dataSet = session.executeQueryStatement("show timeseries"); + // while (dataSet.hasNext()) System.out.println("[DEBUG]" + + // dataSet.next().getFields().toString()); + // long ST; + // ST = new Date().getTime(); + // for (int i = 0; i < 1; i++) + // dataSet = + // session.executeQueryStatement( + // "select exact_median_kll_stat_single(s0) from " + // + storageGroupList.get(0) + // + " where time<" + // + REVERSE_TIME); + // System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); + // System.out.println("\t\ttime:" + (new Date().getTime() - ST)); + // } + + @Test + public void run() throws IoTDBConnectionException, StatementExecutionException, IOException { + testTime(); + testValue(); + } +}
diff --git a/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java b/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java new file mode 100644 index 0000000..f78112e --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/QuerySSTSketchWithDifferentTs.java
@@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +public class QuerySSTSketchWithDifferentTs { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static Session session; + private static int originCompactionThreadNum; + private static final boolean inMemory = false; + int TEST_CASE = 64; + int queryN = 40000000, seriesN = 8192 * 6713; + + @BeforeClass + public static void setUp() throws Exception { + originCompactionThreadNum = CONFIG.getConcurrentCompactionThread(); + CONFIG.setConcurrentCompactionThread(0); + if (inMemory) EnvironmentUtils.envSetUp(); + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + if (inMemory) {} + } + + @AfterClass + public static void tearDown() throws Exception { + session.close(); + if (inMemory) EnvironmentUtils.cleanEnv(); + CONFIG.setConcurrentCompactionThread(originCompactionThreadNum); + } + + private String getQueryStatement(String body, long L, long R) { + return body + " where time>=" + L + " and time<" + R; + } + + private void testTime() throws IoTDBConnectionException, StatementExecutionException { + String dataset = "thruster"; + List<String> aggrList = new ArrayList<>(); + aggrList.add("kll_quantile"); + + System.out.println("\t\t\tqueryN:" + queryN + "\tDataset:" + dataset); + long[] LL = new long[TEST_CASE]; + long[] RR = new long[TEST_CASE]; + Random random = new Random(233); + for (int i = 0; i < TEST_CASE; i++) { + LL[i] = 0; // random.nextInt(seriesN - queryN + 1); + RR[i] = LL[i] + queryN; + // System.out.println("\t\t\t"+(LL[i])+" "+(RR[i])); + } + long ALL_START = new Date().getTime(); + System.out.println(); + SessionDataSet tmpResult; + for (int T : new int[] {1, 2, 4, 8, 16, 32}) { + String sgName = "root." + dataset + "4096" + "T" + T; + String queryBody = "select " + "kll_quantile" + "(s0)" + " from " + sgName + ".d0"; + + for (int i = 0; i < TEST_CASE / 8 + 4; i++) session.executeQueryStatement(queryBody); // drop + + long TIME = new Date().getTime(); + LongArrayList tArr = new LongArrayList(); + for (int t = 0; t < TEST_CASE; t++) { + long tmpT = new Date().getTime(); + tmpResult = + session.executeQueryStatement(/*getQueryStatement(queryBody, LL[t], RR[t])*/ queryBody); + tmpT = new Date().getTime() - tmpT; + tArr.add(tmpT); + // System.out.println(getQueryStatement(queryBody,LL[t],RR[t])); + } + tArr.sort(Long::compare); + + System.out.print(T + "\t\t" + 1.0 * tArr.getLong(tArr.size() / 2) + "\t"); + TIME = new Date().getTime() - TIME; + System.out.print("" + 1.0 * TIME / TEST_CASE + "\n"); + } + + System.out.println( + "\n\n\t\tTEST_CASE=" + TEST_CASE + "\t\tALL_TIME=" + (new Date().getTime() - ALL_START)); + } + + // @Test + // public void executeStatement() + // throws IoTDBConnectionException, StatementExecutionException, IOException { + // SessionDataSet dataSet; + // dataSet = session.executeQueryStatement("show timeseries"); + // while (dataSet.hasNext()) System.out.println("[DEBUG]" + + // dataSet.next().getFields().toString()); + // long ST; + // ST = new Date().getTime(); + // for (int i = 0; i < 1; i++) + // dataSet = + // session.executeQueryStatement( + // "select exact_median_kll_stat_single(s0) from " + // + storageGroupList.get(0) + // + " where time<" + // + REVERSE_TIME); + // System.out.println("[DEBUG]" + dataSet.next().getFields().toString()); + // System.out.println("\t\ttime:" + (new Date().getTime() - ST)); + // } + + @Test + public void run() throws IoTDBConnectionException, StatementExecutionException, IOException { + testTime(); + // testValue(); + } +}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java index 28c9923..0d6baff 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -73,8 +73,8 @@ private int groupSizeInByte = 128 * 1024 * 1024; /** The memory size for each series writer to pack page, default value is 64KB. */ private int pageSizeInByte = 64 * 1024; - /** The maximum number of data points in a page, default value is 1024 * 1024. */ - private int maxNumberOfPointsInPage = 1000; + /** The maximum number of data points in a page, default value 4096. */ + private int maxNumberOfPointsInPage = 4096; /** The maximum degree of a metadataIndex node, default value is 256 */ private int maxDegreeOfIndexNode = 256; /** Data type for input timestamp, TsFile supports INT64. */ @@ -422,6 +422,16 @@ this.enableSynopsis = enableSynopsis; } + private boolean enableSSTSketch = false; + + public boolean isEnableSSTSketch() { + return enableSSTSketch; + } + + public void setEnableSSTSketch(boolean enableSSTSketch) { + this.enableSSTSketch = enableSSTSketch; + } + private boolean enableBloomFilter = false; public boolean isEnableBloomFilter() { @@ -501,4 +511,24 @@ public void setQuantileFile(String file) { this.quantileFile = file; } + + private boolean synopsisForWholeChunkWhenFlush = true; + + public void setSynopsisForWholeChunkWhenFlush(boolean x) { + this.synopsisForWholeChunkWhenFlush = x; + } + + public boolean getSynopsisForWholeChunkWhenFlush() { + return this.synopsisForWholeChunkWhenFlush; + } + + private int sketchSizeRatio = 1; + + public void setSketchSizeRatio(int x) { + this.sketchSizeRatio = x; + } + + public int getSketchSizeRatio() { + return this.sketchSizeRatio; + } }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java index f7589de..c791312 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -72,6 +72,9 @@ private ArrayList<IChunkMetadata> chunkMetadataList; + public ByteBuffer cMDBuffer; + public boolean hasCMDBuffer = false; + public TimeseriesMetadata() {} public TimeseriesMetadata( @@ -96,7 +99,10 @@ this.dataType = timeseriesMetadata.dataType; this.statistics = timeseriesMetadata.statistics; this.modified = timeseriesMetadata.modified; - this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.chunkMetadataList); + if (timeseriesMetadata.hasCMDBuffer) { + this.cMDBuffer = timeseriesMetadata.cMDBuffer; + this.hasCMDBuffer = true; + } else this.chunkMetadataList = new ArrayList<>(timeseriesMetadata.getChunkMetadataList()); } public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) @@ -109,15 +115,18 @@ timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType)); if (needChunkMetadata) { - ByteBuffer byteBuffer = buffer.slice(); - byteBuffer.limit(chunkMetaDataListDataSize); - timeseriesMetaData.chunkMetadataList = new ArrayList<>(); - while (byteBuffer.hasRemaining()) { - timeseriesMetaData.chunkMetadataList.add( - ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData)); - } - // minimize the storage of an ArrayList instance. - timeseriesMetaData.chunkMetadataList.trimToSize(); + timeseriesMetaData.cMDBuffer = buffer.slice(); + timeseriesMetaData.cMDBuffer.limit(chunkMetaDataListDataSize); + timeseriesMetaData.hasCMDBuffer = true; + // ByteBuffer byteBuffer = buffer.slice(); + // byteBuffer.limit(chunkMetaDataListDataSize); + // timeseriesMetaData.chunkMetadataList = new ArrayList<>(); + // while (byteBuffer.hasRemaining()) { + // timeseriesMetaData.chunkMetadataList.add( + // ChunkMetadata.deserializeFrom(byteBuffer, timeseriesMetaData)); + // } + // // minimize the storage of an ArrayList instance. + // timeseriesMetaData.chunkMetadataList.trimToSize(); } buffer.position(buffer.position() + chunkMetaDataListDataSize); return timeseriesMetaData; @@ -137,7 +146,7 @@ byteLen += ReadWriteIOUtils.write(dataType, outputStream); byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(chunkMetaDataListDataSize, outputStream); - byteLen += statistics.serialize(outputStream); + byteLen += statistics.serialize(outputStream, true); chunkMetadataListBuffer.writeTo(outputStream); byteLen += chunkMetadataListBuffer.size(); return byteLen; @@ -206,6 +215,20 @@ } public List<IChunkMetadata> getChunkMetadataList() { + if (hasCMDBuffer) { + // System.out.println("\t\t\t[TSMD]\tgetCMDList from delayed buffer."); + chunkMetadataList = new ArrayList<>(); + try { + while (cMDBuffer.hasRemaining()) { + chunkMetadataList.add(ChunkMetadata.deserializeFrom(cMDBuffer, this)); + } + } catch (IOException e) { + // no-op + } + // minimize the storage of an ArrayList instance. + chunkMetadataList.trimToSize(); + hasCMDBuffer = false; + } return chunkMetadataList; } @@ -260,7 +283,7 @@ + ", isSeq=" + isSeq + ", chunkMetadataList=" - + chunkMetadataList + + getChunkMetadataList() + '}'; } }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java index 046f897..be7448c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java
@@ -20,10 +20,7 @@ import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.LongKLLSketch; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.utils.SamplingHeapForStatMerge; -import org.apache.iotdb.tsfile.utils.TDigestForStatMerge; +import org.apache.iotdb.tsfile.utils.*; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; @@ -48,7 +45,9 @@ private double sumValue; private int summaryNum = 0; private LongKLLSketch kllSketch = null; - private List<LongKLLSketch> kllSketchList = null; + private List<KLLSketchForQuantile> kllSketchList = null; + // public ByteBuffer chunkSketchBuffer; + // public boolean hasChunkSketchBuffer = false; private TDigestForStatMerge tDigest = null; private List<TDigestForStatMerge> tDigestList = null; private SamplingHeapForStatMerge sampling = null; @@ -57,6 +56,13 @@ private BloomFilter<Long> bf = null; private List<BloomFilter<Long>> bfList = null; private MutableLongList MinTimeMaxTimeCountList = null; + private boolean LSMFile = false; + + public boolean hasSegTreeBySketch = false; + public SegTreeBySketch segTreeBySketch = null; + // private int lsmLevel = 0; + // public List<KLLSketchForQuantile> lsmSketch = null; + // private MutableLongList lsmMinTMaxT = null; static final int DOUBLE_STATISTICS_FIXED_RAM_SIZE = 90; @@ -170,7 +176,18 @@ // public LongKLLSketch getKllSketch() { // return kllSketch; // } - public List<LongKLLSketch> getKllSketchList() { + public void setKLLSketch(KLLSketchForQuantile sketch) { + LSMFile = true; + summaryNum = 1; + kllSketchList = new ArrayList<>(); + kllSketchList.add(sketch); + } + + public KLLSketchForQuantile getOneKllSketch() { + return kllSketchList.get(0); + } + + public List<KLLSketchForQuantile> getKllSketchList() { return kllSketchList; } @@ -337,7 +354,7 @@ if (doubleStat.summaryNum > 0) { this.summaryNum = 1; if (SUMMARY_TYPE == 0) { - this.kllSketch = doubleStat.kllSketchList.get(pageID); + this.kllSketch = (LongKLLSketch) (doubleStat.kllSketchList.get(pageID)); this.kllSketchList = new ArrayList<>(1); this.kllSketchList.add(this.kllSketch); } @@ -493,7 +510,7 @@ } @Override - int serializeChunkMetadataStat(OutputStream outputStream) throws IOException { + int serializeSketchStat(OutputStream outputStream) throws IOException { // System.out.println("\t\t\t\t\t[DEBUG DOUBLE stat] serializeStats // hashmap:"+serializeHashMap); int byteLen = 0; @@ -504,10 +521,17 @@ byteLen += ReadWriteIOUtils.write(sumValue, outputStream); byteLen += ReadWriteIOUtils.write(summaryNum, outputStream); byteLen += ReadWriteIOUtils.write(bfNum, outputStream); + byteLen += ReadWriteIOUtils.write(hasSegTreeBySketch, outputStream); + if (hasSegTreeBySketch) { + byteLen += segTreeBySketch.serializeSegTree(outputStream); + return byteLen; + } if (summaryNum > 0) { if (SUMMARY_TYPE == 0) - for (LongKLLSketch sketch : kllSketchList) { - int tmp = sketch.serialize(outputStream); + for (KLLSketchForQuantile sketch : kllSketchList) { + LongKLLSketch diskSketch = + !LSMFile ? ((LongKLLSketch) sketch) : (new LongKLLSketch(sketch)); + int tmp = diskSketch.serialize(outputStream); byteLen += tmp; // System.out.println("\t[DEBUG][DoubleStat serializeStats]:\tbytes:" + tmp); // sketch.show(); @@ -554,6 +578,7 @@ byteLen += ReadWriteIOUtils.write(sumValue, outputStream); byteLen += ReadWriteIOUtils.write(0, outputStream); byteLen += ReadWriteIOUtils.write(0, outputStream); + byteLen += ReadWriteIOUtils.write(false, outputStream); return byteLen; } @@ -566,6 +591,7 @@ this.sumValue = ReadWriteIOUtils.readDouble(inputStream); this.summaryNum = ReadWriteIOUtils.readInt(inputStream); this.bfNum = ReadWriteIOUtils.readInt(inputStream); + this.hasSegTreeBySketch = ReadWriteIOUtils.readBool(inputStream); if (this.summaryNum > 0) { if (SUMMARY_TYPE == 0) { this.kllSketchList = new ArrayList<>(summaryNum); @@ -601,10 +627,21 @@ this.sumValue = ReadWriteIOUtils.readDouble(byteBuffer); this.summaryNum = ReadWriteIOUtils.readInt(byteBuffer); this.bfNum = ReadWriteIOUtils.readInt(byteBuffer); + this.hasSegTreeBySketch = ReadWriteIOUtils.readBool(byteBuffer); + if (hasSegTreeBySketch) { + this.segTreeBySketch = new SegTreeBySketch(byteBuffer); + return; + } if (this.summaryNum > 0) { + // System.out.println( + // "\t\t\t[Stat Deserialize]\tsketch\tsumNum=" + summaryNum + "\tstartT:" + + // getStartTime()); if (SUMMARY_TYPE == 0) { this.kllSketchList = new ArrayList<>(summaryNum); for (int i = 0; i < summaryNum; i++) this.kllSketchList.add(new LongKLLSketch(byteBuffer)); + // for (int i = 0; i < summaryNum; i++) + // System.out.println("\t\t\t\t\tsketch numLen:" + + // this.kllSketchList.get(i).getNumLen()); } if (SUMMARY_TYPE == 1) { this.tDigestList = new ArrayList<>(summaryNum);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 63ba0e3..20bf4b0 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java
@@ -74,6 +74,8 @@ public static int BLOOM_FILTER_SIZE = 0; public static int PAGE_SIZE_IN_BYTE = 65536; public static int SUMMARY_TYPE = 0; + // public static boolean ENABLE_LSM_SKETCH = + // TSFileDescriptor.getInstance().getConfig().getCompressionPerLSMLevel(); protected static double getFPP(double bitsPerKey) { return Math.exp(-1 * bitsPerKey * Math.pow(Math.log(2.0D), 2)); @@ -181,20 +183,20 @@ return serialize(outputStream, false); } - public int serialize(OutputStream outputStream, boolean isChunkMetaData) throws IOException { + public int serialize(OutputStream outputStream, boolean sketch) throws IOException { int byteLen = 0; byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(count, outputStream); byteLen += ReadWriteIOUtils.write(startTime, outputStream); byteLen += ReadWriteIOUtils.write(endTime, outputStream); // value statistics of different data type - if (!isChunkMetaData) byteLen += serializeStats(outputStream); - else byteLen += serializeChunkMetadataStat(outputStream); + if (!sketch) byteLen += serializeStats(outputStream); + else byteLen += serializeSketchStat(outputStream); return byteLen; } abstract int serializeStats(OutputStream outputStream) throws IOException; - int serializeChunkMetadataStat(OutputStream outputStream) throws IOException { + int serializeSketchStat(OutputStream outputStream) throws IOException { return serializeStats(outputStream); }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java new file mode 100644 index 0000000..3774714 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/DDSketchForQuantile.java
@@ -0,0 +1,183 @@ +package org.apache.iotdb.tsfile.utils; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +public class DDSketchForQuantile implements Serializable { + private double alpha; + private double gamma; + private double multiplier; + private int bucket_num_limit; + private int threshold_for_compression; + + private Map<Integer, Long> positive_buckets; + private Map<Integer, Long> negative_buckets; + private double collapse_bound; + private long zero_count; + + private transient double beta; + private final transient double[] valid_range; + + private static double MIN_POSITIVE_VALUE = 1e-6; + private static double COEFFICIENT = 1.5; + boolean valid_buckets = false; + Bucket[] buckets; + + public DDSketchForQuantile(double alpha, int bucket_num_limit) { + // System.out.println(alpha); + this.alpha = alpha; + this.bucket_num_limit = Math.max(bucket_num_limit, 2); + this.threshold_for_compression = (int) (bucket_num_limit * COEFFICIENT); + // + // System.out.println("\t\t\t\tcompression:"+threshold_for_compression+"\t\tlimit="+bucket_num_limit); + + this.gamma = 2 * alpha / (1 - alpha) + 1; + this.multiplier = Math.log(Math.E) / (Math.log1p(gamma - 1)); + this.positive_buckets = new HashMap<>((int) (bucket_num_limit * 0.75)); + this.negative_buckets = new HashMap<>((int) (bucket_num_limit * 0.25)); + this.zero_count = 0; + this.collapse_bound = -Double.MAX_VALUE; + this.valid_range = new double[6]; + } + + public void insert(double v) { + valid_buckets = false; + if (v < collapse_bound) { + v = collapse_bound; + } + if (v > MIN_POSITIVE_VALUE) { + int i = (int) Math.ceil(Math.log(v) * multiplier); + positive_buckets.put(i, positive_buckets.getOrDefault(i, 0L) + 1); + } else if (v < -MIN_POSITIVE_VALUE) { + int i = (int) Math.ceil(Math.log(-v) * multiplier); + negative_buckets.put(i, negative_buckets.getOrDefault(i, 0L) + 1); + } else { + zero_count++; + } + collapse(threshold_for_compression); + } + + private void collapse(int limit) { + if (sketch_size() > limit) { + int exceed = sketch_size() - bucket_num_limit; + Integer[] indices = negative_buckets.keySet().toArray(new Integer[0]); + Arrays.sort(indices); + long count = 0; + for (int i = Math.max(0, indices.length - exceed); i < indices.length; ++i) { + count += negative_buckets.remove(indices[i]); + } + if (count > 0) { + int i = indices.length - exceed - 1; + if (i >= 0) { + negative_buckets.put(indices[i], negative_buckets.get(indices[i]) + count); + collapse_bound = -Math.pow(gamma, indices[i]); + } else { + zero_count += count; + collapse_bound = 0; + } + } + exceed -= (indices.length - Math.max(0, indices.length - exceed)); + if (exceed > 0) { + count = zero_count; + if (zero_count > 0) { + exceed--; + } + indices = positive_buckets.keySet().toArray(new Integer[0]); + Arrays.sort(indices); + for (int i = exceed - 1; i >= 0; --i) { + count += positive_buckets.remove(indices[i]); + } + positive_buckets.put(indices[exceed], positive_buckets.get(indices[exceed]) + count); + collapse_bound = Math.pow(gamma, indices[exceed] - 1); + } + } + } + + static final int DIVIDE_DELTA = 1000000000, DIVIDE_HALF = DIVIDE_DELTA / 2; + + private double getL(int index) { + return index > DIVIDE_HALF + ? Math.pow(gamma, index - DIVIDE_DELTA - 1) + : (index == DIVIDE_HALF ? 0 : -Math.pow(gamma, index)); + } + + private double getR(int index) { + return index > DIVIDE_HALF + ? Math.pow(gamma, index - DIVIDE_DELTA) + : (index == DIVIDE_HALF ? 0 : -Math.pow(gamma, index - 1)); + } + + private long getCount(int index) { + // System.out.println("\t\t\t\t\t-index="+(-index)); + // System.out.println("\t\t\t\t\t\t\texist"+negative_buckets.containsKey(-index)); + return index > DIVIDE_HALF + ? positive_buckets.get(index - DIVIDE_DELTA) + : (index == DIVIDE_HALF ? zero_count : negative_buckets.get(index)); + } + + private void union_buckets() { + buckets = new Bucket[sketch_size()]; + int i = 0; + for (Map.Entry<Integer, Long> e : positive_buckets.entrySet()) { + buckets[i++] = new Bucket(e.getKey() + DIVIDE_DELTA); + } + for (Map.Entry<Integer, Long> e : negative_buckets.entrySet()) { + buckets[i++] = new Bucket(e.getKey()); + } + if (zero_count > 0) { + buckets[i] = new Bucket(DIVIDE_HALF); + } + Arrays.sort(buckets, Comparator.comparingDouble(o -> (getL(o.bucketIndex)))); + long sum = 0; + for (i = 0; i < sketch_size(); i++) { + sum += getCount(buckets[i].bucketIndex); + buckets[i].prefixSum = sum; + } + valid_buckets = true; + } + + private long total_count() { + return positive_buckets.values().stream().mapToLong(l -> l).sum() + + negative_buckets.values().stream().mapToLong(l -> l).sum() + + zero_count; + } + + private int find_p_index(Bucket[] buckets, long total_count, double q) { + double rank = q * (total_count - 1); + int tmp1 = Integer.highestOneBit(buckets.length); + int p = -1; + while (tmp1 > 0) { + if (p + tmp1 < buckets.length && buckets[p + tmp1].prefixSum <= rank) p += tmp1; + tmp1 /= 2; + } + return p + 1; + } + + public double getQuantile(double q) { + if (!valid_buckets) union_buckets(); + long total_count = total_count(); + Bucket p = buckets[find_p_index(buckets, total_count, q)]; + if (getL(p.bucketIndex) < 0) { + return 2 * getL(p.bucketIndex) / (1 + gamma); + } else { + return 2 * getR(p.bucketIndex) / (1 + gamma); + } + } + + public int sketch_size() { + return positive_buckets.size() + negative_buckets.size() + (zero_count == 0 ? 0 : 1); + } + + private static class Bucket { + public int bucketIndex; + public long prefixSum; + + Bucket(int bucketIndex) { + this.bucketIndex = bucketIndex; + this.prefixSum = 0; + } + } +}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java new file mode 100644 index 0000000..4ba7a86 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForLSMFile.java
@@ -0,0 +1,193 @@ +package org.apache.iotdb.tsfile.utils; + +import it.unimi.dsi.fastutil.objects.ObjectArrayList; + +import java.util.Arrays; +import java.util.List; + +public class KLLSketchForLSMFile extends KLLSketchForQuantile { + ObjectArrayList<KLLSketchForQuantile> subSketchList; + + /** Built with sorted MemTable(ordered by t). */ + public KLLSketchForLSMFile(KLLSketchForQuantile subSketch) { + subSketchList = new ObjectArrayList<>(); + subSketchList.add(subSketch); + } + + public KLLSketchForLSMFile() { + subSketchList = new ObjectArrayList<>(); + } + + public void addSubSketch(KLLSketchForQuantile subSketch) { + this.N += subSketch.getN(); + subSketchList.add(subSketch); + } + + public void compactSubSketches(int additionalLevel) { + // int additionalLevel = 1; + // while((1<<additionalLevel)<subSketchList.size())additionalLevel++; + assert !subSketchList.isEmpty(); + int subLevel = subSketchList.get(0).cntLevel, + targetLevel = subLevel + additionalLevel, + tmpNumLen = 0; + levelPos = new int[targetLevel + 1]; + // System.out.println("\t\t[CompactSubSketchesInLSM:] addLV:"+additionalLevel); + for (KLLSketchForQuantile subSketch : subSketchList) { + assert subSketch.cntLevel == subLevel; + assert subSketch.levelPos[0] == subSketch.levelPos[subLevel - 1]; + tmpNumLen += subSketch.levelPos[subLevel] - subSketch.levelPos[subLevel - 1]; + } + long[] tmpNum = new long[tmpNumLen * 2]; + int cntNumLen = 0; + for (KLLSketchForQuantile subSketch : subSketchList) { + System.arraycopy( + subSketch.num, + subSketch.levelPos[subLevel - 1], + tmpNum, + cntNumLen, + subSketch.getLevelSize(subLevel - 1)); + cntNumLen += subSketch.getLevelSize(subLevel - 1); + } + Arrays.sort(tmpNum, 0, cntNumLen); + num = tmpNum; + for (int i = 0; i < subLevel - 1; i++) levelPos[i] = 0; + cntLevel = subLevel; + levelPos[cntLevel] = tmpNumLen; + // showNum(); + for (int i = 0; i < additionalLevel; i++) compactOneLevel(cntLevel - 1); + num = Arrays.copyOfRange(num, 0, levelPos[cntLevel]); + // showNum(); + } + + private void compactOneLevel(int level) { // compact half of data when numToReduce is small + if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1); + int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R) + // System.out.println("T_T\t"+(R1-L1)); + if (level == 0 && !level0Sorted) { + Arrays.sort(num, L1, R1); + level0Sorted = true; + } + L1 += (R1 - L1) & 1; + if (L1 == R1) return; + + randomlyHalveDownToLeft(L1, R1); + + int mid = (L1 + R1) >>> 1; + mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]); + levelPos[level + 1] = mid; + int newP = levelPos[level + 1] - 1, oldP = L1 - 1; + for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--]; + + levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]); + int numReduced = (R1 - L1) >>> 1; + for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced; + // if(levelPos[level+1]-levelPos[level]>levelMaxSize[level+1]){ + // compactOneLevel(level+1); + // } + } + + @Override + protected void calcLevelMaxSize(int setLevel) { // set cntLevel. cntLevel won't decrease + int[] tmpArr = new int[setLevel + 1]; + int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum; + for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos; + levelPos = tmpArr; + cntLevel = setLevel; + levelMaxSize = new int[cntLevel]; + int newK = 0; + for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit. + int need = 0; + for (int i = 0; i < cntLevel; i++) + need += + Math.max(1, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1)))); + if (need <= maxMemoryNum) newK += addK; + } + for (int i = 0; i < cntLevel; i++) + levelMaxSize[i] = Math.max(1, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1)))); + // show(); + } + + public void mergeWithTempSpace(List<KLLSketchForQuantile> otherList) { + // System.out.println("[MERGE]"); + // show(); + // + // System.out.println("[mergeWithTempSpace]\t???\t"+num.length+"\t??\t"+cntLevel+"\t??\toldPos0:"+levelPos[0]); + // System.out.println("[mergeWithTempSpace]\t???\tmaxMemNum:"+maxMemoryNum); + // another.show(); + int[] oldLevelPos = Arrays.copyOf(levelPos, cntLevel + 1); + int oldCntLevel = cntLevel; + int otherNumLen = 0; + long otherN = 0; + // System.out.print("\t\t\t\t[mergeWithTempSpace] others:"); + for (KLLSketchForQuantile another : otherList) + if (another != null) { + // System.out.print("\t"+another.getN()); + if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel); + otherNumLen += another.getNumLen(); + otherN += another.getN(); + } + // System.out.println(); + // System.out.println("[mergeWithTempSpace]\totherNumLen:"+otherNumLen); + if (getNumLen() + otherNumLen <= maxMemoryNum) { + int cntPos = oldLevelPos[0] - otherNumLen; + for (int i = 0; i < cntLevel; i++) { + levelPos[i] = cntPos; + if (i < oldCntLevel) { + System.arraycopy(num, oldLevelPos[i], num, cntPos, oldLevelPos[i + 1] - oldLevelPos[i]); + cntPos += oldLevelPos[i + 1] - oldLevelPos[i]; + } + for (KLLSketchForQuantile another : otherList) + if (another != null && i < another.cntLevel) { + System.arraycopy( + another.num, another.levelPos[i], num, cntPos, another.getLevelSize(i)); + cntPos += another.getLevelSize(i); + } + Arrays.sort(num, levelPos[i], cntPos); + // System.out.println("\t\t!!\t"+cntPos); + } + levelPos[cntLevel] = cntPos; + this.N += otherN; + } else { + long[] oldNum = num; + num = new long[getNumLen() + otherNumLen]; + // System.out.println("\t\t\t\ttmp_num:"+num.length+" + // old_num:"+levelPos[0]+"..."+levelPos[oldCntLevel]); + int numLen = 0; + for (int i = 0; i < cntLevel; i++) { + levelPos[i] = numLen; + if (i < oldCntLevel) { + // System.out.println("\t\t\tlv"+i+"\toldPos:"+oldLevelPos[i]+"\t"+numLen+" + // this_level_old_len:"+(oldLevelPos[i + 1] - oldLevelPos[i])); + // System.out.println("\t\t\t"+oldNum[oldLevelPos[i + 1]-1]); + System.arraycopy( + oldNum, oldLevelPos[i], num, numLen, oldLevelPos[i + 1] - oldLevelPos[i]); + numLen += oldLevelPos[i + 1] - oldLevelPos[i]; + } + for (KLLSketchForQuantile another : otherList) + if (another != null && i < another.cntLevel) { + System.arraycopy( + another.num, another.levelPos[i], num, numLen, another.getLevelSize(i)); + numLen += another.getLevelSize(i); + } + Arrays.sort(num, levelPos[i], numLen); + } + levelPos[cntLevel] = numLen; + this.N += otherN; + // System.out.println("-------------------------------.............---------"); + // show();System.out.println("\t?\t"+levelPos[0]); + while (getNumLen() > maxMemoryNum) compact(); + // show();System.out.println("\t?\t"+levelPos[0]); + // System.out.println("\t\t??\t\t"+Arrays.toString(num)); + int newPos0 = maxMemoryNum - getNumLen(); + System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen()); + for (int i = cntLevel; i >= 0; i--) levelPos[i] += newPos0 - levelPos[0]; + num = oldNum; + } + // System.out.println("\t\t??\t\t"+Arrays.toString(num)); + // System.out.println("\t\t??\t\t"+Arrays.toString(levelPos)); + // System.out.println("-------------------------------.............---------"); + // System.out.println("[MERGE result]"); + // show(); + // System.out.println(); + } +}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java index bbc88d9..8bc5e19 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForQuantile.java
@@ -17,9 +17,11 @@ public KLLSketchForQuantile() {} - protected abstract int calcMaxMemoryNum(int maxMemoryByte); + protected int calcMaxMemoryNum(int maxMemoryByte) { + return 0; + } - protected abstract void calcLevelMaxSize(int setLevel); + protected void calcLevelMaxSize(int setLevel) {} public int getLevelSize(int level) { return levelPos[level + 1] - levelPos[level]; @@ -55,7 +57,7 @@ // System.out.println("\t\t\t"+x); } - protected abstract void compact(); + protected void compact() {} protected int getNextRand01() { // xor shift * XORSHIFT ^= XORSHIFT >>> 12; @@ -151,4 +153,8 @@ } return num[L + K]; } + + public void active() { + // no-op + } }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java new file mode 100644 index 0000000..6e025bb --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/KLLSketchForSST.java
@@ -0,0 +1,104 @@ +package org.apache.iotdb.tsfile.utils; + +import it.unimi.dsi.fastutil.objects.ObjectArrayList; + +import java.util.Arrays; + +public class KLLSketchForSST extends KLLSketchForQuantile { + ObjectArrayList<KLLSketchForQuantile> subSketchList; + + /** Built with other sketches */ + public KLLSketchForSST(KLLSketchForQuantile subSketch) { + subSketchList = new ObjectArrayList<>(); + subSketchList.add(subSketch); + } + + public KLLSketchForSST() { + subSketchList = new ObjectArrayList<>(); + } + + public void addSubSketch(KLLSketchForQuantile subSketch) { + this.N += subSketch.getN(); + subSketchList.add(subSketch); + } + + public void compactSubSketches(int SketchSizeRatio) { + assert !subSketchList.isEmpty(); + int subLevel = subSketchList.get(0).cntLevel, tmpNumLen = 0, subSizeSum = 0; + for (KLLSketchForQuantile sketch : subSketchList) { + subSizeSum += sketch.getNumLen(); + subLevel = Math.max(subLevel, sketch.cntLevel); + } + maxMemoryNum = subSizeSum * SketchSizeRatio / subSketchList.size(); + num = new long[subSizeSum]; + levelPos = new int[subLevel + 1]; + cntLevel = subLevel; + for (int lv = levelPos[0] = 0; lv < cntLevel; lv++) { + levelPos[lv + 1] = levelPos[lv]; + for (KLLSketchForQuantile sketch : subSketchList) + if (sketch.cntLevel > lv && sketch.getLevelSize(lv) > 0) { + System.arraycopy( + sketch.num, sketch.levelPos[lv], num, levelPos[lv + 1], sketch.getLevelSize(lv)); + levelPos[lv + 1] += sketch.getLevelSize(lv); + } + if (getLevelSize(lv) > 0) Arrays.sort(num, levelPos[lv], levelPos[lv + 1]); + } + level0Sorted = true; + // System.out.println("\t\t before compact"); + // show(); + for (int lv = 0; getNumLen() > maxMemoryNum; lv++) { + compactOneLevel(lv); + } + // show(); + // showNum(); + // System.out.println("\t\t compact over"); + } + + private void compactOneLevel(int level) { // compact half of data when numToReduce is small + if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1); + int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R) + // System.out.println("T_T\t"+(R1-L1)); + if (level == 0 && !level0Sorted) { + Arrays.sort(num, L1, R1); + level0Sorted = true; + } + L1 += (R1 - L1) & 1; + if (L1 == R1) return; + + randomlyHalveDownToLeft(L1, R1); + + int mid = (L1 + R1) >>> 1; + mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]); + levelPos[level + 1] = mid; + int newP = levelPos[level + 1] - 1, oldP = L1 - 1; + for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--]; + + levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]); + int numReduced = (R1 - L1) >>> 1; + for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced; + // if(levelPos[level+1]-levelPos[level]>levelMaxSize[level+1]){ + // compactOneLevel(level+1); + // } + } + + @Override + protected void calcLevelMaxSize(int setLevel) { // set cntLevel. cntLevel won't decrease + int[] tmpArr = new int[setLevel + 1]; + int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum; + for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos; + levelPos = tmpArr; + cntLevel = setLevel; + levelMaxSize = new int[cntLevel]; + int newK = 0; + for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit. + int need = 0; + for (int i = 0; i < cntLevel; i++) + need += + Math.max(1, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1)))); + if (need <= maxMemoryNum) newK += addK; + } + for (int i = 0; i < cntLevel; i++) + levelMaxSize[i] = Math.max(1, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1)))); + // show(); + } +}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java index 16f933b..1be9776 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/LongKLLSketch.java
@@ -17,9 +17,9 @@ int maxN, maxSerializeNum; int K; int maxLevel; + ByteBuffer sketchBuffer = null; - public LongKLLSketch( - int maxN, int maxMemoryByte, int maxSerializeByte) { // maxN=7000 for PAGE, 1.6e6 for CHUNK + public LongKLLSketch(int maxN, int maxMemoryByte, int maxSerializeByte) { this.maxN = maxN; N = 0; maxLevel = calcMaxLevel(maxN, maxSerializeByte); @@ -27,6 +27,24 @@ calcLevelMaxSize(1); } + public LongKLLSketch(KLLSketchForQuantile sketch) { + N = sketch.getN(); + maxN = (int) N; + cntLevel = maxLevel = sketch.cntLevel; + maxSerializeNum = Integer.MAX_VALUE; + num = sketch.num; + levelPos = sketch.levelPos; + } + + @Override + public void update(long x) { // signed long + deserializeFromBuffer(); + if (levelPos[0] == 0) compact(); + num[--levelPos[0]] = x; + N++; + level0Sorted = false; + } + // public int getCntLevel(){return cntLevel;} // public int[] getLevelPos(){return levelPos;} // public long[] getNum(){return num;} @@ -97,7 +115,7 @@ int numLEN = getNumLen(); System.out.println( "\t\tCOMPACT_SIZE:" - + (13 + (maxLevel) * (numLEN < 256 ? 1 : 2) + numLEN * 8) + + (13 + (maxLevel) * (numLEN < 256 ? 1 : 4) + numLEN * 8) + "\t//maxMemNum:" + maxMemoryNum + ",maxSeriNum:" @@ -173,19 +191,20 @@ } public int serialize(OutputStream outputStream) throws IOException { // 15+1*?+8*? + deserializeFromBuffer(); compactBeforeSerialization(); // if N==maxN int byteLen = 0; byteLen += ReadWriteIOUtils.write(N, outputStream); byteLen += ReadWriteIOUtils.write(maxN, outputStream); byteLen += ReadWriteIOUtils.write((byte) maxLevel, outputStream); int numLEN = getNumLen(); - byteLen += ReadWriteIOUtils.write((short) numLEN, outputStream); + byteLen += ReadWriteIOUtils.write(numLEN, outputStream); if (numLEN < 256) for (int i = 0; i < maxLevel; i++) byteLen += ReadWriteIOUtils.write((byte) (levelPos[i + 1] - levelPos[i]), outputStream); else for (int i = 0; i < maxLevel; i++) - byteLen += ReadWriteIOUtils.write((short) (levelPos[i + 1] - levelPos[i]), outputStream); + byteLen += ReadWriteIOUtils.write((levelPos[i + 1] - levelPos[i]), outputStream); for (int i = levelPos[0]; i < levelPos[maxLevel]; i++) byteLen += ReadWriteIOUtils.write(num[i], outputStream); return byteLen; @@ -197,11 +216,11 @@ this.maxN = ReadWriteIOUtils.readInt(inputStream); this.maxLevel = ReadWriteIOUtils.readByte(inputStream); calcParameters(maxMemoryByte, maxSerializeByte); - int numLEN = ReadWriteIOUtils.readShort(inputStream); + int numLEN = ReadWriteIOUtils.readInt(inputStream); for (int i = 0, tmp = 0; i < maxLevel; i++) { levelPos[i] = maxMemoryNum - (numLEN - tmp); if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(inputStream)); - else tmp += ReadWriteIOUtils.readShort(inputStream); + else tmp += ReadWriteIOUtils.readInt(inputStream); } int actualLevel = maxLevel - 1; while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--; @@ -215,11 +234,11 @@ this.maxN = ReadWriteIOUtils.readInt(byteBuffer); this.maxLevel = ReadWriteIOUtils.readByte(byteBuffer); calcParameters(maxMemoryByte, maxSerializeByte); - int numLEN = ReadWriteIOUtils.readShort(byteBuffer); + int numLEN = ReadWriteIOUtils.readInt(byteBuffer); for (int i = 0, tmp = 0; i < maxLevel; i++) { levelPos[i] = maxMemoryNum - (numLEN - tmp); if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(byteBuffer)); - else tmp += ReadWriteIOUtils.readShort(byteBuffer); + else tmp += ReadWriteIOUtils.readInt(byteBuffer); } int actualLevel = maxLevel - 1; while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--; @@ -232,7 +251,7 @@ this.N = ReadWriteIOUtils.readLong(inputStream); this.maxN = ReadWriteIOUtils.readInt(inputStream); this.maxLevel = ReadWriteIOUtils.readByte(inputStream); - int numLEN = ReadWriteIOUtils.readShort(inputStream); + int numLEN = ReadWriteIOUtils.readInt(inputStream); this.maxSerializeNum = numLEN; K = calcK(maxN, maxLevel); @@ -246,7 +265,7 @@ // System.out.println("\t\ttmp:"+tmp+"\t\tnumLen:"+numLEN+"\t\tmemMemNum:"+maxMemoryNum); levelPos[i] = maxMemoryNum - (numLEN - tmp); if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(inputStream)); - else tmp += ReadWriteIOUtils.readShort(inputStream); + else tmp += ReadWriteIOUtils.readInt(inputStream); } int actualLevel = maxLevel - 1; while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--; @@ -259,10 +278,26 @@ this.N = ReadWriteIOUtils.readLong(byteBuffer); this.maxN = ReadWriteIOUtils.readInt(byteBuffer); this.maxLevel = ReadWriteIOUtils.readByte(byteBuffer); + // System.out.println("\t\t[DEBUG deserialize sketch]\tmaxLevel=" + maxLevel); - int numLEN = ReadWriteIOUtils.readShort(byteBuffer); + int numLEN = ReadWriteIOUtils.readInt(byteBuffer); maxSerializeNum = numLEN; + sketchBuffer = byteBuffer.slice(); + int bytesToRead = 0; + if (numLEN < 256) bytesToRead = maxLevel; + else bytesToRead = maxLevel * 4; + bytesToRead += numLEN * 8; + sketchBuffer.limit(bytesToRead); + byteBuffer.position(byteBuffer.position() + bytesToRead); + // return; + + } + + public void deserializeFromBuffer() { + if (sketchBuffer == null) return; + // System.out.println("\t\tdelay deseri sketch. bytes:" + sketchBuffer.capacity()); + int numLEN = maxSerializeNum; K = calcK(maxN, maxLevel); maxMemoryNum = numLEN; num = new long[maxMemoryNum]; @@ -272,13 +307,15 @@ for (int i = 0, tmp = 0; i < maxLevel; i++) { levelPos[i] = maxMemoryNum - (numLEN - tmp); - if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(byteBuffer)); - else tmp += ReadWriteIOUtils.readShort(byteBuffer); + if (numLEN < 256) tmp += UnsignedBytes.toInt(ReadWriteIOUtils.readByte(sketchBuffer)); + else tmp += ReadWriteIOUtils.readInt(sketchBuffer); } int actualLevel = maxLevel - 1; while (levelPos[actualLevel] == levelPos[actualLevel + 1]) actualLevel--; calcLevelMaxSize(actualLevel + 1); for (int i = 0; i < numLEN; i++) - num[maxMemoryNum - numLEN + i] = ReadWriteIOUtils.readLong(byteBuffer); + num[maxMemoryNum - numLEN + i] = ReadWriteIOUtils.readLong(sketchBuffer); + + sketchBuffer = null; } }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java new file mode 100644 index 0000000..83fa1e9 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/SegTreeBySketch.java
@@ -0,0 +1,211 @@ +package org.apache.iotdb.tsfile.utils; + +import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class SegTreeBySketch { + int level, leafNum; // ,sizeRatio; + public LongArrayList queriedChunkL, queriedChunkR; + public ObjectArrayList<LongArrayList> sketchMinT, sketchMaxT; + public ObjectArrayList<ObjectArrayList<KLLSketchForQuantile>> sketch; + public static final int LSM_T = 30; + + public void show() { + System.out.println("\t\tshowing a Seg Tree sketches for an SSTable.\tlevel=" + level); + for (int i = 1; i <= level; i++) + System.out.print("\t\t[ " + sketch.get(i).size() + " sketches ]"); + System.out.println(); + } + + public SegTreeBySketch( + ObjectArrayList<KLLSketchForQuantile> leaves, + LongArrayList leafMinT, + LongArrayList leafMaxT, + int sketch_size_ratio) { + leafNum = leaves.size(); + // sizeRatio = sketch_size_ratio; + int tmpNum = leaves.size(); + level = 0; + sketch = new ObjectArrayList<>(); + sketch.add(new ObjectArrayList<>()); + sketchMinT = new ObjectArrayList<>(); + sketchMinT.add(new LongArrayList()); + sketchMaxT = new ObjectArrayList<>(); + sketchMaxT.add(new LongArrayList()); + while (tmpNum >= LSM_T) { + level++; + sketch.add(new ObjectArrayList<>()); + sketchMinT.add(new LongArrayList()); + sketchMaxT.add(new LongArrayList()); + for (int i = 0; i < tmpNum / LSM_T; i++) { + KLLSketchForSST segSketch = new KLLSketchForSST(); + for (int j = 0; j < LSM_T; j++) + segSketch.addSubSketch( + level == 1 ? leaves.get(i * LSM_T + j) : sketch.get(level - 1).get(i * LSM_T + j)); + segSketch.compactSubSketches(sketch_size_ratio); + sketch.get(level).add(segSketch); + sketchMinT + .get(level) + .add( + level == 1 + ? leafMinT.getLong(i * LSM_T) + : sketchMinT.get(level - 1).getLong(i * LSM_T)); + sketchMaxT + .get(level) + .add( + level == 1 + ? leafMaxT.getLong(i * LSM_T + LSM_T - 1) + : sketchMaxT.get(level - 1).getLong(i * LSM_T + LSM_T - 1)); + } + tmpNum /= LSM_T; + } + } + + public int serializeSegTree(OutputStream outputStream) throws IOException { + // System.out.println("\t\t\t\t\t[DEBUG DOUBLE stat] serializeStats + // hashmap:"+serializeHashMap); + int byteLen = 0; + byteLen += ReadWriteIOUtils.write(level, outputStream); + byteLen += ReadWriteIOUtils.write(leafNum, outputStream); // how many chunks + // byteLen += ReadWriteIOUtils.write(sizeRatio, outputStream); + + for (int i = 1; i <= level; i++) { + for (int j = 0; j < sketch.get(i).size(); j++) { + byteLen += ReadWriteIOUtils.write(sketchMinT.get(i).getLong(j), outputStream); + byteLen += ReadWriteIOUtils.write(sketchMaxT.get(i).getLong(j), outputStream); + byteLen += (new LongKLLSketch(sketch.get(i).get(j))).serialize(outputStream); + } + } + return byteLen; + } + + public SegTreeBySketch(ByteBuffer byteBuffer) { + level = ReadWriteIOUtils.readInt(byteBuffer); + leafNum = ReadWriteIOUtils.readInt(byteBuffer); + // sizeRatio = ReadWriteIOUtils.readInt(byteBuffer); + + sketch = new ObjectArrayList<>(); + sketch.add(new ObjectArrayList<>()); + sketchMinT = new ObjectArrayList<>(); + sketchMinT.add(new LongArrayList()); + sketchMaxT = new ObjectArrayList<>(); + sketchMaxT.add(new LongArrayList()); + + for (int i = 1, tmpNum = leafNum; i <= level; i++) { + sketch.add(new ObjectArrayList<>()); + sketchMinT.add(new LongArrayList()); + sketchMaxT.add(new LongArrayList()); + for (int j = 0; j < tmpNum / LSM_T; j++) { + sketchMinT.get(i).add(ReadWriteIOUtils.readLong(byteBuffer)); + sketchMaxT.get(i).add(ReadWriteIOUtils.readLong(byteBuffer)); + sketch.get(i).add(new LongKLLSketch(byteBuffer)); + } + tmpNum /= LSM_T; + } + } + + private boolean inInterval(long x, long y, long L, long R) { + return x >= L && y <= R; + } + + private boolean inInterval(long x, long L, long R) { + return x >= L && x <= R; + } + + private boolean overlapInterval(long x, long y, long L, long R) { // [L,R] + return !(y < L || x > R); + } + + private boolean timeFilter_contains(Filter timeFilter, long L, long R) { + return timeFilter == null || timeFilter.containStartEndTime(L, R); + } + + // start from a node without father in this SegTree + private void range_query_in_node( + int lv, + int p, + ObjectArrayList<KLLSketchForQuantile> queriedSketch, + Filter timeFilter, + long otherL, + long otherR, + ObjectArrayList<ITimeSeriesMetadata> overlappedTSMD) { + // other SST may partially overlap with this SSTable. only [otherL,otherR] in this SST don't + // overlap. + if (lv <= 0) return; + long cntL = sketchMinT.get(lv).getLong(p), cntR = sketchMaxT.get(lv).getLong(p); + // if (!timeFilter.o overlapInterval(cntL, cntR, L, R)) return; + System.out.println( + "\t\t\t\t\tcnt seg node:" + + "lv=" + + lv + + cntL + + "..." + + cntR + + "\t\t\t\tqueryTimeFilter:" + + timeFilter + + "\t\tcontainedInQuery:" + + timeFilter_contains(timeFilter, cntL, cntR) + + "\t\tcontainedInQuery:" + + timeFilter_contains(timeFilter, cntL, cntR)); + if (timeFilter_contains(timeFilter, cntL, cntR) && inInterval(cntL, cntR, otherL, otherR)) { + boolean node_non_overlap = true; + for (ITimeSeriesMetadata tsmd : overlappedTSMD) + if (overlapInterval( + cntL, cntR, tsmd.getStatistics().getStartTime(), tsmd.getStatistics().getEndTime())) + node_non_overlap = false; + if (node_non_overlap) { + // System.out.println("\t\tmerge with + // T:"+cntL+"..."+cntR+"\t\t\tlv="+lv+"\t\tcntN:"+query_sketch.getN()); + ((LongKLLSketch) sketch.get(lv).get(p)).deserializeFromBuffer(); + queriedSketch.add(sketch.get(lv).get(p)); + + int weight = 1; + for (int i = 1; i <= lv; i++) weight *= LSM_T; + System.out.println( + "\t\t\tmerge with the " + + (p * weight) + + "..." + + (p * weight + weight - 1) + + " chunks in SST sketches."); + queriedChunkL.add(sketchMinT.get(lv).getLong(p)); + queriedChunkR.add(sketchMaxT.get(lv).getLong(p)); + return; + } + } + for (int i = 0; i < LSM_T; i++) + range_query_in_node( + lv - 1, p * LSM_T + i, queriedSketch, timeFilter, otherL, otherR, overlappedTSMD); + } + + public void range_query_in_SST_sketches( + ObjectArrayList<KLLSketchForQuantile> queriesSketch, + Filter timeFilter, + long otherL, + long otherR, + ObjectArrayList<ITimeSeriesMetadata> + overlappedTSMD) { // other SST may partially overlap with this SSTable. only + // [otherL,otherR] in + // this SST don't overlap. + queriedChunkL = new LongArrayList(); + queriedChunkR = new LongArrayList(); + int last = 0; + for (int i = level; i >= 1; i--) { + for (int j = last * LSM_T; j < sketch.get(i).size(); j++) { + // if (timeFilter.containStartEndTime( + // sketchMinT.get(i).getLong(j), sketchMaxT.get(i).getLong(j)) + // && overlapInterval( + // sketchMinT.get(i).getLong(j), sketchMaxT.get(i).getLong(j), otherL, + // otherR)) + range_query_in_node(i, j, queriesSketch, timeFilter, otherL, otherR, overlappedTSMD); + } + last = sketch.get(i).size(); + } + } +}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java index f2ced03..8be013c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -506,6 +506,7 @@ * @throws WriteProcessException exception in write process */ public boolean write(Tablet tablet) throws IOException, WriteProcessException { + // System.out.println("\t\t[TsFileWriter write_tablet]:N=" + tablet.rowSize); // make sure the ChunkGroupWriter for this Tablet exist checkIsTimeseriesExist(tablet, false); // get corresponding ChunkGroupWriter and write this Tablet
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 822e0a8..76798bd 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -198,6 +198,8 @@ if (isSdtEncoding && isLastPoint) { pageWriter.write(time, value); } + // if (pageWriter.getPointNumber() % 4000 == 0) + // System.out.println("\t\t[write point] 4000 points written"); checkPageSizeAndMayOpenANewPage(); } @@ -241,6 +243,7 @@ } pageWriter.write(timestamps, values, batchSize); checkPageSizeAndMayOpenANewPage(); + System.out.println("\t\t[write batch] " + batchSize + " points written"); } public void write(long[] timestamps, Binary[] values, int batchSize) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java index 8b6038e..62c6579 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -88,6 +88,7 @@ @Override public int write(Tablet tablet) throws WriteProcessException { + System.out.println("\t\t[NonAlignedCGWriter] write tablet:N=" + tablet.rowSize); int pointCount = 0; List<MeasurementSchema> timeseries = tablet.getSchemas(); for (int row = 0; row < tablet.rowSize; row++) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 92e64f6..3fbb33c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -23,37 +23,26 @@ import org.apache.iotdb.tsfile.file.MetaMarker; import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; import org.apache.iotdb.tsfile.file.header.ChunkHeader; -import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor; -import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; +import org.apache.iotdb.tsfile.file.metadata.*; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.PublicBAOS; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.utils.*; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; /** * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream. @@ -328,6 +317,9 @@ */ private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList) throws IOException { + int chunkNum = chunkMetadataList.size(), + lsmLevel = (int) Math.round(Math.log(chunkNum) / Math.log(30)); + System.out.println("\t[DEBUG flushOneChunkMetadata]\tchunkNum=" + chunkMetadataList.size()); // create TimeseriesMetaData PublicBAOS publicBAOS = new PublicBAOS(); TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); @@ -343,6 +335,58 @@ chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); } + // int addKLLLevel = + // lsmLevel + // * (int) + // Math.round( + // Math.log(30.0 / + // TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio()) + // / Math.log(2)); + // if (TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch() + // && lsmLevel > 0 + // && addKLLLevel > 0) { + // KLLSketchForLSMFile lsmSketch = new KLLSketchForLSMFile(); + // for (IChunkMetadata chunkMetadata : chunkMetadataList) + // lsmSketch.addSubSketch( + // ((DoubleStatistics) (chunkMetadata.getStatistics())).getOneKllSketch()); + // lsmSketch.compactSubSketches(addKLLLevel); + // lsmSketch.show(); + // System.out.println( + // "\t\t[DEBUG] compactingSketchForLSM\tN:" + // + lsmSketch.getN() + // + "\tnumLen:" + // + lsmSketch.getNumLen()); + // ((DoubleStatistics) seriesStatistics).setKLLSketch(lsmSketch); + // } + if (TSFileDescriptor.getInstance().getConfig().isEnableSSTSketch() && lsmLevel > 0) { + ObjectArrayList<KLLSketchForQuantile> leaves = + new ObjectArrayList<>(chunkMetadataList.size()); + LongArrayList leafMinT = new LongArrayList(), leafMaxT = new LongArrayList(); + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + ObjectArrayList<KLLSketchForQuantile> chunk_sketches = new ObjectArrayList<>(); + int tmpNumLen = 0; + for (KLLSketchForQuantile chunkSketch : + ((DoubleStatistics) (chunkMetadata.getStatistics())).getKllSketchList()) { + ((LongKLLSketch) chunkSketch).deserializeFromBuffer(); + chunk_sketches.add(chunkSketch); + tmpNumLen += chunkSketch.getNumLen(); + } + HeapLongKLLSketch leafSketch = new HeapLongKLLSketch(tmpNumLen * 8); + leafSketch.mergeWithTempSpace(chunk_sketches); + leaves.add(leafSketch); + leafMinT.add(chunkMetadata.getStartTime()); + leafMaxT.add(chunkMetadata.getEndTime()); + } + SegTreeBySketch segTreeBySketch = + new SegTreeBySketch( + leaves, + leafMinT, + leafMaxT, + TSFileDescriptor.getInstance().getConfig().getSketchSizeRatio()); + segTreeBySketch.show(); + ((DoubleStatistics) seriesStatistics).segTreeBySketch = segTreeBySketch; + ((DoubleStatistics) seriesStatistics).hasSegTreeBySketch = true; + } TimeseriesMetadata timeseriesMetadata = new TimeseriesMetadata(