blob: f7fbb784bf6d0f350cc97e50fb97027e271423fd [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { TableSessionPool } from "../../src/client/TableSessionPool";
import { TSDataType } from "../../src/utils/DataTypes";
import { ColumnCategory } from "../../src/client/Session";
describe("TableSessionPool E2E Tests", () => {
const IOTDB_HOST = process.env.IOTDB_HOST || "localhost";
const IOTDB_PORT = parseInt(process.env.IOTDB_PORT || "6667");
const IOTDB_USER = process.env.IOTDB_USER || "root";
const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root";
let pool: TableSessionPool;
let isConnected = false;
beforeAll(async () => {
pool = new TableSessionPool(IOTDB_HOST, IOTDB_PORT, {
username: IOTDB_USER,
password: IOTDB_PASSWORD,
database: "test",
maxPoolSize: 5,
minPoolSize: 2,
});
try {
await pool.init();
isConnected = true;
// Cleanup from previous runs
try {
await pool.executeNonQueryStatement("DROP DATABASE test");
} catch (e) {
// Ignore errors if database doesn't exist
}
} catch (error) {
console.warn("Could not connect to IoTDB. E2E tests will be skipped.");
console.warn(
"Set IOTDB_HOST, IOTDB_PORT to run E2E tests against a real instance.",
);
try {
await pool.close();
} catch {
// Ignore cleanup errors
}
}
}, 60000);
afterAll(async () => {
if (pool && isConnected) {
// Cleanup
try {
await pool.executeNonQueryStatement("DROP DATABASE test");
} catch (e) {
// Ignore cleanup errors
}
await pool.close();
}
}, 60000);
test("Should create database and tables", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Create database
await pool.executeNonQueryStatement("CREATE DATABASE test");
// Use test database
await pool.executeNonQueryStatement("USE test");
// Create table1
await pool.executeNonQueryStatement(
"CREATE TABLE table1(" +
"region_id STRING TAG, " +
"plant_id STRING TAG, " +
"device_id STRING TAG, " +
"model STRING ATTRIBUTE, " +
"temperature FLOAT FIELD, " +
"humidity DOUBLE FIELD) " +
"WITH (TTL=3600000)",
);
// Create table2
await pool.executeNonQueryStatement(
"CREATE TABLE table2(" +
"region_id STRING TAG, " +
"plant_id STRING TAG, " +
"color STRING ATTRIBUTE, " +
"temperature FLOAT FIELD, " +
"speed DOUBLE FIELD) " +
"WITH (TTL=6600000)",
);
// Show tables from current database (test)
const dataSet = await pool.executeQueryStatement("SHOW TABLES");
expect(dataSet).toBeDefined();
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThanOrEqual(2);
});
test("Should insert and query table data (based on C# example)", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const database = "test";
const tableName = "testTable1";
// Ensure database exists and is in use
try {
await pool.executeNonQueryStatement(`CREATE DATABASE ${database}`);
} catch (e: any) {
// Ignore if already exists
if (!e.message?.includes("already")) {
throw e;
}
}
// Use test database for this session
await pool.executeNonQueryStatement(`USE ${database}`);
// Create table
await pool.executeNonQueryStatement(
`CREATE TABLE ${tableName}(` +
"region_id STRING TAG, " +
"plant_id STRING TAG, " +
"device_id STRING TAG, " +
"model STRING ATTRIBUTE, " +
"temperature FLOAT FIELD, " +
"humidity DOUBLE FIELD)",
);
// Insert data using tablet with explicit column categories
// Each column's category determines its indexing and query behavior
const tablet = {
tableName: tableName, // Just use table name after USE database
columnNames: [
"region_id",
"plant_id",
"device_id",
"model",
"temperature",
"humidity",
],
columnTypes: [
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.FLOAT,
TSDataType.DOUBLE,
],
columnCategories: [
ColumnCategory.TAG, // region_id - indexed tag for filtering
ColumnCategory.TAG, // plant_id - indexed tag for filtering
ColumnCategory.TAG, // device_id - indexed tag for filtering
ColumnCategory.ATTRIBUTE, // model - metadata (not indexed)
ColumnCategory.FIELD, // temperature - measurement value
ColumnCategory.FIELD, // humidity - measurement value
],
timestamps: [] as number[],
values: [] as any[][],
};
for (let i = 0; i < 50; i++) {
tablet.timestamps.push(i);
tablet.values.push(["1", "5", "3", "A", 1.23 + i, 111.1 + i]);
}
await pool.insertTablet(tablet);
// Query the data
const dataSet = await pool.executeQueryStatement(
`SELECT * FROM ${tableName} WHERE region_id = '1' AND plant_id IN ('3', '5') AND device_id = '3' LIMIT 10`,
);
expect(dataSet).toBeDefined();
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThan(0);
});
test("Should query tables from database context", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Show tables from current database (test)
const dataSet = await pool.executeQueryStatement("SHOW TABLES");
expect(dataSet).toBeDefined();
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThan(0);
});
test("Should handle insert with null values (based on C# example)", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const tableName = "t1";
// Ensure database exists and is in use
try {
await pool.executeNonQueryStatement("CREATE DATABASE test");
} catch (e: any) {
// Ignore if already exists
if (!e.message?.includes("already")) {
throw e;
}
}
// Use test database for this session
await pool.executeNonQueryStatement("USE test");
// Create table
await pool.executeNonQueryStatement(
`CREATE TABLE ${tableName}(t1 STRING TAG, f1 INT32 FIELD)`,
);
// Insert data with explicit ColumnCategory enum showing best practices
const tablet = {
tableName: tableName,
columnNames: ["t1", "f1"],
columnTypes: [TSDataType.STRING, TSDataType.INT32],
columnCategories: [
ColumnCategory.TAG, // Tag column - indexed for efficient filtering in WHERE clauses
ColumnCategory.FIELD, // Field column - stores measurement/sensor values
],
timestamps: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
values: [
["t1", 100],
["t1", 200],
["t1", 300],
["t1", 400],
["t1", 500],
["t1", null],
["t1", null],
["t1", null],
["t1", null],
["t1", null],
],
};
await pool.insertTablet(tablet);
// Query null count
const dataSet = await pool.executeQueryStatement(
`SELECT COUNT(*) as f1 FROM ${tableName} WHERE f1 IS NULL`,
);
expect(dataSet).toBeDefined();
let count = 0;
if (await dataSet.hasNext()) {
const row = dataSet.next();
count = row.getLongByIndex(0);
}
await dataSet.close();
// Should have 5 null values
expect(count).toBe(5);
});
test("Should report pool statistics", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const poolSize = pool.getPoolSize();
const availableSize = pool.getAvailableSize();
expect(poolSize).toBeGreaterThan(0);
expect(availableSize).toBeGreaterThanOrEqual(0);
expect(availableSize).toBeLessThanOrEqual(poolSize);
});
test("Should support explicit session management with getSession/releaseSession", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Get a session from the pool
const session = await pool.getSession();
expect(session).toBeDefined();
expect(session.isOpen()).toBe(true);
try {
// Execute operations with the explicit session
const dataSet = await session.executeQueryStatement("SHOW DATABASES");
expect(dataSet).toBeDefined();
expect(dataSet.getColumnNames()).toBeDefined();
// Iterate through results
while (await dataSet.hasNext()) {
dataSet.next();
}
await dataSet.close();
// Session should still be open
expect(session.isOpen()).toBe(true);
} finally {
// Release the session back to the pool
pool.releaseSession(session);
}
// Verify session is back in pool
expect(pool.getAvailableSize()).toBeGreaterThan(0);
});
test("Should support nodeUrls in string format", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Create a table pool using nodeUrls in string format
const stringNodeUrlsPool = new TableSessionPool({
nodeUrls: [`${IOTDB_HOST}:${IOTDB_PORT}`],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
database: "test",
maxPoolSize: 3,
minPoolSize: 1,
});
try {
await stringNodeUrlsPool.init();
expect(stringNodeUrlsPool.getPoolSize()).toBeGreaterThanOrEqual(1);
const dataSet =
await stringNodeUrlsPool.executeQueryStatement("SHOW DATABASES");
expect(dataSet).toBeDefined();
// Iterate through results
while (await dataSet.hasNext()) {
dataSet.next();
}
await dataSet.close();
console.log(
"TableSessionPool with string format nodeUrls working correctly",
);
} finally {
await stringNodeUrlsPool.close();
}
});
test("Should insert multiple tablets concurrently using insertTabletsParallel", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const tableName = "parallel_test_table";
// Ensure database and table exist
await pool.executeNonQueryStatement("USE test");
try {
await pool.executeNonQueryStatement(
`CREATE TABLE IF NOT EXISTS ${tableName}(device_id STRING TAG, value FLOAT FIELD)`
);
} catch (e: any) {
// Ignore if already exists
}
// Create multiple tablets for different devices
const tablets = [];
const baseTime = Date.now();
for (let i = 0; i < 10; i++) {
tablets.push({
tableName: tableName,
columnNames: ["device_id", "value"],
columnTypes: [TSDataType.STRING, TSDataType.FLOAT],
columnCategories: [ColumnCategory.TAG, ColumnCategory.FIELD],
timestamps: [baseTime + i * 1000],
values: [[`parallel_device_${i}`, 25.5 + i]],
});
}
// Insert tablets concurrently
await pool.insertTabletsParallel(tablets, { concurrency: 5 });
// Verify data was inserted
const dataSet = await pool.executeQueryStatement(
`SELECT COUNT(*) FROM ${tableName}`
);
let count = 0;
if (await dataSet.hasNext()) {
const row = dataSet.next();
count = row.getLongByIndex(0);
}
await dataSet.close();
expect(count).toBeGreaterThanOrEqual(10);
});
test("Should execute multiple operations in parallel using executeParallel", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
await pool.executeNonQueryStatement("USE test");
const tableNames = ["exec_table1", "exec_table2", "exec_table3"];
// Create tables in parallel
const results = await pool.executeParallel(
tableNames,
async (session, tableName) => {
try {
await session.executeNonQueryStatement(
`CREATE TABLE IF NOT EXISTS ${tableName}(device_id STRING TAG, value FLOAT FIELD)`
);
} catch (e: any) {
// Ignore if already exists
}
return tableName;
},
{ concurrency: 3 }
);
expect(results).toHaveLength(3);
expect(results).toContain("exec_table1");
expect(results).toContain("exec_table2");
expect(results).toContain("exec_table3");
// Verify tables exist
const dataSet = await pool.executeQueryStatement("SHOW TABLES");
const tableList: string[] = [];
while (await dataSet.hasNext()) {
const row = dataSet.next();
tableList.push(row.getStringByIndex(0));
}
await dataSet.close();
// Should have at least our 3 tables
const foundTables = tableNames.filter(t => tableList.includes(t));
expect(foundTables.length).toBe(3);
});
});