blob: 57c8c72be1f4898c59b11d1251955098f840fbea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import com.google.common.base.Function;
import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@SuppressWarnings({"unchecked", "deprecation"})
public class VertexManager {
final VertexManagerPluginDescriptor pluginDesc;
final UserGroupInformation dagUgi;
final VertexManagerPlugin plugin;
final Vertex managedVertex;
final VertexManagerPluginContextImpl pluginContext;
final UserPayload payload;
final AppContext appContext;
final BlockingQueue<TezEvent> rootInputInitEventQueue;
final StateChangeNotifier stateChangeNotifier;
private final ListeningExecutorService execService;
private final LinkedBlockingQueue<VertexManagerEvent> eventQueue;
private final AtomicBoolean eventInFlight;
private final AtomicBoolean pluginFailed;
private static final Logger LOG = LoggerFactory.getLogger(VertexManager.class);
private final VertexManagerCallback VM_CALLBACK = new VertexManagerCallback();
class VertexManagerPluginContextImpl implements VertexManagerPluginContext, VertexStateUpdateListener {
private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
managedVertex.getName(), "NULL_VERTEX", null);
private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
private final List<String> notificationRegisteredVertices = Lists.newArrayList();
AtomicBoolean isComplete = new AtomicBoolean(false);
private void checkAndThrowIfDone() {
if (isComplete()) {
throw new TezUncheckedException("Cannot invoke context methods after reporting done");
}
if (pluginFailed.get()) {
throw new TezUncheckedException("Cannot invoke context methods after throwing an exception");
}
}
@Override
public synchronized Map<String, EdgeProperty> getInputVertexEdgeProperties() {
checkAndThrowIfDone();
Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
Map<String, EdgeProperty> vertexEdgeMap =
Maps.newHashMapWithExpectedSize(inputs.size());
for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
}
return vertexEdgeMap;
}
@Override
public synchronized Map<String, EdgeProperty> getOutputVertexEdgeProperties() {
checkAndThrowIfDone();
Map<Vertex, Edge> outputs = managedVertex.getOutputVertices();
Map<String, EdgeProperty> vertexEdgeMap =
Maps.newHashMapWithExpectedSize(outputs.size());
for (Map.Entry<Vertex, Edge> entry : outputs.entrySet()) {
vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
}
return vertexEdgeMap;
}
@Override
public synchronized VertexStatistics getVertexStatistics(String vertexName) {
checkAndThrowIfDone();
return appContext.getCurrentDAG().getVertex(vertexName).getStatistics();
}
@Override
public synchronized String getVertexName() {
checkAndThrowIfDone();
return managedVertex.getName();
}
@Override
public synchronized int getVertexNumTasks(String vertexName) {
checkAndThrowIfDone();
return appContext.getCurrentDAG().getVertex(vertexName).getTotalTasks();
}
@Override
public synchronized void setVertexParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
Map<String, InputSpecUpdate> rootInputSpecUpdate) {
checkAndThrowIfDone();
try {
managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers,
rootInputSpecUpdate, true);
} catch (AMUserCodeException e) {
throw new TezUncheckedException(e);
}
}
@Override
public synchronized void reconfigureVertex(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeProperty> sourceEdgeProperties,
Map<String, InputSpecUpdate> rootInputSpecUpdate) {
checkAndThrowIfDone();
try {
managedVertex.reconfigureVertex(parallelism, vertexLocationHint, sourceEdgeProperties,
rootInputSpecUpdate);
} catch (AMUserCodeException e) {
throw new TezUncheckedException(e);
}
}
@Override
public synchronized void reconfigureVertex(int parallelism,
@Nullable VertexLocationHint locationHint,
@Nullable Map<String, EdgeProperty> sourceEdgeProperties) {
checkAndThrowIfDone();
try {
managedVertex.reconfigureVertex(parallelism, locationHint, sourceEdgeProperties);
} catch (AMUserCodeException e) {
throw new TezUncheckedException(e);
}
}
@Override
public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate,
@Nullable VertexLocationHint locationHint,
int parallelism) {
checkAndThrowIfDone();
try {
managedVertex.reconfigureVertex(rootInputSpecUpdate, parallelism, locationHint);
} catch (AMUserCodeException e) {
throw new TezUncheckedException(e);
}
}
@Override
public synchronized void scheduleTasks(List<ScheduleTaskRequest> tasks) {
checkAndThrowIfDone();
managedVertex.scheduleTasks(tasks);
}
@Override
public synchronized void scheduleVertexTasks(List<TaskWithLocationHint> tasks) {
checkAndThrowIfDone();
List<ScheduleTaskRequest> schedTasks = new ArrayList<ScheduleTaskRequest>(tasks.size());
for (TaskWithLocationHint task : tasks) {
schedTasks.add(ScheduleTaskRequest.create(
task.getTaskIndex(), task.getTaskLocationHint()));
}
scheduleTasks(schedTasks);
}
@Nullable
@Override
public synchronized Set<String> getVertexInputNames() {
checkAndThrowIfDone();
Set<String> inputNames = null;
Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputs = managedVertex.getAdditionalInputs();
if (inputs != null) {
inputNames = inputs.keySet();
}
return inputNames;
}
@Override
public synchronized UserPayload getUserPayload() {
checkAndThrowIfDone();
return payload;
}
@Override
public synchronized void addRootInputEvents(final String inputName,
Collection<InputDataInformationEvent> events) {
checkAndThrowIfDone();
verifyIsRootInput(inputName);
final long currTime = appContext.getClock().getTime();
Collection<TezEvent> tezEvents = Collections2.transform(events,
new Function<InputDataInformationEvent, TezEvent>() {
@Override
public TezEvent apply(InputDataInformationEvent riEvent) {
TezEvent tezEvent = new TezEvent(riEvent, rootEventSourceMetadata, currTime);
tezEvent.setDestinationInfo(getDestinationMetaData(inputName));
return tezEvent;
}
});
if (LOG.isDebugEnabled()) {
LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; Added " + events.size() + " for input " +
"name " + inputName);
}
rootInputInitEventQueue.addAll(tezEvents);
// Recovery handling is taken care of by the Vertex.
}
@Override
public void sendEventToProcessor(Collection<CustomProcessorEvent> events, int taskId) {
checkAndThrowIfDone();
Preconditions.checkArgument(taskId >= 0 && taskId < managedVertex.getTotalTasks(),
"Invalid taskId " + taskId + "; " + "There are " + managedVertex.getTotalTasks()
+ " tasks in total.");
if (events != null && events.size() > 0) {
List<TezEvent> tezEvents = new ArrayList<>();
for (CustomProcessorEvent event : events) {
TezEvent tezEvent = new TezEvent(event, null);
// use dummy task attempt id since this is not an task attempt specific event and task
// attempt id won't be used anyway
EventMetaData destinationMeta = new EventMetaData(EventProducerConsumerType.PROCESSOR,
managedVertex.getName(), managedVertex.getName(),
TezTaskAttemptID.getInstance(managedVertex.getTask(taskId).getTaskId(), -1));
tezEvent.setDestinationInfo(destinationMeta);
tezEvents.add(tezEvent);
}
appContext.getEventHandler().handle(
new VertexEventRouteEvent(managedVertex.getVertexId(), tezEvents));
}
}
@Override
public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {
checkAndThrowIfDone();
Objects.requireNonNull(locationHint, "locationHint is null");
managedVertex.setVertexLocationHint(locationHint);
}
@Override
public synchronized int getDAGAttemptNumber() {
checkAndThrowIfDone();
return appContext.getApplicationAttemptId().getAttemptId();
}
private void verifyIsRootInput(String inputName) {
Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
"Cannot add events for non-root inputs");
}
private EventMetaData getDestinationMetaData(String inputName) {
EventMetaData destMeta = destinationEventMetadataMap.get(inputName);
if (destMeta == null) {
destMeta = new EventMetaData(EventProducerConsumerType.INPUT, managedVertex.getName(),
inputName, null);
destinationEventMetadataMap.put(inputName, destMeta);
}
return destMeta;
}
@Override
public synchronized Resource getVertexTaskResource() {
checkAndThrowIfDone();
return managedVertex.getTaskResource();
}
@Override
public synchronized Resource getTotalAvailableResource() {
checkAndThrowIfDone();
return appContext.getTaskScheduler().getTotalResources(managedVertex.getTaskSchedulerIdentifier());
}
@Override
public synchronized int getNumClusterNodes() {
checkAndThrowIfDone();
return appContext.getTaskScheduler().getNumClusterNodes();
}
@Override
public synchronized void registerForVertexStateUpdates(String vertexName, Set<VertexState> stateSet) {
checkAndThrowIfDone();
synchronized(notificationRegisteredVertices) {
notificationRegisteredVertices.add(vertexName);
}
stateChangeNotifier.registerForVertexUpdates(vertexName, stateSet, this);
}
private void unregisterForVertexStateUpdates() {
synchronized (notificationRegisteredVertices) {
for (String vertexName : notificationRegisteredVertices) {
stateChangeNotifier.unregisterForVertexUpdates(vertexName, this);
}
}
}
boolean isComplete() {
return (isComplete.get() == true);
}
// TODO add later after TEZ-1714 @Override
public synchronized void vertexManagerDone() {
checkAndThrowIfDone();
LOG.info("Vertex Manager reported done for : " + managedVertex.getLogIdentifier());
this.isComplete.set(true);
unregisterForVertexStateUpdates();
}
@Override
public synchronized void vertexReconfigurationPlanned() {
checkAndThrowIfDone();
managedVertex.vertexReconfigurationPlanned();
}
@Override
public synchronized void doneReconfiguringVertex() {
checkAndThrowIfDone();
managedVertex.doneReconfiguringVertex();
}
@Override
public Map<String, List<String>> getInputVertexGroups() {
checkAndThrowIfDone();
Map<String, List<String>> inputGroups = Maps.newHashMap();
if (managedVertex.getGroupInputSpecList() != null) {
for (GroupInputSpec group : managedVertex.getGroupInputSpecList()) {
inputGroups.put(group.getGroupName(), Collections.unmodifiableList(group.getGroupVertices()));
}
}
return inputGroups;
}
@Override
public void onStateUpdated(VertexStateUpdate event) {
// this is not called by the vertex manager plugin.
// no need to synchronize this. similar to other external notification methods
enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStateUpdate(event));
}
}
public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi,
Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) throws TezException {
Objects.requireNonNull(pluginDesc, "pluginDesc is null");
Objects.requireNonNull(managedVertex, "managedVertex is null");
Objects.requireNonNull(appContext, "appContext is null");
Objects.requireNonNull(stateChangeNotifier, "notifier is null");
this.pluginDesc = pluginDesc;
this.dagUgi = dagUgi;
this.managedVertex = managedVertex;
this.appContext = appContext;
this.stateChangeNotifier = stateChangeNotifier;
// don't specify the size of rootInputInitEventQueue, otherwise it will fail when addAll
this.rootInputInitEventQueue = new LinkedBlockingQueue<TezEvent>();
pluginContext = new VertexManagerPluginContextImpl();
payload = pluginDesc.getUserPayload();
pluginFailed = new AtomicBoolean(false);
plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
new Class[] { VertexManagerPluginContext.class }, new Object[] { pluginContext });
execService = appContext.getExecService();
eventQueue = new LinkedBlockingQueue<VertexManagerEvent>();
eventInFlight = new AtomicBoolean(false);
}
public VertexManagerPlugin getPlugin() {
return plugin;
}
public void initialize() throws AMUserCodeException {
try {
if (!pluginContext.isComplete()) {
// TODO TEZ-2066 tracks moving this async.
synchronized (VertexManager.this) {
plugin.initialize();
}
}
} catch (Exception e) {
throw new AMUserCodeException(Source.VertexManager, e);
}
}
private boolean pluginInvocationAllowed(String msg) {
if (pluginFailed.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg + " . Manager failed. Vertex=" + managedVertex.getLogIdentifier());
}
return false;
}
if (pluginContext.isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg+ " . Manager complete. Not scheduling event. Vertex=" + managedVertex.getLogIdentifier());
}
return false;
}
return true;
}
private void enqueueAndScheduleNextEvent(VertexManagerEvent e) {
if (!pluginInvocationAllowed("Dropping event")) {
return;
}
eventQueue.add(e);
tryScheduleNextEvent();
}
private void tryScheduleNextEvent() {
if (!pluginInvocationAllowed("Not scheduling")) {
return;
}
if (eventQueue.isEmpty()) {
return;
}
if (eventInFlight.compareAndSet(false, true)) {
// no event was in flight
// ensures only 1 event is in flight
VertexManagerEvent e = eventQueue.poll();
if (e != null) {
ListenableFuture<Void> future = execService.submit(e);
Futures.addCallback(future, e.getCallback(), GuavaShim.directExecutor());
} else {
// This may happen. Lets say Callback succeeded on threadA. It set eventInFlight to false
// and called tryScheduleNextEvent() and found queue not empty but got paused before it
// could check eventInFlight.compareAndSet(). Another thread managed to dequeue the event
// and schedule a callback. That callback succeeded and set eventInFlight to false, found
// the queue empty and completed. Now threadA woke up and successfully did compareAndSet()
// tried to dequeue an event and got null.
// This could also happen if there is a bug and we manage to schedule for than 1 callback
// verify that is not the case
Preconditions.checkState(eventInFlight.compareAndSet(true, false));
}
}
}
public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws AMUserCodeException {
enqueueAndScheduleNextEvent(new VertexManagerEventOnVertexStarted(completions));
}
public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws AMUserCodeException {
enqueueAndScheduleNextEvent(new VertexManagerEventSourceTaskCompleted(attempt));
}
public void onVertexManagerEventReceived(
org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) throws AMUserCodeException {
enqueueAndScheduleNextEvent(new VertexManagerEventReceived(vmEvent));
}
public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) throws AMUserCodeException {
if (LOG.isDebugEnabled()) {
LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; enqueueing onRootVertexInitialized"
+ " on input:" + inputName + ", current task events size is " + rootInputInitEventQueue.size());
}
enqueueAndScheduleNextEvent(new VertexManagerEventRootInputInitialized(inputName,
inputDescriptor, events));
}
private class VertexManagerCallback implements FutureCallback<Void> {
@Override
public void onFailure(Throwable t) {
try {
Preconditions.checkState(eventInFlight.get());
// stop further event processing
pluginFailed.set(true);
eventQueue.clear();
// catch real root cause of failure, it would throw UndeclaredThrowableException
// if using UGI.doAs
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
Preconditions.checkState(appContext != null);
Preconditions.checkState(managedVertex != null);
// state change must be triggered via an event transition
appContext.getEventHandler().handle(
new VertexEventManagerUserCodeError(managedVertex.getVertexId(),
new AMUserCodeException(Source.VertexManager, t)));
// enqueue no further events due to user code error
} catch (Exception e) {
sendInternalError(e);
}
}
@Override
public void onSuccess(Void result) {
try {
onSuccessDerived(result);
Preconditions.checkState(eventInFlight.compareAndSet(true, false));
tryScheduleNextEvent();
} catch (Exception e) {
sendInternalError(e);
}
}
protected void onSuccessDerived(Void result) {
}
private void sendInternalError(Exception e) {
// fail the DAG so that we dont hang
// state change must be triggered via an event transition
LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e);
appContext.getEventHandler().handle(
(new DAGEventInternalError(managedVertex.getVertexId().getDAGId(),
"Error in VertexManager for vertex: " + managedVertex.getLogIdentifier()
+ ", error=" + ExceptionUtils.getStackTrace(e))));
}
}
private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback {
@Override
protected void onSuccessDerived(Void result) {
if (LOG.isDebugEnabled()) {
LOG.debug("vertex:" + managedVertex.getLogIdentifier()
+ "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"
+ ", current task events size is " + rootInputInitEventQueue.size());
}
List<TezEvent> resultEvents = new ArrayList<TezEvent>();
rootInputInitEventQueue.drainTo(resultEvents);
appContext.getEventHandler().handle(
new VertexEventInputDataInformation(managedVertex.getVertexId(), resultEvents));
}
}
class VertexManagerEventOnVertexStateUpdate extends VertexManagerEvent {
private final VertexStateUpdate event;
public VertexManagerEventOnVertexStateUpdate(VertexStateUpdate event) {
this.event = event;
}
@Override
public void invoke() throws Exception {
plugin.onVertexStateUpdated(event);
}
}
class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
private final List<TaskAttemptIdentifier> pluginCompletions;
public VertexManagerEventOnVertexStarted(List<TaskAttemptIdentifier> pluginCompletions) {
this.pluginCompletions = pluginCompletions;
}
@Override
public void invoke() throws Exception {
plugin.onVertexStarted(pluginCompletions);
}
}
class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
private final TaskAttemptIdentifier attempt;
public VertexManagerEventSourceTaskCompleted(TaskAttemptIdentifier attempt) {
this.attempt = attempt;
}
@Override
public void invoke() throws Exception {
plugin.onSourceTaskCompleted(attempt);
}
}
class VertexManagerEventReceived extends VertexManagerEvent {
private final org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent;
public VertexManagerEventReceived(org.apache.tez.runtime.api.events.VertexManagerEvent vmEvent) {
this.vmEvent = vmEvent;
}
@Override
public void invoke() throws Exception {
plugin.onVertexManagerEventReceived(vmEvent);
}
}
class VertexManagerEventRootInputInitialized extends VertexManagerEvent {
private final String inputName;
private final InputDescriptor inputDescriptor;
private final List<Event> events;
public VertexManagerEventRootInputInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
super(new VertexManagerRootInputInitializedCallback());
this.inputName = inputName;
this.inputDescriptor = inputDescriptor;
this.events = events;
}
@Override
public void invoke() throws Exception {
plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
}
}
abstract class VertexManagerEvent extends CallableEvent {
public VertexManagerEvent() {
this(VM_CALLBACK);
}
public VertexManagerEvent(VertexManagerCallback callback) {
super(callback);
}
@Override
public Void call() throws Exception {
final VertexManager manager = VertexManager.this;
manager.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
synchronized (manager) {
if (manager.pluginInvocationAllowed("Not invoking")) {
invoke();
}
}
return null;
}
});
return null;
}
public abstract void invoke() throws Exception;
}
}