blob: f7ecdeb2e22e91fd07b4585f7ffaea9918800f8a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.runtime.etiao;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.execution.services.JobRegistryService;
import org.apache.edgent.execution.services.ServiceContainer;
import org.apache.edgent.graph.spi.execution.AbstractGraphJob;
import org.apache.edgent.oplet.JobContext;
import org.apache.edgent.runtime.etiao.graph.DirectGraph;
import org.apache.edgent.runtime.etiao.mbeans.EtiaoJobBean;
/**
* Etiao runtime implementation of the {@link org.apache.edgent.execution.Job} interface.
*/
public class EtiaoJob extends AbstractGraphJob implements JobContext {
/** Prefix used by job unique identifiers. */
public static final String ID_PREFIX = "JOB_";
private final DirectGraph graph;
private final String id;
private final String topologyName;
private final String name;
private final ServiceContainer containerServices;
private final JobRegistryService jobs;
private static final AtomicInteger jobID = new AtomicInteger(0);
/**
* Creates a new {@code EtiaoJob} instance which controls the lifecycle
* of the specified org.apache.edgent.graph.
*
* @param graph org.apache.edgent.graph representation of the org.apache.edgent.org.apache.edgent.topology
* @param topologyName name of the org.apache.edgent.org.apache.edgent.topology
* @param jobName name of the Job. If null, a name of {@code topologyName_jobId} is assigned.
* @param container service container
*/
EtiaoJob(DirectGraph graph, String topologyName, String jobName, ServiceContainer container) {
this.graph = graph;
this.id = ID_PREFIX + String.valueOf(jobID.getAndIncrement());
this.topologyName = topologyName;
if (jobName == null)
jobName = this.topologyName + "_" + this.id;
this.name = jobName;
this.containerServices = container;
ControlService cs = container.getService(ControlService.class);
if (cs != null) {
EtiaoJobBean.registerControl(cs, this);
}
this.jobs = container.getService(JobRegistryService.class);
if (jobs != null)
jobs.addJob(this);
}
@Override
public String getName() {
return name;
}
@Override
public String getId() {
return id;
}
ServiceContainer getContainerServices() {
return containerServices;
}
@Override
public void stateChange(Action action) {
switch (action) {
case INITIALIZE:
setNext(State.INITIALIZED, action);
executable().initialize();
break;
case START:
setNext(State.RUNNING, action);
executable().start();
break;
case PAUSE:
case RESUME:
throw new UnsupportedOperationException(action.name());
case CLOSE:
// idempotent
State s = setNext(State.CLOSED, action);
if (s != State.CLOSED)
executable().close();
else
completeTransition();
break;
default:
// TODO log Unsupported action
// log.severe("Unsupported action: " + action.name());
throw new IllegalArgumentException(action.name());
}
}
Executable executable() {
return graph.executable();
}
/* State transitions map. Each entry associates a state with a set of
* reachable states the system can transition to. */
static final HashMap<State, EnumSet<State>> stateMap;
static {
stateMap = new HashMap<State, EnumSet<State>>();
stateMap.put(State.CONSTRUCTED, EnumSet.of(State.INITIALIZED, State.CLOSED));
stateMap.put(State.INITIALIZED, EnumSet.of(State.RUNNING, State.CLOSED));
stateMap.put(State.RUNNING, EnumSet.of(State.PAUSED, State.CLOSED));
stateMap.put(State.PAUSED, EnumSet.of(State.RUNNING, State.CLOSED));
stateMap.put(State.CLOSED, EnumSet.of(State.CLOSED));
}
private synchronized State setNext(State desiredState, Action cause) {
if (!isReachable(desiredState))
throw new IllegalArgumentException(cause.name());
else {
setNextState(desiredState);
updateRegistry();
return getCurrentState();
}
}
private boolean isReachable(State desiredState) {
return !inTransition() && stateMap.get(getCurrentState()).contains(desiredState);
}
protected synchronized void completeTransition() {
super.completeTransition();
updateRegistry();
}
void onActionComplete() {
completeTransition();
}
@Override
public void complete() throws ExecutionException, InterruptedException {
if (getCurrentState() != State.CLOSED && getNextState() != State.CLOSED)
awaitComplete(Long.MAX_VALUE); // TODO remove timeout
}
@Override
public void complete(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
if (unit == null)
throw new NullPointerException();
if (getCurrentState() != State.CLOSED && getNextState() != State.CLOSED &&
!awaitComplete(unit.toMillis(timeout))) {
throw new TimeoutException();
}
}
/**
* Complete job closing. This method can be invoked after a job close
* has been initiated.
* @param timeout the time to wait
* @param unit TimeUnit
* @throws ExecutionException if the job execution threw an ExecutionException.
* @throws InterruptedException if the job execution threw an InterruptedException.
* @throws TimeoutException if the job execution threw an TimeoutException.
*/
public void completeClosing(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
if (unit == null)
throw new NullPointerException();
if (getCurrentState() != State.CLOSED &&
!awaitComplete(unit.toMillis(timeout))) {
throw new TimeoutException();
}
}
private boolean awaitComplete(long millis) throws ExecutionException, InterruptedException {
try {
return executable().complete(millis);
}
catch (InterruptedException e) {
throw e;
}
catch (Throwable t) {
throw Executable.executionException(t);
}
}
public DirectGraph graph() {
return graph;
}
private void updateRegistry() {
if (jobs != null)
jobs.updateJob(this);
}
void updateHealth(Throwable t) {
if (t != null) {
setHealth(Health.UNHEALTHY);
setLastError(t.getMessage());
}
updateRegistry();
}
}