blob: 1518ccdaf18af6505b2f91563d96056c58b2c891 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.common;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ProgressHelper {
private static final Logger LOG =
LoggerFactory.getLogger(ProgressHelper.class);
private static final float MIN_PROGRESS_VAL = 0.0f;
private static final float MAX_PROGRESS_VAL = 1.0f;
private final String processorName;
protected final Map<String, LogicalInput> inputs;
private final ProcessorContext processorContext;
private final AtomicReference<ScheduledFuture<?>> periodicMonitorTaskRef;
private long monitorExecPeriod;
private volatile ScheduledExecutorService scheduledExecutorService;
public static final float processProgress(float val) {
return (Float.isNaN(val)) ? MIN_PROGRESS_VAL
: Math.max(MIN_PROGRESS_VAL, Math.min(MAX_PROGRESS_VAL, val));
}
public static final boolean isProgressWithinRange(float val) {
return (val <= MAX_PROGRESS_VAL && val >= MIN_PROGRESS_VAL);
}
public ProgressHelper(Map<String, LogicalInput> inputsParam,
ProcessorContext context, String processorName) {
this.periodicMonitorTaskRef = new AtomicReference<>(null);
this.inputs = inputsParam;
this.processorContext = context;
this.processorName = processorName;
}
public void scheduleProgressTaskService(long delay, long period) {
monitorExecPeriod = period;
scheduledExecutorService =
Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
"TaskProgressService{" + processorName + ":" + processorContext
.getTaskVertexName()
+ "} #%d").build());
try {
createPeriodicTask(delay);
} catch (RejectedExecutionException | IllegalArgumentException ex) {
LOG.error("Could not create periodic scheduled task for processor={}",
processorName, ex);
}
}
private Runnable createRunnableMonitor() {
return new Runnable() {
@Override
public void run() {
try {
float progSum = MIN_PROGRESS_VAL;
int invalidInput = 0;
float progressVal = MIN_PROGRESS_VAL;
if (inputs != null && !inputs.isEmpty()) {
for (LogicalInput input : inputs.values()) {
if (!(input instanceof AbstractLogicalInput)) {
/**
* According to javdoc in
* {@link org.apache.tez.runtime.api.AbstractLogicalInput} all
* implementations must extend AbstractLogicalInput.
*/
continue;
}
final float inputProgress =
((AbstractLogicalInput) input).getProgress();
if (!isProgressWithinRange(inputProgress)) {
final int invalidSnapshot = ++invalidInput;
if (LOG.isDebugEnabled()) {
LOG.debug(
"progress update: Incorrect value in progress helper in "
+ "processor={}, inputProgress={}, inputsSize={}, "
+ "invalidInput={}",
processorName, inputProgress, inputs.size(),
invalidSnapshot);
}
}
progSum += processProgress(inputProgress);
}
// No need to process the average within the valid range since the
// processorContext validates the value before being set.
progressVal = progSum / inputs.size();
}
// Report progress as 0.0f when if are errors.
processorContext.setProgress(progressVal);
} catch (Throwable th) {
if (LOG.isDebugEnabled()) {
LOG.debug("progress update: Encountered InterruptedException during"
+ " Processor={}", processorName, th);
}
if (th instanceof InterruptedException) {
// set interrupt flag to true sand exit
Thread.currentThread().interrupt();
return;
}
}
}
};
}
private boolean createPeriodicTask(long delay)
throws RejectedExecutionException, IllegalArgumentException {
stopPeriodicMonitor();
final Runnable runnableMonitor = createRunnableMonitor();
ScheduledFuture<?> futureTask = scheduledExecutorService
.scheduleWithFixedDelay(runnableMonitor, delay, monitorExecPeriod,
TimeUnit.MILLISECONDS);
periodicMonitorTaskRef.set(futureTask);
return true;
}
private void stopPeriodicMonitor() {
ScheduledFuture<?> scheduledMonitorRes =
this.periodicMonitorTaskRef.get();
if (scheduledMonitorRes != null && !scheduledMonitorRes.isCancelled()) {
scheduledMonitorRes.cancel(true);
this.periodicMonitorTaskRef.set(null);
}
}
public void shutDownProgressTaskService() {
stopPeriodicMonitor();
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
try {
if (!scheduledExecutorService.awaitTermination(monitorExecPeriod,
TimeUnit.MILLISECONDS)) {
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted exception while shutting down the "
+ "executor service for the processor name={}", processorName);
}
}
scheduledExecutorService.shutdownNow();
}
scheduledExecutorService = null;
}
}