| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import { TableSessionPool } from "../../src/client/TableSessionPool"; |
| import { TSDataType } from "../../src/utils/DataTypes"; |
| import { ColumnCategory } from "../../src/client/Session"; |
| |
| describe("TableSessionPool E2E Tests", () => { |
| const IOTDB_HOST = process.env.IOTDB_HOST || "localhost"; |
| const IOTDB_PORT = parseInt(process.env.IOTDB_PORT || "6667"); |
| const IOTDB_USER = process.env.IOTDB_USER || "root"; |
| const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root"; |
| |
| let pool: TableSessionPool; |
| let isConnected = false; |
| |
| beforeAll(async () => { |
| pool = new TableSessionPool(IOTDB_HOST, IOTDB_PORT, { |
| username: IOTDB_USER, |
| password: IOTDB_PASSWORD, |
| database: "test", |
| maxPoolSize: 5, |
| minPoolSize: 2, |
| }); |
| |
| try { |
| await pool.init(); |
| isConnected = true; |
| // Cleanup from previous runs |
| try { |
| await pool.executeNonQueryStatement("DROP DATABASE test"); |
| } catch (e) { |
| // Ignore errors if database doesn't exist |
| } |
| } catch (error) { |
| console.warn("Could not connect to IoTDB. E2E tests will be skipped."); |
| console.warn( |
| "Set IOTDB_HOST, IOTDB_PORT to run E2E tests against a real instance.", |
| ); |
| try { |
| await pool.close(); |
| } catch { |
| // Ignore cleanup errors |
| } |
| } |
| }, 60000); |
| |
| afterAll(async () => { |
| if (pool && isConnected) { |
| // Cleanup |
| try { |
| await pool.executeNonQueryStatement("DROP DATABASE test"); |
| } catch (e) { |
| // Ignore cleanup errors |
| } |
| await pool.close(); |
| } |
| }, 60000); |
| |
| test("Should create database and tables", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| // Create database |
| await pool.executeNonQueryStatement("CREATE DATABASE test"); |
| |
| // Use test database |
| await pool.executeNonQueryStatement("USE test"); |
| |
| // Create table1 |
| await pool.executeNonQueryStatement( |
| "CREATE TABLE table1(" + |
| "region_id STRING TAG, " + |
| "plant_id STRING TAG, " + |
| "device_id STRING TAG, " + |
| "model STRING ATTRIBUTE, " + |
| "temperature FLOAT FIELD, " + |
| "humidity DOUBLE FIELD) " + |
| "WITH (TTL=3600000)", |
| ); |
| |
| // Create table2 |
| await pool.executeNonQueryStatement( |
| "CREATE TABLE table2(" + |
| "region_id STRING TAG, " + |
| "plant_id STRING TAG, " + |
| "color STRING ATTRIBUTE, " + |
| "temperature FLOAT FIELD, " + |
| "speed DOUBLE FIELD) " + |
| "WITH (TTL=6600000)", |
| ); |
| |
| // Show tables from current database (test) |
| const dataSet = await pool.executeQueryStatement("SHOW TABLES"); |
| expect(dataSet).toBeDefined(); |
| |
| let rowCount = 0; |
| while (await dataSet.hasNext()) { |
| dataSet.next(); |
| rowCount++; |
| } |
| await dataSet.close(); |
| |
| expect(rowCount).toBeGreaterThanOrEqual(2); |
| }); |
| |
| test("Should insert and query table data (based on C# example)", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| const database = "test"; |
| const tableName = "testTable1"; |
| |
| // Ensure database exists and is in use |
| try { |
| await pool.executeNonQueryStatement(`CREATE DATABASE ${database}`); |
| } catch (e: any) { |
| // Ignore if already exists |
| if (!e.message?.includes("already")) { |
| throw e; |
| } |
| } |
| |
| // Use test database for this session |
| await pool.executeNonQueryStatement(`USE ${database}`); |
| |
| // Create table |
| await pool.executeNonQueryStatement( |
| `CREATE TABLE ${tableName}(` + |
| "region_id STRING TAG, " + |
| "plant_id STRING TAG, " + |
| "device_id STRING TAG, " + |
| "model STRING ATTRIBUTE, " + |
| "temperature FLOAT FIELD, " + |
| "humidity DOUBLE FIELD)", |
| ); |
| |
| // Insert data using tablet with explicit column categories |
| // Each column's category determines its indexing and query behavior |
| const tablet = { |
| tableName: tableName, // Just use table name after USE database |
| columnNames: [ |
| "region_id", |
| "plant_id", |
| "device_id", |
| "model", |
| "temperature", |
| "humidity", |
| ], |
| columnTypes: [ |
| TSDataType.STRING, |
| TSDataType.STRING, |
| TSDataType.STRING, |
| TSDataType.STRING, |
| TSDataType.FLOAT, |
| TSDataType.DOUBLE, |
| ], |
| columnCategories: [ |
| ColumnCategory.TAG, // region_id - indexed tag for filtering |
| ColumnCategory.TAG, // plant_id - indexed tag for filtering |
| ColumnCategory.TAG, // device_id - indexed tag for filtering |
| ColumnCategory.ATTRIBUTE, // model - metadata (not indexed) |
| ColumnCategory.FIELD, // temperature - measurement value |
| ColumnCategory.FIELD, // humidity - measurement value |
| ], |
| timestamps: [] as number[], |
| values: [] as any[][], |
| }; |
| |
| for (let i = 0; i < 50; i++) { |
| tablet.timestamps.push(i); |
| tablet.values.push(["1", "5", "3", "A", 1.23 + i, 111.1 + i]); |
| } |
| |
| await pool.insertTablet(tablet); |
| |
| // Query the data |
| const dataSet = await pool.executeQueryStatement( |
| `SELECT * FROM ${tableName} WHERE region_id = '1' AND plant_id IN ('3', '5') AND device_id = '3' LIMIT 10`, |
| ); |
| |
| expect(dataSet).toBeDefined(); |
| |
| let rowCount = 0; |
| while (await dataSet.hasNext()) { |
| dataSet.next(); |
| rowCount++; |
| } |
| await dataSet.close(); |
| |
| expect(rowCount).toBeGreaterThan(0); |
| }); |
| |
| test("Should query tables from database context", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| // Show tables from current database (test) |
| const dataSet = await pool.executeQueryStatement("SHOW TABLES"); |
| expect(dataSet).toBeDefined(); |
| |
| let rowCount = 0; |
| while (await dataSet.hasNext()) { |
| dataSet.next(); |
| rowCount++; |
| } |
| await dataSet.close(); |
| |
| expect(rowCount).toBeGreaterThan(0); |
| }); |
| |
| test("Should handle insert with null values (based on C# example)", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| const tableName = "t1"; |
| |
| // Ensure database exists and is in use |
| try { |
| await pool.executeNonQueryStatement("CREATE DATABASE test"); |
| } catch (e: any) { |
| // Ignore if already exists |
| if (!e.message?.includes("already")) { |
| throw e; |
| } |
| } |
| |
| // Use test database for this session |
| await pool.executeNonQueryStatement("USE test"); |
| |
| // Create table |
| await pool.executeNonQueryStatement( |
| `CREATE TABLE ${tableName}(t1 STRING TAG, f1 INT32 FIELD)`, |
| ); |
| |
| // Insert data with explicit ColumnCategory enum showing best practices |
| const tablet = { |
| tableName: tableName, |
| columnNames: ["t1", "f1"], |
| columnTypes: [TSDataType.STRING, TSDataType.INT32], |
| columnCategories: [ |
| ColumnCategory.TAG, // Tag column - indexed for efficient filtering in WHERE clauses |
| ColumnCategory.FIELD, // Field column - stores measurement/sensor values |
| ], |
| timestamps: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], |
| values: [ |
| ["t1", 100], |
| ["t1", 200], |
| ["t1", 300], |
| ["t1", 400], |
| ["t1", 500], |
| ["t1", null], |
| ["t1", null], |
| ["t1", null], |
| ["t1", null], |
| ["t1", null], |
| ], |
| }; |
| |
| await pool.insertTablet(tablet); |
| |
| // Query null count |
| const dataSet = await pool.executeQueryStatement( |
| `SELECT COUNT(*) as f1 FROM ${tableName} WHERE f1 IS NULL`, |
| ); |
| |
| expect(dataSet).toBeDefined(); |
| |
| let count = 0; |
| if (await dataSet.hasNext()) { |
| const row = dataSet.next(); |
| count = row.getLongByIndex(0); |
| } |
| await dataSet.close(); |
| |
| // Should have 5 null values |
| expect(count).toBe(5); |
| }); |
| |
| test("Should report pool statistics", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| const poolSize = pool.getPoolSize(); |
| const availableSize = pool.getAvailableSize(); |
| |
| expect(poolSize).toBeGreaterThan(0); |
| expect(availableSize).toBeGreaterThanOrEqual(0); |
| expect(availableSize).toBeLessThanOrEqual(poolSize); |
| }); |
| |
| test("Should support explicit session management with getSession/releaseSession", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| // Get a session from the pool |
| const session = await pool.getSession(); |
| expect(session).toBeDefined(); |
| expect(session.isOpen()).toBe(true); |
| |
| try { |
| // Execute operations with the explicit session |
| const dataSet = await session.executeQueryStatement("SHOW DATABASES"); |
| expect(dataSet).toBeDefined(); |
| expect(dataSet.getColumnNames()).toBeDefined(); |
| |
| // Iterate through results |
| while (await dataSet.hasNext()) { |
| dataSet.next(); |
| } |
| await dataSet.close(); |
| |
| // Session should still be open |
| expect(session.isOpen()).toBe(true); |
| } finally { |
| // Release the session back to the pool |
| pool.releaseSession(session); |
| } |
| |
| // Verify session is back in pool |
| expect(pool.getAvailableSize()).toBeGreaterThan(0); |
| }); |
| |
| test("Should support nodeUrls in string format", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| // Create a table pool using nodeUrls in string format |
| const stringNodeUrlsPool = new TableSessionPool({ |
| nodeUrls: [`${IOTDB_HOST}:${IOTDB_PORT}`], |
| username: IOTDB_USER, |
| password: IOTDB_PASSWORD, |
| database: "test", |
| maxPoolSize: 3, |
| minPoolSize: 1, |
| }); |
| |
| try { |
| await stringNodeUrlsPool.init(); |
| expect(stringNodeUrlsPool.getPoolSize()).toBeGreaterThanOrEqual(1); |
| |
| const dataSet = |
| await stringNodeUrlsPool.executeQueryStatement("SHOW DATABASES"); |
| expect(dataSet).toBeDefined(); |
| |
| // Iterate through results |
| while (await dataSet.hasNext()) { |
| dataSet.next(); |
| } |
| await dataSet.close(); |
| |
| console.log( |
| "TableSessionPool with string format nodeUrls working correctly", |
| ); |
| } finally { |
| await stringNodeUrlsPool.close(); |
| } |
| }); |
| |
| test("Should insert multiple tablets concurrently using insertTabletsParallel", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| const tableName = "parallel_test_table"; |
| |
| // Ensure database and table exist |
| await pool.executeNonQueryStatement("USE test"); |
| try { |
| await pool.executeNonQueryStatement( |
| `CREATE TABLE IF NOT EXISTS ${tableName}(device_id STRING TAG, value FLOAT FIELD)` |
| ); |
| } catch (e: any) { |
| // Ignore if already exists |
| } |
| |
| // Create multiple tablets for different devices |
| const tablets = []; |
| const baseTime = Date.now(); |
| |
| for (let i = 0; i < 10; i++) { |
| tablets.push({ |
| tableName: tableName, |
| columnNames: ["device_id", "value"], |
| columnTypes: [TSDataType.STRING, TSDataType.FLOAT], |
| columnCategories: [ColumnCategory.TAG, ColumnCategory.FIELD], |
| timestamps: [baseTime + i * 1000], |
| values: [[`parallel_device_${i}`, 25.5 + i]], |
| }); |
| } |
| |
| // Insert tablets concurrently |
| await pool.insertTabletsParallel(tablets, { concurrency: 5 }); |
| |
| // Verify data was inserted |
| const dataSet = await pool.executeQueryStatement( |
| `SELECT COUNT(*) FROM ${tableName}` |
| ); |
| |
| let count = 0; |
| if (await dataSet.hasNext()) { |
| const row = dataSet.next(); |
| count = row.getLongByIndex(0); |
| } |
| await dataSet.close(); |
| |
| expect(count).toBeGreaterThanOrEqual(10); |
| }); |
| |
| test("Should execute multiple operations in parallel using executeParallel", async () => { |
| if (!isConnected) { |
| console.log("Skipping test - no IoTDB connection"); |
| return; |
| } |
| |
| await pool.executeNonQueryStatement("USE test"); |
| |
| const tableNames = ["exec_table1", "exec_table2", "exec_table3"]; |
| |
| // Create tables in parallel |
| const results = await pool.executeParallel( |
| tableNames, |
| async (session, tableName) => { |
| try { |
| await session.executeNonQueryStatement( |
| `CREATE TABLE IF NOT EXISTS ${tableName}(device_id STRING TAG, value FLOAT FIELD)` |
| ); |
| } catch (e: any) { |
| // Ignore if already exists |
| } |
| return tableName; |
| }, |
| { concurrency: 3 } |
| ); |
| |
| expect(results).toHaveLength(3); |
| expect(results).toContain("exec_table1"); |
| expect(results).toContain("exec_table2"); |
| expect(results).toContain("exec_table3"); |
| |
| // Verify tables exist |
| const dataSet = await pool.executeQueryStatement("SHOW TABLES"); |
| const tableList: string[] = []; |
| while (await dataSet.hasNext()) { |
| const row = dataSet.next(); |
| tableList.push(row.getStringByIndex(0)); |
| } |
| await dataSet.close(); |
| |
| // Should have at least our 3 tables |
| const foundTables = tableNames.filter(t => tableList.includes(t)); |
| expect(foundTables.length).toBe(3); |
| }); |
| }); |