blob: 44c388120741471685e3c7bdb3cb28bd021d826e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.common.evaluator.context;
import com.google.protobuf.ByteString;
import org.apache.reef.annotations.Provided;
import org.apache.reef.annotations.audience.EvaluatorSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.evaluator.context.ContextMessage;
import org.apache.reef.evaluator.context.ContextMessageSource;
import org.apache.reef.evaluator.context.parameters.Services;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
import org.apache.reef.runtime.common.evaluator.task.TaskRuntime;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The evaluator side resourcemanager for Contexts.
*/
@Provided
@Private
@EvaluatorSide
public final class ContextRuntime {
private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName());
/**
* Context-local injector. This contains information that will not be available in child injectors.
*/
private final Injector contextInjector;
/**
* Service injector. State in this injector moves to child injectors.
*/
private final Injector serviceInjector;
/**
* Convenience class to hold all the event handlers for the context as well as the service instances.
*/
private final ContextLifeCycle contextLifeCycle;
/**
* The parent context, if there is any.
*/
private final Optional<ContextRuntime> parentContext; // guarded by this
/**
* The child context, if there is any.
*/
private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this
/**
* The currently running task, if there is any.
*/
private Optional<TaskRuntime> task = Optional.empty(); // guarded by this
private Thread taskRuntimeThread = null;
// TODO[JIRA REEF-835]: Which lock guards this?
private ReefServiceProtos.ContextStatusProto.State contextState =
ReefServiceProtos.ContextStatusProto.State.READY;
/**
* Create a new ContextRuntime.
*
* @param serviceInjector the serviceInjector to be used.
* @param contextConfiguration the Configuration for this context.
* @throws ContextClientCodeException if the context cannot be instantiated.
*/
ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration,
final Optional<ContextRuntime> parentContext) throws ContextClientCodeException {
this.serviceInjector = serviceInjector;
this.parentContext = parentContext;
// Trigger the instantiation of the services
try {
final Set<Object> services = serviceInjector.getNamedInstance(Services.class);
this.contextInjector = serviceInjector.forkInjector(contextConfiguration);
this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class);
} catch (BindException | InjectionException e) {
final Optional<String> parentID = this.getParentContext().isPresent() ?
Optional.of(this.getParentContext().get().getIdentifier()) :
Optional.<String>empty();
throw new ContextClientCodeException(
ContextClientCodeException.getIdentifier(contextConfiguration),
parentID, "Unable to spawn context", e);
}
// Trigger the context start events on contextInjector.
this.contextLifeCycle.start();
}
/**
* Create a new ContextRuntime for the root context.
*
* @param serviceInjector the serviceInjector to be used.
* @param contextConfiguration the Configuration for this context.
* @throws ContextClientCodeException if the context cannot be instantiated.
*/
ContextRuntime(final Injector serviceInjector,
final Configuration contextConfiguration) throws ContextClientCodeException {
this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty());
LOG.log(Level.FINEST, "Instantiating root context");
}
/**
* Spawns a new context.
* <p>
* The new context will have a serviceInjector that is created by forking the one in this object with the given
* serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
*
* @param contextConfiguration the new context's context (local) Configuration.
* @param serviceConfiguration the new context's service Configuration.
* @return a child context.
* @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues
* @throws IllegalStateException If this method is called when there is either a task or child context already
* present.
*/
ContextRuntime spawnChildContext(
final Configuration contextConfiguration,
final Configuration serviceConfiguration) throws ContextClientCodeException {
synchronized (this.contextLifeCycle) {
if (this.task.isPresent()) {
throw new IllegalStateException(
"Attempting to spawn a child context when a Task with id '" +
this.task.get().getId() + "' is running.");
}
if (this.childContext.isPresent()) {
throw new IllegalStateException(
"Attempting to instantiate a child context on a context that is not the topmost active context");
}
try {
final Injector childServiceInjector =
this.serviceInjector.forkInjector(serviceConfiguration);
final ContextRuntime newChildContext =
new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
this.childContext = Optional.of(newChildContext);
return newChildContext;
} catch (final BindException e) {
final Optional<String> parentID = this.getParentContext().isPresent() ?
Optional.of(this.getParentContext().get().getIdentifier()) :
Optional.<String>empty();
throw new ContextClientCodeException(
ContextClientCodeException.getIdentifier(contextConfiguration),
parentID, "Unable to spawn context", e);
}
}
}
/**
* Spawns a new context without services of its own.
* <p>
* The new context will have a serviceInjector that is created by forking the one in this object. The
* contextConfiguration is used to fork the contextInjector from that new serviceInjector.
*
* @param contextConfiguration the new context's context (local) Configuration.
* @return a child context.
* @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues.
* @throws IllegalStateException If this method is called when there is either a task or child context already
* present.
*/
ContextRuntime spawnChildContext(
final Configuration contextConfiguration) throws ContextClientCodeException {
synchronized (this.contextLifeCycle) {
if (this.task.isPresent()) {
throw new IllegalStateException(
"Attempting to to spawn a child context while a Task with id '" +
this.task.get().getId() + "' is running.");
}
if (this.childContext.isPresent()) {
throw new IllegalStateException(
"Attempting to spawn a child context on a context that is not the topmost active context");
}
final Injector childServiceInjector = this.serviceInjector.forkInjector();
final ContextRuntime newChildContext =
new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
this.childContext = Optional.of(newChildContext);
return newChildContext;
}
}
/**
* Launches a Task on this context.
*
* @param taskConfig the configuration to be used for the task.
* @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated
* due to user code / configuration issues.
* @throws IllegalStateException If this method is called when
* there is either a task or child context already present.
*/
@SuppressWarnings("checkstyle:illegalcatch")
void startTask(final Configuration taskConfig) throws TaskClientCodeException {
synchronized (this.contextLifeCycle) {
if (this.task.isPresent() && this.task.get().hasEnded()) {
// clean up state
this.task = Optional.empty();
}
if (this.task.isPresent()) {
throw new IllegalStateException("Attempting to start a Task when a Task with id '" +
this.task.get().getId() + "' is running.");
}
if (this.childContext.isPresent()) {
throw new IllegalStateException(
"Attempting to start a Task on a context that is not the topmost active context");
}
try {
final Injector taskInjector = this.contextInjector.forkInjector(taskConfig);
final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class);
taskRuntime.initialize();
this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId());
this.taskRuntimeThread.start();
this.task = Optional.of(taskRuntime);
LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId());
} catch (final BindException | InjectionException e) {
throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
this.getIdentifier(),
"Unable to instantiate the new task", e);
} catch (final Throwable t) {
throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
this.getIdentifier(),
"Unable to start the new task", t);
}
}
}
/**
* Close this context. If there is a child context, this recursively closes it before closing this context. If
* there is a Task currently running, that will be closed.
*/
void close() {
synchronized (this.contextLifeCycle) {
this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE;
if (this.task.isPresent()) {
LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed.");
this.task.get().close(null);
}
if (this.childContext.isPresent()) {
LOG.log(Level.WARNING, "Closing a context because its parent context is being closed.");
this.childContext.get().close();
}
this.contextLifeCycle.close();
if (this.parentContext.isPresent()) {
this.parentContext.get().resetChildContext();
}
}
}
/**
* @return the parent context, if there is one.
*/
Optional<ContextRuntime> getParentContext() {
return this.parentContext;
}
/**
* Deliver the given message to the Task.
* <p>
* Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
* in the log.
*
* @param message the suspend message to deliver or null if there is none.
*/
void suspendTask(final byte[] message) {
synchronized (this.contextLifeCycle) {
if (!this.task.isPresent()) {
LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring.");
} else {
this.task.get().suspend(message);
}
}
}
/**
* Issue a close call to the Task
* <p>
* Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
* in the log.
*
* @param message the close message to deliver or null if there is none.
*/
void closeTask(final byte[] message) {
synchronized (this.contextLifeCycle) {
if (!this.task.isPresent()) {
LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring.");
} else {
this.task.get().close(message);
}
}
}
/**
* Deliver a message to the Task
* <p>
* Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
* in the log.
*
* @param message the close message to deliver or null if there is none.
*/
void deliverTaskMessage(final byte[] message) {
synchronized (this.contextLifeCycle) {
if (!this.task.isPresent()) {
LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring.");
} else {
this.task.get().deliver(message);
}
}
}
/**
* @return the identifier of this context.
*/
String getIdentifier() {
return this.contextLifeCycle.getIdentifier();
}
/**
* Handle the context message.
*
* @param message sent by the driver
*/
void handleContextMessage(final byte[] message) {
this.contextLifeCycle.handleContextMessage(message);
}
/**
* @return the state of the running Task, if one is running.
*/
Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
synchronized (this.contextLifeCycle) {
if (this.task.isPresent()) {
if (this.task.get().hasEnded()) {
this.task = Optional.empty();
return Optional.empty();
} else {
return Optional.of(this.task.get().getStatusProto());
}
} else {
return Optional.empty();
}
}
}
/**
* Called by the child context when it has been closed.
*/
private void resetChildContext() {
synchronized (this.contextLifeCycle) {
if (this.childContext.isPresent()) {
this.childContext = Optional.empty();
} else {
throw new IllegalStateException("no child context set");
}
}
}
/**
* @return this context's status in protocol buffer form.
*/
ReefServiceProtos.ContextStatusProto getContextStatus() {
synchronized (this.contextLifeCycle) {
final ReefServiceProtos.ContextStatusProto.Builder builder =
ReefServiceProtos.ContextStatusProto.newBuilder()
.setContextId(this.getIdentifier())
.setContextState(this.contextState);
if (this.parentContext.isPresent()) {
builder.setParentId(this.parentContext.get().getIdentifier());
}
for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) {
final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage();
if (contextMessageOptional.isPresent()) {
builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder()
.setSourceId(contextMessageOptional.get().getMessageSourceID())
.setMessage(ByteString.copyFrom(contextMessageOptional.get().get()))
.build());
}
}
return builder.build();
}
}
}