blob: fed9db79d1b274cfe9c91c34e616e8b25a035aca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.cookbook;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
/**
* This is an example that demonstrates several approaches to filtering, and use of the Mean
* transform. It shows how to dynamically set parameters by defining and using new pipeline options,
* and how to use a value derived by the pipeline.
*
* <p>Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
* input; approaches to filtering, selection, and projection.
*
* <p>The example reads public samples of weather data from BigQuery. It performs a
* projection on the data, finds the global mean of the temperature readings, filters on readings
* for a single given month, and then outputs only data (for that month) that has a mean temp
* smaller than the derived global mean.
*
* <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
* table.
*
* <p>To execute this pipeline locally, specify the BigQuery table for the output:
* <pre>{@code
* --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
* [--monthFilter=<month_number>]
* }
* </pre>
* where optional parameter {@code --monthFilter} is set to a number 1-12.
*
* <p>To change the runner, specify:
* <pre>{@code
* --runner=YOUR_SELECTED_RUNNER
* }
* </pre>
* See examples/java/README.md for instructions about how to configure different runners.
*
* <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
* and can be overridden with {@code --input}.
*/
public class FilterExamples {
// Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
private static final String WEATHER_SAMPLES_TABLE =
"clouddataflow-readonly:samples.weather_stations";
static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
static final int MONTH_TO_FILTER = 7;
/**
* Examines each row in the input table. Outputs only the subset of the cells this example
* is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
*/
static class ProjectionFn extends DoFn<TableRow, TableRow> {
@ProcessElement
public void processElement(ProcessContext c){
TableRow row = c.element();
// Grab year, month, day, mean_temp from the row
Integer year = Integer.parseInt((String) row.get("year"));
Integer month = Integer.parseInt((String) row.get("month"));
Integer day = Integer.parseInt((String) row.get("day"));
Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
// Prepares the data for writing to BigQuery by building a TableRow object
TableRow outRow = new TableRow()
.set("year", year).set("month", month)
.set("day", day).set("mean_temp", meanTemp);
c.output(outRow);
}
}
/**
* Implements 'filter' functionality.
*
* <p>Examines each row in the input table. Outputs only rows from the month
* monthFilter, which is passed in as a parameter during construction of this DoFn.
*/
static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
Integer monthFilter;
public FilterSingleMonthDataFn(Integer monthFilter) {
this.monthFilter = monthFilter;
}
@ProcessElement
public void processElement(ProcessContext c){
TableRow row = c.element();
Integer month;
month = (Integer) row.get("month");
if (month.equals(this.monthFilter)) {
c.output(row);
}
}
}
/**
* Examines each row (weather reading) in the input table. Output the temperature
* reading for that row ('mean_temp').
*/
static class ExtractTempFn extends DoFn<TableRow, Double> {
@ProcessElement
public void processElement(ProcessContext c){
TableRow row = c.element();
Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
c.output(meanTemp);
}
}
/**
* Finds the global mean of the mean_temp for each day/record, and outputs
* only data that has a mean temp larger than this global mean.
*/
static class BelowGlobalMean
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
Integer monthFilter;
public BelowGlobalMean(Integer monthFilter) {
this.monthFilter = monthFilter;
}
@Override
public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// Extract the mean_temp from each row.
PCollection<Double> meanTemps = rows.apply(
ParDo.of(new ExtractTempFn()));
// Find the global mean, of all the mean_temp readings in the weather data,
// and prepare this singleton PCollectionView for use as a side input.
final PCollectionView<Double> globalMeanTemp =
meanTemps.apply(Mean.<Double>globally())
.apply(View.<Double>asSingleton());
// Rows filtered to remove all but a single month
PCollection<TableRow> monthFilteredRows = rows
.apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
// Then, use the global mean as a side input, to further filter the weather data.
// By using a side input to pass in the filtering criteria, we can use a value
// that is computed earlier in pipeline execution.
// We'll only output readings with temperatures below this mean.
PCollection<TableRow> filteredRows = monthFilteredRows
.apply("ParseAndFilter", ParDo
.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
Double gTemp = c.sideInput(globalMeanTemp);
if (meanTemp < gTemp) {
c.output(c.element());
}
}
}).withSideInputs(globalMeanTemp));
return filteredRows;
}
}
/**
* Options supported by {@link FilterExamples}.
*
* <p>Inherits standard configuration options.
*/
private interface Options extends PipelineOptions {
@Description("Table to read from, specified as "
+ "<project_id>:<dataset_id>.<table_id>")
@Default.String(WEATHER_SAMPLES_TABLE)
String getInput();
void setInput(String value);
@Description("Table to write to, specified as "
+ "<project_id>:<dataset_id>.<table_id>. "
+ "The dataset_id must already exist")
@Validation.Required
String getOutput();
void setOutput(String value);
@Description("Numeric value of month to filter on")
@Default.Integer(MONTH_TO_FILTER)
Integer getMonthFilter();
void setMonthFilter(Integer value);
}
/**
* Helper method to build the table schema for the output table.
*/
private static TableSchema buildWeatherSchemaProjection() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
public static void main(String[] args)
throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
TableSchema schema = buildWeatherSchemaProjection();
p.apply(BigQueryIO.read().from(options.getInput()))
.apply(ParDo.of(new ProjectionFn()))
.apply(new BelowGlobalMean(options.getMonthFilter()))
.apply(BigQueryIO.writeTableRows()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
}
}