| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.common; |
| |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; |
| import com.google.api.client.http.HttpRequestInitializer; |
| import com.google.api.services.bigquery.Bigquery; |
| import com.google.api.services.bigquery.Bigquery.Datasets; |
| import com.google.api.services.bigquery.Bigquery.Tables; |
| import com.google.api.services.bigquery.model.Dataset; |
| import com.google.api.services.bigquery.model.DatasetReference; |
| import com.google.api.services.bigquery.model.Table; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.api.services.pubsub.Pubsub; |
| import com.google.api.services.pubsub.model.Subscription; |
| import com.google.api.services.pubsub.model.Topic; |
| import com.google.auth.Credentials; |
| import com.google.auth.http.HttpCredentialsAdapter; |
| import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.beam.sdk.PipelineResult; |
| import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; |
| import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; |
| import org.apache.beam.sdk.extensions.gcp.util.Transport; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; |
| import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.util.BackOff; |
| import org.apache.beam.sdk.util.BackOffUtils; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.sdk.util.Sleeper; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; |
| import org.joda.time.Duration; |
| |
| /** |
| * The utility class that sets up and tears down external resources, and cancels the streaming |
| * pipelines once the program terminates. |
| * |
| * <p>It is used to run Beam examples. |
| */ |
| public class ExampleUtils { |
| |
| private static final int SC_NOT_FOUND = 404; |
| |
| /** |
| * \p{L} denotes the category of Unicode letters, so this pattern will match on everything that is |
| * not a letter. |
| * |
| * <p>It is used for tokenizing strings in the wordcount examples. |
| */ |
| public static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; |
| |
| private final PipelineOptions options; |
| private Bigquery bigQueryClient = null; |
| private Pubsub pubsubClient = null; |
| private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet(); |
| private List<String> pendingMessages = Lists.newArrayList(); |
| |
| /** Do resources and runner options setup. */ |
| public ExampleUtils(PipelineOptions options) { |
| this.options = options; |
| } |
| |
| /** |
| * Sets up external resources that are required by the example, such as Pub/Sub topics and |
| * BigQuery tables. |
| * |
| * @throws IOException if there is a problem setting up the resources |
| */ |
| public void setup() throws IOException { |
| Sleeper sleeper = Sleeper.DEFAULT; |
| BackOff backOff = |
| FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff(); |
| Throwable lastException = null; |
| try { |
| do { |
| try { |
| setupPubsub(); |
| setupBigQueryTable(); |
| return; |
| } catch (GoogleJsonResponseException e) { |
| lastException = e; |
| } |
| } while (BackOffUtils.next(sleeper, backOff)); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| // Ignore InterruptedException |
| } |
| throw new RuntimeException(lastException); |
| } |
| |
| /** |
| * Sets up the Google Cloud Pub/Sub topic. |
| * |
| * <p>If the topic doesn't exist, a new topic with the given name will be created. |
| * |
| * @throws IOException if there is a problem setting up the Pub/Sub topic |
| */ |
| public void setupPubsub() throws IOException { |
| ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = |
| options.as(ExamplePubsubTopicAndSubscriptionOptions.class); |
| if (!pubsubOptions.getPubsubTopic().isEmpty()) { |
| pendingMessages.add("**********************Set Up Pubsub************************"); |
| setupPubsubTopic(pubsubOptions.getPubsubTopic()); |
| pendingMessages.add( |
| "The Pub/Sub topic has been set up for this example: " + pubsubOptions.getPubsubTopic()); |
| |
| if (!pubsubOptions.getPubsubSubscription().isEmpty()) { |
| setupPubsubSubscription( |
| pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); |
| pendingMessages.add( |
| "The Pub/Sub subscription has been set up for this example: " |
| + pubsubOptions.getPubsubSubscription()); |
| } |
| } |
| } |
| |
| /** |
| * Sets up the BigQuery table with the given schema. |
| * |
| * <p>If the table already exists, the schema has to match the given one. Otherwise, the example |
| * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema |
| * will be created. |
| * |
| * @throws IOException if there is a problem setting up the BigQuery table |
| */ |
| public void setupBigQueryTable() throws IOException { |
| ExampleBigQueryTableOptions bigQueryTableOptions = |
| options.as(ExampleBigQueryTableOptions.class); |
| if (bigQueryTableOptions.getBigQueryDataset() != null |
| && bigQueryTableOptions.getBigQueryTable() != null |
| && bigQueryTableOptions.getBigQuerySchema() != null) { |
| pendingMessages.add("******************Set Up Big Query Table*******************"); |
| setupBigQueryTable( |
| bigQueryTableOptions.getProject(), |
| bigQueryTableOptions.getBigQueryDataset(), |
| bigQueryTableOptions.getBigQueryTable(), |
| bigQueryTableOptions.getBigQuerySchema()); |
| pendingMessages.add( |
| "The BigQuery table has been set up for this example: " |
| + bigQueryTableOptions.getProject() |
| + ":" |
| + bigQueryTableOptions.getBigQueryDataset() |
| + "." |
| + bigQueryTableOptions.getBigQueryTable()); |
| } |
| } |
| |
| /** Tears down external resources that can be deleted upon the example's completion. */ |
| private void tearDown() { |
| pendingMessages.add("*************************Tear Down*************************"); |
| ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = |
| options.as(ExamplePubsubTopicAndSubscriptionOptions.class); |
| if (!pubsubOptions.getPubsubTopic().isEmpty()) { |
| try { |
| deletePubsubTopic(pubsubOptions.getPubsubTopic()); |
| pendingMessages.add( |
| "The Pub/Sub topic has been deleted: " + pubsubOptions.getPubsubTopic()); |
| } catch (IOException e) { |
| pendingMessages.add( |
| "Failed to delete the Pub/Sub topic : " + pubsubOptions.getPubsubTopic()); |
| } |
| if (!pubsubOptions.getPubsubSubscription().isEmpty()) { |
| try { |
| deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); |
| pendingMessages.add( |
| "The Pub/Sub subscription has been deleted: " |
| + pubsubOptions.getPubsubSubscription()); |
| } catch (IOException e) { |
| pendingMessages.add( |
| "Failed to delete the Pub/Sub subscription : " |
| + pubsubOptions.getPubsubSubscription()); |
| } |
| } |
| } |
| |
| ExampleBigQueryTableOptions bigQueryTableOptions = |
| options.as(ExampleBigQueryTableOptions.class); |
| if (bigQueryTableOptions.getBigQueryDataset() != null |
| && bigQueryTableOptions.getBigQueryTable() != null |
| && bigQueryTableOptions.getBigQuerySchema() != null) { |
| pendingMessages.add( |
| "The BigQuery table might contain the example's output, " |
| + "and it is not deleted automatically: " |
| + bigQueryTableOptions.getProject() |
| + ":" |
| + bigQueryTableOptions.getBigQueryDataset() |
| + "." |
| + bigQueryTableOptions.getBigQueryTable()); |
| pendingMessages.add( |
| "Please go to the Developers Console to delete it manually." |
| + " Otherwise, you may be charged for its usage."); |
| } |
| } |
| |
| /** Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */ |
| private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { |
| return new Bigquery.Builder( |
| Transport.getTransport(), |
| Transport.getJsonFactory(), |
| chainHttpRequestInitializer( |
| options.getGcpCredential(), |
| // Do not log 404. It clutters the output and is possibly even required by the |
| // caller. |
| new RetryHttpRequestInitializer(ImmutableList.of(404)))) |
| .setApplicationName(options.getAppName()) |
| .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); |
| } |
| |
| /** Returns a Pubsub client builder using the specified {@link PubsubOptions}. */ |
| private static Pubsub.Builder newPubsubClient(PubsubOptions options) { |
| return new Pubsub.Builder( |
| Transport.getTransport(), |
| Transport.getJsonFactory(), |
| chainHttpRequestInitializer( |
| options.getGcpCredential(), |
| // Do not log 404. It clutters the output and is possibly even required by the |
| // caller. |
| new RetryHttpRequestInitializer(ImmutableList.of(404)))) |
| .setRootUrl(options.getPubsubRootUrl()) |
| .setApplicationName(options.getAppName()) |
| .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); |
| } |
| |
| private static HttpRequestInitializer chainHttpRequestInitializer( |
| Credentials credential, HttpRequestInitializer httpRequestInitializer) { |
| if (credential == null) { |
| return new ChainingHttpRequestInitializer( |
| new NullCredentialInitializer(), httpRequestInitializer); |
| } else { |
| return new ChainingHttpRequestInitializer( |
| new HttpCredentialsAdapter(credential), httpRequestInitializer); |
| } |
| } |
| |
| private void setupBigQueryTable( |
| String projectId, String datasetId, String tableId, TableSchema schema) throws IOException { |
| if (bigQueryClient == null) { |
| bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build(); |
| } |
| |
| Datasets datasetService = bigQueryClient.datasets(); |
| if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { |
| Dataset newDataset = |
| new Dataset() |
| .setDatasetReference( |
| new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); |
| datasetService.insert(projectId, newDataset).execute(); |
| } |
| |
| Tables tableService = bigQueryClient.tables(); |
| Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); |
| if (table == null) { |
| Table newTable = |
| new Table() |
| .setSchema(schema) |
| .setTableReference( |
| new TableReference() |
| .setProjectId(projectId) |
| .setDatasetId(datasetId) |
| .setTableId(tableId)); |
| tableService.insert(projectId, datasetId, newTable).execute(); |
| } else if (!table.getSchema().equals(schema)) { |
| throw new RuntimeException( |
| "Table exists and schemas do not match, expecting: " |
| + schema.toPrettyString() |
| + ", actual: " |
| + table.getSchema().toPrettyString()); |
| } |
| } |
| |
| private void setupPubsubTopic(String topic) throws IOException { |
| if (pubsubClient == null) { |
| pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build(); |
| } |
| if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { |
| pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); |
| } |
| } |
| |
| private void setupPubsubSubscription(String topic, String subscription) throws IOException { |
| if (pubsubClient == null) { |
| pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build(); |
| } |
| if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { |
| Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic); |
| pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); |
| } |
| } |
| |
| /** |
| * Deletes the Google Cloud Pub/Sub topic. |
| * |
| * @throws IOException if there is a problem deleting the Pub/Sub topic |
| */ |
| private void deletePubsubTopic(String topic) throws IOException { |
| if (pubsubClient == null) { |
| pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build(); |
| } |
| if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { |
| pubsubClient.projects().topics().delete(topic).execute(); |
| } |
| } |
| |
| /** |
| * Deletes the Google Cloud Pub/Sub subscription. |
| * |
| * @throws IOException if there is a problem deleting the Pub/Sub subscription |
| */ |
| private void deletePubsubSubscription(String subscription) throws IOException { |
| if (pubsubClient == null) { |
| pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build(); |
| } |
| if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { |
| pubsubClient.projects().subscriptions().delete(subscription).execute(); |
| } |
| } |
| |
| /** Waits for the pipeline to finish and cancels it before the program exists. */ |
| public void waitToFinish(PipelineResult result) { |
| pipelinesToCancel.add(result); |
| if (!options.as(ExampleOptions.class).getKeepJobsRunning()) { |
| addShutdownHook(pipelinesToCancel); |
| } |
| try { |
| result.waitUntilFinish(); |
| } catch (UnsupportedOperationException e) { |
| // Do nothing if the given PipelineResult doesn't support waitUntilFinish(), |
| // such as EvaluationResults returned by DirectRunner. |
| tearDown(); |
| printPendingMessages(); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to wait the pipeline until finish: " + result); |
| } |
| } |
| |
| private void addShutdownHook(final Collection<PipelineResult> pipelineResults) { |
| Runtime.getRuntime() |
| .addShutdownHook( |
| new Thread( |
| () -> { |
| tearDown(); |
| printPendingMessages(); |
| for (PipelineResult pipelineResult : pipelineResults) { |
| try { |
| pipelineResult.cancel(); |
| } catch (IOException e) { |
| System.out.println("Failed to cancel the job."); |
| System.out.println(e.getMessage()); |
| } |
| } |
| |
| for (PipelineResult pipelineResult : pipelineResults) { |
| boolean cancellationVerified = false; |
| for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { |
| if (pipelineResult.getState().isTerminal()) { |
| cancellationVerified = true; |
| break; |
| } else { |
| System.out.println( |
| "The example pipeline is still running. Verifying the cancellation."); |
| } |
| Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); |
| } |
| if (!cancellationVerified) { |
| System.out.println( |
| "Failed to verify the cancellation for job: " + pipelineResult); |
| } |
| } |
| })); |
| } |
| |
| private void printPendingMessages() { |
| System.out.println(); |
| System.out.println("***********************************************************"); |
| System.out.println("***********************************************************"); |
| for (String message : pendingMessages) { |
| System.out.println(message); |
| } |
| System.out.println("***********************************************************"); |
| System.out.println("***********************************************************"); |
| } |
| |
| private static <T> T executeNullIfNotFound(AbstractGoogleClientRequest<T> request) |
| throws IOException { |
| try { |
| return request.execute(); |
| } catch (GoogleJsonResponseException e) { |
| if (e.getStatusCode() == SC_NOT_FOUND) { |
| return null; |
| } else { |
| throw e; |
| } |
| } |
| } |
| } |