| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.cookbook; |
| |
| import com.google.api.services.bigquery.model.TableRow; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.Validation; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.join.CoGbkResult; |
| import org.apache.beam.sdk.transforms.join.CoGroupByKey; |
| import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TupleTag; |
| |
| /** |
| * This example shows how to do a join on two collections. |
| * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event |
| * 'action' country code against a table that maps country codes to country names. |
| * |
| * <p>Concepts: Join operation; multiple input sources. |
| * |
| * <p>To execute this pipeline locally, specify a local output file or output prefix on GCS: |
| * <pre>{@code |
| * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] |
| * }</pre> |
| * |
| * <p>To change the runner, specify: |
| * <pre>{@code |
| * --runner=YOUR_SELECTED_RUNNER |
| * } |
| * </pre> |
| * See examples/java/README.md for instructions about how to configure different runners. |
| */ |
| public class JoinExamples { |
| |
| // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. |
| private static final String GDELT_EVENTS_TABLE = |
| "clouddataflow-readonly:samples.gdelt_sample"; |
| // A table that maps country codes to country names. |
| private static final String COUNTRY_CODES = |
| "gdelt-bq:full.crosswalk_geocountrycodetohuman"; |
| |
| /** |
| * Join two collections, using country code as the key. |
| */ |
| static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, |
| PCollection<TableRow> countryCodes) throws Exception { |
| |
| final TupleTag<String> eventInfoTag = new TupleTag<String>(); |
| final TupleTag<String> countryInfoTag = new TupleTag<String>(); |
| |
| // transform both input collections to tuple collections, where the keys are country |
| // codes in both cases. |
| PCollection<KV<String, String>> eventInfo = eventsTable.apply( |
| ParDo.of(new ExtractEventDataFn())); |
| PCollection<KV<String, String>> countryInfo = countryCodes.apply( |
| ParDo.of(new ExtractCountryInfoFn())); |
| |
| // country code 'key' -> CGBKR (<event info>, <country name>) |
| PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple |
| .of(eventInfoTag, eventInfo) |
| .and(countryInfoTag, countryInfo) |
| .apply(CoGroupByKey.<String>create()); |
| |
| // Process the CoGbkResult elements generated by the CoGroupByKey transform. |
| // country code 'key' -> string of <event info>, <country name> |
| PCollection<KV<String, String>> finalResultCollection = |
| kvpCollection.apply("Process", ParDo.of( |
| new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| KV<String, CoGbkResult> e = c.element(); |
| String countryCode = e.getKey(); |
| String countryName = "none"; |
| countryName = e.getValue().getOnly(countryInfoTag); |
| for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { |
| // Generate a string that combines information from both collection values |
| c.output(KV.of(countryCode, "Country name: " + countryName |
| + ", Event info: " + eventInfo)); |
| } |
| } |
| })); |
| |
| // write to GCS |
| PCollection<String> formattedResults = finalResultCollection |
| .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| String outputstring = "Country code: " + c.element().getKey() |
| + ", " + c.element().getValue(); |
| c.output(outputstring); |
| } |
| })); |
| return formattedResults; |
| } |
| |
| /** |
| * Examines each row (event) in the input table. Output a KV with the key the country |
| * code of the event, and the value a string encoding event information. |
| */ |
| static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| TableRow row = c.element(); |
| String countryCode = (String) row.get("ActionGeo_CountryCode"); |
| String sqlDate = (String) row.get("SQLDATE"); |
| String actor1Name = (String) row.get("Actor1Name"); |
| String sourceUrl = (String) row.get("SOURCEURL"); |
| String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; |
| c.output(KV.of(countryCode, eventInfo)); |
| } |
| } |
| |
| |
| /** |
| * Examines each row (country info) in the input table. Output a KV with the key the country |
| * code, and the value the country name. |
| */ |
| static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| TableRow row = c.element(); |
| String countryCode = (String) row.get("FIPSCC"); |
| String countryName = (String) row.get("HumanName"); |
| c.output(KV.of(countryCode, countryName)); |
| } |
| } |
| |
| |
| /** |
| * Options supported by {@link JoinExamples}. |
| * |
| * <p>Inherits standard configuration options. |
| */ |
| private interface Options extends PipelineOptions { |
| @Description("Path of the file to write to") |
| @Validation.Required |
| String getOutput(); |
| void setOutput(String value); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); |
| Pipeline p = Pipeline.create(options); |
| // the following two 'applys' create multiple inputs to our pipeline, one for each |
| // of our two input sources. |
| PCollection<TableRow> eventsTable = p.apply( |
| BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE)); |
| PCollection<TableRow> countryCodes = p.apply( |
| BigQueryIO.readTableRows().from(COUNTRY_CODES)); |
| PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); |
| formattedResults.apply(TextIO.write().to(options.getOutput())); |
| p.run().waitUntilFinish(); |
| } |
| |
| } |