blob: 3109518bf2b7c4632b7193e78fa5e3ccbec80bf9 [file] [view]
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Sql transform
In Apache Beam, you can use **SQL** transforms to query and manipulate your data within the pipeline using SQL-like syntax. This is particularly useful if you're already familiar with SQL and want to leverage that knowledge while processing data in your Apache Beam pipeline.
Apache Beam **SQL** is built on top of Calcite, an open-source SQL parser, and optimizer framework.
To use **SQL** transforms in Apache Beam, you'll need to perform the following steps:
* Create a `PCollection` of rows. Apache Beam performs SQL operations on `PCollection<Row>` objects. You'll need to convert your data into a `PCollection` of rows, which requires defining a schema that describes the structure of your data.
* Apply the SQL transform. Once you have your `PCollection<Row>`, you can apply a SQL transform using the `SqlTransform.query()` method. You'll need to provide the SQL query you want to execute on your data.
{{if (eq .Sdk "java")}}
Add the necessary dependencies to your project. For Java, you'll need to include the `beam-sdks-java-extensions-sql` dependency in your build configuration.
```
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
// Define the schema for your data
Schema schema = Schema.builder()
.addField("id", Schema.FieldType.INT32)
.addField("name", Schema.FieldType.STRING)
.build();
// Assume input is a PCollection<Row> with the above schema
PCollection<Row> input = pipeline.apply(Create.of(
Row.withSchema(schema).addValues(1, "Josh").build(),
Row.withSchema(schema).addValues(103, "Anna").build()
).withRowSchema(schema));
// Apply the SQL transform
PCollection<Row> result = input.apply(
SqlTransform.query("SELECT id, name FROM PCOLLECTION WHERE id > 100"));
```
{{end}}
{{if (eq .Sdk "python")}}
```
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
# Define a sample input data as a list of dictionaries
input_data = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'},
{'id': 101, 'name': 'Carol'},
{'id': 102, 'name': 'David'},
]
# Create a pipeline
with beam.Pipeline() as p:
# Read input data and convert it to a PCollection of Rows
input_rows = (
p | 'Create input data' >> beam.Create(input_data)
| 'Convert to Rows' >> beam.Map(lambda x: beam.Row(id=int(x['id']), name=str(x['name'])))
)
# Apply the SQL transform
filtered_rows = input_rows | SqlTransform("SELECT id, name FROM PCOLLECTION WHERE id > 100")
# Print the results
filtered_rows | 'Print results' >> beam.Map(print)
```
{{end}}
### Playground exercise
{{if (eq .Sdk "java")}}
You can use your own classes instead of `Schema`. Here we have created several customer. And we make a query using `SqlTransform` transformation:
```
@DefaultSchema(JavaBeanSchema.class)
public class Customer implements Serializable {
private int id;
private String name;
private String countryOfResidence;
public Customer(int id, String name, String countryOfResidence) {
this.id = id;
this.name = name;
this.countryOfResidence = countryOfResidence;
}
}
PCollection<Customer> customers = pipeline.apply(Create.of(
new Customer(1, "Foo", "Wonderland"),
new Customer(2, "Bar", "Super Kingdom"),
new Customer(3, "Baz", "Wonderland"),
new Customer(4, "Grault", "Wonderland"),
new Customer(5, "Qux", "Super Kingdom")));
PCollection<Row> customersFromWonderland =
customers.apply(
SqlTransform.query(
"SELECT id, name "
+ " FROM PCOLLECTION "
+ " WHERE countryOfResidence = 'Wonderland'"));
```
{{end}}
{{if (eq .Sdk "python")}}
You can use your `typing.NamedTuple` instead of `beam.Row`:
```
Purchase = typing.NamedTuple('Purchase',
[('item_name', unicode), ('price', float)])
coders.registry.register_coder(Purchase, coders.RowCoder)
with Pipeline() as p:
purchases = (p | beam.io...
| beam.Map(..).with_output_types(Purchase))
```
{{end}}