blob: b81a172a40994480eb452069720d5fcd0717a1e1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import struct
from iotdb.utils.IoTDBConstants import TSDataType
from iotdb.utils.BitMap import BitMap
class NumpyTablet(object):
def __init__(self, device_id, measurements, data_types, values, timestamps):
"""
creating a numpy tablet for insertion
for example, considering device: root.sg1.d1
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
Notice: From 0.13.0, the tablet can contain empty cell
The tablet will be sorted at the initialization by timestamps
:param device_id: String, IoTDB time series path to device layer (without sensor)
:param measurements: List, sensors
:param data_types: TSDataType List, specify value types for sensors
:param values: List of numpy array, the values of each column should be the inner numpy array
:param timestamps: Numpy array, the timestamps
"""
if len(values) > 0 and len(values[0]) != len(timestamps):
raise RuntimeError(
"Input error! len(timestamps) does not equal to len(values[0])!"
)
if len(values) != len(data_types):
raise RuntimeError(
"Input error! len(values) does not equal to len(data_types)!"
)
if not self.check_sorted(timestamps):
index = timestamps.argsort()
timestamps = timestamps[index]
for i in range(len(values)):
values[i] = values[i][index]
if timestamps.dtype != TSDataType.INT64.np_dtype():
timestamps = timestamps.astype(TSDataType.INT64.np_dtype())
for i in range(len(values)):
if values[i].dtype != data_types[i].np_dtype():
values[i] = values[i].astype(data_types[i].np_dtype())
self.__values = values
self.__timestamps = timestamps
self.__device_id = device_id
self.__measurements = measurements
self.__data_types = data_types
self.__row_number = len(timestamps)
self.__column_number = len(measurements)
@staticmethod
def check_sorted(timestamps):
for i in range(1, len(timestamps)):
if timestamps[i] < timestamps[i - 1]:
return False
return True
def get_measurements(self):
return self.__measurements
def get_data_types(self):
return self.__data_types
def get_row_number(self):
return self.__row_number
def get_device_id(self):
return self.__device_id
def get_timestamps(self):
return self.__timestamps
def get_values(self):
return self.__values
def get_binary_timestamps(self):
return self.__timestamps.tobytes()
def get_binary_values(self):
bs_len = 0
bs_list = []
for i, value in enumerate(self.__values):
if self.__data_types[i] == TSDataType.TEXT:
format_str_list = [">"]
values_tobe_packed = []
for str_list in value:
# Fot TEXT, it's same as the original solution
value_bytes = bytes(str_list, "utf-8")
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
format_str = "".join(format_str_list)
bs = struct.pack(format_str, *values_tobe_packed)
else:
bs = value.tobytes()
bs_list.append(bs)
bs_len += len(bs)
ret = memoryview(bytearray(bs_len))
offset = 0
for bs in bs_list:
_l = len(bs)
ret[offset : offset + _l] = bs
offset += _l
return ret
def __mark_none_value(self, bitmaps, bitmap, column, row):
if bitmap is None:
bitmap = BitMap(self.__row_number)
bitmaps.insert(column, bitmap)
bitmap.mark(row)