blob: e9a41a67e824665617fbf5102abd0f9c9784c525 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { SessionPool } from "../../src/client/SessionPool";
import { TableSessionPool } from "../../src/client/TableSessionPool";
import { TSDataType } from "../../src/utils/DataTypes";
import { ColumnCategory } from "../../src/client/Session";
describe("Redirection E2E Tests", () => {
const IOTDB_HOST = process.env.IOTDB_HOST || "localhost";
const IOTDB_PORT_1 = parseInt(process.env.IOTDB_PORT || "6667", 10);
const IOTDB_PORT_2 = parseInt(process.env.IOTDB_PORT_2 || "6668", 10);
const IOTDB_PORT_3 = parseInt(process.env.IOTDB_PORT_3 || "6669", 10);
const IOTDB_USER = process.env.IOTDB_USER || "root";
const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root";
const IS_MULTI_NODE = process.env.MULTI_NODE === "true";
let treePool: SessionPool;
let tablePool: TableSessionPool;
let isConnected = false;
beforeAll(async () => {
// Skip redirection tests if not in multi-node environment
if (!IS_MULTI_NODE) {
console.log(
"Skipping Redirection tests - not in multi-node environment (MULTI_NODE env var not set)",
);
return;
}
console.log(`Testing redirection with multi-node IoTDB cluster:`);
console.log(` DataNode 1: ${IOTDB_HOST}:${IOTDB_PORT_1}`);
console.log(` DataNode 2: ${IOTDB_HOST}:${IOTDB_PORT_2}`);
console.log(` DataNode 3: ${IOTDB_HOST}:${IOTDB_PORT_3}`);
try {
// Create tree pool with redirection enabled
treePool = new SessionPool({
nodeUrls: [
`${IOTDB_HOST}:${IOTDB_PORT_1}`,
`${IOTDB_HOST}:${IOTDB_PORT_2}`,
`${IOTDB_HOST}:${IOTDB_PORT_3}`,
],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 10,
minPoolSize: 1,
enableRedirection: true,
maxRedirectRetries: 3,
redirectCacheTTL: 300000,
});
// Create table pool with redirection enabled
tablePool = new TableSessionPool({
nodeUrls: [
`${IOTDB_HOST}:${IOTDB_PORT_1}`,
`${IOTDB_HOST}:${IOTDB_PORT_2}`,
`${IOTDB_HOST}:${IOTDB_PORT_3}`,
],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 10,
minPoolSize: 1,
enableRedirection: true,
maxRedirectRetries: 3,
redirectCacheTTL: 300000,
});
await treePool.init();
await tablePool.init();
isConnected = true;
console.log(
`Successfully connected to IoTDB cluster on ports ${IOTDB_PORT_1}, ${IOTDB_PORT_2}, ${IOTDB_PORT_3}`,
);
} catch (error) {
console.warn(
"Could not connect to IoTDB. Redirection E2E tests will be skipped.",
);
console.warn("Error:", error);
await Promise.allSettled([
treePool?.close(),
tablePool?.close(),
]);
}
}, 60000);
afterAll(async () => {
if (!IS_MULTI_NODE) {
return;
}
if (isConnected) {
// Cleanup test data
try {
await treePool?.executeNonQueryStatement("DROP DATABASE root.test_redirect");
} catch (error: any) {
if (!error.message?.includes("not exist")) {
console.warn("Cleanup error:", error);
}
}
try {
await tablePool?.executeNonQueryStatement("DROP DATABASE test_redirect");
} catch (error: any) {
if (!error.message?.includes("not exist")) {
console.warn("Cleanup error:", error);
}
}
await Promise.allSettled([
treePool?.close(),
tablePool?.close(),
]);
}
}, 60000);
describe("Tree Model Redirection", () => {
test("Should handle writes to multiple devices with potential redirects", async () => {
if (!IS_MULTI_NODE || !isConnected) {
console.log("Skipping test - no multi-node IoTDB connection");
return;
}
// Create database
try {
await treePool.executeNonQueryStatement("CREATE DATABASE root.test_redirect");
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
// Write to multiple devices - these may trigger redirects
const devices = [
"root.test_redirect.device1",
"root.test_redirect.device2",
"root.test_redirect.device3",
"root.test_redirect.device4",
"root.test_redirect.device5",
];
for (const deviceId of devices) {
const tablet = {
deviceId,
measurements: ["temperature", "humidity"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT],
timestamps: [Date.now()],
values: [[25.5 + Math.random() * 5, 60.0 + Math.random() * 10]],
};
// This should automatically handle redirects if needed
await treePool.insertTablet(tablet);
}
// Verify data was written successfully by querying
const dataSet = await treePool.executeQueryStatement(
"SELECT * FROM root.test_redirect.**"
);
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
// We should have at least 5 rows (one per device)
expect(rowCount).toBeGreaterThanOrEqual(5);
});
test("Should use cached endpoint for subsequent writes to same device", async () => {
if (!IS_MULTI_NODE || !isConnected) {
console.log("Skipping test - no multi-node IoTDB connection");
return;
}
const deviceId = "root.test_redirect.cached_device";
// First write - may trigger redirect
const tablet1 = {
deviceId,
measurements: ["temperature"],
dataTypes: [TSDataType.FLOAT],
timestamps: [Date.now()],
values: [[25.5]],
};
await treePool.insertTablet(tablet1);
// Wait a bit
await new Promise((resolve) => setTimeout(resolve, 100));
// Second write - should use cached endpoint (no redirect)
const tablet2 = {
deviceId,
measurements: ["temperature"],
dataTypes: [TSDataType.FLOAT],
timestamps: [Date.now() + 1000],
values: [[26.0]],
};
await treePool.insertTablet(tablet2);
// Verify both writes succeeded
const dataSet = await treePool.executeQueryStatement(
`SELECT * FROM ${deviceId}`
);
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThanOrEqual(2);
});
});
describe("Table Model Redirection", () => {
test("Should handle writes to multiple tables with potential redirects", async () => {
if (!IS_MULTI_NODE || !isConnected) {
console.log("Skipping test - no multi-node IoTDB connection");
return;
}
// Create database
try {
await tablePool.executeNonQueryStatement("CREATE DATABASE test_redirect");
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
// Switch to database
try {
await tablePool.executeNonQueryStatement("USE test_redirect");
} catch (error) {
console.warn("USE DATABASE error (may be expected):", error);
}
// Create table
try {
await tablePool.executeNonQueryStatement(
"CREATE TABLE sensor_data (device_id STRING TAG, temperature FLOAT FIELD)"
);
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
// Write to table multiple times - may trigger redirects
for (let i = 0; i < 5; i++) {
const tablet = {
tableName: "sensor_data",
columnNames: ["device_id", "temperature"],
columnTypes: [TSDataType.STRING, TSDataType.FLOAT],
columnCategories: [ColumnCategory.TAG, ColumnCategory.FIELD],
timestamps: [Date.now() + i * 1000],
values: [[`device_${i}`, 25.5 + i]],
};
// This should automatically handle redirects if needed
await tablePool.insertTablet(tablet);
}
// Verify data was written successfully
const dataSet = await tablePool.executeQueryStatement(
"SELECT * FROM sensor_data"
);
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThanOrEqual(5);
});
});
describe("Redirection Configuration", () => {
test("Should respect enableRedirection=false setting", async () => {
if (!IS_MULTI_NODE || !isConnected) {
console.log("Skipping test - no multi-node IoTDB connection");
return;
}
// Create a pool with redirection disabled
const noRedirectPool = new SessionPool({
nodeUrls: [`${IOTDB_HOST}:${IOTDB_PORT_1}`],
username: IOTDB_USER,
password: IOTDB_PASSWORD,
maxPoolSize: 5,
minPoolSize: 1,
enableRedirection: false,
});
try {
await noRedirectPool.init();
// Write should still work (but may be less optimal)
const tablet = {
deviceId: "root.test_redirect.no_redirect_device",
measurements: ["temperature"],
dataTypes: [TSDataType.FLOAT],
timestamps: [Date.now()],
values: [[25.5]],
};
// This should NOT use redirection even if server suggests it
await noRedirectPool.insertTablet(tablet);
// Verify write succeeded
const dataSet = await noRedirectPool.executeQueryStatement(
"SELECT * FROM root.test_redirect.no_redirect_device"
);
let rowCount = 0;
while (await dataSet.hasNext()) {
dataSet.next();
rowCount++;
}
await dataSet.close();
expect(rowCount).toBeGreaterThanOrEqual(1);
} finally {
await noRedirectPool.close();
}
});
});
});