partial fix
diff --git a/TestArrowOffset.class b/TestArrowOffset.class new file mode 100644 index 0000000..fecab16 --- /dev/null +++ b/TestArrowOffset.class Binary files differ
diff --git a/external-service-impl/flight-sql/pom.xml b/external-service-impl/flight-sql/pom.xml index e116032..bb9536a 100644 --- a/external-service-impl/flight-sql/pom.xml +++ b/external-service-impl/flight-sql/pom.xml
@@ -41,8 +41,18 @@ </dependency> <dependency> <groupId>org.apache.arrow</groupId> - <artifactId>arrow-memory-netty</artifactId> - <scope>runtime</scope> + <artifactId>flight-core</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${arrow.version}</version> </dependency> <!-- IoTDB dependencies (provided at runtime by DataNode classloader) --> <dependency> @@ -53,6 +63,24 @@ </dependency> <dependency> <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-thrift</artifactId> + <scope>provided</scope> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>service-rpc</artifactId> + <scope>provided</scope> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-thrift-commons</artifactId> + <scope>provided</scope> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> <artifactId>node-commons</artifactId> <version>2.0.7-SNAPSHOT</version> <scope>provided</scope> @@ -70,6 +98,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>3.25.1</version> + </dependency> + <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> <scope>provided</scope> @@ -113,14 +146,6 @@ <configuration> <ignoredDependencies> <ignoredDependency>org.apache.tsfile:common</ignoredDependency> - <ignoredDependency>org.apache.iotdb:iotdb-thrift-commons</ignoredDependency> - <ignoredDependency>com.google.protobuf:protobuf-java</ignoredDependency> - <ignoredDependency>org.apache.iotdb:iotdb-thrift</ignoredDependency> - <ignoredDependency>org.apache.arrow:flight-core</ignoredDependency> - <ignoredDependency>org.apache.arrow:arrow-vector</ignoredDependency> - <ignoredDependency>org.apache.arrow:arrow-memory-core</ignoredDependency> - <ignoredDependency>org.apache.iotdb:service-rpc</ignoredDependency> - <ignoredDependency>org.apache.arrow:arrow-memory-netty</ignoredDependency> </ignoredDependencies> </configuration> </plugin>
diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java index fa8286d..4a4227e 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
@@ -210,7 +210,7 @@ LOGGER.warn("getStreamStatement called for queryId={}", queryId); try { streamQueryResults(queryId, listener); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("getStreamStatement failed for queryId={}", queryId, e); listener.error( CallStatus.INTERNAL @@ -230,28 +230,62 @@ return; } - VectorSchemaRoot root = null; - try { - root = TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header, allocator); + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header, allocator)) { listener.start(root); + LOGGER.warn("streamQueryResults: listener started for queryId={}", queryId); + int batchCount = 0; while (true) { + LOGGER.warn("streamQueryResults: fetching batch {} for queryId={}", batchCount, queryId); Optional<TsBlock> optionalTsBlock = ctx.queryExecution.getBatchResult(); - if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) { + if (!optionalTsBlock.isPresent()) { + LOGGER.warn( + "streamQueryResults: optionalTsBlock not present for queryId={}, breaking", queryId); break; } TsBlock tsBlock = optionalTsBlock.get(); - root.clear(); + if (tsBlock.isEmpty()) { + LOGGER.warn("streamQueryResults: tsBlock isEmpty for queryId={}, continuing", queryId); + continue; + } + + LOGGER.warn( + "streamQueryResults: filling root with batch {} ({} rows)", + batchCount, + tsBlock.getPositionCount()); TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, ctx.header); listener.putNext(); + LOGGER.warn("streamQueryResults: putNext done for batch {}", batchCount); + + while (!listener.isReady() && !listener.isCancelled()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + if (listener.isCancelled()) { + LOGGER.warn("Flight stream cancelled by client for queryId={}", queryId); + break; + } + batchCount++; } + LOGGER.warn( + "streamQueryResults: completing listener for queryId={}, total batches={}", + queryId, + batchCount); + // Detach buffers from root so it's not freed while gRPC sends the last batch + root.allocateNew(); listener.completed(); } catch (IoTDBException e) { LOGGER.error("Error streaming query results for queryId={}", queryId, e); listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException()); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("Unexpected error streaming query results for queryId={}", queryId, e); listener.error( CallStatus.INTERNAL @@ -260,9 +294,6 @@ } finally { coordinator.cleanupQueryExecution(queryId); activeQueries.remove(queryId); - if (root != null) { - root.close(); - } } }
diff --git a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java new file mode 100644 index 0000000..f7d68e5 --- /dev/null +++ b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java
@@ -0,0 +1,38 @@ +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; + +public class TestArrowOffset { + public static void main(String[] args) { + try (BufferAllocator allocator = new RootAllocator(); + VarCharVector vector = new VarCharVector("test", FieldType.nullable(new ArrowType.Utf8()), allocator)) { + vector.allocateNew(); + + // Set index 0 to a string + vector.setSafe(0, "hello".getBytes()); + // Set index 1 to null + vector.setNull(1); + // Set index 2 to another string + vector.setSafe(2, "world".getBytes()); + + vector.setValueCount(3); + + System.out.println("Offset at 0: " + vector.getOffsetBuffer().getInt(0)); + System.out.println("Offset at 1: " + vector.getOffsetBuffer().getInt(4)); + System.out.println("Offset at 2: " + vector.getOffsetBuffer().getInt(8)); + System.out.println("Offset at 3: " + vector.getOffsetBuffer().getInt(12)); + + VectorUnloader unloader = new VectorUnloader(new org.apache.arrow.vector.VectorSchemaRoot( + java.util.Collections.singletonList(vector.getField()), + java.util.Collections.singletonList(vector), + 3)); + try (ArrowRecordBatch batch = unloader.getRecordBatch()) { + System.out.println("Record batch length: " + batch.computeBodyLength()); + } + } + } +}
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java index 1710c15..a3c8025 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -163,6 +163,7 @@ Schema schema; List<Field> fields; // 1. Query with all data types + System.out.println("Executing query..."); flightInfo = flightSqlClient.execute( "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER BY time", @@ -174,6 +175,7 @@ rows = fetchAllRows(flightInfo); assertEquals("Should have 3 rows", 3, rows.size()); // 2. Query with filter + System.out.println("Executing query..."); flightInfo = flightSqlClient.execute( "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", credentials); @@ -181,6 +183,7 @@ assertEquals("Should have 2 rows for device1", 2, rows.size()); // 3. Query with aggregation + System.out.println("Executing query..."); flightInfo = flightSqlClient.execute( "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " @@ -192,6 +195,7 @@ assertEquals("Should have 2 groups", 2, rows.size()); // 4. Empty result query + System.out.println("Executing query..."); flightInfo = flightSqlClient.execute( "SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'", credentials); @@ -199,6 +203,7 @@ assertEquals("Should have 0 rows", 0, rows.size()); // 5. Show databases + System.out.println("Executing query..."); flightInfo = flightSqlClient.execute("SHOW DATABASES", credentials); rows = fetchAllRows(flightInfo); assertTrue("Should have at least 1 database", rows.size() >= 1); @@ -222,17 +227,16 @@ private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws Exception { List<List<String>> rows = new ArrayList<>(); for (FlightEndpoint endpoint : flightInfo.getEndpoints()) { - try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket())) { + try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), credentials)) { while (stream.next()) { - try (VectorSchemaRoot root = stream.getRoot()) { - for (int i = 0; i < root.getRowCount(); i++) { - List<String> row = new ArrayList<>(); - for (FieldVector vector : root.getFieldVectors()) { - Object value = vector.getObject(i); - row.add(value == null ? "null" : value.toString()); - } - rows.add(row); + VectorSchemaRoot root = stream.getRoot(); + for (int i = 0; i < root.getRowCount(); i++) { + List<String> row = new ArrayList<>(); + for (FieldVector vector : root.getFieldVectors()) { + Object value = vector.getObject(i); + row.add(value == null ? "null" : value.toString()); } + rows.add(row); } } }