blob: 83fab1ae36d0ce5307cd31ddd0f0fff77378414e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Wordcount is an example using cross-language BigQuery transforms to read and write to BigQuery.
// This example runs a batch pipeline that reads from the public table "shakespeare" described here:
// https://cloud.google.com/bigquery/public-data#sample_tables. It reads the data of word counts per
// different work, aggregates them to find total word counts in all works, as well as the average
// number of times a word appears if it appears in a work, and then writes all that data to a given
// output table.
//
// This example is only expected to work on Dataflow, and requires a cross-language expansion
// service that can expand BigQuery read and write transforms. An address to a persistent expansion
// service can be provided as a flag, or if none is specified then the SDK will attempt to
// automatically start an appropriate expansion service.
//
// # Running an Expansion Server
//
// If the automatic expansion service functionality is not available for your environment, or if
// you want improved performance, you will need to start a persistent expansion service. These
// instructions will cover running the Java SchemaIO Expansion Service, and therefore requires a JDK
// installation in a version supported by Beam. Depending on whether you are running this from a
// numbered Beam release, or a development environment, there are two sources you may use for the
// Expansion service.
//
// Numbered release: The expansion service jar is vendored as module
// org.apache.beam:beam-sdks-java-io-google-cloud-platform-expansion-service in Maven Repository.
// This jar can be executed directly with the following command:
//
// `java -jar <jar_name> <port_number>`
//
// Development env: This requires that the JAVA_HOME environment variable points to your JDK
// installation. From the root `beam/` directory of the Apache Beam repository, the jar can be
// built (or built and run) with the following commands:
//
// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
// ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:runExpansionService -PconstructionService.port=<port_num>
//
// # Running the Example on GCP
//
// An example command for executing this pipeline on GCP is as follows:
//
// export PROJECT="$(gcloud config get-value project)"
// export TEMP_LOCATION="gs://MY-BUCKET/temp"
// export REGION="us-central1"
// export JOB_NAME="bigquery-wordcount-`date +%Y%m%d-%H%M%S`"
// export OUTPUT_TABLE="123.45.67.89:1234"
// export EXPANSION_ADDR="localhost:1234"
// export OUTPUT_TABLE="project_id:dataset_id.table_id"
// go run ./sdks/go/examples/kafka/types/types.go \
// --runner=DataflowRunner \
// --temp_location=$TEMP_LOCATION \
// --staging_location=$STAGING_LOCATION \
// --project=$PROJECT \
// --region=$REGION \
// --job_name="${JOB_NAME}" \
// --bootstrap_servers=$BOOTSTRAP_SERVER \
// --expansion_addr=$EXPANSION_ADDR \
// --out_table=$OUTPUT_TABLE
//
// # Running the Example From a Git Clone
//
// When running on a development environment, a custom container will likely need to be provided
// for the cross-language SDK. First this will require building and pushing the SDK container to
// container repository, such as Docker Hub.
//
// export DOCKER_ROOT="Your Docker Repository Root"
// ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
// docker push $DOCKER_ROOT/beam_java11_sdk:latest
//
// For runners in local mode, simply building the container using the default values for
// docker-repository-root and docker-tag will work to have it accessible locally.
//
// Additionally, you must provide the location of your custom container to the pipeline with the
// --sdk_harness_container_image_override flag for Java, or --environment_config flag for Go. For
// example:
//
// --sdk_harness_container_image_override=".*java.*,${DOCKER_ROOT}/beam_java11_sdk:latest" \
// --environment_config=${DOCKER_ROOT}/beam_go_sdk:latest
package main
import (
"context"
"flag"
"math"
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
// Set this to the address of the expansion service to use for BigQuery read and write, or leave
// unspecified to attempt to automatically start an expansion service.
expansionAddr = flag.String("expansion_addr", "",
"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
// Set this required option to specify where to write the output. If the table does not exist,
// a new one will be created. If the table already exists, elements will be appended to it.
outTable = flag.String("out_table", "", "Output table (required).")
)
func init() {
register.Combiner3[WordsAccum, ShakesRow, CountsRow](&WordsCombine{})
}
// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
// be read properly, field names must match names from the BigQuery table, so some fields must
// include underlines.
type ShakesRow struct {
Word string `beam:"word"`
Word_count int64 `beam:"word_count"`
Corpus string `beam:"corpus"`
Corpus_date int64 `beam:"corpus_date"`
}
// CountsRow is a struct corresponding to the schema of the output table. For writes, field names
// are derived from the Beam schema names specified below as struct tags.
type CountsRow struct {
// Word is the word being counted.
Word string `beam:"word"`
// WordCount is the count of how many times the word appears in all works combined.
WordCount int64 `beam:"word_count"`
// CorpusCount is the count of how many works the word appears in.
CorpusCount int64 `beam:"corpus_count"`
// AvgCount is the average number of times a word appears in all works that it appears in. In
// other words, this is equivalent to WordCount divided by CorpusCount.
AvgCount float64 `beam:"avg_count"`
}
// WordsAccum is an accumulator for combining Shakespeare word counts in order to get averages of
// word counts.
type WordsAccum struct {
// Word is the word being counted.
Word string
// Count is the number of times this word has appeared, or in other words the number of corpuses
// it appears in (assuming that the input never repeats a word and corpus pair.
Count int64
// Sum is the sum of word counts from inputs.
Sum int64
}
// WordsCombine is a CombineFn that adds up word counts and calculates average number of counts.
type WordsCombine struct{}
// CreateAccumulator creates a default WordsAccum.
func (fn *WordsCombine) CreateAccumulator() WordsAccum {
return WordsAccum{}
}
// AddInput sums up word counts and increments the corpus count.
func (fn *WordsCombine) AddInput(a WordsAccum, row ShakesRow) WordsAccum {
a.Word = row.Word
a.Count += 1
a.Sum += row.Word_count
return a
}
// MergeAccumulators sums up the various counts being accumulated.
func (fn *WordsCombine) MergeAccumulators(a, v WordsAccum) WordsAccum {
return WordsAccum{Word: a.Word, Count: a.Count + v.Count, Sum: a.Sum + v.Sum}
}
// ExtractOutput calculates the average and fills out the output rows.
func (fn *WordsCombine) ExtractOutput(a WordsAccum) CountsRow {
row := CountsRow{
Word: a.Word,
WordCount: a.Sum,
CorpusCount: a.Count,
}
if a.Count == 0 {
row.AvgCount = math.NaN()
} else {
row.AvgCount = float64(a.Sum) / float64(a.Count)
}
return row
}
func main() {
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
// Read from the public BigQuery table.
inType := reflect.TypeOf((*ShakesRow)(nil)).Elem()
rows := bigqueryio.Read(s, inType,
bigqueryio.FromTable("bigquery-public-data:samples.shakespeare"),
bigqueryio.ReadExpansionAddr(*expansionAddr))
// Combine the data per word.
keyed := beam.ParDo(s, func(elm ShakesRow) (string, ShakesRow) {
return elm.Word, elm
}, rows)
counts := beam.CombinePerKey(s, &WordsCombine{}, keyed)
countVals := beam.DropKey(s, counts)
// Write the data to the given BigQuery table destination, creating the table if needed.
bigqueryio.Write(s, *outTable, countVals,
bigqueryio.CreateDisposition(bigqueryio.CreateIfNeeded),
bigqueryio.WriteExpansionAddr(*expansionAddr))
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}