blob: b5c81c225c96c97acd65a1a0d9204db093cc28b6 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Session } from "../../src/client/Session";
import { TSDataType } from "../../src/utils/DataTypes";
describe("Large Query E2E Tests", () => {
const IOTDB_HOST = process.env.IOTDB_HOST || "localhost";
const IOTDB_PORT = parseInt(process.env.IOTDB_PORT || "6667");
const IOTDB_USER = process.env.IOTDB_USER || "root";
const IOTDB_PASSWORD = process.env.IOTDB_PASSWORD || "root";
let session: Session;
beforeAll(async () => {
session = new Session({
host: IOTDB_HOST,
port: IOTDB_PORT,
username: IOTDB_USER,
password: IOTDB_PASSWORD,
fetchSize: 100, // Small fetch size to force multiple fetchResult calls
});
try {
await session.open();
console.log("Connected to IoTDB for large query tests");
} catch (error) {
console.warn("Could not connect to IoTDB. E2E tests will be skipped.");
console.warn(
"Set IOTDB_HOST, IOTDB_PORT to run E2E tests against a real instance.",
);
try {
await session.close();
} catch {
// Ignore cleanup errors
}
}
}, 60000);
afterAll(async () => {
if (session && session.isOpen()) {
// Cleanup test data
try {
await session.executeNonQueryStatement("DROP DATABASE root.test");
} catch (error) {
// Ignore cleanup errors
}
await session.close();
}
}, 60000);
test("Should prepare test database and timeseries", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
try {
await session.executeNonQueryStatement("CREATE DATABASE root.test");
} catch (error: any) {
if (!error.message?.includes("already")) {
throw error;
}
}
// Create timeseries for large dataset - handle if they already exist
try {
await session.executeNonQueryStatement(
"CREATE TIMESERIES root.test.device1.sensor1 WITH DATATYPE=FLOAT, ENCODING=RLE",
);
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
try {
await session.executeNonQueryStatement(
"CREATE TIMESERIES root.test.device1.sensor2 WITH DATATYPE=FLOAT, ENCODING=RLE",
);
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
try {
await session.executeNonQueryStatement(
"CREATE TIMESERIES root.test.device1.sensor3 WITH DATATYPE=FLOAT, ENCODING=RLE",
);
} catch (error: any) {
if (!error.message?.includes("already exists")) {
throw error;
}
}
});
test("Should insert large dataset (5,000 records)", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
const batchSize = 250;
const totalRecords = 5000;
const baseTime = Date.now();
// Insert data in batches
for (let i = 0; i < totalRecords; i += batchSize) {
const timestamps: number[] = [];
const values: number[][] = [];
for (let j = 0; j < batchSize && i + j < totalRecords; j++) {
timestamps.push(baseTime + (i + j)); // 1ms interval
values.push([
20 + Math.random() * 10, // sensor1
50 + Math.random() * 20, // sensor2
100 + Math.random() * 50, // sensor3
]);
}
await session.insertTablet({
deviceId: "root.test.device1",
measurements: ["sensor1", "sensor2", "sensor3"],
dataTypes: [TSDataType.FLOAT, TSDataType.FLOAT, TSDataType.FLOAT],
timestamps,
values,
});
}
console.log(`Inserted ${totalRecords} records for large query test`);
});
test("Should query large dataset requiring multiple fetchResult calls", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Query all data - with fetchSize=100, this should require multiple fetchResult calls
const dataSet = await session.executeQueryStatement(
"SELECT * FROM root.test.device1",
);
expect(dataSet).toBeDefined();
const columnNames = dataSet.getColumnNames();
expect(columnNames).toBeDefined();
expect(columnNames.length).toBeGreaterThan(0);
const rows = [];
while (await dataSet.hasNext()) {
const row = await dataSet.next();
rows.push(row);
}
await dataSet.close();
expect(rows.length).toBeGreaterThanOrEqual(5000);
console.log(`Retrieved ${rows.length} rows with fetchSize=100`);
console.log(`Columns: ${columnNames.join(", ")}`);
}, 30000);
test("Should query with filters on large dataset", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
const dataSet = await session.executeQueryStatement(
"SELECT sensor1, sensor2 FROM root.test.device1 WHERE sensor1 > 25",
);
expect(dataSet).toBeDefined();
const columnNames = dataSet.getColumnNames();
expect(columnNames).toBeDefined();
const rows = [];
while (await dataSet.hasNext()) {
const row = await dataSet.next();
rows.push(row);
}
await dataSet.close();
expect(rows.length).toBeGreaterThan(0);
console.log(`Filtered query returned ${rows.length} rows`);
}, 30000);
test("Should query with aggregation on large dataset", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
// IoTDB 2.x requires separate aggregation queries
const countDataSet = await session.executeQueryStatement(
"SELECT COUNT(sensor1) FROM root.test.device1",
);
expect(countDataSet).toBeDefined();
const countColumns = countDataSet.getColumnNames();
expect(countColumns).toBeDefined();
const countRows = [];
while (await countDataSet.hasNext()) {
const row = await countDataSet.next();
countRows.push(row);
}
await countDataSet.close();
expect(countRows.length).toBeGreaterThan(0);
console.log("COUNT result:", countRows[0]);
const avgDataSet = await session.executeQueryStatement(
"SELECT AVG(sensor1) FROM root.test.device1",
);
expect(avgDataSet).toBeDefined();
const avgRows = [];
while (await avgDataSet.hasNext()) {
const row = await avgDataSet.next();
avgRows.push(row);
}
await avgDataSet.close();
expect(avgRows.length).toBeGreaterThan(0);
console.log("AVG result:", avgRows[0]);
}, 30000);
test("Should query with LIMIT on large dataset", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
// Query the first 100 records using LIMIT
const dataSet = await session.executeQueryStatement(
`SELECT * FROM root.test.device1 LIMIT 100`,
);
expect(dataSet).toBeDefined();
const rows = [];
while (await dataSet.hasNext()) {
const row = await dataSet.next();
rows.push(row);
}
await dataSet.close();
expect(rows.length).toBeGreaterThan(0);
console.log(`LIMIT query returned ${rows.length} rows`);
}, 30000);
test("Should handle multiple concurrent large queries", async () => {
if (!session.isOpen()) {
console.log("Skipping test - no IoTDB connection");
return;
}
const queries = [
"SELECT sensor1 FROM root.test.device1",
"SELECT sensor2 FROM root.test.device1",
"SELECT sensor3 FROM root.test.device1",
"SELECT COUNT(*) FROM root.test.device1",
];
const dataSets = await Promise.all(
queries.map((query) => session.executeQueryStatement(query)),
);
expect(dataSets).toHaveLength(4);
for (let index = 0; index < dataSets.length; index++) {
const dataSet = dataSets[index];
expect(dataSet).toBeDefined();
const rows = [];
while (await dataSet.hasNext()) {
const row = await dataSet.next();
rows.push(row);
}
await dataSet.close();
console.log(`Query ${index + 1} returned ${rows.length} rows`);
}
}, 60000); // Increased timeout for concurrent queries
});