In Apache Beam, you can use SQL transforms to query and manipulate your data within the pipeline using SQL-like syntax. This is particularly useful if you're already familiar with SQL and want to leverage that knowledge while processing data in your Apache Beam pipeline.
Apache Beam SQL is built on top of Calcite, an open-source SQL parser, and optimizer framework.
To use SQL transforms in Apache Beam, you'll need to perform the following steps:
Create a PCollection of rows. Apache Beam performs SQL operations on PCollection<Row> objects. You'll need to convert your data into a PCollection of rows, which requires defining a schema that describes the structure of your data.
Apply the SQL transform. Once you have your PCollection<Row>, you can apply a SQL transform using the SqlTransform.query() method. You'll need to provide the SQL query you want to execute on your data.
{{if (eq .Sdk “java”)}} Add the necessary dependencies to your project. For Java, you'll need to include the beam-sdks-java-extensions-sql dependency in your build configuration.
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
// Define the schema for your data
Schema schema = Schema.builder()
.addField("id", Schema.FieldType.INT32)
.addField("name", Schema.FieldType.STRING)
.build();
// Assume input is a PCollection<Row> with the above schema
PCollection<Row> input = pipeline.apply(Create.of(
Row.withSchema(schema).addValues(1, "Josh").build(),
Row.withSchema(schema).addValues(103, "Anna").build()
).withRowSchema(schema));
// Apply the SQL transform
PCollection<Row> result = input.apply(
SqlTransform.query("SELECT id, name FROM PCOLLECTION WHERE id > 100"));
{{end}}
{{if (eq .Sdk “python”)}}
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
# Define a sample input data as a list of dictionaries
input_data = [
{'id': 1, 'name': 'Alice'},
{'id': 2, 'name': 'Bob'},
{'id': 101, 'name': 'Carol'},
{'id': 102, 'name': 'David'},
]
# Create a pipeline
with beam.Pipeline() as p:
# Read input data and convert it to a PCollection of Rows
input_rows = (
p | 'Create input data' >> beam.Create(input_data)
| 'Convert to Rows' >> beam.Map(lambda x: beam.Row(id=int(x['id']), name=str(x['name'])))
)
# Apply the SQL transform
filtered_rows = input_rows | SqlTransform("SELECT id, name FROM PCOLLECTION WHERE id > 100")
# Print the results
filtered_rows | 'Print results' >> beam.Map(print)
{{end}}
{{if (eq .Sdk “java”)}} You can use your own classes instead of Schema. Here we have created several customer. And we make a query using SqlTransform transformation:
@DefaultSchema(JavaBeanSchema.class)
public class Customer implements Serializable {
private int id;
private String name;
private String countryOfResidence;
public Customer(int id, String name, String countryOfResidence) {
this.id = id;
this.name = name;
this.countryOfResidence = countryOfResidence;
}
}
PCollection<Customer> customers = pipeline.apply(Create.of(
new Customer(1, "Foo", "Wonderland"),
new Customer(2, "Bar", "Super Kingdom"),
new Customer(3, "Baz", "Wonderland"),
new Customer(4, "Grault", "Wonderland"),
new Customer(5, "Qux", "Super Kingdom")));
PCollection<Row> customersFromWonderland =
customers.apply(
SqlTransform.query(
"SELECT id, name "
+ " FROM PCOLLECTION "
+ " WHERE countryOfResidence = 'Wonderland'"));
{{end}}
{{if (eq .Sdk “python”)}} You can use your typing.NamedTuple instead of beam.Row:
Purchase = typing.NamedTuple('Purchase',
[('item_name', unicode), ('price', float)])
coders.registry.register_coder(Purchase, coders.RowCoder)
with Pipeline() as p:
purchases = (p | beam.io...
| beam.Map(..).with_output_types(Purchase))
{{end}}