| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.snippets; |
| |
| import com.google.api.services.bigquery.model.TableFieldSchema; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.api.services.bigquery.model.TimePartitioning; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.coders.AvroCoder; |
| import org.apache.beam.sdk.coders.DefaultCoder; |
| import org.apache.beam.sdk.coders.DoubleCoder; |
| import org.apache.beam.sdk.io.Compression; |
| import org.apache.beam.sdk.io.FileIO; |
| import org.apache.beam.sdk.io.GenerateSequence; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; |
| import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; |
| import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; |
| import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.Sum; |
| import org.apache.beam.sdk.transforms.View; |
| import org.apache.beam.sdk.transforms.Watch; |
| import org.apache.beam.sdk.transforms.join.CoGbkResult; |
| import org.apache.beam.sdk.transforms.join.CoGroupByKey; |
| import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; |
| import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindows; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.sdk.values.ValueInSingleWindow; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.joda.time.format.DateTimeFormat; |
| import org.joda.time.format.DateTimeFormatter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Code snippets used in webdocs. */ |
| public class Snippets { |
| |
| @DefaultCoder(AvroCoder.class) |
| static class Quote { |
| final String source; |
| final String quote; |
| |
| public Quote() { |
| this.source = ""; |
| this.quote = ""; |
| } |
| |
| public Quote(String source, String quote) { |
| this.source = source; |
| this.quote = quote; |
| } |
| } |
| |
| @DefaultCoder(AvroCoder.class) |
| static class WeatherData { |
| final long year; |
| final long month; |
| final long day; |
| final double maxTemp; |
| |
| public WeatherData() { |
| this.year = 0; |
| this.month = 0; |
| this.day = 0; |
| this.maxTemp = 0.0f; |
| } |
| |
| public WeatherData(long year, long month, long day, double maxTemp) { |
| this.year = year; |
| this.month = month; |
| this.day = day; |
| this.maxTemp = maxTemp; |
| } |
| } |
| |
| /** Using a Read and Write transform to read/write from/to BigQuery. */ |
| public static void modelBigQueryIO(Pipeline p) { |
| modelBigQueryIO(p, "", "", ""); |
| } |
| |
| public static void modelBigQueryIO( |
| Pipeline p, String writeProject, String writeDataset, String writeTable) { |
| { |
| // [START BigQueryTableSpec] |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [END BigQueryTableSpec] |
| } |
| |
| { |
| // [START BigQueryTableSpecWithoutProject] |
| String tableSpec = "samples.weather_stations"; |
| // [END BigQueryTableSpecWithoutProject] |
| } |
| |
| { |
| // [START BigQueryTableSpecObject] |
| TableReference tableSpec = |
| new TableReference() |
| .setProjectId("clouddataflow-readonly") |
| .setDatasetId("samples") |
| .setTableId("weather_stations"); |
| // [END BigQueryTableSpecObject] |
| } |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [START BigQueryReadTable] |
| PCollection<Double> maxTemperatures = |
| p.apply(BigQueryIO.readTableRows().from(tableSpec)) |
| // Each row is of type TableRow |
| .apply( |
| MapElements.into(TypeDescriptors.doubles()) |
| .via((TableRow row) -> (Double) row.get("max_temperature"))); |
| // [END BigQueryReadTable] |
| } |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [START BigQueryReadFunction] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .from(tableSpec) |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadFunction] |
| } |
| |
| { |
| // [START BigQueryReadQuery] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .fromQuery( |
| "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]") |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadQuery] |
| } |
| |
| { |
| // [START BigQueryReadQueryStdSQL] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .fromQuery( |
| "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") |
| .usingStandardSql() |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadQueryStdSQL] |
| } |
| |
| // [START BigQuerySchemaJson] |
| String tableSchemaJson = |
| "" |
| + "{" |
| + " \"fields\": [" |
| + " {" |
| + " \"name\": \"source\"," |
| + " \"type\": \"STRING\"," |
| + " \"mode\": \"NULLABLE\"" |
| + " }," |
| + " {" |
| + " \"name\": \"quote\"," |
| + " \"type\": \"STRING\"," |
| + " \"mode\": \"REQUIRED\"" |
| + " }" |
| + " ]" |
| + "}"; |
| // [END BigQuerySchemaJson] |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| if (!writeProject.isEmpty() && !writeDataset.isEmpty() && !writeTable.isEmpty()) { |
| tableSpec = writeProject + ":" + writeDataset + "." + writeTable; |
| } |
| |
| // [START BigQuerySchemaObject] |
| TableSchema tableSchema = |
| new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema() |
| .setName("source") |
| .setType("STRING") |
| .setMode("NULLABLE"), |
| new TableFieldSchema() |
| .setName("quote") |
| .setType("STRING") |
| .setMode("REQUIRED"))); |
| // [END BigQuerySchemaObject] |
| |
| // [START BigQueryWriteInput] |
| /* |
| @DefaultCoder(AvroCoder.class) |
| static class Quote { |
| final String source; |
| final String quote; |
| |
| public Quote() { |
| this.source = ""; |
| this.quote = ""; |
| } |
| public Quote(String source, String quote) { |
| this.source = source; |
| this.quote = quote; |
| } |
| } |
| */ |
| |
| PCollection<Quote> quotes = |
| p.apply( |
| Create.of( |
| new Quote("Mahatma Gandhi", "My life is my message."), |
| new Quote("Yoda", "Do, or do not. There is no 'try'."))); |
| // [END BigQueryWriteInput] |
| |
| // [START BigQueryWriteTable] |
| quotes |
| .apply( |
| MapElements.into(TypeDescriptor.of(TableRow.class)) |
| .via( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote))) |
| .apply( |
| BigQueryIO.writeTableRows() |
| .to(tableSpec) |
| .withSchema(tableSchema) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteTable] |
| |
| // [START BigQueryWriteFunction] |
| quotes.apply( |
| BigQueryIO.<Quote>write() |
| .to(tableSpec) |
| .withSchema(tableSchema) |
| .withFormatFunction( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteFunction] |
| |
| // [START BigQueryWriteJsonSchema] |
| quotes.apply( |
| BigQueryIO.<Quote>write() |
| .to(tableSpec) |
| .withJsonSchema(tableSchemaJson) |
| .withFormatFunction( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteJsonSchema] |
| } |
| |
| { |
| // [START BigQueryWriteDynamicDestinations] |
| /* |
| @DefaultCoder(AvroCoder.class) |
| static class WeatherData { |
| final long year; |
| final long month; |
| final long day; |
| final double maxTemp; |
| |
| public WeatherData() { |
| this.year = 0; |
| this.month = 0; |
| this.day = 0; |
| this.maxTemp = 0.0f; |
| } |
| public WeatherData(long year, long month, long day, double maxTemp) { |
| this.year = year; |
| this.month = month; |
| this.day = day; |
| this.maxTemp = maxTemp; |
| } |
| } |
| */ |
| |
| PCollection<WeatherData> weatherData = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> { |
| GenericRecord record = elem.getRecord(); |
| return new WeatherData( |
| (Long) record.get("year"), |
| (Long) record.get("month"), |
| (Long) record.get("day"), |
| (Double) record.get("max_temperature")); |
| }) |
| .fromQuery( |
| "SELECT year, month, day, max_temperature " |
| + "FROM [clouddataflow-readonly:samples.weather_stations] " |
| + "WHERE year BETWEEN 2007 AND 2009") |
| .withCoder(AvroCoder.of(WeatherData.class))); |
| |
| // We will send the weather data into different tables for every year. |
| weatherData.apply( |
| BigQueryIO.<WeatherData>write() |
| .to( |
| new DynamicDestinations<WeatherData, Long>() { |
| @Override |
| public Long getDestination(ValueInSingleWindow<WeatherData> elem) { |
| return elem.getValue().year; |
| } |
| |
| @Override |
| public TableDestination getTable(Long destination) { |
| return new TableDestination( |
| new TableReference() |
| .setProjectId(writeProject) |
| .setDatasetId(writeDataset) |
| .setTableId(writeTable + "_" + destination), |
| "Table for year " + destination); |
| } |
| |
| @Override |
| public TableSchema getSchema(Long destination) { |
| return new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema() |
| .setName("year") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("month") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("day") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("maxTemp") |
| .setType("FLOAT") |
| .setMode("NULLABLE"))); |
| } |
| }) |
| .withFormatFunction( |
| (WeatherData elem) -> |
| new TableRow() |
| .set("year", elem.year) |
| .set("month", elem.month) |
| .set("day", elem.day) |
| .set("maxTemp", elem.maxTemp)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteDynamicDestinations] |
| |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| if (!writeProject.isEmpty() && !writeDataset.isEmpty() && !writeTable.isEmpty()) { |
| tableSpec = writeProject + ":" + writeDataset + "." + writeTable + "_partitioning"; |
| } |
| |
| TableSchema tableSchema = |
| new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema().setName("year").setType("INTEGER").setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("month") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema().setName("day").setType("INTEGER").setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("maxTemp") |
| .setType("FLOAT") |
| .setMode("NULLABLE"))); |
| |
| // [START BigQueryTimePartitioning] |
| weatherData.apply( |
| BigQueryIO.<WeatherData>write() |
| .to(tableSpec + "_partitioning") |
| .withSchema(tableSchema) |
| .withFormatFunction( |
| (WeatherData elem) -> |
| new TableRow() |
| .set("year", elem.year) |
| .set("month", elem.month) |
| .set("day", elem.day) |
| .set("maxTemp", elem.maxTemp)) |
| // NOTE: an existing table without time partitioning set up will not work |
| .withTimePartitioning(new TimePartitioning().setType("DAY")) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryTimePartitioning] |
| } |
| } |
| |
| /** Helper function to format results in coGroupByKeyTuple. */ |
| public static String formatCoGbkResults( |
| String name, Iterable<String> emails, Iterable<String> phones) { |
| |
| List<String> emailsList = new ArrayList<>(); |
| for (String elem : emails) { |
| emailsList.add("'" + elem + "'"); |
| } |
| Collections.sort(emailsList); |
| String emailsStr = "[" + String.join(", ", emailsList) + "]"; |
| |
| List<String> phonesList = new ArrayList<>(); |
| for (String elem : phones) { |
| phonesList.add("'" + elem + "'"); |
| } |
| Collections.sort(phonesList); |
| String phonesStr = "[" + String.join(", ", phonesList) + "]"; |
| |
| return name + "; " + emailsStr + "; " + phonesStr; |
| } |
| |
| /** Using a CoGroupByKey transform. */ |
| public static PCollection<String> coGroupByKeyTuple( |
| TupleTag<String> emailsTag, |
| TupleTag<String> phonesTag, |
| PCollection<KV<String, String>> emails, |
| PCollection<KV<String, String>> phones) { |
| |
| // [START CoGroupByKeyTuple] |
| PCollection<KV<String, CoGbkResult>> results = |
| KeyedPCollectionTuple.of(emailsTag, emails) |
| .and(phonesTag, phones) |
| .apply(CoGroupByKey.create()); |
| |
| PCollection<String> contactLines = |
| results.apply( |
| ParDo.of( |
| new DoFn<KV<String, CoGbkResult>, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| KV<String, CoGbkResult> e = c.element(); |
| String name = e.getKey(); |
| Iterable<String> emailsIter = e.getValue().getAll(emailsTag); |
| Iterable<String> phonesIter = e.getValue().getAll(phonesTag); |
| String formattedResult = |
| Snippets.formatCoGbkResults(name, emailsIter, phonesIter); |
| c.output(formattedResult); |
| } |
| })); |
| // [END CoGroupByKeyTuple] |
| return contactLines; |
| } |
| |
| public static void fileProcessPattern() throws Exception { |
| Pipeline p = Pipeline.create(); |
| |
| // [START FileProcessPatternProcessNewFilesSnip1] |
| // This produces PCollection<MatchResult.Metadata> |
| p.apply( |
| FileIO.match() |
| .filepattern("...") |
| .continuously( |
| Duration.standardSeconds(30), |
| Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); |
| // [END FileProcessPatternProcessNewFilesSnip1] |
| |
| // [START FileProcessPatternProcessNewFilesSnip2] |
| // This produces PCollection<String> |
| p.apply( |
| TextIO.read() |
| .from("<path-to-files>/*") |
| .watchForNewFiles( |
| // Check for new files every minute |
| Duration.standardMinutes(1), |
| // Stop watching the filepattern if no new files appear within an hour |
| Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); |
| // [END FileProcessPatternProcessNewFilesSnip2] |
| |
| // [START FileProcessPatternAccessMetadataSnip1] |
| p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) |
| // withCompression can be omitted - by default compression is detected from the filename. |
| .apply(FileIO.readMatches().withCompression(Compression.GZIP)) |
| .apply( |
| ParDo.of( |
| new DoFn<FileIO.ReadableFile, String>() { |
| @ProcessElement |
| public void process(@Element FileIO.ReadableFile file) { |
| // We now have access to the file and its metadata |
| LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId()); |
| } |
| })); |
| // [END FileProcessPatternAccessMetadataSnip1] |
| |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(Snippets.class); |
| |
| // [START SideInputPatternSlowUpdateGlobalWindowSnip1] |
| public static void sideInputPatterns() { |
| // Using View.asSingleton, this pipeline uses a dummy external service as illustration. |
| // Run in debug mode to see the output |
| Pipeline p = Pipeline.create(); |
| |
| // Create slowly updating sideinput |
| |
| PCollectionView<Map<String, String>> map = |
| p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) |
| .apply( |
| Window.<Long>into(new GlobalWindows()) |
| .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) |
| .discardingFiredPanes()) |
| .apply( |
| ParDo.of( |
| new DoFn<Long, Map<String, String>>() { |
| |
| @ProcessElement |
| public void process( |
| @Element Long input, OutputReceiver<Map<String, String>> o) { |
| // Do any external reads needed here... |
| // We will make use of our dummy external service. |
| // Every time this triggers, the complete map will be replaced with that |
| // read from |
| // the service. |
| o.output(DummyExternalService.readDummyData()); |
| } |
| })) |
| .apply(View.asSingleton()); |
| |
| // ---- Consume slowly updating sideinput |
| |
| // GenerateSequence is only used here to generate dummy data for this illustration. |
| // You would use your real source for example PubSubIO, KafkaIO etc... |
| p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L))) |
| .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) |
| .apply(Sum.longsGlobally().withoutDefaults()) |
| .apply( |
| ParDo.of( |
| new DoFn<Long, KV<Long, Long>>() { |
| |
| @ProcessElement |
| public void process(ProcessContext c) { |
| Map<String, String> keyMap = c.sideInput(map); |
| c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now()); |
| |
| LOG.debug( |
| "Value is {} key A is {} and key B is {}", |
| c.element(), |
| keyMap.get("Key_A"), |
| keyMap.get("Key_B")); |
| } |
| }) |
| .withSideInputs(map)); |
| } |
| |
| /** Dummy class representing a pretend external service. */ |
| public static class DummyExternalService { |
| |
| public static Map<String, String> readDummyData() { |
| |
| Map<String, String> map = new HashMap<>(); |
| Instant now = Instant.now(); |
| |
| DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS"); |
| |
| map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf)); |
| map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString()); |
| |
| return map; |
| } |
| } |
| |
| // [END SideInputPatternSlowUpdateGlobalWindowSnip1] |
| |
| } |