| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker.util.common.worker; |
| |
| import com.google.api.client.util.Clock; |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import javax.annotation.concurrent.GuardedBy; |
| import javax.annotation.concurrent.NotThreadSafe; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * WorkProgressUpdater allows a work executor to send work progress updates to the worker service. |
| * The life-cycle of the WorkProgressUpdater is controlled externally through its {@link |
| * #startReportingProgress()} and {@link #stopReportingProgress()} methods. The updater queries the |
| * worker for progress updates and sends the updates to the worker service. The interval between two |
| * consecutive updates is controlled by the worker service through reporting interval hints sent |
| * back in the update response messages. To avoid update storms and monitoring staleness, the |
| * interval between two consecutive updates is also bound by {@link #getMinReportingInterval} and |
| * {@link #getMaxReportingInterval}. |
| */ |
| // Very likely real potential for bugs - https://issues.apache.org/jira/browse/BEAM-6561 |
| @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") |
| @NotThreadSafe |
| public abstract class WorkProgressUpdater { |
| private static final Logger LOG = LoggerFactory.getLogger(WorkProgressUpdater.class); |
| |
| /** The default lease duration to request from the external worker service (3 minutes). */ |
| public static final long DEFAULT_LEASE_DURATION_MILLIS = 3 * 60 * 1000; |
| |
| /** The lease renewal RPC latency margin (5 seconds). */ |
| private static final long DEFAULT_LEASE_RENEWAL_LATENCY_MARGIN = 5000; |
| |
| /** |
| * The minimum period between two consecutive progress updates. Ensures the {@link |
| * WorkProgressUpdater} does not generate update storms (5 seconds). |
| */ |
| private static final long DEFAULT_MIN_REPORTING_INTERVAL_MILLIS = 5000; |
| |
| /** |
| * The maximum period between two consecutive progress updates. Ensures the {@link |
| * WorkProgressUpdater} does not cause monitoring staleness (10 minutes). |
| */ |
| private static final long DEFAULT_MAX_REPORTING_INTERVAL_MILLIS = 10 * 60 * 1000; |
| |
| /** |
| * Worker providing the work progress updates. This is a volatile variable because the worker |
| * thread sets it while the progress updater thread reads it. |
| */ |
| protected volatile WorkExecutor worker = null; |
| |
| /** Requested periodic checkpoint period. */ |
| private final int checkpointPeriodSec; |
| |
| /** |
| * The time when the next periodic checkpoint should occur. In the same units as {@code |
| * Clock.currentTimeMillis()}. |
| */ |
| private long nextPeriodicCheckpointTimeMs; |
| |
| /** Executor used to schedule work progress updates. */ |
| private final ScheduledExecutorService executor; |
| |
| /** Clock used to either provide real system time or mocked to virtualize time for testing. */ |
| protected final Clock clock; |
| |
| /** The lease duration to request from the external worker service. */ |
| protected long requestedLeaseDurationMs; |
| |
| /** The time period until the next work progress update. */ |
| protected long progressReportIntervalMs; |
| |
| /** The state of worker checkpointing. */ |
| protected enum CheckpointState { |
| /** No checkpoint has yet been requested. */ |
| CHECKPOINT_NOT_REQUESTED, |
| /** A checkpoint has been requested but not yet done successfully. */ |
| CHECKPOINT_REQUESTED, |
| /** A successful checkpoint has been done. */ |
| CHECKPOINT_SUCCESSFUL |
| } |
| |
| @GuardedBy("executor") |
| protected CheckpointState checkpointState = CheckpointState.CHECKPOINT_NOT_REQUESTED; |
| |
| /** |
| * The {@link NativeReader.DynamicSplitResult} to report to the service in the next progress |
| * update, or {@code null} if there is nothing to report (if no dynamic split happened since the |
| * last progress update). |
| */ |
| protected NativeReader.DynamicSplitResult dynamicSplitResultToReport; |
| |
| /** |
| * @param checkpointPeriodSec the desired amount of time in seconds between periodic checkpoints; |
| * if no periodic checkpoints are desired then pass {@link Integer#MAX_VALUE} |
| */ |
| public WorkProgressUpdater(WorkExecutor worker, int checkpointPeriodSec) { |
| this( |
| worker, |
| checkpointPeriodSec, |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat("WorkProgressUpdater-%d") |
| .build()), |
| Clock.SYSTEM); |
| } |
| |
| /** |
| * @param checkpointPeriodSec the desired amount of time in seconds between periodic checkpoints; |
| * if no periodic checkpoints are desired then pass {@link Integer#MAX_VALUE} |
| * @param executor the desired executor, can be used to inject a executor for testing |
| * @param clock the desired clock, can be used to inject a mock clock for testing |
| */ |
| @VisibleForTesting |
| protected WorkProgressUpdater( |
| WorkExecutor worker, |
| int checkpointPeriodSec, |
| ScheduledExecutorService executor, |
| Clock clock) { |
| this.worker = worker; |
| this.checkpointPeriodSec = checkpointPeriodSec; |
| this.executor = executor; |
| this.clock = clock; |
| } |
| |
| /** @param worker workexecutor for the updater. */ |
| public void setWorker(WorkExecutor worker) { |
| this.worker = worker; |
| } |
| |
| /** Starts sending work progress updates to the worker service. */ |
| public void startReportingProgress() { |
| // The initial work progress report is sent according to hints from the service if any. |
| // Otherwise the default is half-way through the lease. |
| long leaseRemainingTime = leaseRemainingTime(getWorkUnitLeaseExpirationTimestamp()); |
| progressReportIntervalMs = |
| nextProgressReportInterval(getWorkUnitSuggestedReportingInterval(), leaseRemainingTime); |
| requestedLeaseDurationMs = DEFAULT_LEASE_DURATION_MILLIS; |
| |
| nextPeriodicCheckpointTimeMs = clock.currentTimeMillis() + ((long) checkpointPeriodSec) * 1000; |
| |
| LOG.debug("Started reporting progress for work item: {}", workString()); |
| scheduleNextUpdate(); |
| } |
| |
| /** Requests that a checkpoint be done. */ |
| public void requestCheckpoint() { |
| synchronized (executor) { |
| LOG.debug("Asynchronous checkpoint for work item {}.", workString()); |
| if (checkpointState == CheckpointState.CHECKPOINT_NOT_REQUESTED) { |
| checkpointState = CheckpointState.CHECKPOINT_REQUESTED; |
| } |
| if (tryCheckpointIfNeeded()) { |
| reportProgress(); |
| } |
| } |
| } |
| |
| /** |
| * Stops sending work progress updates to the worker service. It may throw an exception if the |
| * final progress report fails to be sent for some reason. |
| */ |
| public void stopReportingProgress() throws Exception { |
| // Wait until there are no more progress updates in progress, then shut down. |
| synchronized (executor) { |
| executor.shutdownNow(); |
| |
| // We send a final progress report in case there was an unreported dynamic split. |
| if (dynamicSplitResultToReport != null) { |
| LOG.debug( |
| "Sending final progress update with unreported split: {} " + "for work item: {}", |
| dynamicSplitResultToReport, |
| workString()); |
| reportProgressHelper(); // This call can fail with an exception |
| } |
| } |
| |
| LOG.debug("Stopped reporting progress for work item: {}", workString()); |
| } |
| |
| /** |
| * Computes the time before sending the next work progress update making sure that it falls |
| * between the [{@link #getMinReportingInterval}, {@link #getMaxReportingInterval}] interval. |
| * Makes an attempt to bound the result by the remaining lease time, with an RPC latency margin of |
| * {@link #getLeaseRenewalLatencyMargin}. |
| * |
| * @param suggestedInterval the suggested progress report interval |
| * @param leaseRemainingTime milliseconds left before the work lease expires |
| * @return the time in milliseconds before sending the next progress update |
| */ |
| protected final long nextProgressReportInterval(long suggestedInterval, long leaseRemainingTime) { |
| // Try to send the next progress update before the next lease expiration |
| // allowing some RPC latency margin. |
| suggestedInterval = |
| Math.min(suggestedInterval, leaseRemainingTime - getLeaseRenewalLatencyMargin()); |
| |
| // Bound reporting interval to avoid staleness and progress update storms. |
| return Math.min( |
| Math.max(getMinReportingInterval(), suggestedInterval), getMaxReportingInterval()); |
| } |
| |
| /** Schedules the next work progress update or periodic checkpoint. */ |
| @SuppressWarnings("FutureReturnValueIgnored") |
| private void scheduleNextUpdate() { |
| if (executor.isShutdown()) { |
| return; |
| } |
| long delay = |
| Math.min( |
| progressReportIntervalMs, nextPeriodicCheckpointTimeMs - clock.currentTimeMillis()); |
| executor.schedule( |
| new Runnable() { |
| @Override |
| public void run() { |
| doNextUpdate(); |
| } |
| }, |
| delay, |
| TimeUnit.MILLISECONDS); |
| LOG.debug( |
| "Next work progress update for work item {} scheduled to occur in {} ms.", |
| workString(), |
| progressReportIntervalMs); |
| } |
| |
| /** Does the next work progress update or periodic checkpoint. */ |
| private void doNextUpdate() { |
| // Don't shut down while reporting progress. |
| synchronized (executor) { |
| if (executor.isShutdown()) { |
| return; |
| } |
| try { |
| checkForPeriodicCheckpoint(); |
| tryCheckpointIfNeeded(); |
| reportProgress(); |
| } finally { |
| scheduleNextUpdate(); |
| } |
| } |
| } |
| |
| /** If it is time for a periodic checkpoint then requests it. */ |
| @GuardedBy("executor") |
| private void checkForPeriodicCheckpoint() { |
| if (clock.currentTimeMillis() >= nextPeriodicCheckpointTimeMs) { |
| LOG.debug("Periodic checkpoint for work item {}.", workString()); |
| if (checkpointState == CheckpointState.CHECKPOINT_NOT_REQUESTED) { |
| checkpointState = CheckpointState.CHECKPOINT_REQUESTED; |
| } |
| nextPeriodicCheckpointTimeMs = Long.MAX_VALUE; |
| } |
| } |
| |
| /** |
| * If a checkpoint has been requested but not yet done, tries to do it. Returns whether a |
| * successful checkpoint was done. |
| */ |
| @GuardedBy("executor") |
| protected boolean tryCheckpointIfNeeded() { |
| if (checkpointState == CheckpointState.CHECKPOINT_REQUESTED && worker != null) { |
| LOG.debug("Trying to checkpoint for work item {}.", workString()); |
| try { |
| NativeReader.DynamicSplitResult checkpointPos = worker.requestCheckpoint(); |
| if (checkpointPos != null) { |
| LOG.debug("Successful checkpoint for work item {} at {}.", workString(), checkpointPos); |
| dynamicSplitResultToReport = checkpointPos; |
| checkpointState = CheckpointState.CHECKPOINT_SUCCESSFUL; |
| return true; |
| } |
| } catch (Throwable e) { |
| LOG.warn("Error trying to checkpoint the worker: ", e); |
| } |
| } |
| return false; |
| } |
| |
| /** Reports the current work progress to the worker service. */ |
| @GuardedBy("executor") |
| private void reportProgress() { |
| LOG.debug("Updating progress on work item {}", workString()); |
| try { |
| reportProgressHelper(); |
| } catch (InterruptedException e) { |
| LOG.info("Cancelling workitem execution: {}", workString(), e); |
| worker.abort(); |
| } catch (Throwable e) { |
| LOG.warn("Error reporting workitem progress update to Dataflow service: ", e); |
| } |
| } |
| |
| /** |
| * Computes the amount of time left, in milliseconds, before a lease with the specified expiration |
| * timestamp expires. Returns zero if the lease has already expired. |
| */ |
| protected long leaseRemainingTime(long leaseExpirationTimestamp) { |
| long now = clock.currentTimeMillis(); |
| if (leaseExpirationTimestamp < now) { |
| LOG.debug("Lease remaining time for {} is 0 ms.", workString()); |
| return 0; |
| } |
| LOG.debug( |
| "Lease remaining time for {} is {} ms.", workString(), leaseExpirationTimestamp - now); |
| return leaseExpirationTimestamp - now; |
| } |
| |
| @VisibleForTesting |
| public NativeReader.DynamicSplitResult getDynamicSplitResultToReport() { |
| return dynamicSplitResultToReport; |
| } |
| |
| /** |
| * Reports the current work progress to the worker service. Holds lock on executor during call so |
| * that checkpointState can be accessed. |
| * |
| * @throws an InterruptedException to indicate that the WorkItem has been aborted. |
| */ |
| @GuardedBy("executor") |
| protected abstract void reportProgressHelper() throws Exception; |
| |
| /** Returns the current work item's lease expiration timestamp. */ |
| protected abstract long getWorkUnitLeaseExpirationTimestamp(); |
| |
| /** Returns the current work item's suggested progress reporting interval. */ |
| protected long getWorkUnitSuggestedReportingInterval() { |
| return leaseRemainingTime(getWorkUnitLeaseExpirationTimestamp()) / 2; |
| } |
| |
| /** Returns the minimum allowed time between two periodic progress updates. */ |
| protected long getMinReportingInterval() { |
| return DEFAULT_MIN_REPORTING_INTERVAL_MILLIS; |
| } |
| |
| /** Returns the maximum allowed time between two periodic progress updates. */ |
| protected long getMaxReportingInterval() { |
| return DEFAULT_MAX_REPORTING_INTERVAL_MILLIS; |
| } |
| |
| /** |
| * Returns the maximum allowed time between a periodic progress update and the moment the current |
| * lease expires. |
| */ |
| protected long getLeaseRenewalLatencyMargin() { |
| return DEFAULT_LEASE_RENEWAL_LATENCY_MARGIN; |
| } |
| |
| /** |
| * Returns a string representation of the work item whose progress is being updated, for use in |
| * logging messages. |
| */ |
| protected abstract String workString(); |
| } |