blob: 5c2476b7cc2f46a354b8ba5b9b58ee27a10e1f96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.bridge.driver.service.grpc;
import com.google.protobuf.ByteString;
import io.grpc.*;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.StringUtils;
import org.apache.reef.bridge.driver.service.DriverClientException;
import org.apache.reef.bridge.driver.service.IDriverService;
import org.apache.reef.bridge.service.parameters.DriverClientCommand;
import org.apache.reef.bridge.proto.*;
import org.apache.reef.bridge.proto.Void;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextMessage;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.*;
import org.apache.reef.driver.restart.DriverRestartCompleted;
import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.*;
import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
import org.apache.reef.runtime.common.driver.idle.IdleMessage;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.OSUtils;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.event.Alarm;
import org.apache.reef.wake.time.event.StartTime;
import org.apache.reef.wake.time.event.StopTime;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* GRPC DriverBridgeService that interacts with higher-level languages.
*/
public final class GRPCDriverService implements IDriverService {
private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName());
private static final Void VOID = Void.newBuilder().build();
private Process driverProcess;
private enum StreamType { STDOUT, STDERR }
private Server server;
private DriverClientGrpc.DriverClientFutureStub clientStub;
private final Clock clock;
private final ExceptionCodec exceptionCodec;
private final ConfigurationSerializer configurationSerializer;
private final EvaluatorRequestor evaluatorRequestor;
private final JVMProcessFactory jvmProcessFactory;
private final CLRProcessFactory clrProcessFactory;
private final TcpPortProvider tcpPortProvider;
private final String driverClientCommand;
private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>();
private final Map<String, ActiveContext> activeContextMap = new HashMap<>();
private final Map<String, RunningTask> runningTaskMap = new HashMap<>();
private boolean stopped = false;
@Inject
private GRPCDriverService(
final Clock clock,
final EvaluatorRequestor evaluatorRequestor,
final ConfigurationSerializer configurationSerializer,
final JVMProcessFactory jvmProcessFactory,
final CLRProcessFactory clrProcessFactory,
final TcpPortProvider tcpPortProvider,
final ExceptionCodec exceptionCodec,
@Parameter(DriverClientCommand.class) final String driverClientCommand) {
this.clock = clock;
this.exceptionCodec = exceptionCodec;
this.configurationSerializer = configurationSerializer;
this.jvmProcessFactory = jvmProcessFactory;
this.clrProcessFactory = clrProcessFactory;
this.evaluatorRequestor = evaluatorRequestor;
this.driverClientCommand = driverClientCommand;
this.tcpPortProvider = tcpPortProvider;
}
private void start() throws IOException, InterruptedException {
for (final Integer port : this.tcpPortProvider) {
try {
this.server = ServerBuilder.forPort(port)
.addService(new DriverBridgeServiceImpl())
.build()
.start();
LOG.info("Server started, listening on " + port);
break;
} catch (IOException e) {
LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port);
}
}
if (this.server == null || this.server.isTerminated()) {
throw new IOException("Unable to start gRPC server");
} else {
final String cmd = this.driverClientCommand + " " + this.server.getPort();
final List<String> cmdOs = OSUtils.isWindows() ?
Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "-c", cmd);
LOG.log(Level.INFO, "CMD: " + cmdOs);
this.driverProcess = new ProcessBuilder()
.command(cmdOs)
.redirectError(new File("driverclient.stderr"))
.redirectOutput(new File("driverclient.stdout"))
.directory(new File(System.getProperty("user.dir")))
.start();
synchronized (this) {
int attempts = 10; // give some time
// wait for driver client process to register
while (attempts-- > 0 && this.clientStub == null && driverProcessIsAlive()) {
LOG.log(Level.INFO, "waiting for driver process to register");
this.wait(1000); // a second
}
}
if (driverProcessIsAlive()) {
Thread closeChildThread = new Thread() {
public void run() {
synchronized (GRPCDriverService.this) {
if (GRPCDriverService.this.driverProcess != null) {
GRPCDriverService.this.driverProcess.destroy();
GRPCDriverService.this.driverProcess = null;
}
}
}
};
Runtime.getRuntime().addShutdownHook(closeChildThread);
}
}
}
private void stop() {
stop(null);
}
private void stop(final Throwable t) {
LOG.log(Level.INFO, "STOP: gRPC Driver Service", t);
if (!stopped) {
try {
if (!clock.isClosed()) {
if (t != null) {
clock.stop(t);
} else {
clock.stop();
}
}
if (server != null) {
LOG.log(Level.INFO, "Shutdown gRPC");
this.server.shutdown();
this.server = null;
}
if (this.driverProcess != null) {
LOG.log(Level.INFO, "shutdown driver process");
dump();
this.driverProcess.destroy();
this.driverProcess = null;
}
} finally {
LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service");
stopped = true;
}
}
}
private void dump() {
if (!driverProcessIsAlive()) {
LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue());
}
dumpStream(StreamType.STDOUT);
dumpStream(StreamType.STDERR);
}
private void dumpStream(final StreamType type) {
StringBuffer buffer = new StringBuffer();
String name = "";
InputStream stream = null;
switch(type) {
case STDOUT:
name = "stdout";
stream = this.driverProcess.getInputStream();
break;
case STDERR:
name = "stderr";
stream = this.driverProcess.getErrorStream();
break;
default:
LOG.log(Level.WARNING, "Invalid stream type value");
}
LOG.log(Level.INFO, "capturing driver process " + name);
try {
int nextChar;
buffer.append("\n==============================================\n");
while ((nextChar = stream.read()) != -1) {
buffer.append((char) nextChar);
}
buffer.append("\n==============================================\n");
} catch (IOException e) {
LOG.log(Level.WARNING, "Error while capturing output stream: " + e.getMessage());
}
LOG.log(Level.INFO, buffer.toString());
}
/**
* Determines if the driver process is still alive by
* testing for its exit value, which throws {@link IllegalThreadStateException}
* if process is still running.
* @return true if driver process is alive, false otherwise
*/
private boolean driverProcessIsAlive() {
if (this.driverProcess != null) {
try {
this.driverProcess.exitValue();
} catch (IllegalThreadStateException e) {
return true;
}
}
return false;
}
private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) {
if (descriptor == null) {
return null;
} else {
return EvaluatorDescriptorInfo.newBuilder()
.setCores(descriptor.getNumberOfCores())
.setMemory(descriptor.getMemory())
.setRuntimeName(descriptor.getRuntimeName())
.build();
}
}
@Override
public IdleMessage getIdleStatus() {
final String componentName = "Java Bridge DriverService";
if (this.clientStub != null) {
try {
final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(VOID).get();
LOG.log(Level.INFO, "is idle: " + idleStatus.getIsIdle());
return new IdleMessage(
componentName,
idleStatus.getReason(),
idleStatus.getIsIdle());
} catch (ExecutionException | InterruptedException e) {
stop(e);
}
}
return new IdleMessage(
componentName,
"stub not initialized",
true);
}
@Override
public void startHandler(final StartTime startTime) {
try {
start();
synchronized (this) {
if (this.clientStub != null) {
this.clientStub.startHandler(
StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build());
} else {
stop(new IllegalStateException("Unable to start driver client"));
}
}
} catch (IOException | InterruptedException e) {
stop(e);
}
}
@Override
public void stopHandler(final StopTime stopTime) {
synchronized (this) {
if (clientStub != null) {
final Future<ExceptionInfo> callCompletion = this.clientStub.stopHandler(
StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build());
try {
try {
final ExceptionInfo error = callCompletion.get(5L, TimeUnit.MINUTES);
if (!error.getNoError()) {
final Optional<Throwable> t = parseException(error);
if (t.isPresent()) {
throw new RuntimeException("driver stop exception",
t.get().getCause() != null ? t.get().getCause() : t.get());
} else {
throw new RuntimeException(error.getMessage() != null ? error.getMessage() : error.getName());
}
}
} catch (TimeoutException e) {
throw new RuntimeException("stop handler timed out", e);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
stop();
}
}
}
}
@Override
public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) {
synchronized (this) {
this.allocatedEvaluatorMap.put(eval.getId(), eval);
this.clientStub.allocatedEvaluatorHandler(
EvaluatorInfo.newBuilder()
.setEvaluatorId(eval.getId())
.setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor()))
.build());
}
}
@Override
public void completedEvaluatorHandler(final CompletedEvaluator eval) {
synchronized (this) {
this.allocatedEvaluatorMap.remove(eval.getId());
this.clientStub.completedEvaluatorHandler(
EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
}
}
@Override
public void failedEvaluatorHandler(final FailedEvaluator eval) {
synchronized (this) {
this.allocatedEvaluatorMap.remove(eval.getId());
this.clientStub.failedEvaluatorHandler(
EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build());
}
}
@Override
public void activeContextHandler(final ActiveContext context) {
synchronized (this) {
this.activeContextMap.put(context.getId(), context);
this.clientStub.activeContextHandler(
ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(
context.getParentId().isPresent() ?
context.getParentId().get() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()))
.build());
}
}
@Override
public void closedContextHandler(final ClosedContext context) {
synchronized (this) {
this.activeContextMap.remove(context.getId());
this.clientStub.closedContextHandler(
ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(context.getParentContext().getId())
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()))
.build());
}
}
@Override
public void failedContextHandler(final FailedContext context) {
synchronized (this) {
final ContextInfo.Builder contextInfoBuilder =
ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(
context.getParentContext().isPresent() ?
context.getParentContext().get().getId() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()));
if (context.getReason().isPresent()) {
final Throwable reason = context.getReason().get();
contextInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(reason.toString())
.setMessage(context.getMessage() != null ? context.getMessage() : "")
.setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
.build());
} else if (context.getData().isPresent()) {
contextInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(context.toString())
.setMessage(context.getDescription().isPresent() ?
context.getDescription().get() :
context.getMessage() != null ? context.getMessage() : "")
.setData(ByteString.copyFrom(context.getData().get()))
.build());
} else {
final Throwable reason = context.asError();
contextInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(reason.toString())
.setMessage(context.getMessage() != null ? context.getMessage() : "")
.setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
.build());
}
this.activeContextMap.remove(context.getId());
this.clientStub.failedContextHandler(contextInfoBuilder.build());
}
}
@Override
public void contextMessageHandler(final ContextMessage message) {
synchronized (this) {
this.clientStub.contextMessageHandler(
ContextMessageInfo.newBuilder()
.setContextId(message.getId())
.setMessageSourceId(message.getMessageSourceID())
.setSequenceNumber(message.getSequenceNumber())
.setPayload(ByteString.copyFrom(message.get()))
.build());
}
}
@Override
public void runningTaskHandler(final RunningTask task) {
synchronized (this) {
final ActiveContext context = task.getActiveContext();
if (!this.activeContextMap.containsKey(context.getId())) {
this.activeContextMap.put(context.getId(), context);
}
this.runningTaskMap.put(task.getId(), task);
this.clientStub.runningTaskHandler(
TaskInfo.newBuilder()
.setTaskId(task.getId())
.setContext(ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
task.getActiveContext().getEvaluatorDescriptor()))
.build())
.build());
}
}
@Override
public void failedTaskHandler(final FailedTask task) {
synchronized (this) {
if (task.getActiveContext().isPresent() &&
!this.activeContextMap.containsKey(task.getActiveContext().get().getId())) {
this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get());
}
final TaskInfo.Builder taskInfoBuilder = TaskInfo.newBuilder()
.setTaskId(task.getId());
if (task.getActiveContext().isPresent()) {
taskInfoBuilder.setContext(ContextInfo.newBuilder()
.setContextId(task.getActiveContext().get().getId())
.setEvaluatorId(task.getActiveContext().get().getEvaluatorId())
.setParentId(task.getActiveContext().get().getParentId().isPresent() ?
task.getActiveContext().get().getParentId().get() : "")
.build());
}
if (task.getReason().isPresent()) {
final Throwable reason = task.getReason().get();
taskInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(reason.toString())
.setMessage(task.getMessage() != null ? task.getMessage() : "")
.setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
.build());
} else if (task.getData().isPresent()) {
final Throwable reason = task.asError();
taskInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(reason.toString())
.setMessage(task.getMessage() != null ? task.getMessage() : "")
.setData(ByteString.copyFrom(task.getData().get()))
.build());
} else {
final Throwable reason = task.asError();
taskInfoBuilder.setException(ExceptionInfo.newBuilder()
.setName(reason.toString())
.setMessage(task.getMessage() != null ? task.getMessage() : "")
.setData(ByteString.copyFrom(exceptionCodec.toBytes(reason)))
.build());
}
this.runningTaskMap.remove(task.getId());
this.clientStub.failedTaskHandler(taskInfoBuilder.build());
}
}
@Override
public void completedTaskHandler(final CompletedTask task) {
synchronized (this) {
if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
}
this.runningTaskMap.remove(task.getId());
this.clientStub.completedTaskHandler(
TaskInfo.newBuilder()
.setTaskId(task.getId())
.setContext(ContextInfo.newBuilder()
.setContextId(task.getActiveContext().getId())
.setEvaluatorId(task.getActiveContext().getEvaluatorId())
.setParentId(task.getActiveContext().getParentId().isPresent() ?
task.getActiveContext().getParentId().get() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
task.getActiveContext().getEvaluatorDescriptor()))
.build())
.build());
}
}
@Override
public void suspendedTaskHandler(final SuspendedTask task) {
synchronized (this) {
if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) {
this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext());
}
this.runningTaskMap.remove(task.getId());
this.clientStub.suspendedTaskHandler(
TaskInfo.newBuilder()
.setTaskId(task.getId())
.setContext(ContextInfo.newBuilder()
.setContextId(task.getActiveContext().getId())
.setEvaluatorId(task.getActiveContext().getEvaluatorId())
.setParentId(task.getActiveContext().getParentId().isPresent() ?
task.getActiveContext().getParentId().get() : "")
.build())
.setResult(task.get() == null || task.get().length == 0 ?
null : ByteString.copyFrom(task.get()))
.build());
}
}
@Override
public void taskMessageHandler(final TaskMessage message) {
synchronized (this) {
this.clientStub.taskMessageHandler(
TaskMessageInfo.newBuilder()
.setTaskId(message.getId())
.setContextId(message.getContextId())
.setMessageSourceId(message.getMessageSourceID())
.setSequenceNumber(message.getSequenceNumber())
.setPayload(ByteString.copyFrom(message.get()))
.build());
}
}
@Override
public void clientMessageHandler(final byte[] message) {
synchronized (this) {
this.clientStub.clientMessageHandler(
ClientMessageInfo.newBuilder()
.setPayload(ByteString.copyFrom(message))
.build());
}
}
@Override
public void clientCloseHandler() {
synchronized (this) {
this.clientStub.clientCloseHandler(VOID);
}
}
@Override
public void clientCloseWithMessageHandler(final byte[] message) {
synchronized (this) {
this.clientStub.clientCloseWithMessageHandler(
ClientMessageInfo.newBuilder()
.setPayload(ByteString.copyFrom(message))
.build());
}
}
@Override
public void driverRestarted(final DriverRestarted restart) {
try {
start();
synchronized (this) {
if (this.clientStub != null) {
this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilder()
.setResubmissionAttempts(restart.getResubmissionAttempts())
.setStartTime(StartTimeInfo.newBuilder()
.setStartTime(restart.getStartTime().getTimestamp()).build())
.addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds())
.build());
} else {
stop(new DriverClientException("Failed to restart driver client"));
}
}
} catch (InterruptedException | IOException e) {
stop(e);
}
}
@Override
public void restartRunningTask(final RunningTask task) {
synchronized (this) {
final ActiveContext context = task.getActiveContext();
if (!this.activeContextMap.containsKey(context.getId())) {
this.activeContextMap.put(context.getId(), context);
}
this.runningTaskMap.put(task.getId(), task);
this.clientStub.driverRestartRunningTaskHandler(
TaskInfo.newBuilder()
.setTaskId(task.getId())
.setContext(ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "")
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(context.getEvaluatorDescriptor()))
.build())
.build());
}
}
@Override
public void restartActiveContext(final ActiveContext context) {
synchronized (this) {
this.activeContextMap.put(context.getId(), context);
this.clientStub.driverRestartActiveContextHandler(
ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(
context.getParentId().isPresent() ?
context.getParentId().get() : null)
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()))
.build());
}
}
@Override
public void driverRestartCompleted(final DriverRestartCompleted restartCompleted) {
synchronized (this) {
this.clientStub.driverRestartCompletedHandler(DriverRestartCompletedInfo.newBuilder()
.setCompletionTime(StopTimeInfo.newBuilder()
.setStopTime(restartCompleted.getCompletedTime().getTimestamp()).build())
.setIsTimedOut(restartCompleted.isTimedOut())
.build());
}
}
@Override
public void restartFailedEvalautor(final FailedEvaluator evaluator) {
synchronized (this) {
this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.newBuilder()
.setEvaluatorId(evaluator.getId())
.setFailure(EvaluatorInfo.FailureInfo.newBuilder()
.setMessage(evaluator.getEvaluatorException() != null ?
evaluator.getEvaluatorException().getMessage() : "unknown failure during restart")
.build())
.build());
}
}
private Optional<Throwable> parseException(final ExceptionInfo info) {
if (info.getData() == null || info.getData().isEmpty()) {
return Optional.empty();
} else {
return exceptionCodec.fromBytes(info.getData().toByteArray());
}
}
private final class DriverBridgeServiceImpl
extends DriverServiceGrpc.DriverServiceImplBase {
@Override
public void registerDriverClient(
final DriverClientRegistration request,
final StreamObserver<Void> responseObserver) {
LOG.log(Level.INFO, "driver client register");
try {
if (request.hasException()) {
LOG.log(Level.SEVERE, "Driver client initialization exception");
final Optional<Throwable> ex = parseException(request.getException());
if (ex.isPresent()) {
GRPCDriverService.this.clock.stop(ex.get());
} else {
GRPCDriverService.this.clock.stop(new RuntimeException(
request.getException().getMessage() == null ?
request.getException().getName() :
request.getException().getMessage()
));
}
} else {
final ManagedChannel channel = ManagedChannelBuilder
.forAddress(request.getHost(), request.getPort())
.usePlaintext(true)
.build();
synchronized (GRPCDriverService.this) {
GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel);
GRPCDriverService.this.notifyAll();
}
LOG.log(Level.INFO, "Driver has registered on port " + request.getPort());
}
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
@Override
public void requestResources(
final ResourceRequest request,
final StreamObserver<Void> responseObserver) {
try {
synchronized (GRPCDriverService.this) {
EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest();
requestBuilder.setNumber(request.getResourceCount());
requestBuilder.setNumberOfCores(request.getCores());
requestBuilder.setMemory(request.getMemorySize());
requestBuilder.setRelaxLocality(request.getRelaxLocality());
requestBuilder.setRuntimeName(request.getRuntimeName());
if (request.getNodeNameListCount() > 0) {
requestBuilder.addNodeNames(request.getNodeNameListList());
}
if (request.getRackNameListCount() > 0) {
for (final String rackName : request.getRackNameListList()) {
requestBuilder.addRackName(rackName);
}
}
GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build());
}
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
@Override
public void shutdown(
final ShutdownRequest request,
final StreamObserver<Void> responseObserver) {
try {
LOG.log(Level.INFO, "driver shutdown");
if (request.hasException()) {
final Optional<Throwable> exception = parseException(request.getException());
if (exception.isPresent()) {
LOG.log(Level.INFO, "driver exception: " + exception.get().toString());
GRPCDriverService.this.clock.stop(exception.get());
} else {
// exception that cannot be parsed in java
GRPCDriverService.this.clock.stop(
new DriverClientException(request.getException().getMessage()));
}
} else {
LOG.log(Level.INFO, "clean shutdown");
GRPCDriverService.this.clock.stop();
}
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
@Override
public void setAlarm(
final AlarmRequest request,
final StreamObserver<Void> responseObserver) {
try {
// do not synchronize when scheduling an alarm (or deadlock)
LOG.log(Level.INFO, "Set alarm {0} offset {1}",
new Object[] {request.getAlarmId(), request.getTimeoutMs()});
LOG.log(Level.INFO, "Alarm class " + GRPCDriverService.this.clock.getClass());
GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm value) {
LOG.log(Level.INFO, "Trigger alarm {0}", request.getAlarmId());
synchronized (GRPCDriverService.this) {
GRPCDriverService.this.clientStub.alarmTrigger(
AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build());
LOG.log(Level.INFO, "DONE: trigger alarm {0}", request.getAlarmId());
}
}
});
LOG.log(Level.INFO, "Alarm {0} scheduled is idle? {1}",
new Object[] {request.getAlarmId(), clock.isIdle()});
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
@Override
public void allocatedEvaluatorOp(
final AllocatedEvaluatorRequest request,
final StreamObserver<Void> responseObserver) {
try {
if (request.getEvaluatorConfiguration() == null) {
responseObserver.onError(Status.INTERNAL
.withDescription("Evaluator configuration required")
.asRuntimeException());
} else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) {
responseObserver.onError(Status.INTERNAL
.withDescription("Context and/or Task configuration required")
.asRuntimeException());
} else {
synchronized (GRPCDriverService.this) {
if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) {
responseObserver.onError(Status.INTERNAL
.withDescription("Unknown allocated evaluator " + request.getEvaluatorId())
.asRuntimeException());
}
final AllocatedEvaluator evaluator =
GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId());
if (request.getCloseEvaluator()) {
evaluator.close();
} else {
if (request.getAddFilesCount() > 0) {
for (final String file : request.getAddFilesList()) {
evaluator.addFile(new File(file));
}
}
if (request.getAddLibrariesCount() > 0) {
for (final String library : request.getAddLibrariesList()) {
evaluator.addLibrary(new File(library));
}
}
if (request.getSetProcess() != null) {
final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest =
request.getSetProcess();
switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) {
case JVM:
setJVMProcess(evaluator, processRequest);
break;
case CLR:
setCLRProcess(evaluator, processRequest);
break;
default:
throw new RuntimeException("Unknown evaluator process type");
}
}
if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) {
// Assume that we are running Java driver client, but this assumption could be a bug so log a warning
LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client.");
if (StringUtils.isNotEmpty(request.getContextConfiguration()) &&
StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit context and task
try {
evaluator.submitContextAndTask(
configurationSerializer.fromString(request.getContextConfiguration()),
configurationSerializer.fromString(request.getTaskConfiguration()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (StringUtils.isNotEmpty(request.getContextConfiguration())) {
// submit context
try {
evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit task
try {
evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("Missing check for required evaluator configurations");
}
} else {
if (StringUtils.isNotEmpty(request.getContextConfiguration()) &&
StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit context and task
((AllocatedEvaluatorImpl) evaluator).submitContextAndTask(
request.getEvaluatorConfiguration(),
request.getContextConfiguration(),
request.getTaskConfiguration());
} else if (StringUtils.isNotEmpty(request.getContextConfiguration())) {
// submit context
((AllocatedEvaluatorImpl) evaluator).submitContext(
request.getEvaluatorConfiguration(),
request.getContextConfiguration());
} else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) {
// submit task
((AllocatedEvaluatorImpl) evaluator).submitTask(
request.getEvaluatorConfiguration(),
request.getTaskConfiguration());
} else {
throw new RuntimeException("Missing check for required evaluator configurations");
}
}
}
}
}
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
@Override
public void activeContextOp(
final ActiveContextRequest request,
final StreamObserver<Void> responseObserver) {
synchronized (GRPCDriverService.this) {
if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) {
LOG.log(Level.SEVERE, "Context does not exist with id " + request.getContextId());
responseObserver.onError(Status.INTERNAL
.withDescription("Context does not exist with id " + request.getContextId())
.asRuntimeException());
} else {
final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId());
if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) {
if (request.getCloseContext()) {
try {
LOG.log(Level.INFO, "closing context " + context.getId());
context.close();
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
} else {
LOG.log(Level.SEVERE, "Close context operation not set to true");
responseObserver.onError(Status.INTERNAL
.withDescription("Close context operation not set to true")
.asRuntimeException());
}
} else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) {
if (request.getMessage() != null) {
try {
LOG.log(Level.INFO, "send message to context " + context.getId());
context.sendMessage(request.getMessage().toByteArray());
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
} else {
responseObserver.onError(Status.INTERNAL
.withDescription("Empty message on operation send message").asRuntimeException());
}
} else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) {
try {
LOG.log(Level.INFO, "submitting child context to context " + context.getId());
((EvaluatorContext) context).submitContext(request.getNewContextRequest());
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
} else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) {
try {
LOG.log(Level.INFO, "submitting task to context " + context.getId());
((EvaluatorContext) context).submitTask(request.getNewTaskRequest());
} finally {
responseObserver.onNext(null);
responseObserver.onCompleted();
}
}
}
}
}
@Override
public void runningTaskOp(
final RunningTaskRequest request,
final StreamObserver<Void> responseObserver) {
synchronized (GRPCDriverService.this) {
if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) {
LOG.log(Level.WARNING, "Unknown task id {0}", request.getTaskId());
responseObserver.onError(Status.INTERNAL
.withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException());
} else {
try {
final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId());
switch (request.getOperation()) {
case CLOSE:
LOG.log(Level.INFO, "close task {0}", task.getId());
if (request.getMessage().isEmpty()) {
task.close();
} else {
task.close(request.getMessage().toByteArray());
}
break;
case SUSPEND:
LOG.log(Level.INFO, "suspend task {0}", task.getId());
if (request.getMessage().isEmpty()) {
task.suspend();
} else {
task.suspend(request.getMessage().toByteArray());
}
break;
case SEND_MESSAGE:
LOG.log(Level.INFO, "send message to task {0}", task.getId());
task.send(request.getMessage().toByteArray());
break;
default:
throw new RuntimeException("Unknown operation " + request.getOperation());
}
responseObserver.onNext(null);
} finally {
responseObserver.onCompleted();
}
}
}
}
private void setCLRProcess(
final AllocatedEvaluator evaluator,
final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess();
if (processRequest.getMemoryMb() > 0) {
process.setMemory(processRequest.getMemoryMb());
}
if (processRequest.getConfigurationFileName() != null) {
process.setConfigurationFileName(processRequest.getConfigurationFileName());
}
if (processRequest.getStandardOut() != null) {
process.setStandardOut(processRequest.getStandardOut());
}
if (processRequest.getStandardErr() != null) {
process.setStandardErr(processRequest.getStandardErr());
}
evaluator.setProcess(process);
}
private void setJVMProcess(
final AllocatedEvaluator evaluator,
final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) {
final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess();
if (processRequest.getMemoryMb() > 0) {
process.setMemory(processRequest.getMemoryMb());
}
if (processRequest.getConfigurationFileName() != null) {
process.setConfigurationFileName(processRequest.getConfigurationFileName());
}
if (processRequest.getStandardOut() != null) {
process.setStandardOut(processRequest.getStandardOut());
}
if (processRequest.getStandardErr() != null) {
process.setStandardErr(processRequest.getStandardErr());
}
if (processRequest.getOptionsCount() > 0) {
for (final String option : processRequest.getOptionsList()) {
process.addOption(option);
}
}
evaluator.setProcess(process);
}
}
}