| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Logical plans via datafusion-proto |
| |
| `SessionContext.fromProto(byte[])` accepts a serialized DataFusion |
| `LogicalPlanNode` and returns a lazy `DataFrame`. This is useful when you |
| already have a plan produced by another DataFusion-aware tool, or when |
| you want to construct the plan programmatically with finer-grained |
| control than the `sql` or DataFrame APIs. |
| |
| The protobuf Java classes are generated by the build into the |
| `org.apache.datafusion.protobuf` (plan and expression nodes) and |
| `datafusion_common` (scalar values, schema, column references, file |
| formats) packages. The Maven build downloads pinned `.proto` files from |
| the matching upstream DataFusion tag on first build, then generates the |
| Java classes locally — see the |
| [Contributor Guide](../contributor-guide/updating-datafusion-version.md) |
| for how to bump the version. |
| |
| ## A minimal plan |
| |
| The smallest interesting plan is a projection of a literal over an |
| empty input. It is useful as a sanity check and exercises serialization |
| end-to-end without touching any storage. |
| |
| ```java |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.IntVector; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.ipc.ArrowReader; |
| import org.apache.datafusion.DataFrame; |
| import org.apache.datafusion.SessionContext; |
| import org.apache.datafusion.protobuf.EmptyRelationNode; |
| import org.apache.datafusion.protobuf.LogicalExprNode; |
| import org.apache.datafusion.protobuf.LogicalPlanNode; |
| import org.apache.datafusion.protobuf.ProjectionNode; |
| |
| import datafusion_common.DatafusionCommon; |
| |
| LogicalPlanNode plan = |
| LogicalPlanNode.newBuilder() |
| .setProjection( |
| ProjectionNode.newBuilder() |
| .setInput( |
| LogicalPlanNode.newBuilder() |
| .setEmptyRelation( |
| EmptyRelationNode.newBuilder().setProduceOneRow(true).build()) |
| .build()) |
| .addExpr( |
| LogicalExprNode.newBuilder() |
| .setLiteral( |
| DatafusionCommon.ScalarValue.newBuilder().setInt32Value(1).build()) |
| .build()) |
| .build()) |
| .build(); |
| |
| try (var allocator = new RootAllocator(); |
| SessionContext ctx = new SessionContext(); |
| DataFrame df = ctx.fromProto(plan.toByteArray()); |
| ArrowReader reader = df.collect(allocator)) { |
| reader.loadNextBatch(); |
| VectorSchemaRoot batch = reader.getVectorSchemaRoot(); |
| IntVector col = (IntVector) batch.getVector(0); |
| System.out.println(col.get(0)); // 1 |
| } |
| ``` |
| |
| `fromProto` performs the same logical-planning, optimization, and |
| physical-planning pipeline as `sql`; the result is a lazy |
| [`DataFrame`](dataframe.md) that only executes when you pull results. |
| |
| ## Scanning a Parquet file via ListingTableScanNode |
| |
| A `ListingTableScanNode` reads one or more files of the same format |
| from disk. Unlike `registerParquet`, it does not require the table to |
| be in the catalog — the scan node carries everything DataFusion needs: |
| the file paths, the schema, the projection, the file format, and the |
| target partition count. |
| |
| The scan node's `schema` field is a `datafusion_common.Schema`, not an |
| Arrow `Schema`. Convert between the two with the helper in |
| `org.apache.datafusion.proto.SchemaConverter`: |
| |
| ```java |
| import org.apache.arrow.vector.types.pojo.Schema; |
| import org.apache.datafusion.proto.SchemaConverter; |
| |
| Schema arrow = ctx.tableSchema("lineitem"); |
| DatafusionCommon.Schema schemaProto = SchemaConverter.toProto(arrow); |
| ``` |
| |
| The full example: register the file once to introspect its schema, |
| then build a plan that scans the same file, sorts by `l_orderkey`, |
| and fetches the first row. This is equivalent to |
| `SELECT l_orderkey FROM lineitem ORDER BY l_orderkey LIMIT 1`. |
| |
| ```java |
| import org.apache.datafusion.protobuf.BareTableReference; |
| import org.apache.datafusion.protobuf.ListingTableScanNode; |
| import org.apache.datafusion.protobuf.ProjectionColumns; |
| import org.apache.datafusion.protobuf.SortExprNode; |
| import org.apache.datafusion.protobuf.SortNode; |
| import org.apache.datafusion.protobuf.TableReference; |
| |
| String path = "/path/to/lineitem.parquet"; |
| |
| try (var allocator = new RootAllocator(); |
| SessionContext ctx = new SessionContext()) { |
| |
| ctx.registerParquet("lineitem", path); |
| DatafusionCommon.Schema schemaProto = |
| SchemaConverter.toProto(ctx.tableSchema("lineitem")); |
| |
| LogicalExprNode orderKeyCol = |
| LogicalExprNode.newBuilder() |
| .setColumn(DatafusionCommon.Column.newBuilder().setName("l_orderkey").build()) |
| .build(); |
| |
| LogicalPlanNode plan = |
| LogicalPlanNode.newBuilder() |
| .setSort( |
| SortNode.newBuilder() |
| .setInput( |
| LogicalPlanNode.newBuilder() |
| .setListingScan( |
| ListingTableScanNode.newBuilder() |
| .setTableName( |
| TableReference.newBuilder() |
| .setBare( |
| BareTableReference.newBuilder() |
| .setTable("lineitem") |
| .build()) |
| .build()) |
| .addPaths(path) |
| .setFileExtension(".parquet") |
| .setSchema(schemaProto) |
| .setProjection( |
| ProjectionColumns.newBuilder() |
| .addColumns("l_orderkey") |
| .build()) |
| .setParquet( |
| DatafusionCommon.ParquetFormat.getDefaultInstance()) |
| .setTargetPartitions(1) |
| .build()) |
| .build()) |
| .addExpr( |
| SortExprNode.newBuilder() |
| .setExpr(orderKeyCol) |
| .setAsc(true) |
| .setNullsFirst(false) |
| .build()) |
| .setFetch(1) |
| .build()) |
| .build(); |
| |
| try (DataFrame df = ctx.fromProto(plan.toByteArray()); |
| ArrowReader reader = df.collect(allocator)) { |
| reader.loadNextBatch(); |
| // ... |
| } |
| } |
| ``` |
| |
| ## When to use proto plans |
| |
| The `sql` and DataFrame APIs are the right choice for most workloads. |
| Reach for `fromProto` when you need one of: |
| |
| - **Cross-tool interop.** Accept plans produced by another |
| DataFusion-based system (a planner, a scheduler, a query frontend). |
| - **Programmatic plan construction.** Build the plan node tree directly |
| instead of going through SQL parsing, useful for tools that compile |
| their own surface language to DataFusion. |
| - **Plan persistence.** Serialize a plan to bytes, store or transmit |
| it, and execute it later — possibly in a different process or on a |
| different machine. |
| |
| ## Schema conversion support |
| |
| `SchemaConverter.toProto` and `SchemaConverter.fromProto` support the |
| primitive Arrow types this project's tests exercise: `Bool`, signed and |
| unsigned integer types 8 through 64 bits, `Float32`, `Float64`, `Utf8`, |
| `Utf8View`, `LargeUtf8`, `Date32`, and `Decimal128`. Anything else |
| raises `UnsupportedOperationException` naming the offending type. |