This document provides a detailed design for implementing client-side redirection support in the Apache IoTDB Node.js client, based on the Java client implementation.
Status: ✅ Implemented and Tested
Reference Implementation:
In a multi-node IoTDB cluster, data for different devices may be stored on different nodes. When a client sends a write request to a node that doesn‘t own the target device’s data, the server responds with a redirect recommendation (status code 400) containing the optimal endpoint.
The client should:
Benefits:
Foundation Classes:
src/utils/Errors.ts: RedirectException class with proper status code (400)src/client/RedirectCache.ts: LRU cache with TTL for device→endpoint mappingsConfiguration (src/utils/Config.ts):
enableRedirection: Enable/disable redirection (default: true)redirectCacheTTL: Cache TTL in milliseconds (default: 300000 / 5 minutes)Session Layer (src/client/Session.ts):
insertTreeTabletInternal(): Stores redirect recommendation on code 400 responsesinsertTableTabletInternal(): Stores redirect recommendation on code 400 responsesgetAndClearLastRedirect(): Returns and clears stored redirect recommendationPool Layer (src/client/BaseSessionPool.ts):
getSessionForEndpoint(): Get/create session for specific endpointextractDeviceId(): Extract device ID from tree/table tabletsinsertTablet(): Check for redirect recommendations after successful writes and cache themTesting:
tests/e2e/Redirection.test.ts)When a multi-node IoTDB cluster returns status code 400 (REDIRECTION_RECOMMEND):
This behavior ensures:
// TSStatusCode from Java client export enum TSStatusCode { SUCCESS_STATUS = 200, REDIRECTION_RECOMMEND = 400, // ← The correct status code // ... other codes }
Important: Earlier implementations incorrectly used 531. The correct code is 400.
When redirection is needed, the IoTDB server returns:
interface TSStatus { code: number; // 400 for REDIRECTION_RECOMMEND message?: string; // Optional error message redirectNode?: TEndPoint; // The endpoint to redirect to subStatus?: TSStatus[]; // For batch operations } interface TEndPoint { ip: string; // Public IP internalIp?: string; // Internal IP (preferred if available) port: number; // RPC port }
┌─────────────────────────────────────────────────────────┐
│ Application: pool.insertTablet({ deviceId: "d1", ...}) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ SessionPool: Check cache for device "d1" │
│ - Has cached endpoint? → Use that connection │
│ - No cache? → Use round-robin connection │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Session.insertTablet(): Send request to node │
│ ┌────────────────────────────────────────────────┐ │
│ │ client.insertTablet(request) │ │
│ │ → Returns TSStatus │ │
│ └────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Check TSStatus.code │
│ - code === 200? → Success, return │
│ - code === 400 AND redirectNode? │
│ → Store redirect recommendation, return success │
│ - Other? → Error │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ SessionPool: Check for redirect recommendation │
│ - If present: Cache deviceId → endpoint │
│ - Future writes will use cached endpoint │
└─────────────────────────────────────────────────────────┘
The implemented approach for single-device operations:
// In SessionPool async insertTablet(tablet: TreeTablet | TableTablet): Promise<void> { const deviceId = this.extractDeviceId(tablet); // Check cache for optimal endpoint if redirection is enabled const cachedEndpoint = this.config.enableRedirection ? this.redirectCache.get(deviceId) : null; let session: Session; if (cachedEndpoint) { // Use cached endpoint for optimal routing session = await this.getSessionForEndpoint(cachedEndpoint); } else { // Use round-robin selection session = await this.getSession(); } try { // Attempt insert await session.insertTablet(tablet); // Check if server recommended a redirect for future operations if (this.config.enableRedirection) { const redirectEndpoint = session.getAndClearLastRedirect(); if (redirectEndpoint) { // Cache the recommended endpoint for future writes this.redirectCache.set(deviceId, redirectEndpoint); logger.info( `Cached redirect recommendation: ${deviceId} -> ${redirectEndpoint.host}:${redirectEndpoint.port}` ); } } } finally { // Always release session this.releaseSession(session); } } } }
For insertTablets() with multiple devices, the server can return:
status.code = 400 (MULTIPLE_ERROR or REDIRECTION_RECOMMEND)status.subStatus[] - one TSStatus per deviceredirectNodeasync insertTablets(tablets: Tablet[]): Promise<void> { // Extract all device IDs const deviceIds = tablets.map(t => this.extractDeviceId(t)); // ... similar pattern but handle subStatus array // If we get RedirectException with deviceEndPointMap: for (const [deviceId, endpoint] of Object.entries(deviceEndPointMap)) { this.redirectCache.set(deviceId, endpoint); } // Retry with appropriate connections }
// In Session.ts private async insertTreeTabletInternal(tablet: TreeTablet): Promise<void> { // ... existing serialization code ... return new Promise((resolve, reject) => { client.insertTablet(req, (err: Error, response: any) => { if (err) { reject(err); return; } // Handle redirection recommendation (code 400) // Note: Code 400 means write SUCCEEDED but server recommends a different endpoint for future operations if (response.code === 400 && response.redirectNode) { // Store redirect recommendation for pool to cache this.lastRedirectEndpoint = { host: response.redirectNode.internalIp || response.redirectNode.ip, port: response.redirectNode.port, }; // Resolve successfully - the write already succeeded resolve(); return; } if (response.code !== 200) { reject(new Error(`Insert failed: ${response.message}`)); return; } resolve(); }); }); }
// In BaseSessionPool private endPointToSession: Map<string, PooledSession> = new Map(); protected async getSessionForEndpoint(endpoint: EndPoint): Promise<Session> { const key = `${endpoint.host}:${endpoint.port}`; let pooledSession = this.endPointToSession.get(key); if (pooledSession && pooledSession.session.isOpen()) { pooledSession.inUse = true; return pooledSession.session; } // Create new session for this endpoint const session = new Session({ ...this.config, host: endpoint.host, port: endpoint.port, }); await session.open(); pooledSession = { session, lastUsed: Date.now(), inUse: true, }; this.endPointToSession.set(key, pooledSession); this.pool.push(pooledSession); return session; }
Before implementing redirection, we need:
interface PoolConfig { // ... existing config ... /** * Enable client-side redirection optimization. * When enabled, the client caches device→endpoint mappings * and routes writes directly to optimal nodes. * * @default true * @requires Multi-node IoTDB cluster */ enableRedirection?: boolean; /** * Time-to-live for cached redirect mappings (milliseconds). * Set to 0 for no expiration. * Recommended: 300000 (5 minutes) * * @default 300000 */ redirectCacheTTL?: number; }
Since code 400 responses indicate successful writes with redirect recommendations (not errors), the error handling is simplified:
// Standard error handling - redirect recommendations are not errors class RedirectLoopDetectedError extends Error { constructor(deviceId: string, endpoints: EndPoint[]) { super(`Redirect loop detected for ${deviceId}: ${JSON.stringify(endpoints)}`); } } class RedirectTargetUnavailableError extends Error { constructor(endpoint: EndPoint) { super(`Redirect target unavailable: ${endpoint.host}:${endpoint.port}`); } }
maxPoolSize high enough for redirect endpointsasync-mutex for cache updatesThe implementation has been tested with:
tests/e2e/Redirection.test.tsdocker-compose-1c3d.yml# Unit tests npm run test:unit # Start multi-node cluster docker-compose -f docker-compose-1c3d.yml up -d # E2E tests with redirection MULTI_NODE=true npm run test:e2e # Cleanup docker-compose -f docker-compose-1c3d.yml down
import { SessionPool } from '@iotdb/client'; import { TSDataType } from '@iotdb/client'; // Create pool with redirection enabled const pool = new SessionPool({ nodeUrls: ['node1:6667', 'node2:6667', 'node3:6667'], username: 'root', password: 'root', enableRedirection: true, maxRedirectRetries: 3, redirectCacheTTL: 300000, // 5 minutes }); await pool.init(); // First write - may redirect await pool.insertTablet({ deviceId: 'root.sg.device1', measurements: ['temperature'], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now()], values: [[25.5]], }); // → Round-robin to Node A // → Write to Node A (round-robin) // → Write succeeds! // → Server responds with code 400: "Recommend Node B for future writes" // → Cache: device1 → Node B // Second write - uses cache await pool.insertTablet({ deviceId: 'root.sg.device1', measurements: ['temperature'], dataTypes: [TSDataType.FLOAT], timestamps: [Date.now() + 1000], values: [[26.0]], }); // → Check cache: device1 → Node B // → Write directly to Node B // → No redirect needed! await pool.close();
When to Enable:
When to Disable:
Configuration Recommendations:
enableRedirection: true (default) - Safe to leave enabledredirectCacheTTL: 300000 (5 min default) - Balance between freshness and performancemaxPoolSize: 20+ - Higher values support more redirect endpoints✅ Ready for Production Use
The redirection feature is fully implemented and tested:
The implementation is solid, well-tested, and ready for production deployment.