blob: 46c7de0553e68602b81cc44cc1512f55b6a7c75e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.runtime.etiao;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.edgent.execution.Job;
import org.apache.edgent.execution.services.RuntimeServices;
import org.apache.edgent.execution.services.ServiceContainer;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.graph.Graph;
import org.apache.edgent.oplet.Oplet;
import org.apache.edgent.runtime.etiao.graph.DirectGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Executes and provides runtime services to the executable graph
* elements (oplets and functions).
*/
public class Executable implements RuntimeServices {
private EtiaoJob job; // instantiated when topology is submitted
private final ServiceContainer containerServices;
private final ThreadFactory controlThreads;
private final BiConsumer<Object, Throwable> completionHandler;
private final ThreadFactoryTracker userThreads;
private final TrackingScheduledExecutor controlScheduler;
private final TrackingScheduledExecutor userScheduler;
private Throwable lastError;
private static final Logger logger = LoggerFactory.getLogger(Executable.class);
/**
* Services specific to this job.
*/
private final ServiceContainer jobServices = new ServiceContainer();
private List<Invocation<? extends Oplet<?, ?>, ?, ?>> invocations = new ArrayList<>();
/**
* Creates a new {@code Executable} for the specified job.
* @param name the name of the executable
* @param containerServices runtime services provided by the container
*/
public Executable(String name, ServiceContainer containerServices) {
this(name, containerServices, null);
}
/**
* Creates a new {@code Executable} for the specified topology name, which uses the
* given thread factory to create new threads for oplet execution.
*
* @param name the name of the executable
* @param containerServices runtime services provided by the container
* @param threads thread factory for executing the oplets
*/
public Executable(String name, ServiceContainer containerServices, ThreadFactory threads) {
this.containerServices = containerServices;
this.controlThreads = (threads != null) ? threads : Executors.defaultThreadFactory();
this.completionHandler = new BiConsumer<Object, Throwable>() {
private static final long serialVersionUID = 1L;
/**
* Handler invoked by userThreads, userScheduler, and controlScheduler,
* upon handling an uncaught exception from a user task or when they
* have completed all the tasks.
*
* @param t The uncaught exception; null when called because all
* tasks have completed.
*/
@Override
public void accept(Object source, Throwable t) {
if (job == null)
throw new IllegalStateException("A job has not been instantiated");
if (t != null) {
Executable.this.setLastError(t);
job.updateHealth(t);
cleanup();
}
else if (job.getCurrentState() == Job.State.RUNNING &&
(source == userScheduler || source == userThreads) &&
!hasActiveTasks()) {
logger.info("No more active user tasks");
}
notifyCompleter();
}
};
this.userThreads = new ThreadFactoryTracker(name, controlThreads, completionHandler);
this.controlScheduler = TrackingScheduledExecutor.newScheduler(controlThreads, completionHandler);
this.userScheduler = TrackingScheduledExecutor.newScheduler(userThreads, completionHandler);
}
private ThreadFactory getThreads() {
return userThreads;
}
/**
* Returns the {@code ScheduledExecutorService} used for running
* executable graph elements.
*
* @return the scheduler
*/
public ScheduledExecutorService getScheduler() {
return userScheduler;
}
/**
* Acts as a service provider for executable elements in the graph, first
* looking for a service specific to this job, and then one from the
* container.
*/
@Override
public <T> T getService(Class<T> serviceClass) {
T service = jobServices.getService(serviceClass);
if (service != null)
return service;
return containerServices.getService(serviceClass);
}
/**
* Creates a new {@code Invocation} associated with the specified oplet.
*
* @param <T> Oplet type
* @param <I> Tuple type of input streams
* @param <O> Tuple type of output streams
* @param oplet the oplet
* @param inputs the invocation's inputs
* @param outputs the invocation's outputs
* @return a new invocation for the given oplet
*/
public <T extends Oplet<I, O>, I, O> Invocation<T, I, O> addOpletInvocation(T oplet, int inputs, int outputs) {
Invocation<T, I, O> invocation = new Invocation<>(
Invocation.ID_PREFIX + invocations.size(), oplet, inputs, outputs);
invocations.add(invocation);
return invocation;
}
/**
* Initializes the invocations.
*/
public void initialize() {
jobServices.addService(ThreadFactory.class, getThreads());
jobServices.addService(ScheduledExecutorService.class, getScheduler());
invokeAction(invocation -> invocation.initialize(job, this));
}
/**
* Starts all the invocations.
*/
public void start() {
invokeAction(invocation -> invocation.start());
}
/**
* Shuts down the user scheduler and thread factory, close all
* invocations, then shutdown the control scheduler.
*/
public void close() {
getScheduler().shutdownNow();
userThreads.shutdownNow();
invokeAction(invocation -> {
try {
invocation.close();
}
catch (Throwable t) {
logger.debug("Exception caught while closing invocation {}: {}", invocation.getId(), t);
} finally {
jobServices.cleanOplet(job.getId(), invocation.getId());
job.getContainerServices().cleanOplet(job.getId(), invocation.getId());
}
});
notifyCompleter();
List<Runnable> unfinished = controlScheduler.shutdownNow();
if (!unfinished.isEmpty()) {
logger.warn("Scheduler could not finish {} tasks", unfinished.size());
}
}
private static long getTimeoutValue(long timeout, TimeUnit units) {
// try to protect the tests from timing out prematurely
// in the face of overloaded/slow build/test servers.
if (Boolean.getBoolean("edgent.build.ci")) {
// could do something like base the decision of the current value of timeout and/or units
return timeout * 2; // try to minimize
}
return timeout;
}
private void invokeAction(Consumer<Invocation<?, ?, ?>> action) {
ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>(controlScheduler);
for (Invocation<?, ?, ?> invocation : invocations) {
completer.submit(() -> {
action.accept(invocation);
return true;
});
}
long getFutureTimeout = 10;
TimeUnit getFutureTimeoutUnits = TimeUnit.SECONDS;
getFutureTimeout = getTimeoutValue(getFutureTimeout, getFutureTimeoutUnits);
int remainingTasks = invocations.size();
while (remainingTasks > 0) {
try {
Future<Boolean> completed = completer.poll(getFutureTimeout, getFutureTimeoutUnits);
if (completed == null) {
// TODO during close log exception and wait on the next task to complete
throw new RuntimeException(new TimeoutException(
String.format("%d%s timeout", getFutureTimeout, getFutureTimeoutUnits.toString())));
}
else {
try {
completed.get();
}
catch (ExecutionException | InterruptedException | CancellationException e) {
logger.error("Exception caught while invoking action: {}", e);
}
}
} catch (InterruptedException e) {
logger.error("Exception caught while waiting for future to complete", e);
}
remainingTasks--;
}
job.onActionComplete();
}
/**
* Cleanup after failure.
*/
private void cleanup() {
userScheduler.shutdown();
userThreads.shutdown();
}
/**
* Check whether there are user tasks still active.
* @return {@code true} if at least a user task is still active.
*/
public boolean hasActiveTasks() {
return userScheduler.hasActiveTasks() ||
userThreads.hasActiveNonDaemonThreads();
}
public synchronized Throwable getLastError() {
return lastError;
}
private synchronized void setLastError(Throwable lastError) {
this.lastError = lastError;
}
public Job createJob(Graph graph, String topologyName, String jobName) {
this.job = new EtiaoJob((DirectGraph)graph, topologyName, jobName,
containerServices);
return this.job;
}
/**
* The thread that is waiting for completion of the Executable's
* asynchronous work, may be null.
*/
private Thread completer;
private boolean completerNotify;
/**
* Waits for outstanding user threads or tasks.
*
* @throws ExecutionException if the job execution threw an exception.
* Wraps the latest uncaught Exception thrown by a background
* activity.
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @return true if the {@code Executable} has completed, false if the if
* the wait timed out.
*/
final boolean complete(long timeoutMillis)
throws InterruptedException, ExecutionException {
long totalWait = timeoutMillis;
if (totalWait <= 0)
totalWait = 1000;
synchronized (this) {
completer = Thread.currentThread();
}
final long start = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - start) < totalWait) {
if (Thread.interrupted()) // Clears interrupted status
throw new InterruptedException();
// Check for errors from background activities
Throwable t = getLastError();
if (t != null) {
throw executionException(t);
}
if (!hasActiveTasks()) {
break;
}
// Wait for notification that something interesting to us has
// terminated.
synchronized (completer) {
if (!completerNotify) {
try {
completer.wait(totalWait);
} catch (InterruptedException e) {
if (!completerNotify) {
// Interrupted, but not by a notification
throw e;
}
}
}
completerNotify = false;
}
}
} finally {
synchronized (this) {
completer = null;
}
}
return ((System.currentTimeMillis() - start) < totalWait);
}
private void notifyCompleter() {
Thread completer;
synchronized (this) {
completer = this.completer;
}
if (completer == null)
return;
synchronized (completer) {
completerNotify = true;
completer.notifyAll();
}
}
static ExecutionException executionException(Throwable t) {
return (t instanceof ExecutionException) ?
(ExecutionException) t : new ExecutionException(t);
}
}