| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.kotlin.cookbook |
| |
| import com.google.api.services.bigquery.model.TableFieldSchema |
| import com.google.api.services.bigquery.model.TableRow |
| import com.google.api.services.bigquery.model.TableSchema |
| import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions |
| import org.apache.beam.sdk.Pipeline |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method |
| import org.apache.beam.sdk.io.gcp.bigquery.WriteResult |
| import org.apache.beam.sdk.options.* |
| import org.apache.beam.sdk.transforms.Count |
| import org.apache.beam.sdk.transforms.DoFn |
| import org.apache.beam.sdk.transforms.PTransform |
| import org.apache.beam.sdk.transforms.ParDo |
| import org.apache.beam.sdk.values.KV |
| import org.apache.beam.sdk.values.PCollection |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists |
| |
| |
| /** |
| * An example that reads the public samples of weather data from BigQuery, counts the number of |
| * tornadoes that occur in each month, and writes the results to BigQuery. |
| * |
| * |
| * Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms |
| * |
| * |
| * Note: Before running this example, you must create a BigQuery dataset to contain your output |
| * table. |
| * |
| * |
| * To execute this pipeline locally, specify the BigQuery table for the output with the form: |
| * |
| * <pre>`--output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID |
| `</pre> * |
| * |
| * |
| * To change the runner, specify: |
| * |
| * <pre>`--runner=YOUR_SELECTED_RUNNER |
| `</pre> * |
| * |
| * See examples/java/README.md for instructions about how to configure different runners. |
| * |
| * |
| * The BigQuery input table defaults to `clouddataflow-readonly:samples.weather_stations` |
| * and can be overridden with `--input`. |
| */ |
| object BigQueryTornadoes { |
| // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod. |
| private const val WEATHER_SAMPLES_TABLE = "clouddataflow-readonly:samples.weather_stations" |
| |
| /** |
| * Examines each row in the input table. If a tornado was recorded in that sample, the month in |
| * which it occurred is output. |
| */ |
| internal class ExtractTornadoesFn : DoFn<TableRow, Int>() { |
| @ProcessElement |
| fun processElement(c: ProcessContext) { |
| val row = c.element() |
| if (row["tornado"] as Boolean) { |
| c.output(Integer.parseInt(row["month"] as String)) |
| } |
| } |
| } |
| |
| /** |
| * Prepares the data for writing to BigQuery by building a TableRow object containing an integer |
| * representation of month and the number of tornadoes that occurred in each month. |
| */ |
| internal class FormatCountsFn : DoFn<KV<Int, Long>, TableRow>() { |
| @ProcessElement |
| fun processElement(c: ProcessContext) { |
| val row = TableRow() |
| .set("month", c.element().getKey()) |
| .set("tornado_count", c.element().getValue()) |
| c.output(row) |
| } |
| } |
| |
| /** |
| * Takes rows from a table and generates a table of counts. |
| * |
| * |
| * The input schema is described by https://developers.google.com/bigquery/docs/dataset-gsod . |
| * The output contains the total number of tornadoes found in each month in the following schema: |
| * |
| * |
| * * month: integer |
| * * tornado_count: integer |
| * |
| */ |
| internal class CountTornadoes : PTransform<PCollection<TableRow>, PCollection<TableRow>>() { |
| override fun expand(rows: PCollection<TableRow>): PCollection<TableRow> { |
| |
| // row... => month... |
| val tornadoes = rows.apply(ParDo.of(ExtractTornadoesFn())) |
| |
| // month... => <month,count>... |
| val tornadoCounts = tornadoes.apply(Count.perElement()) |
| |
| // <month,count>... => row... |
| |
| return tornadoCounts.apply(ParDo.of(FormatCountsFn())) |
| } |
| } |
| |
| /** |
| * Options supported by [BigQueryTornadoes]. |
| * |
| * |
| * Inherits standard configuration options. |
| */ |
| interface Options : PipelineOptions { |
| @get:Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>") |
| @get:Default.String(WEATHER_SAMPLES_TABLE) |
| var input: String |
| |
| @get:Description("Mode to use when reading from BigQuery") |
| @get:Default.Enum("EXPORT") |
| var readMethod: Method |
| |
| @get:Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset must already exist.") |
| @get:Validation.Required |
| var output: String |
| } |
| |
| private fun runBigQueryTornadoes(options: Options) { |
| val p = Pipeline.create(options) |
| |
| // Build the table schema for the output table. |
| val fields = arrayListOf<TableFieldSchema>( |
| TableFieldSchema().setName("month").setType("INTEGER"), |
| TableFieldSchema().setName("tornado_count").setType("INTEGER") |
| ) |
| |
| val schema = TableSchema().setFields(fields) |
| |
| val rowsFromBigQuery: PCollection<TableRow> |
| |
| if (options.readMethod == Method.DIRECT_READ) { |
| // Build the read options proto for the read operation. |
| val tableReadOptions = TableReadOptions.newBuilder() |
| .addAllSelectedFields(Lists.newArrayList("month", "tornado")) |
| .build() |
| |
| rowsFromBigQuery = p.apply( |
| BigQueryIO.readTableRows() |
| .from(options.input) |
| .withMethod(Method.DIRECT_READ) |
| .withReadOptions(tableReadOptions)) |
| } else { |
| rowsFromBigQuery = p.apply( |
| BigQueryIO.readTableRows() |
| .from(options.input) |
| .withMethod(options.readMethod)) |
| } |
| |
| rowsFromBigQuery |
| .apply(CountTornadoes()) |
| .apply<WriteResult>( |
| BigQueryIO.writeTableRows() |
| .to(options.output) |
| .withSchema(schema) |
| .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)) |
| |
| p.run().waitUntilFinish() |
| } |
| |
| @JvmStatic |
| fun main(args: Array<String>) { |
| val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options |
| |
| runBigQueryTornadoes(options) |
| } |
| } |