blob: e5464fbb73837654d8911661ed9ad65198c4fe50 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { SessionPool } from "../../src/client/SessionPool";
import { TSDataType } from "../../src/utils/DataTypes";
describe("SessionPool E2E Tests", () => {
const IOTDB_HOST = process.env.IOTDB_HOST || "localhost";
const IOTDB_PORT = parseInt(process.env.IOTDB_PORT || "6667");
const IOTDB_USER = process.env.IOTDB_USER || "root";
const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root";
let pool: SessionPool;
let isConnected = false;
beforeAll(async () => {
pool = new SessionPool(IOTDB_HOST, IOTDB_PORT, {
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 5,
minPoolSize: 2,
});
try {
await pool.init();
isConnected = true;
} catch (error) {
console.warn("Could not connect to IoTDB. E2E tests will be skipped.");
console.warn(
"Set IOTDB_HOST, IOTDB_PORT to run E2E tests against a real instance.",
);
try {
await pool.close();
} catch {
// Ignore cleanup errors
}
}
}, 60000);
afterAll(async () => {
if (pool && isConnected) {
// Cleanup test data
try {
await pool.executeNonQueryStatement("DROP DATABASE root.test");
} catch (error) {
// Ignore cleanup errors
}
await pool.close();
}
}, 60000);
test("Should initialize pool with minimum connections", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
expect(pool.getPoolSize()).toBeGreaterThanOrEqual(2);
});
test("Should execute query using pool", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const dataSet = await pool.executeQueryStatement("SHOW DATABASES");
expect(dataSet).toBeDefined();
expect(dataSet.getColumnNames()).toBeDefined();
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThan(0);
});
test("Should execute non-query using pool", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
try {
await pool.executeNonQueryStatement("CREATE DATABASE root.test");
// Should not throw
} catch (error: any) {
// Might fail if database already exists, that's ok
if (
!error.message.includes("already exists") &&
!error.message.includes("has already been created as database")
) {
throw error;
}
}
});
test("Should handle multiple concurrent queries", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(pool.executeQueryStatement("SHOW DATABASES"));
}
const dataSets = await Promise.all(promises);
expect(dataSets).toHaveLength(10);
// Close all datasets
for (const dataSet of dataSets) {
expect(dataSet).toBeDefined();
expect(dataSet.getColumnNames()).toBeDefined();
await dataSet.close();
}
});
test("Should insert tablet using pool", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const tablet = {
deviceId: "root.test.device1",
measurements: ["temperature"],
dataTypes: [TSDataType.FLOAT],
timestamps: [Date.now()],
values: [[25.5]],
};
try {
await pool.insertTablet(tablet);
// Should not throw
} catch (error: any) {
console.warn("Insert tablet via pool failed:", error.message);
}
});
test("Should support multi-node configuration", async () => {
// Skip this test in 1C1D setup - only run in 3C3D
if (!process.env.MULTI_NODE) {
console.log("Skipping multi-node test in 1C1D configuration");
return;
}
const multiNodePool = new SessionPool(
[IOTDB_HOST, "localhost"],
IOTDB_PORT,
{
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 3,
minPoolSize: 1,
},
);
try {
await multiNodePool.init();
expect(multiNodePool.getPoolSize()).toBeGreaterThanOrEqual(1);
await multiNodePool.close();
} catch (error) {
console.warn(
"Multi-node test failed, this is expected if IoTDB is not available",
);
}
});
test("Should report pool statistics", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const poolSize = pool.getPoolSize();
const availableSize = pool.getAvailableSize();
expect(poolSize).toBeGreaterThan(0);
expect(availableSize).toBeGreaterThanOrEqual(0);
expect(availableSize).toBeLessThanOrEqual(poolSize);
});
test("Should support explicit session management with getSession/releaseSession", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Get a session from the pool
const session = await pool.getSession();
expect(session).toBeDefined();
expect(session.isOpen()).toBe(true);
// Track pool statistics
const inUseBefore = pool.getInUseSize();
const availableBefore = pool.getAvailableSize();
try {
// Execute operations with the explicit session
const dataSet = await session.executeQueryStatement("SHOW DATABASES");
expect(dataSet).toBeDefined();
expect(dataSet.getColumnNames()).toBeDefined();
await dataSet.close();
// Session should still be open
expect(session.isOpen()).toBe(true);
} finally {
// Release the session back to the pool
pool.releaseSession(session);
}
// Verify pool statistics updated correctly
const inUseAfter = pool.getInUseSize();
const availableAfter = pool.getAvailableSize();
expect(inUseAfter).toBe(inUseBefore - 1);
expect(availableAfter).toBe(availableBefore + 1);
});
test("Should handle multiple explicit sessions concurrently", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const sessionPromises = [];
// Get multiple sessions concurrently
for (let i = 0; i < 3; i++) {
sessionPromises.push(
pool.getSession().then(async (session) => {
try {
// Execute a query with this session
const dataSet =
await session.executeQueryStatement("SHOW DATABASES");
expect(dataSet).toBeDefined();
await dataSet.close();
return session;
} catch (error) {
pool.releaseSession(session);
throw error;
}
}),
);
}
const sessions = await Promise.all(sessionPromises);
// All sessions should be valid
expect(sessions.length).toBe(3);
sessions.forEach((session) => {
expect(session.isOpen()).toBe(true);
});
// Release all sessions
sessions.forEach((session) => {
pool.releaseSession(session);
});
// All sessions should be available again
expect(pool.getAvailableSize()).toBeGreaterThanOrEqual(3);
});
test("Should support enhanced metrics", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Test new getter properties
expect(pool.totalCount).toBe(pool.getPoolSize());
expect(pool.idleCount).toBe(pool.getAvailableSize());
expect(pool.activeCount).toBe(pool.getInUseSize());
expect(pool.waitingCount).toBeGreaterThanOrEqual(0);
// Test getPoolStats method
const stats = pool.getPoolStats();
expect(stats).toHaveProperty("total");
expect(stats).toHaveProperty("idle");
expect(stats).toHaveProperty("active");
expect(stats).toHaveProperty("waiting");
expect(stats).toHaveProperty("endpoints");
expect(stats).toHaveProperty("redirectCacheSize");
expect(stats.total).toBe(pool.totalCount);
expect(stats.idle).toBe(pool.idleCount);
expect(stats.active).toBe(pool.activeCount);
expect(stats.waiting).toBe(pool.waitingCount);
});
test("Should insert multiple tablets concurrently using insertTabletsParallel", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Create multiple tablets for different devices
const tablets = [];
const baseTime = Date.now();
for (let i = 0; i < 10; i++) {
tablets.push({
deviceId: `root.test.parallel_device${i}`,
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [baseTime + i * 1000],
values: [[25.5 + i, 60.0 + i]],
});
}
// Insert tablets concurrently
await pool.insertTabletsParallel(tablets, { concurrency: 5 });
// Verify data was inserted by querying one device
const dataSet = await pool.executeQueryStatement(
"SELECT * FROM root.test.parallel_device0"
);
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThan(0);
});
test("Should execute multiple operations in parallel using executeParallel", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const deviceNames = ["exec_d1", "exec_d2", "exec_d3"];
// Create timeseries in parallel
const results = await pool.executeParallel(
deviceNames,
async (session, deviceName) => {
try {
await session.executeNonQueryStatement(
`CREATE TIMESERIES root.test.${deviceName}.value WITH DATATYPE=FLOAT`
);
} catch (e: any) {
// Ignore if already exists
if (!e.message.includes("already exists") && !e.message.includes("already exist")) {
throw e;
}
}
return deviceName;
},
{ concurrency: 3 }
);
expect(results).toHaveLength(3);
expect(results).toContain("exec_d1");
expect(results).toContain("exec_d2");
expect(results).toContain("exec_d3");
});
});