blob: 513018b7c97352d37c2745e5ec7a8afd530402a6 [file] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Logical plans via datafusion-proto
`SessionContext.fromProto(byte[])` accepts a serialized DataFusion
`LogicalPlanNode` and returns a lazy `DataFrame`. This is useful when you
already have a plan produced by another DataFusion-aware tool, or when
you want to construct the plan programmatically with finer-grained
control than the `sql` or DataFrame APIs.
The protobuf Java classes are generated by the build into the
`org.apache.datafusion.protobuf` (plan and expression nodes) and
`datafusion_common` (scalar values, schema, column references, file
formats) packages. The Maven build downloads pinned `.proto` files from
the matching upstream DataFusion tag on first build, then generates the
Java classes locally — see the
[Contributor Guide](../contributor-guide/updating-datafusion-version.md)
for how to bump the version.
## A minimal plan
The smallest interesting plan is a projection of a literal over an
empty input. It is useful as a sanity check and exercises serialization
end-to-end without touching any storage.
```java
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.datafusion.DataFrame;
import org.apache.datafusion.SessionContext;
import org.apache.datafusion.protobuf.EmptyRelationNode;
import org.apache.datafusion.protobuf.LogicalExprNode;
import org.apache.datafusion.protobuf.LogicalPlanNode;
import org.apache.datafusion.protobuf.ProjectionNode;
import datafusion_common.DatafusionCommon;
LogicalPlanNode plan =
LogicalPlanNode.newBuilder()
.setProjection(
ProjectionNode.newBuilder()
.setInput(
LogicalPlanNode.newBuilder()
.setEmptyRelation(
EmptyRelationNode.newBuilder().setProduceOneRow(true).build())
.build())
.addExpr(
LogicalExprNode.newBuilder()
.setLiteral(
DatafusionCommon.ScalarValue.newBuilder().setInt32Value(1).build())
.build())
.build())
.build();
try (var allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame df = ctx.fromProto(plan.toByteArray());
ArrowReader reader = df.collect(allocator)) {
reader.loadNextBatch();
VectorSchemaRoot batch = reader.getVectorSchemaRoot();
IntVector col = (IntVector) batch.getVector(0);
System.out.println(col.get(0)); // 1
}
```
`fromProto` performs the same logical-planning, optimization, and
physical-planning pipeline as `sql`; the result is a lazy
[`DataFrame`](dataframe.md) that only executes when you pull results.
## Scanning a Parquet file via ListingTableScanNode
A `ListingTableScanNode` reads one or more files of the same format
from disk. Unlike `registerParquet`, it does not require the table to
be in the catalog — the scan node carries everything DataFusion needs:
the file paths, the schema, the projection, the file format, and the
target partition count.
The scan node's `schema` field is a `datafusion_common.Schema`, not an
Arrow `Schema`. Convert between the two with the helper in
`org.apache.datafusion.proto.SchemaConverter`:
```java
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.datafusion.proto.SchemaConverter;
Schema arrow = ctx.tableSchema("lineitem");
DatafusionCommon.Schema schemaProto = SchemaConverter.toProto(arrow);
```
The full example: register the file once to introspect its schema,
then build a plan that scans the same file, sorts by `l_orderkey`,
and fetches the first row. This is equivalent to
`SELECT l_orderkey FROM lineitem ORDER BY l_orderkey LIMIT 1`.
```java
import org.apache.datafusion.protobuf.BareTableReference;
import org.apache.datafusion.protobuf.ListingTableScanNode;
import org.apache.datafusion.protobuf.ProjectionColumns;
import org.apache.datafusion.protobuf.SortExprNode;
import org.apache.datafusion.protobuf.SortNode;
import org.apache.datafusion.protobuf.TableReference;
String path = "/path/to/lineitem.parquet";
try (var allocator = new RootAllocator();
SessionContext ctx = new SessionContext()) {
ctx.registerParquet("lineitem", path);
DatafusionCommon.Schema schemaProto =
SchemaConverter.toProto(ctx.tableSchema("lineitem"));
LogicalExprNode orderKeyCol =
LogicalExprNode.newBuilder()
.setColumn(DatafusionCommon.Column.newBuilder().setName("l_orderkey").build())
.build();
LogicalPlanNode plan =
LogicalPlanNode.newBuilder()
.setSort(
SortNode.newBuilder()
.setInput(
LogicalPlanNode.newBuilder()
.setListingScan(
ListingTableScanNode.newBuilder()
.setTableName(
TableReference.newBuilder()
.setBare(
BareTableReference.newBuilder()
.setTable("lineitem")
.build())
.build())
.addPaths(path)
.setFileExtension(".parquet")
.setSchema(schemaProto)
.setProjection(
ProjectionColumns.newBuilder()
.addColumns("l_orderkey")
.build())
.setParquet(
DatafusionCommon.ParquetFormat.getDefaultInstance())
.setTargetPartitions(1)
.build())
.build())
.addExpr(
SortExprNode.newBuilder()
.setExpr(orderKeyCol)
.setAsc(true)
.setNullsFirst(false)
.build())
.setFetch(1)
.build())
.build();
try (DataFrame df = ctx.fromProto(plan.toByteArray());
ArrowReader reader = df.collect(allocator)) {
reader.loadNextBatch();
// ...
}
}
```
## When to use proto plans
The `sql` and DataFrame APIs are the right choice for most workloads.
Reach for `fromProto` when you need one of:
- **Cross-tool interop.** Accept plans produced by another
DataFusion-based system (a planner, a scheduler, a query frontend).
- **Programmatic plan construction.** Build the plan node tree directly
instead of going through SQL parsing, useful for tools that compile
their own surface language to DataFusion.
- **Plan persistence.** Serialize a plan to bytes, store or transmit
it, and execute it later — possibly in a different process or on a
different machine.
## Schema conversion support
`SchemaConverter.toProto` and `SchemaConverter.fromProto` support the
primitive Arrow types this project's tests exercise: `Bool`, signed and
unsigned integer types 8 through 64 bits, `Float32`, `Float64`, `Utf8`,
`Utf8View`, `LargeUtf8`, `Date32`, and `Decimal128`. Anything else
raises `UnsupportedOperationException` naming the offending type.