GH-309: [Java] Initial Substrait Plan documentation (Query Dataset) (#310)
Co-authored-by: David Li <li.davidm96@gmail.com>
diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml
index a2bee04..376db3a 100644
--- a/java/source/demo/pom.xml
+++ b/java/source/demo/pom.xml
@@ -97,5 +97,15 @@
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
+ <dependency>
+ <groupId>io.substrait</groupId>
+ <artifactId>isthmus</artifactId>
+ <version>0.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.substrait</groupId>
+ <artifactId>core</artifactId>
+ <version>0.11.0</version>
+ </dependency>
</dependencies>
</project>
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 5e18167..53017b2 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -287,7 +287,7 @@
S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
C2: Client (Populate Data): Wrote 2 batches with 3 rows each
- C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6}
+ C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
C4: Client (Get Stream):
Client Received batch #1, Data:
name
@@ -299,7 +299,7 @@
Manuel
Felipe
JJ
- C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6}
+ C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully
diff --git a/java/source/index.rst b/java/source/index.rst
index 1ae98bb..63f94c0 100644
--- a/java/source/index.rst
+++ b/java/source/index.rst
@@ -39,6 +39,7 @@
io
flight
dataset
+ substrait
data
avro
jdbc
diff --git a/java/source/substrait.rst b/java/source/substrait.rst
new file mode 100644
index 0000000..ee87371
--- /dev/null
+++ b/java/source/substrait.rst
@@ -0,0 +1,206 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _arrow-substrait:
+
+=========
+Substrait
+=========
+
+Arrow can use `Substrait`_ to integrate with other languages.
+
+.. contents::
+
+Querying Datasets
+=================
+
+The Substrait support in Arrow combines :doc:`Dataset <dataset>` and
+`substrait-java`_ to query datasets using `Acero`_ as a backend.
+
+Acero currently supports:
+
+- Reading Arrow, CSV, ORC, and Parquet files
+- Filters
+- Projections
+- Joins
+- Aggregates
+
+Here is an example of a Java program that queries a Parquet file:
+
+.. testcode::
+
+ import com.google.common.collect.ImmutableList;
+ import io.substrait.isthmus.SqlToSubstrait;
+ import io.substrait.proto.Plan;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+ import org.apache.calcite.sql.parser.SqlParseException;
+
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ static Plan queryTableNation() throws SqlParseException {
+ String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
+ String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
+ "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+ SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+ Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
+ return plan;
+ }
+
+ static void queryDatasetThruSubstraitPlanDefinition() {
+ String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("NATION", reader);
+ // get binary plan
+ Plan plan = queryTableNation();
+ ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
+ substraitPlan.put(plan.toByteArray());
+ // run query
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ while (arrowReader.loadNextBatch()) {
+ System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ queryDatasetThruSubstraitPlanDefinition();
+
+.. testoutput::
+
+ N_NATIONKEY N_NAME N_REGIONKEY N_COMMENT
+ 17 PERU 1 platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
+
+It is also possible to query multiple datasets and join them based on some criteria.
+For example, we can join the nation and customer tables from the TPC-H benchmark:
+
+.. testcode::
+
+ import com.google.common.collect.ImmutableList;
+ import io.substrait.isthmus.SqlToSubstrait;
+ import io.substrait.proto.Plan;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+ import org.apache.calcite.sql.parser.SqlParseException;
+
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ static Plan queryTableNationJoinCustomer() throws SqlParseException {
+ String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
+ "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
+ "GROUP BY n.n_name";
+ String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
+ "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+ String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
+ "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
+ "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
+ "C_COMMENT VARCHAR(117) )";
+ SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+ Plan plan = sqlToSubstrait.execute(sql,
+ ImmutableList.of(nation, customer));
+ return plan;
+ }
+
+ static void queryTwoDatasetsThruSubstraitPlanDefinition() {
+ String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
+ String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uriNation);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader readerNation = scanner.scanBatches();
+ DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uriCustomer);
+ Dataset datasetCustomer = datasetFactoryCustomer.finish();
+ Scanner scannerCustomer = datasetCustomer.newScan(options);
+ ArrowReader readerCustomer = scannerCustomer.scanBatches()
+ ) {
+ // map table to reader
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("NATION", readerNation);
+ mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+ // get binary plan
+ Plan plan = queryTableNationJoinCustomer();
+ ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
+ plan.toByteArray().length);
+ substraitPlan.put(plan.toByteArray());
+ // run query
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(
+ allocator).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ while (arrowReader.loadNextBatch()) {
+ System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ queryTwoDatasetsThruSubstraitPlanDefinition();
+
+.. testoutput::
+
+ N_NAME NUMBER_CUSTOMER
+ PERU 573
+
+.. _`Substrait`: https://substrait.io/
+.. _`substrait-java`: https://github.com/substrait-io/substrait-java
+.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
\ No newline at end of file
diff --git a/java/thirdpartydeps/tpch/customer.parquet b/java/thirdpartydeps/tpch/customer.parquet
new file mode 100644
index 0000000..ba017f2
--- /dev/null
+++ b/java/thirdpartydeps/tpch/customer.parquet
Binary files differ
diff --git a/java/thirdpartydeps/tpch/nation.parquet b/java/thirdpartydeps/tpch/nation.parquet
new file mode 100644
index 0000000..0189118
--- /dev/null
+++ b/java/thirdpartydeps/tpch/nation.parquet
Binary files differ