| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow; |
| |
| import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull; |
| |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.util.BackOff; |
| import com.google.api.client.util.BackOffUtils; |
| import com.google.api.client.util.NanoClock; |
| import com.google.api.client.util.Sleeper; |
| import com.google.api.services.dataflow.model.Job; |
| import com.google.api.services.dataflow.model.JobMessage; |
| import com.google.api.services.dataflow.model.MetricUpdate; |
| import java.io.IOException; |
| import java.net.SocketTimeoutException; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.FutureTask; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; |
| import org.apache.beam.runners.dataflow.util.MonitoringUtil; |
| import org.apache.beam.sdk.PipelineResult; |
| import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; |
| import org.apache.beam.sdk.metrics.MetricResults; |
| import org.apache.beam.sdk.runners.AppliedPTransform; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashBiMap; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** A DataflowPipelineJob represents a job submitted to Dataflow using {@link DataflowRunner}. */ |
| public class DataflowPipelineJob implements PipelineResult { |
| private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class); |
| |
| /** The id for the job. */ |
| protected String jobId; |
| |
| /** The {@link DataflowPipelineOptions} for the job. */ |
| private final DataflowPipelineOptions dataflowOptions; |
| |
| /** |
| * Client for the Dataflow service. This can be used to query the service for information about |
| * the job. |
| */ |
| private final DataflowClient dataflowClient; |
| |
| /** |
| * MetricResults object for Dataflow Runner. It allows for querying of metrics from the Dataflow |
| * service. |
| */ |
| private final DataflowMetrics dataflowMetrics; |
| |
| /** The state the job terminated in or {@code null} if the job has not terminated. */ |
| @Nullable private State terminalState = null; |
| |
| /** The job that replaced this one or {@code null} if the job has not been replaced. */ |
| @Nullable private DataflowPipelineJob replacedByJob = null; |
| |
| protected BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames; |
| |
| /** The Metric Updates retrieved after the job was in a terminal state. */ |
| private List<MetricUpdate> terminalMetricUpdates; |
| |
| /** The latest timestamp up to which job messages have been retrieved. */ |
| private long lastTimestamp = Long.MIN_VALUE; |
| |
| /** The polling interval for job status and messages information. */ |
| static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2); |
| |
| static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2); |
| |
| static final double DEFAULT_BACKOFF_EXPONENT = 1.5; |
| |
| /** The amount of polling retries for job status and messages information. */ |
| static final int MESSAGES_POLLING_RETRIES = 11; |
| |
| static final int STATUS_POLLING_RETRIES = 4; |
| |
| private static final FluentBackoff MESSAGES_BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT |
| .withInitialBackoff(MESSAGES_POLLING_INTERVAL) |
| .withMaxRetries(MESSAGES_POLLING_RETRIES) |
| .withExponent(DEFAULT_BACKOFF_EXPONENT); |
| protected static final FluentBackoff STATUS_BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT |
| .withInitialBackoff(STATUS_POLLING_INTERVAL) |
| .withMaxRetries(STATUS_POLLING_RETRIES) |
| .withExponent(DEFAULT_BACKOFF_EXPONENT); |
| |
| /** |
| * Constructs the job. |
| * |
| * @param jobId the job id |
| * @param dataflowOptions used to configure the client for the Dataflow Service |
| * @param transformStepNames a mapping from AppliedPTransforms to Step Names |
| */ |
| public DataflowPipelineJob( |
| DataflowClient dataflowClient, |
| String jobId, |
| DataflowPipelineOptions dataflowOptions, |
| Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) { |
| this.dataflowClient = dataflowClient; |
| this.jobId = jobId; |
| this.dataflowOptions = dataflowOptions; |
| this.transformStepNames = HashBiMap.create(firstNonNull(transformStepNames, ImmutableMap.of())); |
| this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient); |
| } |
| |
| /** Get the id of this job. */ |
| public String getJobId() { |
| return jobId; |
| } |
| |
| /** Get the project this job exists in. */ |
| public String getProjectId() { |
| return dataflowOptions.getProject(); |
| } |
| |
| public DataflowPipelineOptions getDataflowOptions() { |
| return dataflowOptions; |
| } |
| |
| /** Get the region this job exists in. */ |
| public String getRegion() { |
| return dataflowOptions.getRegion(); |
| } |
| |
| /** |
| * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. |
| * |
| * @throws IllegalStateException if called before the job has terminated or if the job terminated |
| * but was not updated |
| */ |
| public DataflowPipelineJob getReplacedByJob() { |
| if (terminalState == null) { |
| throw new IllegalStateException("getReplacedByJob() called before job terminated"); |
| } |
| if (replacedByJob == null) { |
| throw new IllegalStateException("getReplacedByJob() called for job that was not replaced"); |
| } |
| return replacedByJob; |
| } |
| |
| @Override |
| @Nullable |
| public State waitUntilFinish() { |
| return waitUntilFinish(Duration.millis(-1)); |
| } |
| |
| @Override |
| @Nullable |
| public State waitUntilFinish(Duration duration) { |
| try { |
| return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler()); |
| } catch (Exception e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| if (e instanceof RuntimeException) { |
| throw (RuntimeException) e; |
| } |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Waits until the pipeline finishes and returns the final status. |
| * |
| * @param duration The time to wait for the job to finish. Provide a value less than 1 ms for an |
| * infinite wait. |
| * @param messageHandler If non null this handler will be invoked for each batch of messages |
| * received. |
| * @return The final state of the job or null on timeout or if the thread is interrupted. |
| * @throws IOException If there is a persistent problem getting job information. |
| */ |
| @Nullable |
| @VisibleForTesting |
| public State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler messageHandler) |
| throws IOException, InterruptedException { |
| // We ignore the potential race condition here (Ctrl-C after job submission but before the |
| // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture) |
| // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a |
| // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail, |
| // etc. If the user wants to verify the job was cancelled they should look at the job status. |
| Thread shutdownHook = |
| new Thread( |
| () -> |
| LOG.warn( |
| "Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n" |
| + "To cancel the job in the cloud, run:\n> {}", |
| MonitoringUtil.getGcloudCancelCommand(dataflowOptions, getJobId()))); |
| |
| try { |
| Runtime.getRuntime().addShutdownHook(shutdownHook); |
| return waitUntilFinish( |
| duration, |
| messageHandler, |
| Sleeper.DEFAULT, |
| NanoClock.SYSTEM, |
| new MonitoringUtil(dataflowClient)); |
| } finally { |
| Runtime.getRuntime().removeShutdownHook(shutdownHook); |
| } |
| } |
| |
| @Nullable |
| @VisibleForTesting |
| State waitUntilFinish( |
| Duration duration, |
| @Nullable MonitoringUtil.JobMessagesHandler messageHandler, |
| Sleeper sleeper, |
| NanoClock nanoClock) |
| throws IOException, InterruptedException { |
| return waitUntilFinish( |
| duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient)); |
| } |
| |
| /** |
| * Waits until the pipeline finishes and returns the final status. |
| * |
| * @param duration The time to wait for the job to finish. Provide a value less than 1 ms for an |
| * infinite wait. |
| * @param messageHandler If non null this handler will be invoked for each batch of messages |
| * received. |
| * @param sleeper A sleeper to use to sleep between attempts. |
| * @param nanoClock A nanoClock used to time the total time taken. |
| * @return The final state of the job or null on timeout. |
| * @throws IOException If there is a persistent problem getting job information. |
| * @throws InterruptedException if the thread is interrupted. |
| */ |
| @Nullable |
| @VisibleForTesting |
| State waitUntilFinish( |
| Duration duration, |
| @Nullable MonitoringUtil.JobMessagesHandler messageHandler, |
| Sleeper sleeper, |
| NanoClock nanoClock, |
| MonitoringUtil monitor) |
| throws IOException, InterruptedException { |
| |
| BackOff backoff; |
| if (!duration.isLongerThan(Duration.ZERO)) { |
| backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff()); |
| } else { |
| backoff = |
| BackOffAdapter.toGcpBackOff( |
| MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff()); |
| } |
| |
| // This function tracks the cumulative time from the *first request* to enforce the wall-clock |
| // limit. Any backoff instance could, at best, track the the time since the first attempt at a |
| // given request. Thus, we need to track the cumulative time ourselves. |
| long startNanos = nanoClock.nanoTime(); |
| |
| State state; |
| do { |
| // Get the state of the job before listing messages. This ensures we always fetch job |
| // messages after the job finishes to ensure we have all them. |
| state = |
| getStateWithRetries( |
| BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()), |
| sleeper); |
| boolean hasError = state == State.UNKNOWN; |
| |
| if (messageHandler != null && !hasError) { |
| // Process all the job messages that have accumulated so far. |
| try { |
| List<JobMessage> allMessages = monitor.getJobMessages(getJobId(), lastTimestamp); |
| |
| if (!allMessages.isEmpty()) { |
| lastTimestamp = |
| fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis(); |
| messageHandler.process(allMessages); |
| } |
| } catch (GoogleJsonResponseException | SocketTimeoutException e) { |
| hasError = true; |
| LOG.warn("There were problems getting current job messages: {}.", e.getMessage()); |
| LOG.debug("Exception information:", e); |
| } |
| } |
| |
| if (!hasError) { |
| // We can stop if the job is done. |
| if (state.isTerminal()) { |
| switch (state) { |
| case DONE: |
| case CANCELLED: |
| LOG.info("Job {} finished with status {}.", getJobId(), state); |
| break; |
| case UPDATED: |
| LOG.info( |
| "Job {} has been updated and is running as the new job with id {}. " |
| + "To access the updated job on the Dataflow monitoring console, " |
| + "please navigate to {}", |
| getJobId(), |
| getReplacedByJob().getJobId(), |
| MonitoringUtil.getJobMonitoringPageURL( |
| getReplacedByJob().getProjectId(), |
| getRegion(), |
| getReplacedByJob().getJobId())); |
| break; |
| default: |
| LOG.info("Job {} failed with status {}.", getJobId(), state); |
| } |
| return state; |
| } |
| |
| // The job is not done, so we must keep polling. |
| backoff.reset(); |
| |
| // If a total duration for all backoff has been set, update the new cumulative sleep time to |
| // be the remaining total backoff duration, stopping if we have already exceeded the |
| // allotted time. |
| if (duration.isLongerThan(Duration.ZERO)) { |
| long nanosConsumed = nanoClock.nanoTime() - startNanos; |
| Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000); |
| Duration remaining = duration.minus(consumed); |
| if (remaining.isLongerThan(Duration.ZERO)) { |
| backoff = |
| BackOffAdapter.toGcpBackOff( |
| MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff()); |
| } else { |
| // If there is no time remaining, don't bother backing off. |
| backoff = BackOff.STOP_BACKOFF; |
| } |
| } |
| } |
| } while (BackOffUtils.next(sleeper, backoff)); |
| LOG.warn("No terminal state was returned. State value {}", state); |
| return null; // Timed out. |
| } |
| |
| private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>(); |
| |
| @Override |
| public State cancel() throws IOException { |
| // Enforce that a cancel() call on the job is done at most once - as |
| // a workaround for Dataflow service's current bugs with multiple |
| // cancellation, where it may sometimes return an error when cancelling |
| // a job that was already cancelled, but still report the job state as |
| // RUNNING. |
| // To partially work around these issues, we absorb duplicate cancel() |
| // calls. This, of course, doesn't address the case when the job terminates |
| // externally almost concurrently to calling cancel(), but at least it |
| // makes it possible to safely call cancel() multiple times and from |
| // multiple threads in one program. |
| FutureTask<State> tentativeCancelTask = |
| new FutureTask<>( |
| () -> { |
| Job content = new Job(); |
| content.setProjectId(getProjectId()); |
| content.setId(jobId); |
| content.setRequestedState("JOB_STATE_CANCELLED"); |
| try { |
| Job job = dataflowClient.updateJob(getJobId(), content); |
| return MonitoringUtil.toState(job.getCurrentState()); |
| } catch (IOException e) { |
| State state = getState(); |
| if (state.isTerminal()) { |
| LOG.warn("Cancel failed because job is already terminated. State is {}", state); |
| return state; |
| } else if (e.getMessage().contains("has terminated")) { |
| // This handles the case where the getState() call above returns RUNNING but the |
| // cancel was rejected because the job is in fact done. Hopefully, someday we can |
| // delete this code if there is better consistency between the State and whether |
| // Cancel succeeds. |
| // |
| // Example message: |
| // Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform |
| // operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has |
| // terminated in state SUCCESS: Workflow job: |
| // 2017-04-01_22_50_59-9269855660514862348 succeeded. |
| LOG.warn("Cancel failed because job is already terminated.", e); |
| return state; |
| } else { |
| String errorMsg = |
| String.format( |
| "Failed to cancel job in state %s, " |
| + "please go to the Developers Console to cancel it manually: %s", |
| state, |
| MonitoringUtil.getJobMonitoringPageURL( |
| getProjectId(), getRegion(), getJobId())); |
| LOG.warn(errorMsg); |
| throw new IOException(errorMsg, e); |
| } |
| } |
| }); |
| if (cancelState.compareAndSet(null, tentativeCancelTask)) { |
| // This thread should perform cancellation, while others will |
| // only wait for the result. |
| cancelState.get().run(); |
| } |
| try { |
| return cancelState.get().get(); |
| } catch (InterruptedException | ExecutionException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| public State getState() { |
| if (terminalState != null) { |
| return terminalState; |
| } |
| |
| return getStateWithRetries( |
| BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT); |
| } |
| |
| /** |
| * Attempts to get the state. Uses exponential backoff on failure up to the maximum number of |
| * passed in attempts. |
| * |
| * @param attempts The amount of attempts to make. |
| * @param sleeper Object used to do the sleeps between attempts. |
| * @return The state of the job or State.UNKNOWN in case of failure. |
| */ |
| @VisibleForTesting |
| State getStateWithRetries(BackOff attempts, Sleeper sleeper) { |
| if (terminalState != null) { |
| return terminalState; |
| } |
| try { |
| Job job = getJobWithRetries(attempts, sleeper); |
| return MonitoringUtil.toState(job.getCurrentState()); |
| } catch (IOException exn) { |
| // The only IOException that getJobWithRetries is permitted to throw is the final IOException |
| // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions |
| // and will propagate. |
| return State.UNKNOWN; |
| } |
| } |
| |
| /** |
| * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the |
| * maximum number of passed in attempts. |
| * |
| * @param backoff the {@link BackOff} used to control retries. |
| * @param sleeper Object used to do the sleeps between attempts. |
| * @return The underlying {@link Job} object. |
| * @throws IOException When the maximum number of retries is exhausted, the last exception is |
| * thrown. |
| */ |
| private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException { |
| // Retry loop ends in return or throw |
| while (true) { |
| try { |
| Job job = dataflowClient.getJob(getJobId()); |
| State currentState = MonitoringUtil.toState(job.getCurrentState()); |
| if (currentState.isTerminal()) { |
| terminalState = currentState; |
| replacedByJob = |
| new DataflowPipelineJob( |
| dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames); |
| } |
| return job; |
| } catch (IOException exn) { |
| LOG.warn("There were problems getting current job status: {}.", exn.getMessage()); |
| LOG.debug("Exception information:", exn); |
| |
| if (!nextBackOff(sleeper, backoff)) { |
| throw exn; |
| } |
| } |
| } |
| } |
| |
| /** Identical to {@link BackOffUtils#next} but without checked exceptions. */ |
| private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { |
| try { |
| return BackOffUtils.next(sleeper, backoff); |
| } catch (InterruptedException | IOException e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public MetricResults metrics() { |
| return dataflowMetrics; |
| } |
| } |