blob: abd16186399ab4b224ba066e9b3a249a46d0601b [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Connection } from '../connection/Connection';
import { Config, DEFAULT_CONFIG } from '../utils/Config';
import { logger } from '../utils/Logger';
const ttypes = require('../thrift/generated/client_types');
export interface QueryResult {
columns: string[];
dataTypes: string[];
rows: any[][];
queryId?: number;
}
export interface Tablet {
deviceId: string;
measurements: string[];
dataTypes: number[];
timestamps: number[];
values: any[][];
}
export class Session {
protected config: Config;
protected connection: Connection;
constructor(config: Config) {
if (!config.host || !config.port) {
throw new Error('Host and port are required');
}
this.config = { ...DEFAULT_CONFIG, ...config } as Config;
this.connection = new Connection(this.config);
}
async open(): Promise<void> {
await this.connection.open();
}
async close(): Promise<void> {
await this.connection.close();
}
async executeQueryStatement(sql: string, timeoutMs: number = 60000): Promise<QueryResult> {
logger.debug(`Executing query: ${sql}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
const statementId = this.connection.getStatementId();
const req = new ttypes.TSExecuteStatementReq({
sessionId: sessionId,
statement: sql,
statementId: statementId,
fetchSize: this.config.fetchSize,
timeout: timeoutMs,
enableRedirectQuery: true,
jdbcQuery: true,
});
return new Promise((resolve, reject) => {
client.executeQueryStatementV2(req, async (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.status.code !== 200) {
reject(new Error(response.status.message || 'Query failed'));
return;
}
try {
const result = await this.parseQueryResult(response);
resolve(result);
} catch (parseError) {
reject(parseError);
}
});
});
}
async executeNonQueryStatement(sql: string): Promise<void> {
logger.debug(`Executing non-query: ${sql}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
const statementId = this.connection.getStatementId();
const req = new ttypes.TSExecuteStatementReq({
sessionId: sessionId,
statement: sql,
statementId: statementId,
});
return new Promise((resolve, reject) => {
client.executeUpdateStatementV2(req, (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.status.code !== 200) {
reject(new Error(response.status.message || 'Statement execution failed'));
return;
}
resolve();
});
});
}
async insertTablet(tablet: Tablet): Promise<void> {
logger.debug(`Inserting tablet for device: ${tablet.deviceId}`);
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
// Validate timestamps and convert to BigInt
const bigIntTimestamps = tablet.timestamps.map(t => {
if (typeof t !== 'number' || !Number.isFinite(t)) {
throw new Error(`Invalid timestamp: ${t}`);
}
return BigInt(Math.floor(t));
});
const req = new ttypes.TSInsertTabletReq({
sessionId: sessionId,
prefixPath: tablet.deviceId,
measurements: tablet.measurements,
values: this.serializeTabletValues(tablet),
timestamps: Buffer.from(new BigInt64Array(bigIntTimestamps).buffer),
types: tablet.dataTypes,
size: tablet.timestamps.length,
isAligned: false,
});
return new Promise((resolve, reject) => {
client.insertTablet(req, (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.code !== 200) {
reject(new Error(response.message || 'Insert tablet failed'));
return;
}
resolve();
});
});
}
private serializeTabletValues(tablet: Tablet): Buffer {
// Serialize tablet values based on data types
const buffers: Buffer[] = [];
for (let colIndex = 0; colIndex < tablet.measurements.length; colIndex++) {
const dataType = tablet.dataTypes[colIndex];
const columnValues = tablet.values.map((row) => row[colIndex]);
const buffer = this.serializeColumn(columnValues, dataType);
buffers.push(buffer);
}
return Buffer.concat(buffers);
}
private serializeColumn(values: any[], dataType: number): Buffer {
// TSDataType: BOOLEAN(0), INT32(1), INT64(2), FLOAT(3), DOUBLE(4), TEXT(5)
switch (dataType) {
case 0: // BOOLEAN
return Buffer.from(values.map((v) => (v ? 1 : 0)));
case 1: // INT32
return Buffer.from(new Int32Array(values).buffer);
case 2: // INT64
return Buffer.from(new BigInt64Array(values.map(BigInt)).buffer);
case 3: { // FLOAT
return Buffer.from(new Float32Array(values).buffer);
}
case 4: { // DOUBLE
return Buffer.from(new Float64Array(values).buffer);
}
case 5: { // TEXT
const strBuffers = values.map((v) => {
const str = String(v);
const len = Buffer.alloc(4);
len.writeInt32LE(str.length);
return Buffer.concat([len, Buffer.from(str, 'utf8')]);
});
return Buffer.concat(strBuffers);
}
default:
throw new Error(`Unsupported data type: ${dataType}`);
}
}
private async parseQueryResult(response: any): Promise<QueryResult> {
const result: QueryResult = {
columns: response.columns || [],
dataTypes: response.dataTypeList || [],
rows: [],
queryId: response.queryId,
};
if (response.queryDataSet) {
const dataset = response.queryDataSet;
result.rows = await this.parseDataSet(
dataset,
response.columns.length,
response.dataTypeList
);
}
// Fetch more data if available
if (response.moreData) {
const moreRows = await this.fetchResults(response.queryId);
result.rows.push(...moreRows);
}
return result;
}
private async fetchResults(queryId: number): Promise<any[][]> {
const client = this.connection.getClient();
const sessionId = this.connection.getSessionId();
const req = new ttypes.TSFetchResultsReq({
sessionId: sessionId,
statement: '',
fetchSize: this.config.fetchSize,
queryId: queryId,
isAlign: true,
});
return new Promise((resolve, reject) => {
client.fetchResultsV2(req, async (err: Error, response: any) => {
if (err) {
reject(err);
return;
}
if (response.status.code !== 200) {
reject(new Error(response.status.message || 'Fetch results failed'));
return;
}
const rows = await this.parseDataSet(
response.queryDataSet,
response.queryDataSet?.valueList?.length || 0,
[]
);
if (response.moreData) {
const moreRows = await this.fetchResults(queryId);
resolve([...rows, ...moreRows]);
} else {
resolve(rows);
}
});
});
}
private async parseDataSet(
dataset: any,
_columnCount: number,
_dataTypes: string[]
): Promise<any[][]> {
const rows: any[][] = [];
if (!dataset || !dataset.time || !Buffer.isBuffer(dataset.time)) {
return rows;
}
// Validate buffer has sufficient length
const timeBufferLength = dataset.time.length;
if (timeBufferLength === 0 || timeBufferLength % 8 !== 0) {
logger.warn('Invalid time buffer length:', timeBufferLength);
return rows;
}
const rowCount = Math.floor(timeBufferLength / 8);
for (let i = 0; i < rowCount; i++) {
const row: any[] = [];
// Add timestamp with bounds checking
if (i * 8 + 8 <= timeBufferLength) {
row.push(dataset.time.readBigInt64LE(i * 8));
}
// Add values (simplified - actual implementation would parse based on type)
for (let j = 0; j < dataset.valueList.length; j++) {
row.push(null); // Placeholder
}
rows.push(row);
}
return rows;
}
isOpen(): boolean {
return this.connection.isOpen();
}
}