| /* |
| * Copyright (C) 2015 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package com.google.cloud.dataflow.sdk.runners; |
| |
| import static com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName; |
| |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.util.Joiner; |
| import com.google.api.services.dataflow.Dataflow; |
| import com.google.api.services.dataflow.model.DataflowPackage; |
| import com.google.api.services.dataflow.model.Job; |
| import com.google.api.services.dataflow.model.ListJobsResponse; |
| import com.google.cloud.dataflow.sdk.Pipeline; |
| import com.google.cloud.dataflow.sdk.PipelineResult.State; |
| import com.google.cloud.dataflow.sdk.annotations.Experimental; |
| import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; |
| import com.google.cloud.dataflow.sdk.coders.Coder; |
| import com.google.cloud.dataflow.sdk.coders.CoderException; |
| import com.google.cloud.dataflow.sdk.io.PubsubIO; |
| import com.google.cloud.dataflow.sdk.io.Read; |
| import com.google.cloud.dataflow.sdk.io.UnboundedSource; |
| import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; |
| import com.google.cloud.dataflow.sdk.options.PipelineOptions; |
| import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; |
| import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification; |
| import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat; |
| import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; |
| import com.google.cloud.dataflow.sdk.transforms.Aggregator; |
| import com.google.cloud.dataflow.sdk.transforms.Combine; |
| import com.google.cloud.dataflow.sdk.transforms.Create; |
| import com.google.cloud.dataflow.sdk.transforms.DoFn; |
| import com.google.cloud.dataflow.sdk.transforms.GroupByKey; |
| import com.google.cloud.dataflow.sdk.transforms.PTransform; |
| import com.google.cloud.dataflow.sdk.transforms.ParDo; |
| import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; |
| import com.google.cloud.dataflow.sdk.transforms.View; |
| import com.google.cloud.dataflow.sdk.transforms.WithKeys; |
| import com.google.cloud.dataflow.sdk.transforms.Write; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.Window; |
| import com.google.cloud.dataflow.sdk.util.CoderUtils; |
| import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; |
| import com.google.cloud.dataflow.sdk.util.IOChannelUtils; |
| import com.google.cloud.dataflow.sdk.util.InstanceBuilder; |
| import com.google.cloud.dataflow.sdk.util.MonitoringUtil; |
| import com.google.cloud.dataflow.sdk.util.PCollectionViews; |
| import com.google.cloud.dataflow.sdk.util.PathValidator; |
| import com.google.cloud.dataflow.sdk.util.PropertyNames; |
| import com.google.cloud.dataflow.sdk.util.Reshuffle; |
| import com.google.cloud.dataflow.sdk.util.StreamingPCollectionViewWriterFn; |
| import com.google.cloud.dataflow.sdk.util.Transport; |
| import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; |
| import com.google.cloud.dataflow.sdk.util.WindowingStrategy; |
| import com.google.cloud.dataflow.sdk.values.KV; |
| import com.google.cloud.dataflow.sdk.values.PCollection; |
| import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; |
| import com.google.cloud.dataflow.sdk.values.PCollectionView; |
| import com.google.cloud.dataflow.sdk.values.PDone; |
| import com.google.cloud.dataflow.sdk.values.PInput; |
| import com.google.cloud.dataflow.sdk.values.POutput; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.ImmutableMap; |
| |
| import org.joda.time.DateTimeUtils; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Duration; |
| import org.joda.time.format.DateTimeFormat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| |
| /** |
| * A {@link PipelineRunner} that executes the operations in the |
| * pipeline by first translating them to the Dataflow representation |
| * using the {@link DataflowPipelineTranslator} and then submitting |
| * them to a Dataflow service for execution. |
| */ |
| public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> { |
| private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineRunner.class); |
| |
| /** Provided configuration options. */ |
| private final DataflowPipelineOptions options; |
| |
| /** Client for the Dataflow service. This is used to actually submit jobs. */ |
| private final Dataflow dataflowClient; |
| |
| /** Translator for this DataflowPipelineRunner, based on options. */ |
| private final DataflowPipelineTranslator translator; |
| |
| /** Custom transforms implementations for running in streaming mode. */ |
| private final Map<Class<?>, Class<?>> streamingOverrides; |
| |
| /** A set of user defined functions to invoke at different points in execution. */ |
| private DataflowPipelineRunnerHooks hooks; |
| |
| // Environment version information |
| private static final String ENVIRONMENT_MAJOR_VERSION = "3"; |
| |
| /** |
| * Construct a runner from the provided options. |
| * |
| * @param options Properties that configure the runner. |
| * @return The newly created runner. |
| */ |
| public static DataflowPipelineRunner fromOptions(PipelineOptions options) { |
| // (Re-)register standard IO factories. Clobbers any prior credentials. |
| IOChannelUtils.registerStandardIOFactories(options); |
| |
| DataflowPipelineOptions dataflowOptions = |
| PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options); |
| ArrayList<String> missing = new ArrayList<>(); |
| |
| if (dataflowOptions.getAppName() == null) { |
| missing.add("appName"); |
| } |
| if (missing.size() > 0) { |
| throw new IllegalArgumentException( |
| "Missing required values: " + Joiner.on(',').join(missing)); |
| } |
| |
| PathValidator validator = dataflowOptions.getPathValidator(); |
| if (dataflowOptions.getStagingLocation() != null) { |
| validator.verifyPath(dataflowOptions.getStagingLocation()); |
| } |
| if (dataflowOptions.getTempLocation() != null) { |
| validator.verifyPath(dataflowOptions.getTempLocation()); |
| } |
| if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) { |
| dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation()); |
| } else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) { |
| try { |
| dataflowOptions.setStagingLocation( |
| IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging")); |
| } catch (IOException e) { |
| throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation " |
| + "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e); |
| } |
| } |
| |
| if (dataflowOptions.getFilesToStage() == null) { |
| dataflowOptions.setFilesToStage(detectClassPathResourcesToStage( |
| DataflowPipelineRunner.class.getClassLoader())); |
| LOG.info("PipelineOptions.filesToStage was not specified. " |
| + "Defaulting to files from the classpath: will stage {} files. " |
| + "Enable logging at DEBUG level to see which files will be staged.", |
| dataflowOptions.getFilesToStage().size()); |
| LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage()); |
| } |
| |
| // Verify jobName according to service requirements. |
| String jobName = dataflowOptions.getJobName().toLowerCase(); |
| Preconditions.checkArgument( |
| jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), |
| "JobName invalid; the name must consist of only the characters " |
| + "[-a-z0-9], starting with a letter and ending with a letter " |
| + "or number"); |
| |
| return new DataflowPipelineRunner(dataflowOptions); |
| } |
| |
| @VisibleForTesting protected DataflowPipelineRunner(DataflowPipelineOptions options) { |
| this.options = options; |
| this.dataflowClient = options.getDataflowClient(); |
| this.translator = DataflowPipelineTranslator.fromOptions(options); |
| |
| this.streamingOverrides = ImmutableMap.<Class<?>, Class<?>>builder() |
| .put(Create.Values.class, StreamingCreate.class) |
| .put(View.AsMap.class, StreamingViewAsMap.class) |
| .put(View.AsMultimap.class, StreamingViewAsMultimap.class) |
| .put(View.AsSingleton.class, StreamingViewAsSingleton.class) |
| .put(View.AsIterable.class, StreamingViewAsIterable.class) |
| .put(Write.Bound.class, StreamingWrite.class) |
| .put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class) |
| .put(Read.Unbounded.class, StreamingUnboundedRead.class) |
| .build(); |
| } |
| |
| /** |
| * Applies the given transform to the input. For transforms with customized definitions |
| * for the Dataflow pipeline runner, the application is intercepted and modified here. |
| */ |
| @Override |
| public <OutputT extends POutput, InputT extends PInput> OutputT apply( |
| PTransform<InputT, OutputT> transform, InputT input) { |
| |
| if (Combine.GroupedValues.class.equals(transform.getClass()) |
| || GroupByKey.class.equals(transform.getClass())) { |
| |
| // For both Dataflow runners (streaming and batch), GroupByKey and GroupedValues are |
| // primitives. Returning a primitive output instead of the expanded definition |
| // signals to the translator that translation is necessary. |
| @SuppressWarnings("unchecked") |
| PCollection<?> pc = (PCollection<?>) input; |
| @SuppressWarnings("unchecked") |
| OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal( |
| pc.getPipeline(), |
| transform instanceof GroupByKey |
| ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy()) |
| : pc.getWindowingStrategy(), |
| pc.isBounded()); |
| return outputT; |
| |
| } else if (options.isStreaming() && streamingOverrides.containsKey(transform.getClass())) { |
| // It is the responsibility of whoever constructs streamingOverrides |
| // to ensure this is type safe. |
| @SuppressWarnings("unchecked") |
| Class<PTransform<InputT, OutputT>> transformClass = |
| (Class<PTransform<InputT, OutputT>>) transform.getClass(); |
| |
| @SuppressWarnings("unchecked") |
| Class<PTransform<InputT, OutputT>> customTransformClass = |
| (Class<PTransform<InputT, OutputT>>) streamingOverrides.get(transform.getClass()); |
| |
| PTransform<InputT, OutputT> customTransform = |
| InstanceBuilder.ofType(customTransformClass) |
| .withArg(transformClass, transform) |
| .build(); |
| |
| return Pipeline.applyTransform(input, customTransform); |
| } else { |
| return super.apply(transform, input); |
| } |
| } |
| |
| @Override |
| public DataflowPipelineJob run(Pipeline pipeline) { |
| LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications " |
| + "related to Google Compute Engine usage and other Google Cloud Services."); |
| |
| List<DataflowPackage> packages = options.getStager().stageFiles(); |
| JobSpecification jobSpecification = translator.translate(pipeline, packages); |
| Job newJob = jobSpecification.getJob(); |
| |
| // Set a unique client_request_id in the CreateJob request. |
| // This is used to ensure idempotence of job creation across retried |
| // attempts to create a job. Specifically, if the service returns a job with |
| // a different client_request_id, it means the returned one is a different |
| // job previously created with the same job name, and that the job creation |
| // has been effectively rejected. The SDK should return |
| // Error::Already_Exists to user in that case. |
| int randomNum = new Random().nextInt(9000) + 1000; |
| String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC) |
| .print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum; |
| newJob.setClientRequestId(requestId); |
| |
| String version = DataflowReleaseInfo.getReleaseInfo().getVersion(); |
| System.out.println("Dataflow SDK version: " + version); |
| |
| newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo()); |
| // The Dataflow Service may write to the temporary directory directly, so |
| // must be verified. |
| DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); |
| if (!Strings.isNullOrEmpty(options.getTempLocation())) { |
| newJob.getEnvironment().setTempStoragePrefix( |
| dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); |
| } |
| newJob.getEnvironment().setDataset(options.getTempDatasetId()); |
| newJob.getEnvironment().setClusterManagerApiService( |
| options.getClusterManagerApi().getApiServiceName()); |
| newJob.getEnvironment().setExperiments(options.getExperiments()); |
| |
| // Requirements about the service. |
| Map<String, Object> environmentVersion = new HashMap<>(); |
| environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, ENVIRONMENT_MAJOR_VERSION); |
| newJob.getEnvironment().setVersion(environmentVersion); |
| // Default jobType is DATA_PARALLEL, which is for java batch. |
| String jobType = "DATA_PARALLEL"; |
| |
| if (options.isStreaming()) { |
| jobType = "STREAMING"; |
| } |
| environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); |
| |
| if (hooks != null) { |
| hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); |
| } |
| |
| if (!Strings.isNullOrEmpty(options.getDataflowJobFile())) { |
| try (PrintWriter printWriter = new PrintWriter( |
| new File(options.getDataflowJobFile()))) { |
| String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); |
| printWriter.print(workSpecJson); |
| LOG.info("Printed workflow specification to {}", options.getDataflowJobFile()); |
| } catch (IllegalStateException ex) { |
| LOG.warn("Cannot translate workflow spec to json for debug."); |
| } catch (FileNotFoundException ex) { |
| LOG.warn("Cannot create workflow spec output file."); |
| } |
| } |
| |
| String jobIdToUpdate = null; |
| if (options.getUpdate()) { |
| jobIdToUpdate = getJobIdFromName(options.getJobName()); |
| newJob.setTransformNameMapping(options.getTransformNameMapping()); |
| } |
| Job jobResult; |
| try { |
| Dataflow.Projects.Jobs.Create createRequest = |
| dataflowClient |
| .projects() |
| .jobs() |
| .create(options.getProject(), newJob); |
| if (jobIdToUpdate != null) { |
| createRequest.setReplaceJobId(jobIdToUpdate); |
| } |
| jobResult = createRequest.execute(); |
| } catch (GoogleJsonResponseException e) { |
| throw new RuntimeException("Failed to create a workflow job: " |
| + (e.getDetails() != null ? e.getDetails().getMessage() : e), e); |
| } catch (IOException e) { |
| throw new RuntimeException("Failed to create a workflow job", e); |
| } |
| // If the service returned client request id, the SDK needs to compare it |
| // with the original id generated in the request, if they are not the same |
| // (i.e., the returned job is not created by this request), throw |
| // Error::Already_Exists. |
| if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty() |
| && !jobResult.getClientRequestId().equals(requestId)) { |
| // If updating a job. |
| if (options.getUpdate()) { |
| throw new RuntimeException( |
| "The job named " + newJob.getName() + " with id: " + jobIdToUpdate |
| + " has already been updated into job id: " + jobResult.getId() |
| + " and cannot be updated again. "); |
| } else { |
| throw new RuntimeException("There is already an active job named " + newJob.getName() |
| + " with id: " + jobResult.getId() |
| + ". If you want to submit a second job, try again by setting a " |
| + "different name using --jobName."); |
| } |
| } |
| |
| LOG.info("To access the Dataflow monitoring console, please navigate to {}", |
| MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId())); |
| System.out.println("Submitted job: " + jobResult.getId()); |
| |
| LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", |
| MonitoringUtil.getGcloudCancelCommand(options, jobResult.getId())); |
| |
| // Obtain all of the extractors from the PTransforms used in the pipeline so the |
| // DataflowPipelineJob has access to them. |
| AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline); |
| Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = |
| aggregatorExtractor.getAggregatorSteps(); |
| |
| DataflowAggregatorTransforms aggregatorTransforms = |
| new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames()); |
| |
| // Use a raw client for post-launch monitoring, as status calls may fail |
| // regularly and need not be retried automatically. |
| DataflowPipelineJob dataflowPipelineJob = |
| new DataflowPipelineJob(options.getProject(), jobResult.getId(), |
| Transport.newRawDataflowClient(options).build(), aggregatorTransforms); |
| |
| return dataflowPipelineJob; |
| } |
| |
| /** |
| * Returns the DataflowPipelineTranslator associated with this object. |
| */ |
| public DataflowPipelineTranslator getTranslator() { |
| return translator; |
| } |
| |
| /** |
| * Sets callbacks to invoke during execution see {@code DataflowPipelineRunnerHooks}. |
| */ |
| @Experimental |
| public void setHooks(DataflowPipelineRunnerHooks hooks) { |
| this.hooks = hooks; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Specialized (non-)implementation for {@link Write.Bound} for the Dataflow runner in streaming |
| * mode. |
| */ |
| private static class StreamingWrite<T> extends PTransform<PCollection<T>, PDone> { |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| public StreamingWrite(Write.Bound<T> transform) { } |
| |
| @Override |
| public PDone apply(PCollection<T> input) { |
| throw new UnsupportedOperationException( |
| "The Write transform is not supported by the Dataflow streaming runner."); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingWrite"; |
| } |
| } |
| |
| /** |
| * Specialized implementation for {@link PubsubIO.Write} for the Dataflow runner in streaming |
| * mode. |
| * |
| * <p>For internal use only. Subject to change at any time. |
| * |
| * <p>Public so the {@link com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator} |
| * can access. |
| */ |
| public static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> { |
| private static final long serialVersionUID = 0L; |
| |
| private final PubsubIO.Write.Bound<T> transform; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| public StreamingPubsubIOWrite(PubsubIO.Write.Bound<T> transform) { |
| this.transform = transform; |
| } |
| |
| public PubsubIO.Write.Bound<T> getOverriddenTransform() { |
| return transform; |
| } |
| |
| @Override |
| public PDone apply(PCollection<T> input) { |
| return PDone.in(input.getPipeline()); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingPubsubIOWrite"; |
| } |
| } |
| |
| /** |
| * Specialized implementation for {@link Read.Unbounded} for the Dataflow runner in streaming |
| * mode. |
| * |
| * <p> In particular, if an UnboundedSource requires deduplication, then features of WindmillSink |
| * are leveraged to do the deduplication. |
| */ |
| private static class StreamingUnboundedRead<T> extends PTransform<PInput, PCollection<T>> { |
| private static final long serialVersionUID = 0L; |
| |
| private final UnboundedSource<T, ?> source; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingUnboundedRead(Read.Unbounded<T> transform) { |
| this.source = transform.getSource();; |
| } |
| |
| @Override |
| protected Coder<T> getDefaultOutputCoder() { |
| return source.getDefaultOutputCoder(); |
| } |
| |
| @Override |
| public final PCollection<T> apply(PInput input) { |
| source.validate(); |
| |
| if (source.requiresDeduping()) { |
| return Pipeline.applyTransform(input, new ReadWithIds<T>(source)) |
| .apply(new Deduplicate<T>()); |
| } else { |
| return Pipeline.applyTransform(input, new ReadWithIds<T>(source)) |
| .apply(ValueWithRecordId.<T>stripIds()); |
| } |
| } |
| |
| /** |
| * {@link PTransform} that reads {@code (record,recordId)} pairs from an |
| * {@link UnboundedSource}. |
| */ |
| private static class ReadWithIds<T> |
| extends PTransform<PInput, PCollection<ValueWithRecordId<T>>> { |
| private static final long serialVersionUID = 0L; |
| private final UnboundedSource<T, ?> source; |
| |
| private ReadWithIds(UnboundedSource<T, ?> source) { |
| this.source = source; |
| } |
| |
| @Override |
| public final PCollection<ValueWithRecordId<T>> apply(PInput input) { |
| return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal( |
| input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED); |
| } |
| |
| @Override |
| protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() { |
| return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder()); |
| } |
| |
| public UnboundedSource<T, ?> getSource() { |
| return source; |
| } |
| } |
| |
| @Override |
| public String getKindString() { |
| return "Read(" + approximateSimpleName(source.getClass()) + ")"; |
| } |
| |
| static { |
| DataflowPipelineTranslator.registerTransformTranslator( |
| ReadWithIds.class, new ReadWithIdsTranslator()); |
| } |
| |
| private static class ReadWithIdsTranslator |
| implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> { |
| @Override |
| public void translate(ReadWithIds<?> transform, |
| DataflowPipelineTranslator.TranslationContext context) { |
| BasicSerializableSourceFormat.translateReadHelper( |
| transform.getSource(), transform, context); |
| } |
| } |
| } |
| |
| /** |
| * Remove values with duplicate ids. |
| */ |
| private static class Deduplicate<T> |
| extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> { |
| private static final long serialVersionUID = 0L; |
| // Use a finite set of keys to improve bundling. Without this, the key space |
| // will be the space of ids which is potentially very large, which results in much |
| // more per-key overhead. |
| private static final int NUM_RESHARD_KEYS = 10000; |
| @Override |
| public PCollection<T> apply(PCollection<ValueWithRecordId<T>> input) { |
| return input |
| .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() { |
| private static final long serialVersionUID = 0L; |
| |
| @Override |
| public Integer apply(ValueWithRecordId<T> value) { |
| return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS; |
| } |
| })) |
| // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through |
| // WindmillSink. |
| .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of()) |
| .apply(ParDo.named("StripIds").of( |
| new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() { |
| private static final long serialVersionUID = 0L; |
| |
| @Override |
| public void processElement(ProcessContext c) { |
| c.output(c.element().getValue().getValue()); |
| } |
| })); |
| } |
| } |
| |
| /** |
| * Specialized implementation for {@link Create.Values} for the Dataflow runner in streaming mode. |
| */ |
| private static class StreamingCreate<T> extends PTransform<PInput, PCollection<T>> { |
| private static final long serialVersionUID = 0L; |
| |
| private final Create.Values<T> transform; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingCreate(Create.Values<T> transform) { |
| this.transform = transform; |
| } |
| |
| /** |
| * {@link DoFn} that outputs a single KV.of(null, null) kick off the {@link GroupByKey} |
| * in the streaming create implementation. |
| */ |
| private static class OutputNullKv extends DoFn<String, KV<Void, Void>> { |
| private static final long serialVersionUID = 0; |
| |
| @Override |
| public void processElement(DoFn<String, KV<Void, Void>>.ProcessContext c) throws Exception { |
| c.output(KV.of((Void) null, (Void) null)); |
| } |
| } |
| |
| /** |
| * A {@link DoFn} which outputs the specified elements by first encoding them to bytes using |
| * the specified {@link Coder} so that they are serialized as part of the {@link DoFn} but |
| * need not implement {@code Serializable}. |
| */ |
| private static class OutputElements<T> extends DoFn<Object, T> { |
| private static final long serialVersionUID = 0; |
| |
| private final Coder<T> coder; |
| private final List<byte[]> encodedElements; |
| |
| public OutputElements(Iterable<T> elems, Coder<T> coder) { |
| this.coder = coder; |
| this.encodedElements = new ArrayList<>(); |
| for (T t : elems) { |
| try { |
| encodedElements.add(CoderUtils.encodeToByteArray(coder, t)); |
| } catch (CoderException e) { |
| throw new IllegalArgumentException("Unable to encode value " + t |
| + " with coder " + coder, e); |
| } |
| } |
| } |
| |
| @Override |
| public void processElement(ProcessContext c) throws IOException { |
| for (byte[] encodedElement : encodedElements) { |
| c.output(CoderUtils.decodeFromByteArray(coder, encodedElement)); |
| } |
| } |
| } |
| |
| @Override |
| public PCollection<T> apply(PInput input) { |
| try { |
| Coder<T> coder = transform.getDefaultOutputCoder(input); |
| return Pipeline.applyTransform( |
| input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/")) |
| .apply(ParDo.of(new OutputNullKv())) |
| .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows()) |
| .triggering(AfterPane.elementCountAtLeast(1)) |
| .withAllowedLateness(Duration.ZERO) |
| .discardingFiredPanes()) |
| .apply(GroupByKey.<Void, Void>create()) |
| // Go back to the default windowing strategy, so that our setting allowed lateness |
| // doesn't count as the user having set it. |
| .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) |
| .apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows())) |
| .apply(ParDo.of(new OutputElements<>(transform.getElements(), coder))) |
| .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED); |
| } catch (CannotProvideCoderException e) { |
| throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. " |
| + "Please set a coder by invoking Create.withCoder() explicitly.", e); |
| } |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingCreate"; |
| } |
| } |
| |
| /** |
| * Specialized implementation for {@link View.AsMap} for the Dataflow runner in streaming mode. |
| */ |
| private static class StreamingViewAsMap<K, V> |
| extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { |
| private static final long serialVersionUID = 0L; |
| |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingViewAsMap(View.AsMap<K, V> transform) { } |
| |
| @Override |
| public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) { |
| PCollectionView<Map<K, V>> view = |
| PCollectionViews.mapView( |
| input.getPipeline(), |
| input.getWindowingStrategy(), |
| input.getCoder()); |
| |
| return input |
| .apply(Combine.globally(new View.Concatenate<KV<K, V>>()).withoutDefaults()) |
| .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) |
| .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view)); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingViewAsMap"; |
| } |
| } |
| |
| /** |
| * Specialized expansion for {@link View.AsMultimap} for the Dataflow runner in streaming mode. |
| */ |
| private static class StreamingViewAsMultimap<K, V> |
| extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) { } |
| |
| @Override |
| public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { |
| PCollectionView<Map<K, Iterable<V>>> view = |
| PCollectionViews.multimapView( |
| input.getPipeline(), |
| input.getWindowingStrategy(), |
| input.getCoder()); |
| |
| return input |
| .apply(Combine.globally(new View.Concatenate<KV<K, V>>()).withoutDefaults()) |
| .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) |
| .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingViewAsMultimap"; |
| } |
| } |
| |
| /** |
| * Specialized implementation for {@link View.AsIterable} for the Dataflow runner in streaming |
| * mode. |
| */ |
| private static class StreamingViewAsIterable<T> |
| extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingViewAsIterable(View.AsIterable<T> transform) { } |
| |
| @Override |
| public PCollectionView<Iterable<T>> apply(PCollection<T> input) { |
| // Using Combine.globally(...).asSingletonView() allows automatic propagation of |
| // the CombineFn's default value as the default value of the SingletonView. |
| // |
| // safe covariant cast List<T> -> Iterable<T> |
| // not expressible in java, even with unchecked casts |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| Combine.GloballyAsSingletonView<T, Iterable<T>> concatAndView = |
| (Combine.GloballyAsSingletonView) |
| Combine.globally(new View.Concatenate<T>()).asSingletonView(); |
| return input.apply(concatAndView); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingViewAsIterable"; |
| } |
| } |
| |
| private static class WrapAsList<T> extends DoFn<T, List<T>> { |
| private static final long serialVersionUID = 0; |
| |
| @Override |
| public void processElement(ProcessContext c) { |
| c.output(Arrays.asList(c.element())); |
| } |
| } |
| |
| /** |
| * Specialized expansion for {@link View.AsSingleton} for the Dataflow runner in streaming mode. |
| */ |
| private static class StreamingViewAsSingleton<T> |
| extends PTransform<PCollection<T>, PCollectionView<T>> { |
| private static final long serialVersionUID = 0L; |
| |
| private View.AsSingleton<T> transform; |
| |
| /** |
| * Builds an instance of this class from the overridden transform. |
| */ |
| @SuppressWarnings("unused") // used via reflection in apply() |
| public StreamingViewAsSingleton(View.AsSingleton<T> transform) { |
| this.transform = transform; |
| } |
| |
| @Override |
| public PCollectionView<T> apply(PCollection<T> input) { |
| PCollectionView<T> view = PCollectionViews.singletonView( |
| input.getPipeline(), |
| input.getWindowingStrategy(), |
| transform.hasDefaultValue(), |
| transform.defaultValue(), |
| input.getCoder()); |
| return input |
| .apply(ParDo.of(new WrapAsList<T>())) |
| .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) |
| .apply(View.CreatePCollectionView.<T, T>of(view)); |
| } |
| |
| @Override |
| protected String getKindString() { |
| return "StreamingViewAsSingleton"; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "DataflowPipelineRunner#" + options.getJobName(); |
| } |
| |
| /** |
| * Attempts to detect all the resources the class loader has access to. This does not recurse |
| * to class loader parents stopping it from pulling in resources from the system class loader. |
| * |
| * @param classLoader The URLClassLoader to use to detect resources to stage. |
| * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one |
| * of the resources the class loader exposes is not a file resource. |
| * @return A list of absolute paths to the resources the class loader uses. |
| */ |
| protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { |
| if (!(classLoader instanceof URLClassLoader)) { |
| String message = String.format("Unable to use ClassLoader to detect classpath elements. " |
| + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); |
| LOG.error(message); |
| throw new IllegalArgumentException(message); |
| } |
| |
| List<String> files = new ArrayList<>(); |
| for (URL url : ((URLClassLoader) classLoader).getURLs()) { |
| try { |
| files.add(new File(url.toURI()).getAbsolutePath()); |
| } catch (IllegalArgumentException | URISyntaxException e) { |
| String message = String.format("Unable to convert url (%s) to file.", url); |
| LOG.error(message); |
| throw new IllegalArgumentException(message, e); |
| } |
| } |
| return files; |
| } |
| |
| /** |
| * Finds the id for the running job of the given name. |
| */ |
| private String getJobIdFromName(String jobName) { |
| try { |
| ListJobsResponse listResult; |
| String token = null; |
| do { |
| listResult = dataflowClient.projects().jobs() |
| .list(options.getProject()) |
| .setPageToken(token) |
| .execute(); |
| token = listResult.getNextPageToken(); |
| for (Job job : listResult.getJobs()) { |
| if (job.getName().equals(jobName) |
| && MonitoringUtil.toState(job.getCurrentState()).equals(State.RUNNING)) { |
| return job.getId(); |
| } |
| } |
| } while (token != null); |
| } catch (GoogleJsonResponseException e) { |
| throw new RuntimeException( |
| "Got error while looking up jobs: " |
| + (e.getDetails() != null ? e.getDetails().getMessage() : e), e); |
| } catch (IOException e) { |
| throw new RuntimeException("Got error while looking up jobs: ", e); |
| } |
| |
| throw new IllegalArgumentException("Could not find running job named " + jobName); |
| } |
| } |