blob: ccd0950b7c552c785c3f9a1cfcd106724b31b5af [file] [log] [blame] [view]
Prompt:
Write a sample Java code snippet that writes data to a Google BigQuery table using Apache Beam.
Response:
Your Apache Beam pipeline can write data to a BigQuery table using the Apache Beam BigQueryIO connector.
Here is an example of how to use the BigQueryIO connector to accomplish this:
```java
package bigquery;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Pipeline to write data to a BigQuery table using Apache Beam
public class WriteBigQueryTable {
private static final Logger LOG = LoggerFactory.getLogger(WriteBigQueryTable.class);
// Pipeline options to configure the pipeline
public interface WriteBigQueryTableOptions extends PipelineOptions {
@Description("Table name in BigQuery to write to")
String getTableName();
void setTableName(String value);
}
public static void main(String[] args) {
// Parse the pipeline options from the command line
WriteBigQueryTableOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteBigQueryTableOptions.class);
// Sample data to write to the BigQuery table
final List<String> elements = Arrays.asList(
"1, January, $1000",
"2, February, $2000",
"3, March, $3000"
);
// Define the schema for the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("STRING"));
fields.add(new TableFieldSchema().setName("amount").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
// Create a pipeline using the Apache Beam SDK
Pipeline p = Pipeline.create(options);
p
// Create a list of strings to write to the BigQuery table
.apply(Create.of(elements))
// Convert the strings to TableRow objects
.apply("to BigQuery TableRow", ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] columns = c.element().split(", ");
// Create a TableRow object for each row in the input string
TableRow row = new TableRow();
// Set the values for each column in the TableRow object
row.set("id", columns[0]);
row.set("month", columns[1]);
row.set("amount", columns[2]);
// Emit the TableRow object to the output PCollection
c.output(row);
}
}))
// Write the data to the BigQuery table using the BigQueryIO connector
.apply(BigQueryIO.writeTableRows()
.withSchema(schema)
// Set the table name to write to
.to(options.getTableName())
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(1000)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
// Execute the pipeline
p.run();
}
}
```
This code snippet utilizes the pipeline options pattern to parse command-line arguments.