blob: d01ce2428e4586ddcf23019d27a7f99ab0250d8f [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { SessionPool } from "../../src/client/SessionPool";
import { TSDataType } from "../../src/utils/DataTypes";
describe("Multi-Node E2E Tests", () => {
const IOTDB_HOST = process.env.IOTDB_HOST || "localhost";
const IOTDB_PORT_1 = parseInt(process.env.IOTDB_PORT || "6667", 10);
const IOTDB_PORT_2 = parseInt(process.env.IOTDB_PORT_2 || "6668", 10);
const IOTDB_PORT_3 = parseInt(process.env.IOTDB_PORT_3 || "6669", 10);
const IOTDB_USER = process.env.IOTDB_USER || "root";
const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root";
const IS_MULTI_NODE = process.env.MULTI_NODE === "true";
let pool1: SessionPool;
let pool2: SessionPool;
let pool3: SessionPool;
let isConnected = false;
beforeAll(async () => {
// Skip multi-node tests if not in multi-node environment
if (!IS_MULTI_NODE) {
console.log(
"Skipping Multi-Node tests - not in multi-node environment (MULTI_NODE env var not set)",
);
return;
}
// For 3C3D setup, connect to all three DataNode ports
// This enables true multi-node load distribution and testing
console.log(`Connecting to IoTDB cluster:`);
console.log(` DataNode 1: ${IOTDB_HOST}:${IOTDB_PORT_1}`);
console.log(` DataNode 2: ${IOTDB_HOST}:${IOTDB_PORT_2}`);
console.log(` DataNode 3: ${IOTDB_HOST}:${IOTDB_PORT_3}`);
try {
// Create three pools, each connected to a different DataNode
pool1 = new SessionPool(IOTDB_HOST, IOTDB_PORT_1, {
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 5,
minPoolSize: 2,
});
pool2 = new SessionPool(IOTDB_HOST, IOTDB_PORT_2, {
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 5,
minPoolSize: 2,
});
pool3 = new SessionPool(IOTDB_HOST, IOTDB_PORT_3, {
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 5,
minPoolSize: 2,
});
await pool1.init();
await pool2.init();
await pool3.init();
isConnected = true;
console.log(
`Successfully connected to IoTDB cluster on ports ${IOTDB_PORT_1}, ${IOTDB_PORT_2}, ${IOTDB_PORT_3}`,
);
} catch (error) {
console.warn(
"Could not connect to IoTDB. Multi-node E2E tests will be skipped.",
);
console.warn("Error:", error);
await Promise.allSettled([
pool1?.close(),
pool2?.close(),
pool3?.close(),
]);
}
}, 60000);
// No beforeEach cleanup - tests handle "already exists" errors like other test files
afterAll(async () => {
if (!IS_MULTI_NODE) {
return;
}
if (isConnected) {
try {
await pool1.executeNonQueryStatement("DROP DATABASE root.test");
} catch (error) {
// Ignore cleanup errors
}
// Close pools in parallel with timeout protection
await Promise.allSettled([pool1.close(), pool2.close(), pool3.close()]);
}
}, 90000); // Increased timeout for multi-node cleanup
test("Should initialize pools with connections to all three DataNodes", async () => {
if (!IS_MULTI_NODE || !isConnected) {
console.log(
"Skipping test - not in multi-node environment or no IoTDB connection",
);
return;
}
expect(pool1.getPoolSize()).toBeGreaterThanOrEqual(2);
expect(pool2.getPoolSize()).toBeGreaterThanOrEqual(2);
expect(pool3.getPoolSize()).toBeGreaterThanOrEqual(2);
console.log(
`Pools initialized: DataNode1=${pool1.getPoolSize()}, DataNode2=${pool2.getPoolSize()}, DataNode3=${pool3.getPoolSize()} connections`,
);
});
test("Should create database and timeseries on first node", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Create database - ignore if already exists (like other tests)
try {
await pool1.executeNonQueryStatement("CREATE DATABASE root.test");
} catch (error: any) {
if (
!error.message?.includes("already exist") &&
!error.message?.includes("has already been created")
) {
throw error;
}
}
// Create timeseries - ignore if already exist
try {
await pool1.executeNonQueryStatement(
"CREATE TIMESERIES root.test.device1.temperature WITH DATATYPE=FLOAT",
);
} catch (error: any) {
if (!error.message.includes("already")) {
throw error;
}
}
try {
await pool1.executeNonQueryStatement(
"CREATE TIMESERIES root.test.device1.humidity WITH DATATYPE=FLOAT",
);
} catch (error: any) {
if (!error.message.includes("already")) {
throw error;
}
}
});
test("Should handle concurrent load distributed across all three DataNodes", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const operationsPerNode = 7;
const promises: Promise<any>[] = [];
// Distribute operations across all three DataNode pools
for (let i = 0; i < operationsPerNode; i++) {
// Insert to DataNode 1
promises.push(
pool1.insertTablet({
deviceId: "root.test.device1",
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [Date.now() + i * 1000],
values: [[20 + i * 0.1, 60 + i * 0.2]],
}),
);
// Query from DataNode 2
promises.push(
pool2.executeQueryStatement("SELECT * FROM root.test.device1 LIMIT 10"),
);
// Insert to DataNode 3
promises.push(
pool3.insertTablet({
deviceId: "root.test.device1",
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [Date.now() + i * 1000 + 500],
values: [[21 + i * 0.1, 61 + i * 0.2]],
}),
);
}
const results = await Promise.all(promises);
expect(results).toHaveLength(operationsPerNode * 3);
// Close all SessionDataSet objects from query operations
for (const result of results) {
if (result && typeof result.close === "function") {
await result.close();
}
}
console.log(
`Completed ${operationsPerNode * 3} concurrent operations across 3 DataNodes`,
);
console.log(
`Pool stats: DN1=${pool1.getPoolSize()}/${pool1.getAvailableSize()}, DN2=${pool2.getPoolSize()}/${pool2.getAvailableSize()}, DN3=${pool3.getPoolSize()}/${pool3.getAvailableSize()}`,
);
});
test("Should verify data replication across all DataNodes", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Insert data through DataNode 1
await pool1.insertTablet({
deviceId: "root.test.device1",
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [Date.now()],
values: [[99.9, 99.9]],
});
// Wait briefly for replication (optimized from 2000ms to 200ms)
await new Promise((resolve) => setTimeout(resolve, 200));
// Query from all three DataNodes - should see the same data
const dataSet1 = await pool1.executeQueryStatement(
"SELECT COUNT(*) FROM root.test.device1",
);
const dataSet2 = await pool2.executeQueryStatement(
"SELECT COUNT(*) FROM root.test.device1",
);
const dataSet3 = await pool3.executeQueryStatement(
"SELECT COUNT(*) FROM root.test.device1",
);
let count1 = 0;
let count2 = 0;
let count3 = 0;
while (await dataSet1.hasNext()) {
await dataSet1.next();
count1++;
}
while (await dataSet2.hasNext()) {
await dataSet2.next();
count2++;
}
while (await dataSet3.hasNext()) {
await dataSet3.next();
count3++;
}
await dataSet1.close();
await dataSet2.close();
await dataSet3.close();
expect(count1).toBeGreaterThan(0);
expect(count2).toBeGreaterThan(0);
expect(count3).toBeGreaterThan(0);
console.log(
`Data replicated across all DataNodes - verified queries from ports 6667, 6668, 6669`,
);
});
test("Should handle large batch inserts across multiple DataNodes", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const batchSize = 100;
const timestamps: number[] = [];
const values: number[][] = [];
const baseTime = Date.now();
for (let i = 0; i < batchSize; i++) {
timestamps.push(baseTime + i * 1000);
values.push([25 + Math.random() * 5, 65 + Math.random() * 10]);
}
// Insert through different DataNodes
await pool1.insertTablet({
deviceId: "root.test.device1",
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps,
values,
});
// Verify data from different DataNode
const dataSet = await pool2.executeQueryStatement(
"SELECT COUNT(*) FROM root.test.device1",
);
let rowCount = 0;
while (await dataSet.hasNext()) {
await dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThan(0);
console.log(
`Inserted ${batchSize} records via DataNode1, queried via DataNode2`,
);
});
test("Should maintain all pool healths under stress", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
const initialSize1 = pool1.getPoolSize();
const initialSize2 = pool2.getPoolSize();
const initialSize3 = pool3.getPoolSize();
// Execute operations across all DataNodes (reduced to 5 per pool to avoid timeout)
const promises: Promise<any>[] = [];
for (let i = 0; i < 5; i++) {
promises.push(pool1.executeQueryStatement("SHOW DATABASES"));
promises.push(pool2.executeQueryStatement("SHOW DATABASES"));
promises.push(pool3.executeQueryStatement("SHOW DATABASES"));
}
const dataSets = await Promise.all(promises);
// Close all SessionDataSets in parallel (much faster)
await Promise.all(dataSets.map((ds) => ds.close()));
// Pools should maintain their sizes
expect(pool1.getPoolSize()).toBeGreaterThanOrEqual(initialSize1);
expect(pool2.getPoolSize()).toBeGreaterThanOrEqual(initialSize2);
expect(pool3.getPoolSize()).toBeGreaterThanOrEqual(initialSize3);
console.log("All three pool healths maintained after stress test");
}, 30000); // Increased timeout from default to 30s
test("Should handle queries across all DataNodes simultaneously", async () => {
if (!isConnected) {
console.log("Skipping test - no IoTDB connection");
return;
}
// First insert some data to ensure there's something to query
await pool1.insertTablet({
deviceId: "root.test.device1",
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [Date.now(), Date.now() + 1000, Date.now() + 2000],
values: [
[20.0, 60.0],
[21.0, 61.0],
[22.0, 62.0],
],
});
// Brief delay for data availability (optimized from 500ms to 100ms)
await new Promise((resolve) => setTimeout(resolve, 100));
// Execute queries simultaneously on all three DataNodes
const [dataSet1, dataSet2, dataSet3] = await Promise.all([
pool1.executeQueryStatement("SELECT * FROM root.test.device1 LIMIT 5"),
pool2.executeQueryStatement("SELECT * FROM root.test.device1 LIMIT 5"),
pool3.executeQueryStatement("SELECT * FROM root.test.device1 LIMIT 5"),
]);
// Count rows in parallel for better performance
const [count1, count2, count3] = await Promise.all([
(async () => {
let count = 0;
while (await dataSet1.hasNext()) {
await dataSet1.next();
count++;
}
return count;
})(),
(async () => {
let count = 0;
while (await dataSet2.hasNext()) {
await dataSet2.next();
count++;
}
return count;
})(),
(async () => {
let count = 0;
while (await dataSet3.hasNext()) {
await dataSet3.next();
count++;
}
return count;
})(),
]);
// Close all in parallel
await Promise.all([dataSet1.close(), dataSet2.close(), dataSet3.close()]);
expect(count1).toBeGreaterThan(0);
expect(count2).toBeGreaterThan(0);
expect(count3).toBeGreaterThan(0);
console.log(
`Simultaneous queries across 3 DataNodes: DN1=${count1}, DN2=${count2}, DN3=${count3} rows`,
);
}, 20000); // Increased timeout to 20s
test("Should support nodeUrls configuration for multi-node setup", async () => {
if (!IS_MULTI_NODE) {
console.log("Skipping test - not in multi-node environment");
return;
}
// Create a pool using nodeUrls in string format (RECOMMENDED)
const nodeUrlsPool = new SessionPool({
nodeUrls: [
`${IOTDB_HOST}:${IOTDB_PORT_1}`,
`${IOTDB_HOST}:${IOTDB_PORT_2}`,
`${IOTDB_HOST}:${IOTDB_PORT_3}`,
],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 6,
minPoolSize: 3,
});
try {
await nodeUrlsPool.init();
expect(nodeUrlsPool.getPoolSize()).toBeGreaterThanOrEqual(3);
// Execute a query to verify it works
const dataSet =
await nodeUrlsPool.executeQueryStatement("SHOW DATABASES");
expect(dataSet.getColumnNames()).toBeDefined();
await dataSet.close();
console.log("nodeUrls string format configuration working correctly");
} finally {
await nodeUrlsPool.close();
}
}, 10000); // Add explicit 10s timeout
test("Should support nodeUrls configuration in object format", async () => {
if (!IS_MULTI_NODE) {
console.log("Skipping test - not in multi-node environment");
return;
}
// Create a pool using nodeUrls in object format (also supported)
const nodeUrlsPool = new SessionPool({
nodeUrls: [
{ host: IOTDB_HOST, port: IOTDB_PORT_1 },
{ host: IOTDB_HOST, port: IOTDB_PORT_2 },
{ host: IOTDB_HOST, port: IOTDB_PORT_3 },
],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 6,
minPoolSize: 3,
});
try {
await nodeUrlsPool.init();
expect(nodeUrlsPool.getPoolSize()).toBeGreaterThanOrEqual(3);
// Execute a query to verify it works
const dataSet =
await nodeUrlsPool.executeQueryStatement("SHOW DATABASES");
expect(dataSet.getColumnNames()).toBeDefined();
await dataSet.close();
console.log("nodeUrls object format configuration working correctly");
} finally {
await nodeUrlsPool.close();
}
}, 10000); // Add explicit 10s timeout
});