feat(flight-sql): add per-client session isolation and security hardening - Add x-flight-sql-client-id header support for per-client USE database isolation via FlightSqlAuthHandler and ClientIdMiddlewareFactory - Use \0 (null byte) delimiter in clientSessionCache key to prevent username/clientId collision attacks - Validate clientId: alphanumeric + dash only, max 64 chars, fail-closed for non-empty invalid values (SecurityException) - Add maximumSize(1000) to tokenCache and clientSessionCache to prevent resource exhaustion from arbitrary clientIds - Remove LoginLockManager (userId=-1L caused cross-user lock collision; getUserId() is blocking RPC incompatible with directExecutor()) - Remove unused flightClient field from IT - Add directExecutor() + HTTP/2 flow control window tuning (1MB) on NettyServerBuilder to fix end-of-stream mid-frame errors - Document all functional gaps vs SessionManager.login() (password expiration, login lock, checkUser cache-miss risk) Tests (9/9 pass): - 5 original Flight SQL query tests - testUseDbSessionPersistence: USE context persists across connections - testUseDbWithFullyQualifiedFallback: USE + qualified/unqualified queries - testUseDbIsolationAcrossClients: Client B fails without USE context - testInvalidClientIdRejected: non-empty invalid clientId rejected
diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java index c22d935..66da06f 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
@@ -93,9 +93,10 @@ String username = decoded.substring(0, colonIdx); String password = decoded.substring(colonIdx + 1); - LOGGER.debug("Validating credentials for user: {}", username); + String clientId = headers.get("x-flight-sql-client-id"); + LOGGER.debug("Validating credentials for user: {}, clientId: {}", username, clientId); try { - String token = sessionManager.authenticate(username, password, "unknown"); + String token = sessionManager.authenticate(username, password, "unknown", clientId); return bearerResult(token); } catch (SecurityException e) { throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java index 9ed91d5..c775b84 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
@@ -86,11 +86,9 @@ flightServer = FlightServer.builder(allocator, location, producer) .headerAuthenticator(authHandler) - // Configure Netty server for DataNode JVM environment: - // - directExecutor: run gRPC handlers in the Netty event loop thread to - // avoid thread scheduling issues with the default executor - // - flowControlWindow: explicit HTTP/2 flow control prevents framing errors - // when standalone Netty JARs coexist on the classpath + // directExecutor: run gRPC handlers in the Netty event loop thread to + // avoid thread scheduling issues with the default executor that cause + // "end-of-stream mid-frame" errors on subsequent RPCs. .transportHint( "grpc.builderConsumer", (java.util.function.Consumer<io.grpc.netty.NettyServerBuilder>)
diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java index 0178dc2..825c930 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.security.SecureRandom; -import java.time.ZoneId; import java.util.Base64; import java.util.concurrent.TimeUnit; @@ -48,18 +47,32 @@ private static final String AUTHORIZATION_HEADER = "authorization"; private static final String BEARER_PREFIX = "Bearer "; private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + private static final int MAX_CLIENT_ID_LENGTH = 64; + private static final int MAX_SESSIONS = 1000; + private static final java.util.regex.Pattern CLIENT_ID_PATTERN = + java.util.regex.Pattern.compile("^[a-zA-Z0-9\\-]+$"); private final SessionManager sessionManager = SessionManager.getInstance(); /** Cache of Bearer token -> IClientSession with configurable TTL. */ private final Cache<String, IClientSession> tokenCache; - /** Cache of username -> Bearer token for session reuse with Basic auth on every call. */ - private final Cache<String, String> userTokenCache; + /** + * Cache of (username@clientId) -> Bearer token for session reuse. Avoids repeated session + * creation on every RPC — necessary because the Arrow Flight client middleware does not always + * cache the Bearer token, causing Basic auth to be re-sent on every call. + * + * <p>Keyed by {@code username@clientId} where clientId comes from the {@code + * x-flight-sql-client-id} header. This ensures different logical clients (even with the same + * username) get independent sessions with separate USE database contexts. If no clientId header + * is present, falls back to username-only keying (shared session). + */ + private final Cache<String, String> clientSessionCache; public FlightSqlSessionManager(long sessionTimeoutMinutes) { this.tokenCache = Caffeine.newBuilder() + .maximumSize(MAX_SESSIONS) .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES) .removalListener( (String token, IClientSession session, RemovalCause cause) -> { @@ -78,8 +91,11 @@ } }) .build(); - this.userTokenCache = - Caffeine.newBuilder().expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES).build(); + this.clientSessionCache = + Caffeine.newBuilder() + .maximumSize(MAX_SESSIONS) + .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES) + .build(); } /** @@ -87,46 +103,77 @@ * * @param username the username * @param password the password - * @param clientAddress the client's IP address + * @param clientAddress the client's IP address (for logging) + * @param clientId optional client identifier from x-flight-sql-client-id header (may be null) * @return the Bearer token if authentication succeeds * @throws SecurityException if authentication fails */ - public String authenticate(String username, String password, String clientAddress) { - // Check if this user already has an active session (reuse it) - String existingToken = userTokenCache.getIfPresent(username); + public String authenticate( + String username, String password, String clientAddress, String clientId) { + // NOTE: We intentionally do NOT call SessionManager.login() here because it performs + // blocking I/O that is incompatible with directExecutor() on the Netty event loop: + // - DataNodeAuthUtils.checkPasswordExpiration: executes SELECT via Coordinator + // - AuthorityChecker.getUserId: sends RPC to ConfigNode on cache miss + // Blocking the event loop corrupts HTTP/2 connection state and causes "end-of-stream + // mid-frame" errors on subsequent RPCs. + // + // Functional gaps vs SessionManager.login(): + // - Password expiration checks (requires Coordinator query) + // - Login lock / brute-force protection (LoginLockManager is in-memory but keys + // by userId; AuthorityChecker.getUserId() is a blocking RPC, so we cannot obtain + // a correct userId without risking event loop stalls) + // + // Risk: AuthorityChecker.checkUser() may perform a one-time blocking RPC to ConfigNode + // on cache miss (ClusterAuthorityFetcher.login). After the first successful auth, the + // credential is cached locally, and clientSessionCache avoids repeated authenticate() + // calls for the same client. + // + // TODO: Support password expiration and login lock. This requires either: + // (a) async auth support in Arrow Flight (not yet available), or + // (b) resolving the Netty classpath conflict so directExecutor() is no longer needed. + + // Always verify credentials — never skip password verification even if a cached + // session exists for this client. + org.apache.iotdb.common.rpc.thrift.TSStatus status; + try { + status = AuthorityChecker.checkUser(username, password); + } catch (Exception e) { + throw new SecurityException("Authentication failed", e); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Authentication failed for client: {}", clientAddress); + throw new SecurityException("Authentication failed: wrong username or password"); + } + + // Reuse existing session for this client. + // Key uses \0 (null byte) delimiter — cannot appear in usernames or HTTP headers, + // so the mapping (username, clientId) -> cacheKey is injective (no collisions). + String validClientId = validateClientId(clientId); + String cacheKey = validClientId != null ? username + "\0" + validClientId : username; + String existingToken = clientSessionCache.getIfPresent(cacheKey); if (existingToken != null && tokenCache.getIfPresent(existingToken) != null) { return existingToken; } - // Verify credentials (REST pattern) - try { - org.apache.iotdb.common.rpc.thrift.TSStatus status = - AuthorityChecker.checkUser(username, password); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn("Authentication failed for client: {}", clientAddress); - throw new SecurityException("Authentication failed: wrong username or password"); - } - } catch (SecurityException e) { - throw e; - } catch (Exception e) { - throw new SecurityException("Authentication failed", e); - } - - // Create and register session (REST pattern) + // Create session. Do NOT call registerSession() — it sets a ThreadLocal (currSession) + // designed for the client-thread model (Thrift). gRPC with directExecutor() runs all + // handlers on the Netty event loop, so ThreadLocal-based session tracking would pollute. IClientSession session = new InternalClientSession("FlightSQL-" + clientAddress); session.setSqlDialect(IClientSession.SqlDialect.TABLE); - sessionManager.registerSession(session); - - long userId = AuthorityChecker.getUserId(username).orElse(-1L); + // Pass -1L for userId — getUserId() sends blocking RPC to ConfigNode. sessionManager.supplySession( - session, userId, username, ZoneId.systemDefault(), IoTDBConstant.ClientVersion.V_1_0); + session, + -1L, + username, + java.time.ZoneId.systemDefault(), + IoTDBConstant.ClientVersion.V_1_0); // Generate cryptographically secure Bearer token (32 bytes = 256 bits) byte[] tokenBytes = new byte[32]; SECURE_RANDOM.nextBytes(tokenBytes); String token = Base64.getUrlEncoder().withoutPadding().encodeToString(tokenBytes); tokenCache.put(token, session); - userTokenCache.put(username, token); + clientSessionCache.put(cacheKey, token); LOGGER.info("Flight SQL authentication successful for client: {}", clientAddress); return token; } @@ -162,6 +209,29 @@ return session; } + /** + * Validates the client ID from the x-flight-sql-client-id header. Returns the validated clientId, + * or null if the header was absent (null/empty). Non-empty invalid clientIds are rejected + * (fail-closed) to prevent silent fallback to shared username-only sessions, which would break + * USE database isolation. + * + * @throws SecurityException if clientId is non-empty but invalid (too long or bad characters) + */ + private static String validateClientId(String clientId) { + if (clientId == null || clientId.isEmpty()) { + return null; + } + if (clientId.length() > MAX_CLIENT_ID_LENGTH) { + throw new SecurityException( + "Client ID exceeds maximum length of " + MAX_CLIENT_ID_LENGTH + " characters"); + } + if (!CLIENT_ID_PATTERN.matcher(clientId).matches()) { + throw new SecurityException( + "Client ID contains invalid characters (only alphanumeric and dash allowed)"); + } + return clientId; + } + /** Invalidates all sessions and cleans up resources. */ public void close() { tokenCache.invalidateAll();
diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java index ca5532f..84c6984 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
@@ -123,9 +123,7 @@ for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) { String colName = columnNames.get(colIdx); int sourceIdx = - (headerMap != null && headerMap.containsKey(colName)) - ? headerMap.get(colName) - : colIdx; + (headerMap != null && headerMap.containsKey(colName)) ? headerMap.get(colName) : colIdx; Column column = tsBlock.getColumn(sourceIdx); TSDataType dataType = dataTypes.get(colIdx); FieldVector fieldVector = root.getVector(colIdx);
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java index 94d531c4..625337a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -25,7 +25,11 @@ import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightClientMiddleware; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightStream; @@ -49,6 +53,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.UUID; import static org.junit.Assert.*; @@ -66,8 +71,8 @@ private static final String USER = "root"; private static final String PASSWORD = "root"; + private String clientId; private BufferAllocator allocator; - private FlightClient flightClient; private FlightSqlClient flightSqlClient; private CredentialCallOption bearerToken; @@ -82,10 +87,8 @@ allocator = new RootAllocator(Long.MAX_VALUE); Location location = Location.forGrpcInsecure("127.0.0.1", port); - ClientIncomingAuthHeaderMiddleware.Factory authFactory = - new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); - flightClient = FlightClient.builder(allocator, location).intercept(authFactory).build(); - flightSqlClient = new FlightSqlClient(flightClient); + clientId = UUID.randomUUID().toString(); + flightSqlClient = createFlightSqlClient(clientId); bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); // Create test data via native session (not Flight SQL) @@ -123,13 +126,6 @@ // ignore } } - if (flightClient != null) { - try { - flightClient.close(); - } catch (Exception e) { - // ignore - } - } if (allocator != null) { allocator.close(); } @@ -177,8 +173,7 @@ public void testQueryWithFilter() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", - bearerToken); + "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", bearerToken); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 2 rows for device1", 2, rows.size()); @@ -189,7 +184,9 @@ FlightInfo flightInfo = flightSqlClient.execute( "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " - + "FROM " + TABLE + " GROUP BY id1 ORDER BY id1", + + "FROM " + + TABLE + + " GROUP BY id1 ORDER BY id1", bearerToken); List<List<String>> rows = fetchAllRows(flightInfo); @@ -206,6 +203,111 @@ assertEquals("Should have 0 rows", 0, rows.size()); } + @Test + public void testUseDbSessionPersistence() throws Exception { + // Connection 1: USE database (same clientId shares the session) + flightSqlClient.execute("USE " + DATABASE, bearerToken); + + // Connection 2: query without fully-qualified table name. + // Same clientId ensures the same session is reused, so USE context persists. + FlightSqlClient client2 = createFlightSqlClient(clientId); + try { + CredentialCallOption token2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo flightInfo = client2.execute("SELECT * FROM test_table ORDER BY time", token2); + List<List<String>> rows = fetchAllRows(flightInfo, client2, token2); + assertEquals("Should have 3 rows from unqualified query after USE", 3, rows.size()); + } finally { + client2.close(); + } + } + + @Test + public void testUseDbWithFullyQualifiedFallback() throws Exception { + // Connection 1: USE database + flightSqlClient.execute("USE " + DATABASE, bearerToken); + + // Connection 2: unqualified query (same clientId → same session) + FlightSqlClient client2 = createFlightSqlClient(clientId); + try { + CredentialCallOption token2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo infoUnqualified = + client2.execute("SELECT * FROM test_table ORDER BY time", token2); + List<List<String>> rowsUnqualified = fetchAllRows(infoUnqualified, client2, token2); + assertEquals("Unqualified query should return 3 rows", 3, rowsUnqualified.size()); + } finally { + client2.close(); + } + + // Connection 3: fully-qualified query + FlightSqlClient client3 = createFlightSqlClient(clientId); + try { + CredentialCallOption token3 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + FlightInfo infoQualified = + client3.execute("SELECT * FROM " + TABLE + " ORDER BY time", token3); + List<List<String>> rowsQualified = fetchAllRows(infoQualified, client3, token3); + assertEquals("Fully-qualified query should also return 3 rows", 3, rowsQualified.size()); + } finally { + client3.close(); + } + } + + @Test + public void testUseDbIsolationAcrossClients() throws Exception { + // Client A (clientId from setUp): USE DATABASE + flightSqlClient.execute("USE " + DATABASE, bearerToken); + + // Client B (different clientId): gets its own independent session with NO USE context. + // Querying an unqualified table name should fail because no database is selected. + String clientIdB = UUID.randomUUID().toString(); + FlightSqlClient clientB = createFlightSqlClient(clientIdB); + CredentialCallOption tokenB = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + clientB.execute("SELECT * FROM test_table", tokenB); + fail("Client B should fail on unqualified table query without USE"); + } catch (Exception expected) { + // Expected: Client B has no database context, so unqualified table query fails. + // Arrow Flight wraps the actual error, so we just verify the query did fail. + assertNotNull("Exception should have a message", expected.getMessage()); + } finally { + clientB.close(); + } + + // Client A's USE context is preserved (same clientId → same session) + FlightSqlClient clientA2 = createFlightSqlClient(clientId); + CredentialCallOption tokenA2 = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + FlightInfo infoA = clientA2.execute("SELECT * FROM test_table ORDER BY time", tokenA2); + List<List<String>> rowsA = fetchAllRows(infoA, clientA2, tokenA2); + assertEquals("Client A should still see 3 rows after Client B's queries", 3, rowsA.size()); + } finally { + clientA2.close(); + } + } + + @Test + public void testInvalidClientIdRejected() throws Exception { + // A non-empty clientId with invalid characters (contains @) should be rejected (fail-closed). + // Only null/empty clientId should fall back to shared session keying. + String invalidClientId = "bad@client!id"; + FlightSqlClient invalidClient = createFlightSqlClient(invalidClientId); + CredentialCallOption token = + new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + try { + invalidClient.execute("SHOW DATABASES", token); + fail("Server should reject invalid clientId during authentication"); + } catch (Exception expected) { + // Expected: server rejects the invalid clientId + assertNotNull("Exception should have a message", expected.getMessage()); + } finally { + invalidClient.close(); + } + } + // ===================== Helper Methods ===================== /** @@ -213,9 +315,14 @@ * of string representations of the column values. */ private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws Exception { + return fetchAllRows(flightInfo, flightSqlClient, bearerToken); + } + + private List<List<String>> fetchAllRows( + FlightInfo flightInfo, FlightSqlClient client, CredentialCallOption token) throws Exception { List<List<String>> rows = new ArrayList<>(); for (FlightEndpoint endpoint : flightInfo.getEndpoints()) { - try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) { + try (FlightStream stream = client.getStream(endpoint.getTicket(), token)) { while (stream.next()) { VectorSchemaRoot root = stream.getRoot(); int rowCount = root.getRowCount(); @@ -232,4 +339,50 @@ } return rows; } + + private FlightSqlClient createFlightSqlClient(String flightClientId) { + int port = EnvFactory.getEnv().getArrowFlightSqlPort(); + Location location = Location.forGrpcInsecure("127.0.0.1", port); + ClientIncomingAuthHeaderMiddleware.Factory authFactory = + new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); + FlightClient client = + FlightClient.builder(allocator, location) + .intercept(authFactory) + .intercept(new ClientIdMiddlewareFactory(flightClientId)) + .build(); + return new FlightSqlClient(client); + } + + /** + * FlightClientMiddleware that injects the x-flight-sql-client-id header on every call. This + * allows the server to key sessions per logical client, enabling per-client USE database + * isolation. + */ + private static class ClientIdMiddlewareFactory implements FlightClientMiddleware.Factory { + private final String flightClientId; + + ClientIdMiddlewareFactory(String flightClientId) { + this.flightClientId = flightClientId; + } + + @Override + public FlightClientMiddleware onCallStarted(CallInfo info) { + return new FlightClientMiddleware() { + @Override + public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { + outgoingHeaders.insert("x-flight-sql-client-id", flightClientId); + } + + @Override + public void onHeadersReceived(CallHeaders incomingHeaders) { + // no-op + } + + @Override + public void onCallCompleted(CallStatus status) { + // no-op + } + }; + } + } }