| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.snippets; |
| |
| import com.google.api.services.bigquery.model.TableFieldSchema; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.api.services.bigquery.model.TimePartitioning; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Base64; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.coders.AvroCoder; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.DefaultCoder; |
| import org.apache.beam.sdk.coders.DoubleCoder; |
| import org.apache.beam.sdk.io.Compression; |
| import org.apache.beam.sdk.io.FileIO; |
| import org.apache.beam.sdk.io.GenerateSequence; |
| import org.apache.beam.sdk.io.TextIO; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; |
| import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; |
| import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; |
| import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; |
| import org.apache.beam.sdk.options.Default; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.Sum; |
| import org.apache.beam.sdk.transforms.View; |
| import org.apache.beam.sdk.transforms.Watch; |
| import org.apache.beam.sdk.transforms.join.CoGbkResult; |
| import org.apache.beam.sdk.transforms.join.CoGroupByKey; |
| import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; |
| import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindows; |
| import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.transforms.windowing.WindowFn; |
| import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext; |
| import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.sdk.values.ValueInSingleWindow; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.joda.time.format.DateTimeFormat; |
| import org.joda.time.format.DateTimeFormatter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Code snippets used in webdocs. */ |
| public class Snippets { |
| |
| @DefaultCoder(AvroCoder.class) |
| static class Quote { |
| final String source; |
| final String quote; |
| |
| public Quote() { |
| this.source = ""; |
| this.quote = ""; |
| } |
| |
| public Quote(String source, String quote) { |
| this.source = source; |
| this.quote = quote; |
| } |
| } |
| |
| @DefaultCoder(AvroCoder.class) |
| static class WeatherData { |
| final long year; |
| final long month; |
| final long day; |
| final double maxTemp; |
| |
| public WeatherData() { |
| this.year = 0; |
| this.month = 0; |
| this.day = 0; |
| this.maxTemp = 0.0f; |
| } |
| |
| public WeatherData(long year, long month, long day, double maxTemp) { |
| this.year = year; |
| this.month = month; |
| this.day = day; |
| this.maxTemp = maxTemp; |
| } |
| } |
| |
| /** Using a Read and Write transform to read/write from/to BigQuery. */ |
| public static void modelBigQueryIO(Pipeline p) { |
| modelBigQueryIO(p, "", "", ""); |
| } |
| |
| public static void modelBigQueryIO( |
| Pipeline p, String writeProject, String writeDataset, String writeTable) { |
| { |
| // [START BigQueryTableSpec] |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [END BigQueryTableSpec] |
| } |
| |
| { |
| // [START BigQueryTableSpecWithoutProject] |
| String tableSpec = "samples.weather_stations"; |
| // [END BigQueryTableSpecWithoutProject] |
| } |
| |
| { |
| // [START BigQueryTableSpecObject] |
| TableReference tableSpec = |
| new TableReference() |
| .setProjectId("clouddataflow-readonly") |
| .setDatasetId("samples") |
| .setTableId("weather_stations"); |
| // [END BigQueryTableSpecObject] |
| } |
| |
| { |
| // [START BigQueryDataTypes] |
| TableRow row = new TableRow(); |
| row.set("string", "abc"); |
| byte[] rawbytes = {(byte) 0xab, (byte) 0xac}; |
| row.set("bytes", new String(Base64.getEncoder().encodeToString(rawbytes))); |
| row.set("integer", 5); |
| row.set("float", 0.5); |
| row.set("numeric", 5); |
| row.set("boolean", true); |
| row.set("timestamp", "2018-12-31 12:44:31.744957 UTC"); |
| row.set("date", "2018-12-31"); |
| row.set("time", "12:44:31"); |
| row.set("datetime", "2019-06-11T14:44:31"); |
| row.set("geography", "POINT(30 10)"); |
| // [END BigQueryDataTypes] |
| } |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [START BigQueryReadTable] |
| PCollection<Double> maxTemperatures = |
| p.apply(BigQueryIO.readTableRows().from(tableSpec)) |
| // Each row is of type TableRow |
| .apply( |
| MapElements.into(TypeDescriptors.doubles()) |
| .via((TableRow row) -> (Double) row.get("max_temperature"))); |
| // [END BigQueryReadTable] |
| } |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| // [START BigQueryReadFunction] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .from(tableSpec) |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadFunction] |
| } |
| |
| { |
| // [START BigQueryReadQuery] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .fromQuery( |
| "SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]") |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadQuery] |
| } |
| |
| { |
| // [START BigQueryReadQueryStdSQL] |
| PCollection<Double> maxTemperatures = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) |
| .fromQuery( |
| "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`") |
| .usingStandardSql() |
| .withCoder(DoubleCoder.of())); |
| // [END BigQueryReadQueryStdSQL] |
| } |
| |
| // [START BigQuerySchemaJson] |
| String tableSchemaJson = |
| "" |
| + "{" |
| + " \"fields\": [" |
| + " {" |
| + " \"name\": \"source\"," |
| + " \"type\": \"STRING\"," |
| + " \"mode\": \"NULLABLE\"" |
| + " }," |
| + " {" |
| + " \"name\": \"quote\"," |
| + " \"type\": \"STRING\"," |
| + " \"mode\": \"REQUIRED\"" |
| + " }" |
| + " ]" |
| + "}"; |
| // [END BigQuerySchemaJson] |
| |
| { |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| if (!writeProject.isEmpty() && !writeDataset.isEmpty() && !writeTable.isEmpty()) { |
| tableSpec = writeProject + ":" + writeDataset + "." + writeTable; |
| } |
| |
| // [START BigQuerySchemaObject] |
| TableSchema tableSchema = |
| new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema() |
| .setName("source") |
| .setType("STRING") |
| .setMode("NULLABLE"), |
| new TableFieldSchema() |
| .setName("quote") |
| .setType("STRING") |
| .setMode("REQUIRED"))); |
| // [END BigQuerySchemaObject] |
| |
| // [START BigQueryWriteInput] |
| /* |
| @DefaultCoder(AvroCoder.class) |
| static class Quote { |
| final String source; |
| final String quote; |
| |
| public Quote() { |
| this.source = ""; |
| this.quote = ""; |
| } |
| public Quote(String source, String quote) { |
| this.source = source; |
| this.quote = quote; |
| } |
| } |
| */ |
| |
| PCollection<Quote> quotes = |
| p.apply( |
| Create.of( |
| new Quote("Mahatma Gandhi", "My life is my message."), |
| new Quote("Yoda", "Do, or do not. There is no 'try'."))); |
| // [END BigQueryWriteInput] |
| |
| // [START BigQueryWriteTable] |
| quotes |
| .apply( |
| MapElements.into(TypeDescriptor.of(TableRow.class)) |
| .via( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote))) |
| .apply( |
| BigQueryIO.writeTableRows() |
| .to(tableSpec) |
| .withSchema(tableSchema) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteTable] |
| |
| // [START BigQueryWriteFunction] |
| quotes.apply( |
| BigQueryIO.<Quote>write() |
| .to(tableSpec) |
| .withSchema(tableSchema) |
| .withFormatFunction( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteFunction] |
| |
| // [START BigQueryWriteJsonSchema] |
| quotes.apply( |
| BigQueryIO.<Quote>write() |
| .to(tableSpec) |
| .withJsonSchema(tableSchemaJson) |
| .withFormatFunction( |
| (Quote elem) -> |
| new TableRow().set("source", elem.source).set("quote", elem.quote)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteJsonSchema] |
| } |
| |
| { |
| // [START BigQueryWriteDynamicDestinations] |
| /* |
| @DefaultCoder(AvroCoder.class) |
| static class WeatherData { |
| final long year; |
| final long month; |
| final long day; |
| final double maxTemp; |
| |
| public WeatherData() { |
| this.year = 0; |
| this.month = 0; |
| this.day = 0; |
| this.maxTemp = 0.0f; |
| } |
| public WeatherData(long year, long month, long day, double maxTemp) { |
| this.year = year; |
| this.month = month; |
| this.day = day; |
| this.maxTemp = maxTemp; |
| } |
| } |
| */ |
| |
| PCollection<WeatherData> weatherData = |
| p.apply( |
| BigQueryIO.read( |
| (SchemaAndRecord elem) -> { |
| GenericRecord record = elem.getRecord(); |
| return new WeatherData( |
| (Long) record.get("year"), |
| (Long) record.get("month"), |
| (Long) record.get("day"), |
| (Double) record.get("max_temperature")); |
| }) |
| .fromQuery( |
| "SELECT year, month, day, max_temperature " |
| + "FROM [clouddataflow-readonly:samples.weather_stations] " |
| + "WHERE year BETWEEN 2007 AND 2009") |
| .withCoder(AvroCoder.of(WeatherData.class))); |
| |
| // We will send the weather data into different tables for every year. |
| weatherData.apply( |
| BigQueryIO.<WeatherData>write() |
| .to( |
| new DynamicDestinations<WeatherData, Long>() { |
| @Override |
| public Long getDestination(ValueInSingleWindow<WeatherData> elem) { |
| return elem.getValue().year; |
| } |
| |
| @Override |
| public TableDestination getTable(Long destination) { |
| return new TableDestination( |
| new TableReference() |
| .setProjectId(writeProject) |
| .setDatasetId(writeDataset) |
| .setTableId(writeTable + "_" + destination), |
| "Table for year " + destination); |
| } |
| |
| @Override |
| public TableSchema getSchema(Long destination) { |
| return new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema() |
| .setName("year") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("month") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("day") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("maxTemp") |
| .setType("FLOAT") |
| .setMode("NULLABLE"))); |
| } |
| }) |
| .withFormatFunction( |
| (WeatherData elem) -> |
| new TableRow() |
| .set("year", elem.year) |
| .set("month", elem.month) |
| .set("day", elem.day) |
| .set("maxTemp", elem.maxTemp)) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryWriteDynamicDestinations] |
| |
| String tableSpec = "clouddataflow-readonly:samples.weather_stations"; |
| if (!writeProject.isEmpty() && !writeDataset.isEmpty() && !writeTable.isEmpty()) { |
| tableSpec = writeProject + ":" + writeDataset + "." + writeTable + "_partitioning"; |
| } |
| |
| TableSchema tableSchema = |
| new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema().setName("year").setType("INTEGER").setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("month") |
| .setType("INTEGER") |
| .setMode("REQUIRED"), |
| new TableFieldSchema().setName("day").setType("INTEGER").setMode("REQUIRED"), |
| new TableFieldSchema() |
| .setName("maxTemp") |
| .setType("FLOAT") |
| .setMode("NULLABLE"))); |
| |
| // [START BigQueryTimePartitioning] |
| weatherData.apply( |
| BigQueryIO.<WeatherData>write() |
| .to(tableSpec + "_partitioning") |
| .withSchema(tableSchema) |
| .withFormatFunction( |
| (WeatherData elem) -> |
| new TableRow() |
| .set("year", elem.year) |
| .set("month", elem.month) |
| .set("day", elem.day) |
| .set("maxTemp", elem.maxTemp)) |
| // NOTE: an existing table without time partitioning set up will not work |
| .withTimePartitioning(new TimePartitioning().setType("DAY")) |
| .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) |
| .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); |
| // [END BigQueryTimePartitioning] |
| } |
| } |
| |
| /** Helper function to format results in coGroupByKeyTuple. */ |
| public static String formatCoGbkResults( |
| String name, Iterable<String> emails, Iterable<String> phones) { |
| |
| List<String> emailsList = new ArrayList<>(); |
| for (String elem : emails) { |
| emailsList.add("'" + elem + "'"); |
| } |
| Collections.sort(emailsList); |
| String emailsStr = "[" + String.join(", ", emailsList) + "]"; |
| |
| List<String> phonesList = new ArrayList<>(); |
| for (String elem : phones) { |
| phonesList.add("'" + elem + "'"); |
| } |
| Collections.sort(phonesList); |
| String phonesStr = "[" + String.join(", ", phonesList) + "]"; |
| |
| return name + "; " + emailsStr + "; " + phonesStr; |
| } |
| |
| /** Using a CoGroupByKey transform. */ |
| public static PCollection<String> coGroupByKeyTuple( |
| TupleTag<String> emailsTag, |
| TupleTag<String> phonesTag, |
| PCollection<KV<String, String>> emails, |
| PCollection<KV<String, String>> phones) { |
| |
| // [START CoGroupByKeyTuple] |
| PCollection<KV<String, CoGbkResult>> results = |
| KeyedPCollectionTuple.of(emailsTag, emails) |
| .and(phonesTag, phones) |
| .apply(CoGroupByKey.create()); |
| |
| PCollection<String> contactLines = |
| results.apply( |
| ParDo.of( |
| new DoFn<KV<String, CoGbkResult>, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| KV<String, CoGbkResult> e = c.element(); |
| String name = e.getKey(); |
| Iterable<String> emailsIter = e.getValue().getAll(emailsTag); |
| Iterable<String> phonesIter = e.getValue().getAll(phonesTag); |
| String formattedResult = |
| Snippets.formatCoGbkResults(name, emailsIter, phonesIter); |
| c.output(formattedResult); |
| } |
| })); |
| // [END CoGroupByKeyTuple] |
| return contactLines; |
| } |
| |
| public static void fileProcessPattern() throws Exception { |
| Pipeline p = Pipeline.create(); |
| |
| // [START FileProcessPatternProcessNewFilesSnip1] |
| // This produces PCollection<MatchResult.Metadata> |
| p.apply( |
| FileIO.match() |
| .filepattern("...") |
| .continuously( |
| Duration.standardSeconds(30), |
| Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); |
| // [END FileProcessPatternProcessNewFilesSnip1] |
| |
| // [START FileProcessPatternProcessNewFilesSnip2] |
| // This produces PCollection<String> |
| p.apply( |
| TextIO.read() |
| .from("<path-to-files>/*") |
| .watchForNewFiles( |
| // Check for new files every minute. |
| Duration.standardMinutes(1), |
| // Stop watching the file pattern if no new files appear for an hour. |
| Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1)))); |
| // [END FileProcessPatternProcessNewFilesSnip2] |
| |
| // [START FileProcessPatternAccessMetadataSnip1] |
| p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) |
| // The withCompression method is optional. By default, the Beam SDK detects compression from |
| // the filename. |
| .apply(FileIO.readMatches().withCompression(Compression.GZIP)) |
| .apply( |
| ParDo.of( |
| new DoFn<FileIO.ReadableFile, String>() { |
| @ProcessElement |
| public void process(@Element FileIO.ReadableFile file) { |
| // We can now access the file and its metadata. |
| LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId()); |
| } |
| })); |
| // [END FileProcessPatternAccessMetadataSnip1] |
| |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(Snippets.class); |
| |
| // [START SideInputPatternSlowUpdateGlobalWindowSnip1] |
| public static void sideInputPatterns() { |
| // This pipeline uses View.asSingleton for a placeholder external service. |
| // Run in debug mode to see the output. |
| Pipeline p = Pipeline.create(); |
| |
| // Create a side input that updates each second. |
| PCollectionView<Map<String, String>> map = |
| p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) |
| .apply( |
| Window.<Long>into(new GlobalWindows()) |
| .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) |
| .discardingFiredPanes()) |
| .apply( |
| ParDo.of( |
| new DoFn<Long, Map<String, String>>() { |
| |
| @ProcessElement |
| public void process( |
| @Element Long input, OutputReceiver<Map<String, String>> o) { |
| // Replace map with test data from the placeholder external service. |
| // Add external reads here. |
| o.output(PlaceholderExternalService.readTestData()); |
| } |
| })) |
| .apply(View.asSingleton()); |
| |
| // Consume side input. GenerateSequence generates test data. |
| // Use a real source (like PubSubIO or KafkaIO) in production. |
| p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L))) |
| .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) |
| .apply(Sum.longsGlobally().withoutDefaults()) |
| .apply( |
| ParDo.of( |
| new DoFn<Long, KV<Long, Long>>() { |
| |
| @ProcessElement |
| public void process(ProcessContext c) { |
| Map<String, String> keyMap = c.sideInput(map); |
| c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now()); |
| |
| LOG.debug( |
| "Value is {}, key A is {}, and key B is {}.", |
| c.element(), |
| keyMap.get("Key_A"), |
| keyMap.get("Key_B")); |
| } |
| }) |
| .withSideInputs(map)); |
| } |
| |
| /** Placeholder class that represents an external service generating test data. */ |
| public static class PlaceholderExternalService { |
| |
| public static Map<String, String> readTestData() { |
| |
| Map<String, String> map = new HashMap<>(); |
| Instant now = Instant.now(); |
| |
| DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS"); |
| |
| map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf)); |
| map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString()); |
| |
| return map; |
| } |
| } |
| |
| // [END SideInputPatternSlowUpdateGlobalWindowSnip1] |
| |
| // [START AccessingValueProviderInfoAfterRunSnip1] |
| |
| /** Sample of PipelineOptions with a ValueProvider option argument. */ |
| public interface MyOptions extends PipelineOptions { |
| @Description("My option") |
| @Default.String("Hello world!") |
| ValueProvider<String> getStringValue(); |
| |
| void setStringValue(ValueProvider<String> value); |
| } |
| |
| public static void accessingValueProviderInfoAfterRunSnip1(String[] args) { |
| |
| MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); |
| |
| // Create pipeline. |
| Pipeline p = Pipeline.create(options); |
| |
| // Add a branch for logging the ValueProvider value. |
| p.apply(Create.of(1)) |
| .apply( |
| ParDo.of( |
| new DoFn<Integer, Integer>() { |
| |
| // Define the DoFn that logs the ValueProvider value. |
| @ProcessElement |
| public void process(ProcessContext c) { |
| |
| MyOptions ops = c.getPipelineOptions().as(MyOptions.class); |
| // This example logs the ValueProvider value, but you could store it by |
| // pushing it to an external database. |
| |
| LOG.info("Option StringValue was {}", ops.getStringValue()); |
| } |
| })); |
| |
| // The main pipeline. |
| p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally()); |
| |
| p.run(); |
| } |
| |
| // [END AccessingValueProviderInfoAfterRunSnip1] |
| |
| private static final Duration gapDuration = Duration.standardSeconds(10L); |
| |
| // [START CustomSessionWindow1] |
| |
| public Collection<IntervalWindow> assignWindows(WindowFn.AssignContext c) { |
| |
| // Assign each element into a window from its timestamp until gapDuration in the |
| // future. Overlapping windows (representing elements within gapDuration of |
| // each other) will be merged. |
| return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration)); |
| } |
| // [END CustomSessionWindow1] |
| |
| // [START CustomSessionWindow2] |
| public static class DynamicSessions extends WindowFn<TableRow, IntervalWindow> { |
| /** Duration of the gaps between sessions. */ |
| private final Duration gapDuration; |
| |
| /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */ |
| private DynamicSessions(Duration gapDuration) { |
| this.gapDuration = gapDuration; |
| } |
| |
| // [END CustomSessionWindow2] |
| |
| // [START CustomSessionWindow3] |
| @Override |
| public Collection<IntervalWindow> assignWindows(AssignContext c) { |
| // Assign each element into a window from its timestamp until gapDuration in the |
| // future. Overlapping windows (representing elements within gapDuration of |
| // each other) will be merged. |
| Duration dataDrivenGap; |
| TableRow message = c.element(); |
| |
| try { |
| dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.get("gap").toString())); |
| } catch (Exception e) { |
| dataDrivenGap = gapDuration; |
| } |
| return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap)); |
| } |
| // [END CustomSessionWindow3] |
| |
| // [START CustomSessionWindow4] |
| /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */ |
| public static DynamicSessions withDefaultGapDuration(Duration gapDuration) { |
| return new DynamicSessions(gapDuration); |
| } |
| |
| // [END CustomSessionWindow4] |
| |
| @Override |
| public void mergeWindows(MergeContext c) throws Exception {} |
| |
| @Override |
| public boolean isCompatible(WindowFn<?, ?> other) { |
| return false; |
| } |
| |
| @Override |
| public Coder<IntervalWindow> windowCoder() { |
| return null; |
| } |
| |
| @Override |
| public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { |
| return null; |
| } |
| } |
| |
| public static class CustomSessionPipeline { |
| |
| public static void main(String[] args) { |
| |
| // [START CustomSessionWindow5] |
| |
| PCollection<TableRow> p = |
| Pipeline.create() |
| .apply( |
| "Create data", |
| Create.timestamped( |
| TimestampedValue.of( |
| new TableRow().set("user", "mobile").set("score", 12).set("gap", 5), |
| new Instant()), |
| TimestampedValue.of( |
| new TableRow().set("user", "desktop").set("score", 4), new Instant()), |
| TimestampedValue.of( |
| new TableRow().set("user", "mobile").set("score", -3).set("gap", 5), |
| new Instant().plus(2000)), |
| TimestampedValue.of( |
| new TableRow().set("user", "mobile").set("score", 2).set("gap", 5), |
| new Instant().plus(9000)), |
| TimestampedValue.of( |
| new TableRow().set("user", "mobile").set("score", 7).set("gap", 5), |
| new Instant().plus(12000)), |
| TimestampedValue.of( |
| new TableRow().set("user", "desktop").set("score", 10), |
| new Instant().plus(12000)))); |
| // [END CustomSessionWindow5] |
| |
| // [START CustomSessionWindow6] |
| p.apply( |
| "Window into sessions", |
| Window.<TableRow>into( |
| DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10)))); |
| // [END CustomSessionWindow6] |
| |
| } |
| } |
| } |