| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| package org.apache.edgent.runtime.etiao; |
| |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.edgent.execution.services.ControlService; |
| import org.apache.edgent.execution.services.JobRegistryService; |
| import org.apache.edgent.execution.services.ServiceContainer; |
| import org.apache.edgent.graph.spi.execution.AbstractGraphJob; |
| import org.apache.edgent.oplet.JobContext; |
| import org.apache.edgent.runtime.etiao.graph.DirectGraph; |
| import org.apache.edgent.runtime.etiao.mbeans.EtiaoJobBean; |
| |
| /** |
| * Etiao runtime implementation of the {@link org.apache.edgent.execution.Job} interface. |
| */ |
| public class EtiaoJob extends AbstractGraphJob implements JobContext { |
| /** Prefix used by job unique identifiers. */ |
| public static final String ID_PREFIX = "JOB_"; |
| |
| private final DirectGraph graph; |
| private final String id; |
| private final String topologyName; |
| private final String name; |
| private final ServiceContainer containerServices; |
| private final JobRegistryService jobs; |
| |
| private static final AtomicInteger jobID = new AtomicInteger(0); |
| |
| /** |
| * Creates a new {@code EtiaoJob} instance which controls the lifecycle |
| * of the specified org.apache.edgent.graph. |
| * |
| * @param graph org.apache.edgent.graph representation of the org.apache.edgent.org.apache.edgent.topology |
| * @param topologyName name of the org.apache.edgent.org.apache.edgent.topology |
| * @param jobName name of the Job. If null, a name of {@code topologyName_jobId} is assigned. |
| * @param container service container |
| */ |
| EtiaoJob(DirectGraph graph, String topologyName, String jobName, ServiceContainer container) { |
| this.graph = graph; |
| this.id = ID_PREFIX + String.valueOf(jobID.getAndIncrement()); |
| this.topologyName = topologyName; |
| if (jobName == null) |
| jobName = this.topologyName + "_" + this.id; |
| this.name = jobName; |
| this.containerServices = container; |
| |
| ControlService cs = container.getService(ControlService.class); |
| if (cs != null) { |
| EtiaoJobBean.registerControl(cs, this); |
| } |
| this.jobs = container.getService(JobRegistryService.class); |
| if (jobs != null) |
| jobs.addJob(this); |
| } |
| |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public String getId() { |
| return id; |
| } |
| |
| ServiceContainer getContainerServices() { |
| return containerServices; |
| } |
| |
| @Override |
| public void stateChange(Action action) { |
| switch (action) { |
| case INITIALIZE: |
| setNext(State.INITIALIZED, action); |
| executable().initialize(); |
| break; |
| case START: |
| setNext(State.RUNNING, action); |
| executable().start(); |
| break; |
| case PAUSE: |
| case RESUME: |
| throw new UnsupportedOperationException(action.name()); |
| case CLOSE: |
| // idempotent |
| State s = setNext(State.CLOSED, action); |
| if (s != State.CLOSED) |
| executable().close(); |
| else |
| completeTransition(); |
| break; |
| default: |
| // TODO log Unsupported action |
| // log.severe("Unsupported action: " + action.name()); |
| throw new IllegalArgumentException(action.name()); |
| } |
| } |
| |
| Executable executable() { |
| return graph.executable(); |
| } |
| |
| /* State transitions map. Each entry associates a state with a set of |
| * reachable states the system can transition to. */ |
| static final HashMap<State, EnumSet<State>> stateMap; |
| static { |
| stateMap = new HashMap<State, EnumSet<State>>(); |
| |
| stateMap.put(State.CONSTRUCTED, EnumSet.of(State.INITIALIZED, State.CLOSED)); |
| stateMap.put(State.INITIALIZED, EnumSet.of(State.RUNNING, State.CLOSED)); |
| stateMap.put(State.RUNNING, EnumSet.of(State.PAUSED, State.CLOSED)); |
| stateMap.put(State.PAUSED, EnumSet.of(State.RUNNING, State.CLOSED)); |
| stateMap.put(State.CLOSED, EnumSet.of(State.CLOSED)); |
| } |
| |
| private synchronized State setNext(State desiredState, Action cause) { |
| if (!isReachable(desiredState)) |
| throw new IllegalArgumentException(cause.name()); |
| else { |
| setNextState(desiredState); |
| updateRegistry(); |
| return getCurrentState(); |
| } |
| } |
| |
| private boolean isReachable(State desiredState) { |
| return !inTransition() && stateMap.get(getCurrentState()).contains(desiredState); |
| } |
| |
| protected synchronized void completeTransition() { |
| super.completeTransition(); |
| updateRegistry(); |
| } |
| |
| void onActionComplete() { |
| completeTransition(); |
| } |
| |
| @Override |
| public void complete() throws ExecutionException, InterruptedException { |
| if (getCurrentState() != State.CLOSED && getNextState() != State.CLOSED) |
| awaitComplete(Long.MAX_VALUE); // TODO remove timeout |
| } |
| |
| @Override |
| public void complete(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException { |
| if (unit == null) |
| throw new NullPointerException(); |
| if (getCurrentState() != State.CLOSED && getNextState() != State.CLOSED && |
| !awaitComplete(unit.toMillis(timeout))) { |
| throw new TimeoutException(); |
| } |
| } |
| |
| /** |
| * Complete job closing. This method can be invoked after a job close |
| * has been initiated. |
| * @param timeout the time to wait |
| * @param unit TimeUnit |
| * @throws ExecutionException if the job execution threw an ExecutionException. |
| * @throws InterruptedException if the job execution threw an InterruptedException. |
| * @throws TimeoutException if the job execution threw an TimeoutException. |
| */ |
| public void completeClosing(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException { |
| if (unit == null) |
| throw new NullPointerException(); |
| if (getCurrentState() != State.CLOSED && |
| !awaitComplete(unit.toMillis(timeout))) { |
| throw new TimeoutException(); |
| } |
| } |
| |
| private boolean awaitComplete(long millis) throws ExecutionException, InterruptedException { |
| try { |
| return executable().complete(millis); |
| } |
| catch (InterruptedException e) { |
| throw e; |
| } |
| catch (Throwable t) { |
| throw Executable.executionException(t); |
| } |
| } |
| |
| public DirectGraph graph() { |
| return graph; |
| } |
| |
| private void updateRegistry() { |
| if (jobs != null) |
| jobs.updateJob(this); |
| } |
| |
| void updateHealth(Throwable t) { |
| if (t != null) { |
| setHealth(Health.UNHEALTHY); |
| setLastError(t.getMessage()); |
| } |
| updateRegistry(); |
| } |
| } |