blob: 879f62ef88ced16a61929ae2b4ecbd1618895da1 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Connection } from "../connection/Connection";
import {
Config,
DEFAULT_CONFIG,
parseNodeUrls,
EndPoint,
} from "../utils/Config";
import { logger } from "../utils/Logger";
import { registerClosable, unregisterClosable } from "../utils/ProcessCleanup";
import { SessionDataSet } from "./SessionDataSet";
import { RowRecord } from "./RowRecord";
import { BaseColumnDecoder, ColumnEncoding, Column } from "./ColumnDecoder";
import { RedirectException } from "../utils/Errors";
import {
serializeColumnFast,
serializeTimestamps
} from "../utils/FastSerializer";
import { globalBufferPool } from "../utils/BufferPool";
const ttypes = require("../thrift/generated/client_types");
/**
* @deprecated QueryResult is deprecated. Use SessionDataSet instead.
*/
export interface QueryResult {
columns: string[];
dataTypes: string[];
rows: any[][];
queryId?: number;
}
export { SessionDataSet, RowRecord };
/**
* Column category for table model
* Matches C# and Java IoTDB client definitions
*
* Note: TIME is reserved for internal use. When specifying columnCategories in TableTablet,
* only use TAG, FIELD, and ATTRIBUTE. Timestamps are handled separately via the timestamps array.
*/
export enum ColumnCategory {
TAG = 0, // Tag column - indexed for WHERE clause filtering (e.g., device_id, region_id)
FIELD = 1, // Field column - measurement values (e.g., temperature, humidity)
ATTRIBUTE = 2, // Attribute column - metadata not indexed (e.g., model, firmware_version)
TIME = 3, // Time column (reserved for internal use, do not use in columnCategories)
}
/**
* Tree model tablet interface - for timeseries model
* Uses deviceId as the full path (e.g., "root.sg.device")
*/
export interface ITreeTablet {
deviceId: string;
measurements: string[];
dataTypes: number[];
timestamps: number[];
values: any[][];
}
/**
* Table model tablet interface - for relational/table model
* Uses tableName and includes column categories
*/
export interface ITableTablet {
tableName: string;
columnNames: string[];
columnTypes: number[];
columnCategories: ColumnCategory[];
timestamps: number[];
values: any[][];
}
/**
* Tree model tablet class with helper methods
* Convenient for building tablets row-by-row
*/
export class TreeTablet implements ITreeTablet {
deviceId: string;
measurements: string[];
dataTypes: number[];
timestamps: number[];
values: any[][];
constructor(deviceId: string, measurements: string[], dataTypes: number[]) {
this.deviceId = deviceId;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.timestamps = [];
this.values = [];
}
/**
* Add a row to the tablet
* @param timestamp - The timestamp for this row
* @param values - Array of values, must match the length of measurements
*/
addRow(timestamp: number, values: any[]): void {
if (values.length !== this.measurements.length) {
throw new Error(
`Values array length (${values.length}) does not match measurements length (${this.measurements.length})`,
);
}
this.timestamps.push(timestamp);
this.values.push(values);
}
}
/**
* Table model tablet class with helper methods
* Convenient for building tablets row-by-row
*/
export class TableTablet implements ITableTablet {
tableName: string;
columnNames: string[];
columnTypes: number[];
columnCategories: ColumnCategory[];
timestamps: number[];
values: any[][];
constructor(
tableName: string,
columnNames: string[],
columnTypes: number[],
columnCategories: ColumnCategory[],
) {
if (
columnNames.length !== columnTypes.length ||
columnNames.length !== columnCategories.length
) {
throw new Error(
"columnNames, columnTypes, and columnCategories must have the same length",
);
}
this.tableName = tableName;
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.columnCategories = columnCategories;
this.timestamps = [];
this.values = [];
}
/**
* Add a row to the tablet
* @param timestamp - The timestamp for this row
* @param values - Array of values, must match the length of columnNames
*/
addRow(timestamp: number, values: any[]): void {
if (values.length !== this.columnNames.length) {
throw new Error(
`Values array length (${values.length}) does not match columnNames length (${this.columnNames.length})`,
);
}
this.timestamps.push(timestamp);
this.values.push(values);
}
}
/**
* @deprecated Use TreeTablet for tree model or TableTablet for table model instead
*/
export interface Tablet {
deviceId: string;
measurements: string[];
dataTypes: number[];
timestamps: number[];
values: any[][];
}
export class Session {
protected config: Config;
protected connection: Connection;
private lastRedirectEndpoint: EndPoint | null = null;
constructor(config: Config) {
// Validate config - either host/port or nodeUrls must be provided
if (!config.host && !config.nodeUrls) {
throw new Error("Either host/port or nodeUrls must be provided");
}
if (config.host && !config.port) {
throw new Error("Port is required when host is provided");
}
if (config.nodeUrls && config.nodeUrls.length === 0) {
throw new Error("nodeUrls array cannot be empty");
}
// If nodeUrls is provided, parse and use the first node for single session
if (config.nodeUrls && config.nodeUrls.length > 0) {
// Parse nodeUrls if in string format
const endpoints: EndPoint[] =
typeof config.nodeUrls[0] === "string"
? parseNodeUrls(config.nodeUrls as string[])
: (config.nodeUrls as EndPoint[]);
const firstNode = endpoints[0];
this.config = {
...DEFAULT_CONFIG,
...config,
host: firstNode.host,
port: firstNode.port,
} as Config;
} else {
this.config = { ...DEFAULT_CONFIG, ...config } as Config;
}
this.connection = new Connection(this.config);
registerClosable(this);
}
async open(): Promise<void> {
await this.connection.open();
}
async close(): Promise<void> {
await this.connection.close();
unregisterClosable(this);
}
/**
* Get and clear the last redirect endpoint recommendation.
* Returns null if no redirect was recommended in the last operation.
*/
getAndClearLastRedirect(): EndPoint | null {
const redirect = this.lastRedirectEndpoint;
this.lastRedirectEndpoint = null;
return redirect;
}
/**
* Execute a query and return a SessionDataSet for iterating through results.
* This method supports lazy loading and proper resource management.
*
* @param sql SQL query statement
* @param timeoutMs Query timeout in milliseconds (default: 60000)
* @returns SessionDataSet for iterating through query results
*
* @example
* ```typescript
* const dataSet = await session.executeQueryStatement('SELECT * FROM root.test');
* while (await dataSet.hasNext()) {
* const row = dataSet.next();
* console.log(row.getTimestamp(), row.getFields());
* }
* await dataSet.close();
* ```
*/
async executeQueryStatement(
sql: string,
timeoutMs: number = 60000,
): Promise<SessionDataSet> {
logger.debug(`Executing query: ${sql}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
const statementId = this.connection.getStatementId();
const req = new ttypes.TSExecuteStatementReq({
sessionId: sessionId,
statement: sql,
statementId: statementId,
fetchSize: this.config.fetchSize || 1024,
timeout: timeoutMs,
enableRedirectQuery: true,
jdbcQuery: false, // Changed to false to use queryDataSet format
});
return new Promise((resolve, reject) => {
client.executeQueryStatementV2(req, async (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.status.code !== 200) {
reject(new Error(response.status.message || "Query failed"));
return;
}
try {
let initialRows: any[][];
const ignoreTimeStamp = response.ignoreTimeStamp || false;
logger.debug(
`executeQueryStatement response: ignoreTimeStamp=${ignoreTimeStamp}`,
);
logger.debug(
`executeQueryStatement response: queryResult exists=${!!response.queryResult}, length=${response.queryResult?.length || 0}`,
);
logger.debug(
`executeQueryStatement response: queryDataSet exists=${!!response.queryDataSet}`,
);
// Handle both queryDataSet and queryResult formats
if (response.queryResult && response.queryResult.length > 0) {
// New TsBlock format (queryResult is Buffer[])
initialRows = await this.parseQueryResult(
response.queryResult,
response.columns?.length || 0,
response.dataTypeList || [],
ignoreTimeStamp,
);
} else if (response.queryDataSet) {
// Old columnar format (TSQueryDataSet)
initialRows = await this.parseDataSet(
response.queryDataSet,
response.columns?.length || 0,
response.dataTypeList || [],
);
} else {
// No data in response
initialRows = [];
}
// Create SessionDataSet
const dataSet = new SessionDataSet(
this,
response.queryId,
statementId,
sql,
response.columns || [],
response.dataTypeList || [],
initialRows,
response.moreData || false,
this.config.fetchSize || 1024,
sessionId,
ignoreTimeStamp,
response.columnIndex2TsBlockColumnIndexList,
);
resolve(dataSet);
} catch (parseError) {
reject(parseError);
}
});
});
}
async executeNonQueryStatement(sql: string): Promise<void> {
logger.debug(`Executing non-query: ${sql}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
const statementId = this.connection.getStatementId();
const req = new ttypes.TSExecuteStatementReq({
sessionId: sessionId,
statement: sql,
statementId: statementId,
});
return new Promise((resolve, reject) => {
client.executeUpdateStatementV2(req, (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.status.code !== 200) {
reject(
new Error(response.status.message || "Statement execution failed"),
);
return;
}
resolve();
});
});
}
/**
* Insert tablet (supports both tree and table models)
* @param tablet TreeTablet for tree model or TableTablet for table model
*/
async insertTablet(
tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet,
): Promise<void> {
// Check if it's a TableTablet
if ("tableName" in tablet) {
return this.insertTableTabletInternal(
tablet as TableTablet | ITableTablet,
);
} else {
return this.insertTreeTabletInternal(tablet as TreeTablet | ITreeTablet);
}
}
/**
* Internal method to insert tree model tablet
*/
private async insertTreeTabletInternal(
tablet: TreeTablet | ITreeTablet,
): Promise<void> {
const totalStartTime = Date.now();
logger.debug(`[PERF] insertTreeTablet START for device: ${tablet.deviceId}, rows: ${tablet.timestamps.length}, cols: ${tablet.measurements.length}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
// Serialize timestamps - use fast serializer if enabled
const timestampStartTime = Date.now();
const timestampBuffer = this.config.enableFastSerialization
? serializeTimestamps(tablet.timestamps)
: this.serializeTimestampsLegacy(tablet.timestamps);
const timestampDuration = Date.now() - timestampStartTime;
logger.debug(`[PERF] Timestamp serialization (fast=${this.config.enableFastSerialization}): ${timestampDuration}ms`);
// Serialize values
const serializeStartTime = Date.now();
const valuesBuffer = this.serializeTabletValues(
tablet.values,
tablet.dataTypes,
tablet.timestamps.length,
);
const serializeDuration = Date.now() - serializeStartTime;
logger.debug(`[PERF] Values serialization: ${serializeDuration}ms, buffer size: ${valuesBuffer.length} bytes`);
const req = new ttypes.TSInsertTabletReq({
sessionId: sessionId,
prefixPath: tablet.deviceId,
measurements: tablet.measurements,
values: valuesBuffer,
timestamps: timestampBuffer,
types: tablet.dataTypes,
size: tablet.timestamps.length,
isAligned: false,
});
const rpcStartTime = Date.now();
return new Promise((resolve, reject) => {
client.insertTablet(req, (err: Error, response: any) => {
const rpcDuration = Date.now() - rpcStartTime;
const totalDuration = Date.now() - totalStartTime;
logger.debug(`[PERF] RPC call: ${rpcDuration}ms, Total: ${totalDuration}ms (serialize: ${timestampDuration + serializeDuration}ms)`);
if (err) {
reject(err);
return;
}
// Handle redirection recommendation (code 400)
// Note: Code 400 means write SUCCEEDED but server recommends a different endpoint for future operations
if (response.code === 400) {
// Store redirect recommendation if provided
if (response.redirectNode) {
this.lastRedirectEndpoint = {
host: response.redirectNode.internalIp || response.redirectNode.ip,
port: response.redirectNode.port,
};
logger.debug(
`Server recommends endpoint ${this.lastRedirectEndpoint.host}:${this.lastRedirectEndpoint.port} for future writes to ${tablet.deviceId}`
);
} else {
logger.debug(`Server returned code 400 without redirect node for ${tablet.deviceId}`);
}
// Resolve successfully - the write already succeeded
resolve();
return;
}
if (response.code !== 200) {
reject(new Error(response.message || "Insert tablet failed"));
return;
}
resolve();
});
});
}
/**
* Internal method to insert table model tablet
*/
private async insertTableTabletInternal(
tablet: TableTablet | ITableTablet,
): Promise<void> {
const totalStartTime = Date.now();
logger.debug(`[PERF] insertTableTablet START for table: ${tablet.tableName}, rows: ${tablet.timestamps.length}, cols: ${tablet.columnNames.length}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
// Serialize timestamps - use fast serializer if enabled
const timestampStartTime = Date.now();
const timestampBuffer = this.config.enableFastSerialization
? serializeTimestamps(tablet.timestamps)
: this.serializeTimestampsLegacy(tablet.timestamps);
const timestampDuration = Date.now() - timestampStartTime;
logger.debug(`[PERF] Timestamp serialization (fast=${this.config.enableFastSerialization}): ${timestampDuration}ms`);
// For table model, use database.tableName format for prefixPath
// If tableName already contains database (e.g., "test.table1"), use as-is
// Otherwise, we need to get current database context
const prefixPath = tablet.tableName;
// Convert columnCategories to signed bytes for Thrift
const columnCategoriesBytes = tablet.columnCategories.map((category) => {
// Convert unsigned to signed byte (-128 to 127)
return category < 128 ? category : category - 256;
});
// Serialize values
const serializeStartTime = Date.now();
const valuesBuffer = this.serializeTabletValues(
tablet.values,
tablet.columnTypes,
tablet.timestamps.length,
);
const serializeDuration = Date.now() - serializeStartTime;
logger.debug(`[PERF] Values serialization: ${serializeDuration}ms, buffer size: ${valuesBuffer.length} bytes`);
const req = new ttypes.TSInsertTabletReq({
sessionId: sessionId,
prefixPath: prefixPath,
measurements: tablet.columnNames,
values: valuesBuffer,
timestamps: timestampBuffer,
types: tablet.columnTypes,
size: tablet.timestamps.length,
isAligned: false,
writeToTable: true, // CRITICAL: Tell IoTDB this is table model data
columnCategories: columnCategoriesBytes, // Pass column categories for table model
});
const rpcStartTime = Date.now();
return new Promise((resolve, reject) => {
client.insertTablet(req, (err: Error, response: any) => {
const rpcDuration = Date.now() - rpcStartTime;
const totalDuration = Date.now() - totalStartTime;
logger.debug(`[PERF] RPC call: ${rpcDuration}ms, Total: ${totalDuration}ms (serialize: ${timestampDuration + serializeDuration}ms)`);
if (err) {
reject(err);
return;
}
// Handle redirection recommendation (code 400)
// Note: Code 400 means write SUCCEEDED but server recommends a different endpoint for future operations
if (response.code === 400) {
// Store redirect recommendation if provided
if (response.redirectNode) {
this.lastRedirectEndpoint = {
host: response.redirectNode.internalIp || response.redirectNode.ip,
port: response.redirectNode.port,
};
logger.debug(
`Server recommends endpoint ${this.lastRedirectEndpoint.host}:${this.lastRedirectEndpoint.port} for future writes to table ${tablet.tableName}`
);
} else {
logger.debug(`Server returned code 400 without redirect node for table ${tablet.tableName}`);
}
// Resolve successfully - the write already succeeded
resolve();
return;
}
if (response.code !== 200) {
const errorMsg = response.message || "Insert table tablet failed";
logger.error(`Insert table tablet failed: code=${response.code}, message=${response.message}`);
logger.error(`Request details: prefixPath=${prefixPath}, columns=${tablet.columnNames.length}, rows=${tablet.timestamps.length}`);
reject(new Error(`${errorMsg} (code: ${response.code})`));
return;
}
resolve();
});
});
}
/**
* Legacy timestamp serialization (for when enableFastSerialization=false)
*/
private serializeTimestampsLegacy(timestamps: number[]): Buffer {
const bigIntTimestamps = timestamps.map((t) => {
if (typeof t !== "number" || !Number.isFinite(t)) {
throw new Error(`Invalid timestamp: ${t}`);
}
return BigInt(Math.floor(t));
});
const timestampBuffer = Buffer.alloc(bigIntTimestamps.length * 8);
bigIntTimestamps.forEach((ts, i) => {
timestampBuffer.writeBigInt64BE(ts, i * 8);
});
return timestampBuffer;
}
protected serializeTabletValues(
values: any[][],
dataTypes: number[],
rowCount: number,
): Buffer {
// Serialize tablet values based on data types
// Format: all columns data, then bitmap for null values
const buffers: Buffer[] = [];
const bitMaps: (boolean[] | null)[] = [];
// Serialize each column
for (let colIndex = 0; colIndex < dataTypes.length; colIndex++) {
const dataType = dataTypes[colIndex];
const columnValues = values.map((row) => row[colIndex]);
// Track null values for this column
const nullBitmap: boolean[] = [];
let hasNull = false;
for (let rowIndex = 0; rowIndex < columnValues.length; rowIndex++) {
const isNull =
columnValues[rowIndex] === null ||
columnValues[rowIndex] === undefined;
nullBitmap.push(isNull);
if (isNull) {
hasNull = true;
}
}
// Use fast serialization if enabled, otherwise fall back to legacy
const buffer = this.config.enableFastSerialization
? serializeColumnFast(columnValues, dataType)
: this.serializeColumn(columnValues, dataType);
buffers.push(buffer);
bitMaps.push(hasNull ? nullBitmap : null);
}
// Append bitmap information
const bitmapBuffer = this.serializeBitMaps(bitMaps, rowCount);
buffers.push(bitmapBuffer);
return Buffer.concat(buffers);
}
protected serializeColumn(values: any[], dataType: number): Buffer {
// TSDataType from Apache TSFile:
// BOOLEAN(0), INT32(1), INT64(2), FLOAT(3), DOUBLE(4), TEXT(5),
// VECTOR(6), UNKNOWN(7), TIMESTAMP(8), DATE(9), BLOB(10), STRING(11), OBJECT(12)
switch (dataType) {
case 0: // BOOLEAN
return Buffer.from(
values.map((v) => (v === null || v === undefined ? 0 : v ? 1 : 0)),
);
case 1: {
// INT32 - Use big-endian
const buffer = Buffer.alloc(values.length * 4);
values.forEach((v, i) => {
buffer.writeInt32BE(v === null || v === undefined ? 0 : v, i * 4);
});
return buffer;
}
case 2: {
// INT64 - Use big-endian
const buffer = Buffer.alloc(values.length * 8);
values.forEach((v, i) => {
buffer.writeBigInt64BE(
v === null || v === undefined ? BigInt(0) : BigInt(v),
i * 8,
);
});
return buffer;
}
case 3: {
// FLOAT - Use big-endian
const buffer = Buffer.alloc(values.length * 4);
values.forEach((v, i) => {
buffer.writeFloatBE(v === null || v === undefined ? 0.0 : v, i * 4);
});
return buffer;
}
case 4: {
// DOUBLE - Use big-endian
const buffer = Buffer.alloc(values.length * 8);
values.forEach((v, i) => {
buffer.writeDoubleBE(v === null || v === undefined ? 0.0 : v, i * 8);
});
return buffer;
}
case 5: // TEXT
case 11: {
// STRING (similar to TEXT)
// Optimized: Pre-calculate total size to avoid multiple Buffer.concat calls
// Phase 1: Convert all values to buffers and calculate total size
const strData: Buffer[] = [];
let totalSize = 0;
for (const v of values) {
const str = v === null || v === undefined ? "" : String(v);
const strBytes = Buffer.from(str, "utf8");
strData.push(strBytes);
totalSize += 4 + strBytes.length; // 4 bytes for length + string bytes
}
// Phase 2: Allocate single buffer and copy data
const result = Buffer.allocUnsafe(totalSize);
let offset = 0;
for (const strBytes of strData) {
// Write length (4 bytes, big-endian)
result.writeInt32BE(strBytes.length, offset);
offset += 4;
// Copy string bytes
strBytes.copy(result, offset);
offset += strBytes.length;
}
return result;
}
case 8: {
// TIMESTAMP (stored as INT64 - milliseconds) - Use big-endian for consistency
const buffer = Buffer.alloc(values.length * 8);
values.forEach((v, i) => {
let timestamp = BigInt(0);
if (v !== null && v !== undefined) {
if (v instanceof Date) {
timestamp = BigInt(v.getTime());
} else {
timestamp = BigInt(v);
}
}
buffer.writeBigInt64BE(timestamp, i * 8);
});
return buffer;
}
case 9: {
// DATE (stored as INT32 - days since epoch) - Use big-endian
const buffer = Buffer.alloc(values.length * 4);
values.forEach((v, i) => {
let days = 0;
if (v !== null && v !== undefined) {
if (v instanceof Date) {
days = Math.floor(v.getTime() / (24 * 60 * 60 * 1000));
} else {
days = v;
}
}
buffer.writeInt32BE(days, i * 4);
});
return buffer;
}
case 10: {
// BLOB
// Optimized: Pre-calculate total size to avoid multiple Buffer.concat calls
// Phase 1: Convert all values to buffers and calculate total size
const blobData: Buffer[] = [];
let totalSize = 0;
for (const v of values) {
const blob =
v === null || v === undefined
? Buffer.alloc(0)
: Buffer.isBuffer(v)
? v
: Buffer.from(v);
blobData.push(blob);
totalSize += 4 + blob.length; // 4 bytes for length + blob bytes
}
// Phase 2: Allocate single buffer and copy data
const result = Buffer.allocUnsafe(totalSize);
let offset = 0;
for (const blob of blobData) {
// Write length (4 bytes, big-endian)
result.writeInt32BE(blob.length, offset);
offset += 4;
// Copy blob bytes
blob.copy(result, offset);
offset += blob.length;
}
return result;
}
default:
throw new Error(`Unsupported data type: ${dataType}`);
}
}
/**
* Serialize BitMap for null value indicators.
*
* BitMap Serialization Format (compatible with Apache IoTDB Java client):
*
* For each column:
* 1. columnHasNull flag (1 byte): 0=no nulls, 1=has nulls
* 2. If has nulls: bitmap array
* - 8 values packed per byte (LSB-first bit ordering)
* - Bit=1 means NULL, Bit=0 means NOT NULL
* - Size = Math.ceil(rowCount / 8) bytes
* - Padding: Remaining bits in last byte are set to 0
*
* Example 1: rowCount=10, nulls at indices [1, 4, 6, 9]
* Row indices: 0 1 2 3 4 5 6 7 | 8 9 (6 padding bits)
* Null values: 0 1 0 0 1 0 1 0 | 0 1 0 0 0 0 0 0
* Bit positions: 0 1 2 3 4 5 6 7 | 0 1 2 3 4 5 6 7
* Binary (LSB): 01001010 | 01000000
* Hex: 0x52 | 0x02
*
* Example 2: rowCount=13, nulls at indices [0, 3, 8, 10]
* Byte 1: indices 0-7 → binary 00001001 → 0x09
* Byte 2: indices 8-12 → binary 00000101 → 0x05
*
* Bit Ordering Details (LSB-first):
* - Row index 0 maps to bit 0 (LSB) = 1 << 0 = 0x01
* - Row index 1 maps to bit 1 = 1 << 1 = 0x02
* - Row index 2 maps to bit 2 = 1 << 2 = 0x04
* - Row index 3 maps to bit 3 = 1 << 3 = 0x08
* - ...
* - Row index 7 maps to bit 7 (MSB) = 1 << 7 = 0x80
*
* This format is compatible with:
* - Java: org.apache.iotdb.session.tablet.Tablet.writeBitMaps()
* - C++: Session::getValue() with BitMap serialization
* - Python: Tablet.get_binary_values() with struct.pack
*
* @param bitMaps - Array of boolean arrays (or null) for each column, true=null, false=not null
* @param rowCount - Number of rows in the tablet
* @returns Serialized bitmap buffer
*/
protected serializeBitMaps(
bitMaps: (boolean[] | null)[],
rowCount: number,
): Buffer {
const buffers: Buffer[] = [];
for (const bitMap of bitMaps) {
const columnHasNull = bitMap !== null;
// Write columnHasNull flag (1 byte): 1 if column has nulls, 0 if not
buffers.push(Buffer.from([columnHasNull ? 1 : 0]));
if (columnHasNull && bitMap) {
// Calculate number of bytes needed for bitmap (1 bit per row, 8 rows per byte)
// Example: 10 rows → Math.ceil(10/8) = 2 bytes
const bitmapByteCount = Math.ceil(rowCount / 8);
const bitmapBytes = Buffer.alloc(bitmapByteCount);
// Pack null indicators into bits (LSB-first ordering)
// Bit=1 means NULL, Bit=0 means NOT NULL
for (let i = 0; i < bitMap.length; i++) {
if (bitMap[i]) {
const byteIndex = Math.floor(i / 8); // Which byte in the bitmap
const bitIndex = i % 8; // Which bit in the byte (0-7)
// Set bit using LSB-first ordering: 1 << bitIndex
// Row 0 → bit 0 (LSB) = 1 << 0 = 0x01
// Row 1 → bit 1 = 1 << 1 = 0x02
// Row 7 → bit 7 (MSB) = 1 << 7 = 0x80
bitmapBytes[byteIndex] |= 1 << bitIndex;
}
}
buffers.push(bitmapBytes);
}
}
return Buffer.concat(buffers);
}
/**
* Parse queryResult (TsBlock format) - new format used by IoTDB
* Each buffer in queryResult is a complete TsBlock containing:
* - Value column count (INT32)
* - Value column data types (list of BYTE)
* - Position count (INT32)
* - Column encodings (list of BYTE)
* - Time column data
* - Value columns data
*
* @param ignoreTimeStamp - If true, no time column is present
*/
async parseQueryResult(
queryResult: Buffer[],
_columnCount: number,
dataTypes: string[],
ignoreTimeStamp: boolean = false,
): Promise<any[][]> {
const rows: any[][] = [];
if (!queryResult || queryResult.length === 0) {
logger.debug("parseQueryResult: queryResult is null or empty");
return rows;
}
logger.debug(
`parseQueryResult: queryResult has ${queryResult.length} TsBlocks, ignoreTimeStamp=${ignoreTimeStamp}`,
);
logger.debug(`parseQueryResult: dataTypes: ${JSON.stringify(dataTypes)}`);
// Process each TsBlock in queryResult
for (let blockIndex = 0; blockIndex < queryResult.length; blockIndex++) {
const tsBlockBuffer = Buffer.isBuffer(queryResult[blockIndex])
? queryResult[blockIndex]
: Buffer.from(queryResult[blockIndex]);
logger.debug(
`parseQueryResult: Processing TsBlock ${blockIndex}, size=${tsBlockBuffer.length} bytes`,
);
try {
const blockRows = this.parseTsBlock(
tsBlockBuffer,
dataTypes,
ignoreTimeStamp,
);
rows.push(...blockRows);
} catch (error) {
logger.error(`Error parsing TsBlock ${blockIndex}:`, error);
throw error;
}
}
logger.debug(`parseQueryResult: returning ${rows.length} total rows`);
return rows;
}
/**
* Parse a single TsBlock buffer
* TsBlock format (from Apache IoTDB C# client):
* +-------------+---------------+---------+------------+-----------+----------+
* | val col cnt | val col types | pos cnt | encodings | time col | val col |
* +-------------+---------------+---------+------------+-----------+----------+
* | int32 | list[byte] | int32 | list[byte] | bytes | bytes |
* +-------------+---------------+---------+------------+-----------+----------+
*
* IMPORTANT: TsBlock ALWAYS contains time column encoding and time column data,
* regardless of the ignoreTimeStamp setting. The ignoreTimeStamp flag only
* affects whether the timestamp is included in the returned row data, not the
* TsBlock binary format. This matches the behavior of iotdb-client-csharp.
*/
private parseTsBlock(
buffer: Buffer,
dataTypes: string[],
ignoreTimeStamp: boolean,
): any[][] {
let offset = 0;
// Read value column count (INT32, 4 bytes, BIG ENDIAN)
const valueColumnCount = buffer.readInt32BE(offset);
offset += 4;
logger.debug(`TsBlock: valueColumnCount=${valueColumnCount}`);
// Read value column data types (valueColumnCount bytes)
const valueColumnTypes: number[] = [];
for (let i = 0; i < valueColumnCount; i++) {
valueColumnTypes.push(buffer.readUInt8(offset));
offset += 1;
}
logger.debug(
`TsBlock: valueColumnTypes=${JSON.stringify(valueColumnTypes)}`,
);
// Read position count (INT32, 4 bytes, BIG ENDIAN)
const positionCount = buffer.readInt32BE(offset);
offset += 4;
logger.debug(`TsBlock: positionCount=${positionCount}`);
// Read encodings - time column + value columns
// Time column encoding (1 byte)
const timeEncoding: ColumnEncoding = buffer.readUInt8(offset);
offset += 1;
logger.debug(`TsBlock: timeEncoding=${timeEncoding}`);
// Value column encodings (valueColumnCount bytes)
const valueEncodings: ColumnEncoding[] = [];
for (let i = 0; i < valueColumnCount; i++) {
valueEncodings.push(buffer.readUInt8(offset));
offset += 1;
}
logger.debug(`TsBlock: valueEncodings=${JSON.stringify(valueEncodings)}`);
// Read time column using decoder
// IMPORTANT: TsBlock ALWAYS contains time column data, even when ignoreTimeStamp=true
// This matches the behavior of iotdb-client-csharp
const timeDecoder = BaseColumnDecoder.getDecoder(timeEncoding);
const { column: timeColumn, bytesRead: timeColumnBytesRead } =
timeDecoder.readColumn(buffer, offset, 8 /* TIMESTAMP */, positionCount);
offset += timeColumnBytesRead;
logger.debug(
`TsBlock: read time column, ${timeColumn.values.length} timestamps, ignoreTimeStamp=${ignoreTimeStamp}`,
);
// Read value columns using decoders
const valueColumns: Column[] = [];
for (let i = 0; i < valueColumnCount; i++) {
const dataType = valueColumnTypes[i];
const encoding = valueEncodings[i];
const decoder = BaseColumnDecoder.getDecoder(encoding);
const { column, bytesRead } = decoder.readColumn(
buffer,
offset,
dataType,
positionCount,
);
valueColumns.push(column);
offset += bytesRead;
logger.debug(
`TsBlock: read column ${i}, type=${dataType}, encoding=${encoding}, ${column.values.length} values`,
);
}
// Build rows from columns
const rows: any[][] = [];
for (let rowIndex = 0; rowIndex < positionCount; rowIndex++) {
const row: any[] = [];
// Add timestamp only if not ignoring timestamp
// Note: timeColumn is always present in TsBlock, but we only include it in results when needed
if (
!ignoreTimeStamp &&
timeColumn &&
rowIndex < timeColumn.values.length
) {
row.push(timeColumn.values[rowIndex]);
}
// Add column values
for (let colIndex = 0; colIndex < valueColumns.length; colIndex++) {
row.push(valueColumns[colIndex].values[rowIndex]);
}
rows.push(row);
}
return rows;
}
/**
* Helper method to convert data type string to numeric code
*/
private getDataTypeCode(typeStr: string | undefined): number {
if (!typeStr) return 5; // Default to TEXT
const type = String(typeStr).toUpperCase();
if (type.includes("BOOLEAN")) return 0;
else if (type.includes("INT32")) return 1;
else if (type.includes("INT64")) return 2;
else if (type.includes("FLOAT")) return 3;
else if (type.includes("DOUBLE")) return 4;
else if (type.includes("TEXT")) return 5;
else if (type.includes("TIMESTAMP")) return 8;
else if (type.includes("DATE")) return 9;
else if (type.includes("BLOB")) return 10;
else if (type.includes("STRING")) return 11;
return 5; // Default to TEXT
}
/**
* Helper method to determine row count from a buffer based on data type
*/
private getRowCountFromBuffer(buffer: Buffer, dataType: number): number {
const length = buffer.length;
// Calculate based on fixed-size types
switch (dataType) {
case 0: // BOOLEAN - 1 byte per value
return length;
case 1: // INT32 - 4 bytes per value
case 9: // DATE - 4 bytes per value
return Math.floor(length / 4);
case 2: // INT64 - 8 bytes per value
case 8: // TIMESTAMP - 8 bytes per value
return Math.floor(length / 8);
case 3: // FLOAT - 4 bytes per value
return Math.floor(length / 4);
case 4: // DOUBLE - 8 bytes per value
return Math.floor(length / 8);
case 5: // TEXT - variable length, need to parse
case 10: // BLOB - variable length
case 11: // STRING - variable length
// For variable-length types, count entries by parsing length prefixes
let count = 0;
let offset = 0;
while (offset + 4 <= length) {
const strLength = buffer.readInt32BE(offset);
offset += 4 + strLength;
count++;
}
return count;
default:
logger.warn(
`Unknown data type ${dataType}, cannot determine row count`,
);
return 0;
}
}
// parseDataSet is used by SessionDataSet for parsing query results
// Keep it as public for SessionDataSet to access
async parseDataSet(
dataset: any,
_columnCount: number,
dataTypes: string[],
): Promise<any[][]> {
const rows: any[][] = [];
if (!dataset) {
logger.debug("parseDataSet: dataset is null or undefined");
return rows;
}
// Handle case where dataset.time is not a Buffer (might be an array or null)
if (!dataset.time) {
logger.debug("parseDataSet: dataset.time is null or undefined");
return rows;
}
logger.debug(
`parseDataSet: dataset.time type: ${typeof dataset.time}, isBuffer: ${Buffer.isBuffer(dataset.time)}, length: ${dataset.time.length || "N/A"}`,
);
logger.debug(
`parseDataSet: dataset.valueList type: ${typeof dataset.valueList}, isArray: ${Array.isArray(dataset.valueList)}, length: ${dataset.valueList?.length || "N/A"}`,
);
logger.debug(`parseDataSet: dataTypes: ${JSON.stringify(dataTypes)}`);
// Convert time to Buffer if it's not already
const timeBuffer = Buffer.isBuffer(dataset.time)
? dataset.time
: Buffer.from(dataset.time);
// Validate buffer has sufficient length
const timeBufferLength = timeBuffer.length;
if (timeBufferLength === 0 || timeBufferLength % 8 !== 0) {
logger.warn("Invalid time buffer length:", timeBufferLength);
return rows;
}
const rowCount = Math.floor(timeBufferLength / 8);
logger.debug(`parseDataSet: rowCount = ${rowCount}`);
// Parse value buffers
const parsedColumns: any[][] = [];
if (dataset.valueList && Array.isArray(dataset.valueList)) {
for (let colIndex = 0; colIndex < dataset.valueList.length; colIndex++) {
const valueBuffer = Buffer.isBuffer(dataset.valueList[colIndex])
? dataset.valueList[colIndex]
: Buffer.from(dataset.valueList[colIndex]);
const bitmap =
dataset.bitmapList && dataset.bitmapList[colIndex]
? Buffer.isBuffer(dataset.bitmapList[colIndex])
? dataset.bitmapList[colIndex]
: Buffer.from(dataset.bitmapList[colIndex])
: null;
// Get data type - dataTypes might be an array of strings or numbers
let dataType = 5; // Default to TEXT
if (dataTypes && dataTypes[colIndex] !== undefined) {
const typeStr = String(dataTypes[colIndex]).toUpperCase();
if (typeStr.includes("BOOLEAN")) dataType = 0;
else if (typeStr.includes("INT32")) dataType = 1;
else if (typeStr.includes("INT64")) dataType = 2;
else if (typeStr.includes("FLOAT")) dataType = 3;
else if (typeStr.includes("DOUBLE")) dataType = 4;
else if (typeStr.includes("TEXT")) dataType = 5;
else if (typeStr.includes("TIMESTAMP")) dataType = 8;
else if (typeStr.includes("DATE")) dataType = 9;
else if (typeStr.includes("BLOB")) dataType = 10;
else if (typeStr.includes("STRING")) dataType = 11;
}
logger.debug(
`parseDataSet: column ${colIndex}, dataType = ${dataType}, valueBuffer.length = ${valueBuffer.length}`,
);
const columnValues = this.deserializeColumn(
valueBuffer,
dataType,
rowCount,
bitmap,
);
parsedColumns.push(columnValues);
}
}
// Build rows
for (let i = 0; i < rowCount; i++) {
const row: any[] = [];
// Add timestamp
if (i * 8 + 8 <= timeBufferLength) {
row.push(timeBuffer.readBigInt64LE(i * 8));
}
// Add column values
for (let colIndex = 0; colIndex < parsedColumns.length; colIndex++) {
row.push(parsedColumns[colIndex][i]);
}
rows.push(row);
}
logger.debug(`parseDataSet: returning ${rows.length} rows`);
return rows;
}
private deserializeColumn(
buffer: Buffer,
dataType: number,
rowCount: number,
bitmap: Buffer | null,
): any[] {
const values: any[] = [];
try {
switch (dataType) {
case 0: {
// BOOLEAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
values.push(buffer[i] !== 0);
}
}
break;
}
case 1: {
// INT32 - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
values.push(buffer.readInt32BE(i * 4));
}
}
break;
}
case 2: {
// INT64 - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
values.push(buffer.readBigInt64BE(i * 8));
}
}
break;
}
case 3: {
// FLOAT - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
values.push(buffer.readFloatBE(i * 4));
}
}
break;
}
case 4: {
// DOUBLE - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
values.push(buffer.readDoubleBE(i * 8));
}
}
break;
}
case 5: // TEXT
case 11: {
// STRING (similar to TEXT) - TSQueryDataSet uses BIG ENDIAN for length
let offset = 0;
for (let i = 0; i < rowCount && offset < buffer.length; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
if (offset + 4 > buffer.length) break;
const strLength = buffer.readInt32BE(offset);
offset += 4;
if (offset + strLength > buffer.length) break;
const str = buffer.toString("utf8", offset, offset + strLength);
values.push(str);
offset += strLength;
}
}
break;
}
case 8: {
// TIMESTAMP (stored as INT64 - milliseconds) - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
// Convert milliseconds to Date object
const timestamp = Number(buffer.readBigInt64BE(i * 8));
const date = new Date(timestamp);
values.push(date);
}
}
break;
}
case 9: {
// DATE (stored as INT32 - days since epoch) - TSQueryDataSet uses BIG ENDIAN
for (let i = 0; i < rowCount; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
// Convert days since epoch to Date object
const days = buffer.readInt32BE(i * 4);
const date = new Date(days * 24 * 60 * 60 * 1000);
values.push(date);
}
}
break;
}
case 10: {
// BLOB - TSQueryDataSet uses BIG ENDIAN for length
let offset = 0;
for (let i = 0; i < rowCount && offset < buffer.length; i++) {
if (this.isNull(bitmap, i)) {
values.push(null);
} else {
if (offset + 4 > buffer.length) break;
const blobLength = buffer.readInt32BE(offset);
offset += 4;
if (offset + blobLength > buffer.length) break;
const blob = buffer.slice(offset, offset + blobLength);
values.push(blob);
offset += blobLength;
}
}
break;
}
default:
// Unknown type, return nulls
for (let i = 0; i < rowCount; i++) {
values.push(null);
}
}
} catch (error) {
logger.error("Error deserializing column:", error);
// Fill with nulls on error
for (let i = values.length; i < rowCount; i++) {
values.push(null);
}
}
return values;
}
private isNull(bitmap: Buffer | null, index: number): boolean {
if (!bitmap) return false;
const byteIndex = Math.floor(index / 8);
const bitIndex = index % 8;
if (byteIndex >= bitmap.length) return false;
return (bitmap[byteIndex] & (1 << bitIndex)) === 0;
}
isOpen(): boolean {
return this.connection.isOpen();
}
/**
* Insert multiple tablets in a single RPC call (batch insert).
* This is more efficient than calling insertTablet multiple times,
* especially for high-throughput scenarios.
*
* Note: All tablets must be tree model tablets (have deviceId).
* For table model tablets, use insertTabletsTable().
*
* @param tablets Array of TreeTablets to insert
* @example
* ```typescript
* const tablets = [
* { deviceId: 'root.sg.d1', measurements: ['temp'], dataTypes: [3], timestamps: [Date.now()], values: [[25.5]] },
* { deviceId: 'root.sg.d2', measurements: ['temp'], dataTypes: [3], timestamps: [Date.now()], values: [[26.0]] },
* ];
* await session.insertTablets(tablets);
* ```
*/
async insertTablets(tablets: (TreeTablet | ITreeTablet)[]): Promise<void> {
if (tablets.length === 0) {
return;
}
const totalStartTime = Date.now();
logger.debug(`[PERF] insertTablets START for ${tablets.length} tablets`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
// Serialize all tablets
const prefixPaths: string[] = [];
const measurementsList: string[][] = [];
const valuesList: Buffer[] = [];
const timestampsList: Buffer[] = [];
const typesList: number[][] = [];
const sizeList: number[] = [];
const serializeStartTime = Date.now();
for (const tablet of tablets) {
prefixPaths.push(tablet.deviceId);
measurementsList.push(tablet.measurements);
typesList.push(tablet.dataTypes);
sizeList.push(tablet.timestamps.length);
// Serialize timestamps
const timestampBuffer = this.config.enableFastSerialization
? serializeTimestamps(tablet.timestamps)
: this.serializeTimestampsLegacy(tablet.timestamps);
timestampsList.push(timestampBuffer);
// Serialize values
const valuesBuffer = this.serializeTabletValues(
tablet.values,
tablet.dataTypes,
tablet.timestamps.length,
);
valuesList.push(valuesBuffer);
}
const serializeDuration = Date.now() - serializeStartTime;
logger.debug(`[PERF] Tablets serialization: ${serializeDuration}ms`);
const req = new ttypes.TSInsertTabletsReq({
sessionId: sessionId,
prefixPaths: prefixPaths,
measurementsList: measurementsList,
valuesList: valuesList,
timestampsList: timestampsList,
typesList: typesList,
sizeList: sizeList,
isAligned: false,
});
const rpcStartTime = Date.now();
return new Promise((resolve, reject) => {
client.insertTablets(req, (err: Error, response: any) => {
const rpcDuration = Date.now() - rpcStartTime;
const totalDuration = Date.now() - totalStartTime;
logger.debug(`[PERF] insertTablets RPC: ${rpcDuration}ms, Total: ${totalDuration}ms`);
if (err) {
reject(err);
return;
}
// Handle redirection recommendation (code 400)
// Note: Code 400 means write SUCCEEDED but server recommends a different endpoint for future operations
if (response.code === 400) {
// Store redirect recommendation if provided (use first device's recommendation)
if (response.redirectNode) {
this.lastRedirectEndpoint = {
host: response.redirectNode.internalIp || response.redirectNode.ip,
port: response.redirectNode.port,
};
logger.debug(
`Server recommends endpoint ${this.lastRedirectEndpoint.host}:${this.lastRedirectEndpoint.port} for future writes`
);
} else {
logger.debug(`Server returned code 400 without redirect node`);
}
// Resolve successfully - the write already succeeded
resolve();
return;
}
if (response.code !== 200) {
reject(new Error(response.message || "Insert tablets failed"));
return;
}
resolve();
});
});
}
/**
* Insert multiple tablets concurrently using Promise.all.
* This is optimized for Node.js async patterns and maximizes throughput
* by leveraging the event loop for parallel execution.
*
* @param tablets Array of tablets to insert
* @param concurrency Maximum number of concurrent insertions (default: 10)
* @returns Promise that resolves when all inserts complete
*
* @example
* ```typescript
* const tablets = generateTablets(100);
* // Insert 100 tablets with max 20 concurrent operations
* await session.insertTabletsParallel(tablets, 20);
* ```
*/
async insertTabletsParallel(
tablets: (TreeTablet | ITreeTablet | TableTablet | ITableTablet)[],
concurrency: number = 10,
): Promise<void> {
if (tablets.length === 0) {
return;
}
const totalStartTime = Date.now();
logger.debug(`[PERF] insertTabletsParallel START: ${tablets.length} tablets, concurrency: ${concurrency}`);
// Use worker pattern to properly limit concurrency
const actualConcurrency = Math.min(concurrency, tablets.length);
let tabletIndex = 0;
const errors: Error[] = [];
// Create workers that consume from the tablet queue
const workers = Array.from({ length: actualConcurrency }, async () => {
while (tabletIndex < tablets.length) {
const idx = tabletIndex++;
if (idx >= tablets.length) break;
try {
await this.insertTablet(tablets[idx]);
} catch (err) {
errors.push(err instanceof Error ? err : new Error(String(err)));
}
}
});
await Promise.all(workers);
const totalDuration = Date.now() - totalStartTime;
logger.debug(`[PERF] insertTabletsParallel COMPLETE: ${totalDuration}ms, ${tablets.length / (totalDuration / 1000)} tablets/sec`);
if (errors.length > 0) {
throw new Error(`${errors.length} of ${tablets.length} tablet inserts failed. First error: ${errors[0].message}`);
}
}
}