blob: c03583983c0ad05922247b754b28a0ee30206f89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.options;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Options that can be used to configure the {@link DataflowRunner}. */
@Description("Options that configure the Dataflow pipeline.")
public interface DataflowPipelineOptions
extends PipelineOptions,
GcpOptions,
ApplicationNameOptions,
DataflowPipelineDebugOptions,
DataflowPipelineWorkerPoolOptions,
BigQueryOptions,
GcsOptions,
StreamingOptions,
CloudDebuggerOptions,
DataflowWorkerLoggingOptions,
DataflowProfilingOptions,
PubsubOptions {
@Description(
"Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
@Override
@Validation.Required
@Default.InstanceFactory(DefaultProjectFactory.class)
String getProject();
@Override
void setProject(String value);
/**
* GCS path for staging local files, e.g. gs://bucket/object
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
*
* <p>If {@link #getStagingLocation()} is not set, it will default to {@link
* GcpOptions#getGcpTempLocation()}. {@link GcpOptions#getGcpTempLocation()} must be a valid GCS
* path.
*/
@Description(
"GCS path for staging local files, e.g. \"gs://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+ "If stagingLocation is unset, defaults to gcpTempLocation with \"/staging\" suffix.")
@Default.InstanceFactory(StagingLocationFactory.class)
String getStagingLocation();
void setStagingLocation(String value);
/** Whether to update the currently running pipeline with the same name as this one. */
@Description(
"If set, replace the existing pipeline with the name specified by --jobName with "
+ "this pipeline, preserving state.")
boolean isUpdate();
void setUpdate(boolean value);
/** If set, the snapshot from which the job should be created. */
@Hidden
@Experimental
@Description("If set, the snapshot from which the job should be created.")
String getCreateFromSnapshot();
void setCreateFromSnapshot(String value);
/** Where the runner should generate a template file. Must either be local or Cloud Storage. */
@Description(
"Where the runner should generate a template file. "
+ "Must either be local or Cloud Storage.")
String getTemplateLocation();
void setTemplateLocation(String value);
/** Run the job as a specific service account, instead of the default GCE robot. */
@Hidden
@Experimental
@Description("Run the job as a specific service account, instead of the default GCE robot.")
String getServiceAccount();
void setServiceAccount(String value);
/**
* The Google Compute Engine <a
* href="https://cloud.google.com/compute/docs/regions-zones/regions-zones">region</a> for
* creating Dataflow jobs.
*
* <p>NOTE: The Cloud Dataflow now also supports the region flag.
*/
@Hidden
@Experimental
@Description(
"The Google Compute Engine region for creating Dataflow jobs. See "
+ "https://cloud.google.com/compute/docs/regions-zones/regions-zones for a list of valid "
+ "options. Currently defaults to us-central1, but future releases of Beam will "
+ "require the user to set the region explicitly.")
String getRegion();
void setRegion(String region);
/** Labels that will be applied to the billing records for this job. */
@Description("Labels that will be applied to the billing records for this job.")
Map<String, String> getLabels();
void setLabels(Map<String, String> labels);
/** The URL of the staged portable pipeline. */
@Description("The URL of the staged portable pipeline")
String getPipelineUrl();
void setPipelineUrl(String urlString);
@Description("The customized dataflow worker jar")
String getDataflowWorkerJar();
void setDataflowWorkerJar(String dataflowWorkerJar);
/** Set of available Flexible Resource Scheduling goals. */
enum FlexResourceSchedulingGoal {
/** No goal specified. */
UNSPECIFIED,
/** Optimize for lower execution time. */
SPEED_OPTIMIZED,
/** Optimize for lower cost. */
COST_OPTIMIZED,
}
/** This option controls Flexible Resource Scheduling mode. */
@Description("Controls the Flexible Resource Scheduling mode.")
@Default.Enum("UNSPECIFIED")
FlexResourceSchedulingGoal getFlexRSGoal();
void setFlexRSGoal(FlexResourceSchedulingGoal goal);
/** Returns a default staging location under {@link GcpOptions#getGcpTempLocation}. */
class StagingLocationFactory implements DefaultValueFactory<String> {
private static final Logger LOG = LoggerFactory.getLogger(StagingLocationFactory.class);
@Override
public String create(PipelineOptions options) {
GcsOptions gcsOptions = options.as(GcsOptions.class);
LOG.info("No stagingLocation provided, falling back to gcpTempLocation");
String gcpTempLocation;
try {
gcpTempLocation = gcsOptions.getGcpTempLocation();
} catch (Exception e) {
throw new IllegalArgumentException(
"Error constructing default value for stagingLocation: failed to retrieve gcpTempLocation. "
+ "Either stagingLocation must be set explicitly or a valid value must be provided"
+ "for gcpTempLocation.",
e);
}
try {
gcsOptions.getPathValidator().validateOutputFilePrefixSupported(gcpTempLocation);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Error constructing default value for stagingLocation: gcpTempLocation is not"
+ " a valid GCS path, %s. ",
gcpTempLocation),
e);
}
return FileSystems.matchNewResource(gcpTempLocation, true /* isDirectory */)
.resolve("staging", StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
}
}
}