本文档针对以下三个核心技术领域提供详细的实现规划:
Java 参考实现特性:
deviceIdToEndpoint 映射缓存endPointToSessionConnection 连接池RedirectException 捕获和重试逻辑enableRedirection 配置开关Node.js 当前实现:
性能影响: 缺少 Redirection 支持导致:
Java 参考实现特性:
Node.js 当前实现:
代码位置:
serializeTabletValues(), serializeColumn(), serializeBitMaps()Java 参考实现特性:
Node.js 当前实现:
代码位置:
parseQueryResult(), parseTsBlock()┌─────────────────────────────────────────────────────┐
│ SessionPool (with Redirection) │
│ ┌──────────────────────────────────────────────┐ │
│ │ deviceIdToEndpoint: Map<string, EndPoint> │ │
│ │ endPointToConnection: Map<string, Session> │ │
│ │ enableRedirection: boolean │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓
Try insertTablet(deviceId)
↓
┌──────────────────────┐
│ Check cache │
│ deviceIdToEndpoint │
└──────────────────────┘
↓
Has cached? ───Yes──→ Use optimal connection
│
No
↓
Use default connection (round-robin)
↓
┌──────────────────────┐
│ Execute insert │
└──────────────────────┘
↓
Catch RedirectException?
│ │
No Yes
│ │
↓ ↓
Success ┌──────────────────────┐
│ handleRedirection │
│ - Cache endpoint │
│ - Get/create conn │
│ - Retry with conn │
└──────────────────────┘
文件: Config.ts
export interface PoolConfig extends Config { // ... existing fields ... /** * Enable automatic redirection for write operations. * When enabled, the client caches device-to-endpoint mappings * and routes subsequent writes directly to optimal nodes. * * @default true */ enableRedirection?: boolean; /** * Maximum number of redirect retries before failing. * * @default 3 */ maxRedirectRetries?: number; /** * Time-to-live for cached redirect mappings (ms). * Set to 0 for no expiration. * * @default 300000 (5 minutes) */ redirectCacheTTL?: number; } export const DEFAULT_POOL_CONFIG: Partial<PoolConfig> = { ...DEFAULT_CONFIG, maxPoolSize: 10, minPoolSize: 1, maxIdleTime: 60000, waitTimeout: 60000, enableRedirection: true, maxRedirectRetries: 3, redirectCacheTTL: 300000, };
文件: src/utils/Errors.ts (新建)
/** * Represents a redirect recommendation from the server. * Thrown when the server suggests a better endpoint for a device. */ export class RedirectException extends Error { public readonly endpoint: EndPoint; public readonly deviceId: string; constructor(deviceId: string, endpoint: EndPoint, message?: string) { super( message || `Redirect recommended for device ${deviceId} to ${endpoint.host}:${endpoint.port}`, ); this.name = "RedirectException"; this.deviceId = deviceId; this.endpoint = endpoint; } static fromThriftStatus( status: any, deviceId: string, ): RedirectException | null { // Check for REDIRECTION_RECOMMEND status code (Java uses 531) if (status.code === 531 && status.redirectNode) { return new RedirectException( deviceId, { host: status.redirectNode.internalIp || status.redirectNode.ip, port: status.redirectNode.port, }, status.message, ); } return null; } } export enum TSStatusCode { SUCCESS_STATUS = 200, REDIRECTION_RECOMMEND = 531, // ... other codes }
文件: src/client/RedirectCache.ts (新建)
import { EndPoint } from "../utils/Config"; import { logger } from "../utils/Logger"; interface CacheEntry { endpoint: EndPoint; timestamp: number; } /** * Cache for device-to-endpoint redirect mappings. * Supports TTL-based expiration and LRU eviction. */ export class RedirectCache { private deviceToEndpoint: Map<string, CacheEntry> = new Map(); private ttl: number; private maxSize: number; constructor(ttl: number = 300000, maxSize: number = 10000) { this.ttl = ttl; this.maxSize = maxSize; } /** * Get cached endpoint for a device. * Returns null if not cached or expired. */ get(deviceId: string): EndPoint | null { const entry = this.deviceToEndpoint.get(deviceId); if (!entry) { return null; } // Check expiration if (this.ttl > 0 && Date.now() - entry.timestamp > this.ttl) { this.deviceToEndpoint.delete(deviceId); logger.debug(`Redirect cache expired for device: ${deviceId}`); return null; } return entry.endpoint; } /** * Cache endpoint for a device. */ set(deviceId: string, endpoint: EndPoint): void { // Evict oldest entry if cache is full (simple LRU) if (this.deviceToEndpoint.size >= this.maxSize) { const firstKey = this.deviceToEndpoint.keys().next().value; if (firstKey) { this.deviceToEndpoint.delete(firstKey); logger.debug(`Evicted oldest redirect cache entry: ${firstKey}`); } } this.deviceToEndpoint.set(deviceId, { endpoint, timestamp: Date.now(), }); logger.debug( `Cached redirect: ${deviceId} -> ${endpoint.host}:${endpoint.port}`, ); } /** * Remove cached endpoint for a device. */ remove(deviceId: string): void { this.deviceToEndpoint.delete(deviceId); logger.debug(`Removed redirect cache for device: ${deviceId}`); } /** * Clear all cached mappings. */ clear(): void { this.deviceToEndpoint.clear(); logger.debug("Cleared all redirect cache entries"); } /** * Get cache statistics. */ getStats(): { size: number; maxSize: number; ttl: number } { return { size: this.deviceToEndpoint.size, maxSize: this.maxSize, ttl: this.ttl, }; } }
文件: BaseSessionPool.ts
修改点:
export abstract class BaseSessionPool { protected config: PoolConfig; protected endPoints: EndPoint[]; protected pool: PooledSession[] = []; // NEW: Redirection support protected redirectCache: RedirectCache; protected endPointToSession: Map<string, PooledSession> = new Map(); constructor(/* ... */) { // ... existing code ... // Initialize redirect cache this.redirectCache = new RedirectCache( this.config.redirectCacheTTL || 300000, 10000, ); } /** * Get endpoint key for map lookup. */ private getEndPointKey(endpoint: EndPoint): string { return `${endpoint.host}:${endpoint.port}`; } /** * Get or create session for specific endpoint. * Used for redirected connections. */ protected async getSessionForEndpoint(endpoint: EndPoint): Promise<Session> { const key = this.getEndPointKey(endpoint); // Check if we already have a connection to this endpoint let pooledSession = this.endPointToSession.get(key); if (pooledSession && pooledSession.session.isOpen()) { return pooledSession.session; } // Create new session for this endpoint logger.debug(`Creating new session for redirect endpoint: ${key}`); const session = new Session({ ...this.config, host: endpoint.host, port: endpoint.port, } as InternalConfig); await session.open(); pooledSession = { session, lastUsed: Date.now(), inUse: false, }; this.endPointToSession.set(key, pooledSession); this.pool.push(pooledSession); return session; } /** * Handle redirect exception and retry insert. */ protected async handleRedirection( deviceId: string, endpoint: EndPoint, insertFn: (session: Session) => Promise<void>, retryCount: number = 0, ): Promise<void> { if (!this.config.enableRedirection) { throw new Error("Redirection is disabled"); } const maxRetries = this.config.maxRedirectRetries || 3; if (retryCount >= maxRetries) { throw new Error( `Max redirect retries (${maxRetries}) exceeded for device: ${deviceId}`, ); } // Ignore "no redirect" indicator (0.0.0.0) if (endpoint.host === "0.0.0.0") { logger.debug(`No redirection needed for device: ${deviceId}`); return; } try { // Cache the redirect mapping this.redirectCache.set(deviceId, endpoint); // Get or create session for redirect endpoint const session = await this.getSessionForEndpoint(endpoint); // Retry insert with redirected connection await insertFn(session); logger.info( `Successfully inserted via redirect: ${deviceId} -> ${endpoint.host}:${endpoint.port}`, ); } catch (error: any) { // If redirect connection fails, remove from cache and retry with default this.redirectCache.remove(deviceId); logger.warn( `Redirect connection failed for ${deviceId}, removed from cache:`, error.message, ); throw error; } } /** * Insert tablet with redirection support. */ async insertTablet( tablet: TreeTablet | ITreeTablet | TableTablet | ITableTablet, ): Promise<void> { // Extract device ID from tablet const deviceId = "deviceId" in tablet ? tablet.deviceId : tablet.tableName; // Check redirect cache const cachedEndpoint = this.config.enableRedirection ? this.redirectCache.get(deviceId) : null; let session: Session; if (cachedEndpoint) { // Use cached redirect endpoint logger.debug( `Using cached redirect for ${deviceId}: ${cachedEndpoint.host}:${cachedEndpoint.port}`, ); session = await this.getSessionForEndpoint(cachedEndpoint); } else { // Use default round-robin session session = await this.getSession(); } try { await session.insertTablet(tablet); if (!cachedEndpoint) { // Release session back to pool if not using cached endpoint this.releaseSession(session); } } catch (error: any) { // Check if this is a redirect exception if (this.config.enableRedirection && this.isRedirectError(error)) { const endpoint = this.extractRedirectEndpoint(error); if (endpoint) { logger.info( `Received redirect recommendation: ${deviceId} -> ${endpoint.host}:${endpoint.port}`, ); // Release current session if (!cachedEndpoint) { this.releaseSession(session); } // Handle redirection and retry await this.handleRedirection( deviceId, endpoint, async (redirectSession) => { await redirectSession.insertTablet(tablet); }, ); return; } } // Not a redirect error or redirect disabled, release and rethrow if (!cachedEndpoint) { this.releaseSession(session); } throw error; } } /** * Check if error is a redirect recommendation. */ private isRedirectError(error: any): boolean { // Check for redirect status code (Java uses 531) return ( error.message && (error.message.includes("REDIRECTION_RECOMMEND") || error.message.includes("redirect") || error.code === 531) ); } /** * Extract redirect endpoint from error. */ private extractRedirectEndpoint(error: any): EndPoint | null { try { // Parse redirect info from error message or status // Format varies, need to check actual IoTDB response if (error.redirectNode) { return { host: error.redirectNode.internalIp || error.redirectNode.ip, port: error.redirectNode.port, }; } // TODO: Add more parsing logic based on actual error format return null; } catch (e) { logger.warn("Failed to extract redirect endpoint:", e); return null; } } }
文件: Session.ts
修改 insertTreeTabletInternal 和 insertTableTabletInternal 以捕获 Thrift 响应并检查 redirect 状态:
private async insertTreeTabletInternal(tablet: TreeTablet | ITreeTablet): Promise<void> { logger.debug(`Inserting tree tablet for device: ${tablet.deviceId}`); const client = this.connection.getClient(); const sessionId = this.connection.getSessionId(); // ... existing serialization code ... const req = new ttypes.TSInsertTabletReq({ sessionId: sessionId, prefixPath: tablet.deviceId, measurements: tablet.measurements, values: this.serializeTabletValues(/*...*/), timestamps: timestampBuffer, types: tablet.dataTypes, size: tablet.timestamps.length, isAligned: false, }); return new Promise((resolve, reject) => { client.insertTablet(req, (err: Error, response: any) => { if (err) { reject(err); return; } // NEW: Check for redirect recommendation if (response.code === 531 && response.redirectNode) { const redirectError: any = new Error('REDIRECTION_RECOMMEND'); redirectError.code = 531; redirectError.redirectNode = { ip: response.redirectNode.ip, internalIp: response.redirectNode.internalIp, port: response.redirectNode.port, }; redirectError.deviceId = tablet.deviceId; reject(redirectError); return; } if (response.code !== 200) { reject(new Error(response.message || 'Insert tablet failed')); return; } resolve(); }); }); }
文件: tests/unit/RedirectCache.test.ts (新建)
describe("RedirectCache", () => { test("should cache and retrieve endpoint", () => { const cache = new RedirectCache(60000); const endpoint = { host: "node1", port: 6667 }; cache.set("device1", endpoint); const cached = cache.get("device1"); expect(cached).toEqual(endpoint); }); test("should expire entries after TTL", async () => { const cache = new RedirectCache(100); // 100ms TTL cache.set("device1", { host: "node1", port: 6667 }); await new Promise((resolve) => setTimeout(resolve, 150)); const cached = cache.get("device1"); expect(cached).toBeNull(); }); test("should evict oldest when full", () => { const cache = new RedirectCache(60000, 2); // maxSize=2 cache.set("device1", { host: "node1", port: 6667 }); cache.set("device2", { host: "node2", port: 6667 }); cache.set("device3", { host: "node3", port: 6667 }); expect(cache.get("device1")).toBeNull(); // Evicted expect(cache.get("device2")).not.toBeNull(); expect(cache.get("device3")).not.toBeNull(); }); });
文件: tests/e2e/Redirection.test.ts (新建)
describe("Redirection E2E Tests", () => { test("should handle redirect and cache endpoint", async () => { const pool = new SessionPool({ nodeUrls: ["node1:6667", "node2:6667", "node3:6667"], enableRedirection: true, maxPoolSize: 10, }); await pool.init(); // First insert may trigger redirect await pool.insertTablet({ deviceId: "root.redirect.device1", measurements: ["s1"], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now()], values: [[25.5]], }); // Second insert should use cached endpoint (check logs) await pool.insertTablet({ deviceId: "root.redirect.device1", measurements: ["s1"], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now() + 1000], values: [[26.0]], }); await pool.close(); }); });
预期改进:
基准测试计划:
# Without redirection ENABLE_REDIRECTION=false CLIENT_NUMBER=50 DEVICE_NUMBER=1000 node benchmark/benchmark-tree.js # With redirection ENABLE_REDIRECTION=true CLIENT_NUMBER=50 DEVICE_NUMBER=1000 node benchmark/benchmark-tree.js
已实现:
优化空间:
文件: src/utils/TSEncoding.ts (新建)
/** * TSEncoding types supported by Apache IoTDB */ export enum TSEncoding { PLAIN = 0, // No compression RLE = 2, // Run-Length Encoding TS_2DIFF = 3, // Two-level delta encoding (for timestamps) GORILLA = 4, // Gorilla compression (for float/double) DICTIONARY = 5, // Dictionary encoding (for strings) // ... other encodings } /** * Encoder interface for different compression strategies */ export interface ValueEncoder { encode( values: any[], dataType: number, nullIndicators: boolean[] | null, ): Buffer; } /** * PLAIN encoder - no compression, just serialize values as-is */ export class PlainEncoder implements ValueEncoder { encode( values: any[], dataType: number, nullIndicators: boolean[] | null, ): Buffer { // Delegate to existing serializeColumn logic // This is the default implementation return serializeColumnPlain(values, dataType); } } /** * RLE encoder - compresses runs of identical values * Format: [count: INT32][value: TYPE-SPECIFIC] */ export class RleEncoder implements ValueEncoder { encode( values: any[], dataType: number, nullIndicators: boolean[] | null, ): Buffer { const runs: Array<{ value: any; count: number }> = []; // Find runs of identical values let currentValue = values[0]; let currentCount = 1; for (let i = 1; i < values.length; i++) { if (this.valuesEqual(values[i], currentValue, dataType)) { currentCount++; } else { runs.push({ value: currentValue, count: currentCount }); currentValue = values[i]; currentCount = 1; } } runs.push({ value: currentValue, count: currentCount }); // Serialize runs const buffers: Buffer[] = []; // Write number of runs const runCountBuffer = Buffer.alloc(4); runCountBuffer.writeInt32BE(runs.length); buffers.push(runCountBuffer); // Write each run for (const run of runs) { // Write count const countBuffer = Buffer.alloc(4); countBuffer.writeInt32BE(run.count); buffers.push(countBuffer); // Write value (single value serialization) buffers.push(this.serializeSingleValue(run.value, dataType)); } return Buffer.concat(buffers); } private valuesEqual(a: any, b: any, dataType: number): boolean { if (a === null || b === null) return a === b; switch (dataType) { case 0: // BOOLEAN case 1: // INT32 case 9: // DATE return a === b; case 2: // INT64 case 8: // TIMESTAMP return BigInt(a) === BigInt(b); case 3: // FLOAT case 4: // DOUBLE return Math.abs(a - b) < 1e-9; // Float comparison case 5: // TEXT case 11: // STRING return String(a) === String(b); case 10: // BLOB return Buffer.isBuffer(a) && Buffer.isBuffer(b) && a.equals(b); default: return false; } } private serializeSingleValue(value: any, dataType: number): Buffer { // Similar to serializeColumn but for single value // ... implementation } } /** * GORILLA encoder - for floating point compression * Uses XOR-based compression (Facebook's Gorilla algorithm) */ export class GorillaEncoder implements ValueEncoder { encode( values: any[], dataType: number, nullIndicators: boolean[] | null, ): Buffer { if (dataType !== 3 && dataType !== 4) { throw new Error("GORILLA encoding only supports FLOAT and DOUBLE"); } // Implement Gorilla compression algorithm // - Store first value as-is // - For subsequent values: // - XOR with previous value // - If XOR = 0, write single bit 0 // - If leading/trailing zeros match previous, write compact format // - Otherwise, write full XOR value // TODO: Full Gorilla implementation // This is complex, may defer to future PR throw new Error("GORILLA encoding not yet implemented"); } }
配置支持:
// src/utils/Config.ts export interface TabletConfig { /** * Encoding strategy for tablet values. * @default TSEncoding.PLAIN */ encoding?: TSEncoding; /** * Enable RLE compression for tablets with repeated values. * Only applies if encoding=PLAIN. * @default false */ enableAutoCompression?: boolean; }
文件: Session.ts
当前实现使用多次 Buffer.concat,可优化为预先计算大小:
protected serializeTabletValues( values: any[][], dataTypes: number[], rowCount: number, ): Buffer { // NEW: Pre-calculate total buffer size let totalSize = 0; const columnBuffers: Buffer[] = []; const bitMaps: (boolean[] | null)[] = []; // Phase 1: Serialize columns and calculate sizes for (let colIndex = 0; colIndex < dataTypes.length; colIndex++) { const dataType = dataTypes[colIndex]; const columnValues = values.map((row) => row[colIndex]); // Track null values const nullBitmap: boolean[] = []; let hasNull = false; for (let rowIndex = 0; rowIndex < columnValues.length; rowIndex++) { const isNull = columnValues[rowIndex] === null || columnValues[rowIndex] === undefined; nullBitmap.push(isNull); if (isNull) hasNull = true; } const buffer = this.serializeColumn(columnValues, dataType); columnBuffers.push(buffer); bitMaps.push(hasNull ? nullBitmap : null); totalSize += buffer.length; } // Phase 2: Calculate bitmap size const bitmapBuffer = this.serializeBitMaps(bitMaps, rowCount); totalSize += bitmapBuffer.length; // Phase 3: Allocate single buffer and copy const result = Buffer.allocUnsafe(totalSize); let offset = 0; for (const buffer of columnBuffers) { buffer.copy(result, offset); offset += buffer.length; } bitmapBuffer.copy(result, offset); return result; }
性能改进:
文件: Session.ts
增强 BitMap 序列化的代码注释:
protected serializeBitMaps( bitMaps: (boolean[] | null)[], rowCount: number, ): Buffer { /** * BitMap Serialization Format (matches Apache IoTDB Java client): * * For each column: * 1. columnHasNull flag (1 byte): 0=no nulls, 1=has nulls * 2. If has nulls: bitmap array * - 8 values packed per byte (big-endian bit ordering) * - Bit=1 means NULL, Bit=0 means NOT NULL * - Size = Math.ceil(rowCount / 8) bytes * - Padding: Remaining bits in last byte are set to 0 * * Example for rowCount=10: * Row indices: 0 1 2 3 4 5 6 7 | 8 9 (padding 6 bits) * Null values: 0 1 0 0 1 0 1 0 | 0 1 0 0 0 0 0 0 * Packed byte: 0b01001010 | 0b01000000 * Hex: 0x4A | 0x40 * * This matches the bit packing in: * - Java: InsertTabletNode.writeBitMaps() (lines 562-581) * - C++: Session::getValue() with BitMap serialization * - Python: Tablet.get_binary_values() with struct.pack */ const buffers: Buffer[] = []; for (const bitMap of bitMaps) { const columnHasNull = bitMap !== null; // Write columnHasNull flag (1 byte) buffers.push(Buffer.from([columnHasNull ? 1 : 0])); if (columnHasNull && bitMap) { // Calculate bitmap byte count // Example: 10 rows → Math.ceil(10/8) = 2 bytes const bitmapByteCount = Math.ceil(rowCount / 8); const bitmapBytes = Buffer.alloc(bitmapByteCount); // Pack 8 bits per byte (big-endian ordering) for (let i = 0; i < bitMap.length; i++) { if (bitMap[i]) { const byteIndex = Math.floor(i / 8); const bitIndex = i % 8; // Set bit from left to right (MSB first) // Bit 0 → 0x80 (10000000) // Bit 1 → 0x40 (01000000) // Bit 2 → 0x20 (00100000) // ... bitmapBytes[byteIndex] |= (1 << (7 - bitIndex)); } } buffers.push(bitmapBytes); } } return Buffer.concat(buffers); }
注意: 检查当前实现的位序是否与 Java 匹配:
1 << bitIndex (LSB first)1 << bitIndex (matches Java)文件: tests/unit/TabletSerialization.test.ts (新建)
describe("Tablet Serialization", () => { test("should serialize bitmap with correct bit packing", () => { const session = new Session({ host: "localhost", port: 6667 }); // Test data: 10 rows, column 0 has nulls at indices 1, 4, 6, 9 const bitMaps = [ [false, true, false, false, true, false, true, false, false, true], // Column 0 null, // Column 1: no nulls ]; const buffer = (session as any).serializeBitMaps(bitMaps, 10); // Expected format: // Column 0: [0x01] (has null) + [0x4A, 0x40] (bitmap) // Bits: 01001010 01000000 // Explanation: bit 1=1, bit 4=1, bit 6=1, bit 9=1 // Column 1: [0x00] (no null) expect(buffer[0]).toBe(0x01); // Column 0 has null expect(buffer[1]).toBe(0x4a); // First byte: 01001010 expect(buffer[2]).toBe(0x40); // Second byte: 01000000 (bit 9 set) expect(buffer[3]).toBe(0x00); // Column 1 no null }); test("should handle edge case: rowCount not multiple of 8", () => { // Test with 13 rows (needs 2 bytes, last 3 bits unused) const bitMaps = [ [ true, false, false, true, false, false, false, false, true, false, true, false, false, ], ]; const buffer = (session as any).serializeBitMaps(bitMaps, 13); // Expected: [0x01] + [0b10010000, 0b10100000] expect(buffer[0]).toBe(0x01); expect(buffer[1]).toBe(0x90); // 10010000 expect(buffer[2]).toBe(0xa0); // 10100000 }); });
文件: tests/performance/TabletSerialization.bench.ts (新建)
describe("Tablet Serialization Performance", () => { test("benchmark: large tablet serialization", () => { const session = new Session({ host: "localhost", port: 6667 }); // Create large tablet: 1000 rows x 100 columns const rowCount = 1000; const colCount = 100; const tablet = { deviceId: "root.perf.device1", measurements: Array.from({ length: colCount }, (_, i) => `s${i}`), dataTypes: Array(colCount).fill(TSDataType.FLOAT), timestamps: Array.from( { length: rowCount }, (_, i) => Date.now() + i * 1000, ), values: Array.from({ length: rowCount }, () => Array.from({ length: colCount }, () => Math.random() * 100), ), }; const startTime = Date.now(); const buffer = (session as any).serializeTabletValues( tablet.values, tablet.dataTypes, rowCount, ); const duration = Date.now() - startTime; console.log(`Serialized ${rowCount}x${colCount} tablet in ${duration}ms`); console.log(`Buffer size: ${buffer.length} bytes`); console.log( `Throughput: ${(buffer.length / duration / 1024).toFixed(2)} MB/s`, ); // Performance target: < 50ms for 1000x100 tablet expect(duration).toBeLessThan(50); }); });
已实现:
优化空间:
文件: ColumnDecoder.ts
添加 DICTIONARY 编码支持:
/** * Decoder for DICTIONARY encoding (encoding=5) * Uses integer keys to reference a string dictionary. * Efficient for columns with low cardinality (many repeated values). * * Format: * [dictionarySize: INT32] * [key1: String, key2: String, ..., keyN: String] * [indices: INT32 array with positionCount entries] * [nullIndicators: bitmap] */ class DictionaryColumnDecoder implements ColumnDecoder { readColumn( buffer: Buffer, offset: number, dataType: number, positionCount: number, ): { column: Column; bytesRead: number } { if (dataType !== 5 && dataType !== 11) { throw new Error("DICTIONARY encoding only supports TEXT/STRING"); } let currentOffset = offset; // Read dictionary size const dictionarySize = buffer.readInt32BE(currentOffset); currentOffset += 4; // Read dictionary entries const dictionary: string[] = new Array(dictionarySize); for (let i = 0; i < dictionarySize; i++) { const length = buffer.readInt32BE(currentOffset); currentOffset += 4; dictionary[i] = buffer.toString( "utf8", currentOffset, currentOffset + length, ); currentOffset += length; } logger.debug(`Dictionary decoded: ${dictionarySize} unique values`); // Read indices array const indices: number[] = new Array(positionCount); for (let i = 0; i < positionCount; i++) { indices[i] = buffer.readInt32BE(currentOffset); currentOffset += 4; } // Read null indicators const { nullIndicators, bytesRead: nullBytes } = ColumnDeserializer.deserializeNullIndicators( buffer, currentOffset, positionCount, ); currentOffset += nullBytes; // Map indices to dictionary values const values: any[] = new Array(positionCount); for (let i = 0; i < positionCount; i++) { if (nullIndicators && nullIndicators[i]) { values[i] = null; } else { const dictIndex = indices[i]; if (dictIndex < 0 || dictIndex >= dictionarySize) { throw new Error(`Invalid dictionary index: ${dictIndex}`); } values[i] = dictionary[dictIndex]; } } return { column: { dataType, encoding: ColumnEncoding.Dictionary, values, nullIndicators, positionCount, }, bytesRead: currentOffset - offset, }; } } // Add to BaseColumnDecoder export enum ColumnEncoding { ByteArray = 0, Int32Array = 1, Int64Array = 2, BinaryArray = 3, Rle = 4, Dictionary = 5, // NEW } export class BaseColumnDecoder { private static decoders: Map<ColumnEncoding, ColumnDecoder> = new Map([ [ColumnEncoding.Int32Array, new Int32ArrayColumnDecoder()], [ColumnEncoding.Int64Array, new Int64ArrayColumnDecoder()], [ColumnEncoding.ByteArray, new ByteArrayColumnDecoder()], [ColumnEncoding.BinaryArray, new BinaryArrayColumnDecoder()], [ColumnEncoding.Rle, new RunLengthColumnDecoder()], [ColumnEncoding.Dictionary, new DictionaryColumnDecoder()], // NEW ]); // ... rest of implementation }
文件: RowRecord.ts
添加列值缓存和类型转换优化:
export class RowRecord { private timestamp: number; private fields: any[]; private columnNames: string[]; private columnNameIndexMap: Map<string, number>; // NEW: Cache for type-converted values private convertedValuesCache: Map<number, any> = new Map(); constructor( timestamp: number, fields: any[], columnNames: string[], columnNameIndexMap: Map<string, number>, ) { this.timestamp = timestamp; this.fields = fields; this.columnNames = columnNames; this.columnNameIndexMap = columnNameIndexMap; } /** * Get FLOAT value with caching. */ getFloat(columnName: string): number { const index = this.columnNameIndexMap.get(columnName); if (index === undefined) { throw new Error(`Column not found: ${columnName}`); } // Check cache const cacheKey = index * 1000 + 3; // Encode index + type if (this.convertedValuesCache.has(cacheKey)) { return this.convertedValuesCache.get(cacheKey); } const value = this.fields[index]; if (value === null || value === undefined) { return NaN; } const converted = Number(value); this.convertedValuesCache.set(cacheKey, converted); return converted; } // Similar optimizations for getInt, getDouble, etc. }
文件: SessionDataSet.ts
新增批量列访问方法以提升大查询性能:
export class SessionDataSet { // ... existing fields ... /** * Get entire column as array (efficient for columnar operations). * Only fetches data up to current position. * * @param columnName Column name * @returns Array of column values (may include nulls) */ getColumnArray(columnName: string): any[] { const columnIndex = this.columnNameIndexMap.get(columnName); if (columnIndex === undefined) { throw new Error(`Column not found: ${columnName}`); } // Extract column from all cached rows return this.currentRows.map((row) => { // Account for timestamp column if present const dataColumnIndex = this.ignoreTimeStamp ? columnIndex : columnIndex + 1; return row[dataColumnIndex]; }); } /** * Get multiple columns as arrays in single call. * More efficient than multiple getColumnArray calls. * * @param columnNames Array of column names * @returns Map of column name to value array */ getColumnsAsArrays(columnNames: string[]): Map<string, any[]> { const result = new Map<string, any[]>(); for (const columnName of columnNames) { result.set(columnName, this.getColumnArray(columnName)); } return result; } /** * Convert current batch to columnar format (for analytics). * Returns { timestamp: number[], column1: any[], column2: any[], ... } */ toColumnarBatch(): Record<string, any[]> { const result: Record<string, any[]> = {}; // Extract timestamps if present if (!this.ignoreTimeStamp) { result.timestamp = this.currentRows.map((row) => row[0]); } // Extract each column for (let i = 0; i < this.columnNames.length; i++) { const columnName = this.columnNames[i]; const columnIndex = this.ignoreTimeStamp ? i : i + 1; result[columnName] = this.currentRows.map((row) => row[columnIndex]); } return result; } }
文件: tests/unit/ColumnDecoder.test.ts (扩展)
describe("ColumnDecoder - DICTIONARY encoding", () => { test("should decode dictionary-encoded column", () => { // Create test buffer with dictionary encoding const buffer = Buffer.alloc(1000); let offset = 0; // Dictionary size: 3 buffer.writeInt32BE(3, offset); offset += 4; // Dictionary entries: "RED", "GREEN", "BLUE" const entries = ["RED", "GREEN", "BLUE"]; for (const entry of entries) { const entryBuffer = Buffer.from(entry, "utf8"); buffer.writeInt32BE(entryBuffer.length, offset); offset += 4; entryBuffer.copy(buffer, offset); offset += entryBuffer.length; } // Indices: [0, 1, 2, 0, 1, 0] (6 values) const indices = [0, 1, 2, 0, 1, 0]; for (const index of indices) { buffer.writeInt32BE(index, offset); offset += 4; } // No nulls buffer.writeUInt8(0, offset); offset += 1; // Decode const decoder = BaseColumnDecoder.getDecoder(ColumnEncoding.Dictionary); const { column } = decoder.readColumn(buffer, 0, TSDataType.TEXT, 6); expect(column.values).toEqual([ "RED", "GREEN", "BLUE", "RED", "GREEN", "RED", ]); }); });
文件: tests/performance/QueryDeserialization.bench.ts (新建)
describe("Query Deserialization Performance", () => { test("benchmark: large result set iteration", async () => { const session = new Session({ host: process.env.IOTDB_HOST || "localhost", port: parseInt(process.env.IOTDB_PORT || "6667"), fetchSize: 10000, // Large fetch size }); await session.open(); // Query large dataset const startTime = Date.now(); const dataSet = await session.executeQueryStatement( "SELECT * FROM root.test.** LIMIT 100000", ); let rowCount = 0; let totalDeserializeTime = 0; while (await dataSet.hasNext()) { const batchStart = Date.now(); const row = dataSet.next(); totalDeserializeTime += Date.now() - batchStart; rowCount++; } await dataSet.close(); await session.close(); const totalTime = Date.now() - startTime; console.log(`Deserialized ${rowCount} rows in ${totalTime}ms`); console.log( `Average per row: ${(totalDeserializeTime / rowCount).toFixed(3)}ms`, ); console.log( `Throughput: ${(rowCount / (totalTime / 1000)).toFixed(0)} rows/sec`, ); // Performance target: > 50,000 rows/sec expect(rowCount / (totalTime / 1000)).toBeGreaterThan(50000); }); test("benchmark: columnar access vs row-by-row", async () => { const dataSet = await session.executeQueryStatement( "SELECT * FROM root.test.** LIMIT 10000", ); // Method 1: Row-by-row access const rowStartTime = Date.now(); const rowResults = []; while (await dataSet.hasNext()) { const row = dataSet.next(); rowResults.push(row.getValue("temperature")); } const rowTime = Date.now() - rowStartTime; // Method 2: Columnar access const colStartTime = Date.now(); const colResults = dataSet.getColumnArray("temperature"); const colTime = Date.now() - colStartTime; console.log(`Row-by-row: ${rowTime}ms, Columnar: ${colTime}ms`); console.log(`Speedup: ${(rowTime / colTime).toFixed(2)}x`); // Columnar access should be faster expect(colTime).toBeLessThan(rowTime); }); });
| 功能 | 优先级 | 预计工作量 | 性能影响 | 用户影响 |
|---|---|---|---|---|
| Redirection 支持 | P0 (最高) | 5-7 天 | +++++ (40-60%) | High |
| Tablet 序列化优化 | P1 (高) | 3-4 天 | +++ (15-25%) | Medium |
| 查询反序列化增强 | P2 (中) | 4-5 天 | ++ (10-20%) | Medium |
优先级理由:
Milestone 1 (Week 2): Redirection 支持完成
Milestone 2 (Week 3): Tablet 序列化优化完成
Milestone 3 (Week 4): 查询反序列化增强完成
| 风险 | 可能性 | 影响 | 缓解措施 |
|---|---|---|---|
| Redirection 协议变更 | 低 | 高 | 参考最新 Java 源码,保持同步更新 |
| 编码实现不兼容 | 中 | 中 | 充分测试 Java 互操作性 |
| 性能回归 | 低 | 高 | 每个 PR 运行性能基准测试 |
| 内存泄漏 | 中 | 高 | 压力测试和内存分析 |
外部依赖:
内部依赖:
单元测试:
集成测试:
性能测试:
兼容性测试:
Redirection:
Tablet 序列化:
查询反序列化:
Redirection:
Tablet 序列化:
查询反序列化:
Java 实现:
C++ 实现:
Python 实现:
本规划文档详细分析了 Apache IoTDB Node.js 客户端在以下三个核心技术领域的现状、差距和实施路径:
完整的 Redirection 支持:通过实现 RedirectCache、RedirectException 处理和智能连接路由,预期将写入性能提升 40-60%
Tablet 序列化优化:通过引入 TSEncoding 压缩、优化内存分配和增强 BitMap 处理,预期将序列化性能提升 15-25%
查询结果反序列化增强:通过扩展列编码支持、添加列式访问 API 和优化类型转换,预期将查询性能提升 10-20%
按照 4 周的实施时间表,将分阶段交付这些核心功能,显著提升 Node.js 客户端在生产环境中的性能和可用性。