| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.bridge.driver.service.grpc; |
| |
| import com.google.protobuf.ByteString; |
| import io.grpc.ManagedChannel; |
| import io.grpc.ManagedChannelBuilder; |
| import io.grpc.Server; |
| import io.grpc.Status; |
| import io.grpc.netty.NettyServerBuilder; |
| import io.grpc.stub.StreamObserver; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.reef.annotations.audience.Private; |
| import org.apache.reef.bridge.driver.common.grpc.GRPCUtils; |
| import org.apache.reef.bridge.driver.common.grpc.ObserverCleanup; |
| import org.apache.reef.bridge.driver.service.DriverService; |
| import org.apache.reef.bridge.proto.*; |
| import org.apache.reef.bridge.proto.Void; |
| import org.apache.reef.bridge.service.parameters.DriverClientCommand; |
| import org.apache.reef.driver.context.ActiveContext; |
| import org.apache.reef.driver.context.ClosedContext; |
| import org.apache.reef.driver.context.ContextMessage; |
| import org.apache.reef.driver.context.FailedContext; |
| import org.apache.reef.driver.evaluator.*; |
| import org.apache.reef.driver.restart.DriverRestartCompleted; |
| import org.apache.reef.driver.restart.DriverRestarted; |
| import org.apache.reef.driver.task.*; |
| import org.apache.reef.exception.NonSerializableException; |
| import org.apache.reef.runtime.common.driver.context.EvaluatorContext; |
| import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; |
| import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource; |
| import org.apache.reef.runtime.common.driver.idle.IdleMessage; |
| import org.apache.reef.runtime.common.files.REEFFileNames; |
| import org.apache.reef.runtime.common.utils.ExceptionCodec; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.util.OSUtils; |
| import org.apache.reef.util.Optional; |
| import org.apache.reef.wake.EventHandler; |
| import org.apache.reef.wake.remote.ports.TcpPortProvider; |
| import org.apache.reef.wake.time.Clock; |
| import org.apache.reef.wake.time.event.Alarm; |
| import org.apache.reef.wake.time.event.StartTime; |
| import org.apache.reef.wake.time.event.StopTime; |
| |
| import javax.inject.Inject; |
| import java.io.*; |
| import java.net.InetSocketAddress; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * GRPC DriverBridgeService that interacts with higher-level languages. |
| */ |
| @Private |
| public final class GRPCDriverService implements DriverService, DriverIdlenessSource { |
| private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName()); |
| |
| private static final Void VOID = Void.newBuilder().build(); |
| |
| private Process driverProcess; |
| |
| private enum StreamType { STDOUT, STDERR } |
| |
| private Server server; |
| |
| private DriverClientGrpc.DriverClientFutureStub clientStub; |
| |
| private final Clock clock; |
| |
| private final REEFFileNames reefFileNames; |
| |
| private final ExceptionCodec exceptionCodec; |
| |
| private final EvaluatorRequestor evaluatorRequestor; |
| |
| private final JVMProcessFactory jvmProcessFactory; |
| |
| private final CLRProcessFactory clrProcessFactory; |
| |
| private final DotNetProcessFactory dotNetProcessFactory; |
| |
| private final TcpPortProvider tcpPortProvider; |
| |
| private final String driverClientCommand; |
| |
| private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>(); |
| |
| private final Map<String, ActiveContext> activeContextMap = new HashMap<>(); |
| |
| private final Map<String, RunningTask> runningTaskMap = new HashMap<>(); |
| |
| private boolean stopped = false; |
| |
| @Inject |
| private GRPCDriverService( |
| final Clock clock, |
| final REEFFileNames reefFileNames, |
| final EvaluatorRequestor evaluatorRequestor, |
| final JVMProcessFactory jvmProcessFactory, |
| final CLRProcessFactory clrProcessFactory, |
| final DotNetProcessFactory dotNetProcessFactory, |
| final TcpPortProvider tcpPortProvider, |
| final ExceptionCodec exceptionCodec, |
| @Parameter(DriverClientCommand.class) final String driverClientCommand) { |
| this.clock = clock; |
| this.reefFileNames = reefFileNames; |
| this.exceptionCodec = exceptionCodec; |
| this.jvmProcessFactory = jvmProcessFactory; |
| this.clrProcessFactory = clrProcessFactory; |
| this.dotNetProcessFactory = dotNetProcessFactory; |
| this.evaluatorRequestor = evaluatorRequestor; |
| this.driverClientCommand = driverClientCommand; |
| this.tcpPortProvider = tcpPortProvider; |
| } |
| |
| private void start() throws IOException, InterruptedException { |
| for (final int port : this.tcpPortProvider) { |
| try { |
| this.server = NettyServerBuilder.forAddress(new InetSocketAddress("localhost", port)) |
| .addService(new DriverBridgeServiceImpl()) |
| .build() |
| .start(); |
| LOG.log(Level.INFO, "Server started, listening on port [{0}]", port); |
| break; |
| } catch (final IOException e) { |
| LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); |
| } |
| } |
| if (this.server == null || this.server.isTerminated()) { |
| throw new IOException("Unable to start gRPC server"); |
| } |
| final String cmd = this.driverClientCommand + " " + this.server.getPort(); |
| final List<String> cmdOs = OSUtils.isWindows() ? |
| Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "-c", cmd); |
| LOG.log(Level.INFO, "CMD: {0}", cmdOs); |
| this.driverProcess = new ProcessBuilder() |
| .command(cmdOs) |
| .redirectError(new File(this.reefFileNames.getDriverClientStderrFileName())) |
| .redirectOutput(new File(this.reefFileNames.getDriverClientStdoutFileName())) |
| .start(); |
| synchronized (this) { |
| int attempts = 60; // give it a minute |
| /* wait for driver client process to register |
| * Note: attempts and wait time have been given reasonable hardcoded values for a driver |
| * client to register with the driver service (us). Making these values configurable would |
| * require additions to the ClientProtocol buffer such that they can be passed to the |
| * GRPCDriverServiceConfigurationProvider and bound to the appropriate NamedParameters. It |
| * is the opinion at the time of this writing that a driver client should be able to register |
| * within a minute. |
| */ |
| while (!stopped && attempts-- > 0 && this.clientStub == null && driverProcessIsAlive()) { |
| LOG.log(Level.INFO, "waiting for driver process to register"); |
| this.wait(1000); // a second |
| } |
| } |
| if (!stopped && driverProcessIsAlive()) { |
| final Thread closeChildThread = new Thread() { |
| public void run() { |
| synchronized (GRPCDriverService.this) { |
| if (GRPCDriverService.this.driverProcess != null) { |
| GRPCDriverService.this.driverProcess.destroy(); |
| GRPCDriverService.this.driverProcess = null; |
| } |
| } |
| } |
| }; |
| // This is probably overkill since shutdown should be called in the stop handler. |
| Runtime.getRuntime().addShutdownHook(closeChildThread); |
| } |
| } |
| |
| private void stop() { |
| stop(null); |
| } |
| |
| private void stop(final Throwable t) { |
| LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); |
| if (!stopped) { |
| try { |
| if (!clock.isClosed()) { |
| if (t != null) { |
| clock.stop(t); |
| } else { |
| clock.stop(); |
| } |
| } |
| if (server != null) { |
| LOG.log(Level.INFO, "Shutdown gRPC"); |
| this.server.shutdown(); |
| this.server = null; |
| } |
| if (this.driverProcess != null) { |
| LOG.log(Level.INFO, "shutdown driver process"); |
| dump(); |
| this.driverProcess.destroy(); |
| this.driverProcess = null; |
| } |
| } finally { |
| LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service"); |
| clientStub = null; |
| stopped = true; |
| } |
| } |
| } |
| |
| private void dump() { |
| if (!driverProcessIsAlive()) { |
| LOG.log(Level.INFO, "Exit code: {0}", this.driverProcess.exitValue()); |
| } |
| dumpStream(StreamType.STDOUT); |
| dumpStream(StreamType.STDERR); |
| } |
| |
| private void dumpStream(final StreamType type) { |
| final StringBuilder stringBuilder = new StringBuilder(); |
| |
| final String name; |
| final InputStream stream; |
| switch(type) { |
| case STDOUT: |
| name = "stdout"; |
| stream = this.driverProcess.getInputStream(); |
| break; |
| case STDERR: |
| name = "stderr"; |
| stream = this.driverProcess.getErrorStream(); |
| break; |
| default: |
| throw new RuntimeException("Invalid stream type value"); |
| } |
| |
| LOG.log(Level.INFO, "capturing driver process {0}", name); |
| try { |
| stringBuilder.append("\n==============================================\n"); |
| try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { |
| while (reader.ready()) { |
| stringBuilder.append(reader.readLine()).append('\n'); |
| } |
| } |
| stringBuilder.append("\n==============================================\n"); |
| } catch (final IOException e) { |
| LOG.log(Level.WARNING, "Error while capturing output stream", e); |
| } |
| LOG.log(Level.INFO, "{0}", stringBuilder); |
| } |
| |
| /** |
| * Determines if the driver process is still alive by |
| * testing for its exit value, which throws {@link IllegalThreadStateException} |
| * if process is still running. |
| * @return true if driver process is alive, false otherwise |
| */ |
| private boolean driverProcessIsAlive() { |
| if (this.driverProcess != null) { |
| try { |
| this.driverProcess.exitValue(); |
| } catch (final IllegalThreadStateException e) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| |
| @Override |
| public IdleMessage getIdleStatus() { |
| final String componentName = "Java Bridge DriverService"; |
| synchronized (this) { |
| if (this.clientStub != null) { |
| try { |
| LOG.log(Level.INFO, "{0} getting idle status", componentName); |
| final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(VOID).get(); |
| LOG.log(Level.INFO, "is idle: {0}", idleStatus.getIsIdle()); |
| return new IdleMessage( |
| componentName, |
| idleStatus.getReason(), |
| idleStatus.getIsIdle()); |
| } catch (final ExecutionException | InterruptedException e) { |
| stop(e); |
| } |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| return new IdleMessage( |
| componentName, |
| "stub not initialized", |
| true); |
| } |
| } |
| |
| @Override |
| public void startHandler(final StartTime startTime) { |
| try { |
| start(); |
| } catch (final IOException | InterruptedException e) { |
| throw new RuntimeException("unable to start driver client", e); |
| } |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.startHandler( |
| StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build()); |
| } else if (!stopped) { |
| stop(new RuntimeException("Unable to start driver client")); |
| } |
| } |
| } |
| |
| @Override |
| public void stopHandler(final StopTime stopTime) { |
| synchronized (this) { |
| if (clientStub != null) { |
| LOG.log(Level.INFO, "Stop handler called at {0}", stopTime); |
| final Future<ExceptionInfo> callCompletion = this.clientStub.stopHandler( |
| StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build()); |
| try { |
| final ExceptionInfo error = callCompletion.get(5L, TimeUnit.MINUTES); |
| if (!error.getNoError()) { |
| final Optional<Throwable> t = parseException(error); |
| if (t.isPresent()) { |
| throw new RuntimeException("driver stop exception", |
| t.get().getCause() != null ? t.get().getCause() : t.get()); |
| } else { |
| throw new RuntimeException(error.getName(), |
| new NonSerializableException(error.getMessage(), error.getData().toByteArray())); |
| } |
| } |
| } catch (final TimeoutException e) { |
| throw new RuntimeException("stop handler timed out", e); |
| } catch (final InterruptedException | ExecutionException e) { |
| throw new RuntimeException("error in stop handler", e); |
| } finally { |
| stop(); |
| } |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.allocatedEvaluatorMap.put(eval.getId(), eval); |
| this.clientStub.allocatedEvaluatorHandler( |
| EvaluatorInfo.newBuilder() |
| .setEvaluatorId(eval.getId()) |
| .setDescriptorInfo( |
| GRPCUtils.toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void completedEvaluatorHandler(final CompletedEvaluator eval) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.allocatedEvaluatorMap.remove(eval.getId()); |
| this.clientStub.completedEvaluatorHandler( |
| EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void failedEvaluatorHandler(final FailedEvaluator eval) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.allocatedEvaluatorMap.remove(eval.getId()); |
| this.clientStub.failedEvaluatorHandler( |
| EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void activeContextHandler(final ActiveContext context) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.activeContextMap.put(context.getId(), context); |
| this.clientStub.activeContextHandler(GRPCUtils.toContextInfo(context)); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void closedContextHandler(final ClosedContext context) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.activeContextMap.remove(context.getId()); |
| this.clientStub.closedContextHandler(GRPCUtils.toContextInfo(context)); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void failedContextHandler(final FailedContext context) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| final ExceptionInfo error; |
| if (context.getReason().isPresent()) { |
| final Throwable reason = context.getReason().get(); |
| error = GRPCUtils.createExceptionInfo(this.exceptionCodec, reason); |
| } else if (context.getData().isPresent()) { |
| error = ExceptionInfo.newBuilder() |
| .setName(context.toString()) |
| .setMessage(context.getDescription().orElse( |
| context.getMessage() != null ? context.getMessage() : "")) |
| .setData(ByteString.copyFrom(context.getData().get())) |
| .build(); |
| } else { |
| error = GRPCUtils.createExceptionInfo(this.exceptionCodec, context.asError()); |
| } |
| this.activeContextMap.remove(context.getId()); |
| this.clientStub.failedContextHandler(GRPCUtils.toContextInfo(context, error)); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void contextMessageHandler(final ContextMessage message) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.contextMessageHandler( |
| ContextMessageInfo.newBuilder() |
| .setContextId(message.getId()) |
| .setMessageSourceId(message.getMessageSourceID()) |
| .setSequenceNumber(message.getSequenceNumber()) |
| .setPayload(ByteString.copyFrom(message.get())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void runningTaskHandler(final RunningTask task) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| final ActiveContext context = task.getActiveContext(); |
| if (!this.activeContextMap.containsKey(context.getId())) { |
| this.activeContextMap.put(context.getId(), context); |
| } |
| this.runningTaskMap.put(task.getId(), task); |
| this.clientStub.runningTaskHandler( |
| TaskInfo.newBuilder() |
| .setTaskId(task.getId()) |
| .setContext(GRPCUtils.toContextInfo(context)) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void failedTaskHandler(final FailedTask task) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| if (task.getActiveContext().isPresent() && |
| !this.activeContextMap.containsKey(task.getActiveContext().get().getId())) { |
| this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get()); |
| } |
| final TaskInfo.Builder taskInfoBuilder = TaskInfo.newBuilder() |
| .setTaskId(task.getId()); |
| if (task.getActiveContext().isPresent()) { |
| taskInfoBuilder.setContext(GRPCUtils.toContextInfo(task.getActiveContext().get())); |
| } |
| if (task.getReason().isPresent()) { |
| LOG.log(Level.WARNING, "Task exception present", task.getReason().get()); |
| taskInfoBuilder.setException(GRPCUtils.createExceptionInfo(this.exceptionCodec, task.getReason().get())); |
| } else if (task.getData().isPresent()) { |
| LOG.log(Level.WARNING, "Not able to deserialize task exception {0}", task.getMessage()); |
| final Throwable reason = task.asError(); |
| taskInfoBuilder.setException(ExceptionInfo.newBuilder() |
| .setName(reason.toString()) |
| .setMessage(StringUtils.isNotEmpty(task.getMessage()) ? task.getMessage() : reason.toString()) |
| .setData(ByteString.copyFrom(task.getData().get())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "Serialize generic error"); |
| taskInfoBuilder.setException(GRPCUtils.createExceptionInfo(this.exceptionCodec, task.asError())); |
| } |
| this.runningTaskMap.remove(task.getId()); |
| this.clientStub.failedTaskHandler(taskInfoBuilder.build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void completedTaskHandler(final CompletedTask task) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) { |
| this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext()); |
| } |
| this.runningTaskMap.remove(task.getId()); |
| this.clientStub.completedTaskHandler( |
| TaskInfo.newBuilder() |
| .setTaskId(task.getId()) |
| .setContext(GRPCUtils.toContextInfo(task.getActiveContext())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void suspendedTaskHandler(final SuspendedTask task) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) { |
| this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext()); |
| } |
| this.runningTaskMap.remove(task.getId()); |
| this.clientStub.suspendedTaskHandler( |
| TaskInfo.newBuilder() |
| .setTaskId(task.getId()) |
| .setContext(GRPCUtils.toContextInfo(task.getActiveContext())) |
| .setResult(task.get() == null || task.get().length == 0 ? |
| null : ByteString.copyFrom(task.get())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void taskMessageHandler(final TaskMessage message) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.taskMessageHandler( |
| TaskMessageInfo.newBuilder() |
| .setTaskId(message.getId()) |
| .setContextId(message.getContextId()) |
| .setMessageSourceId(message.getMessageSourceID()) |
| .setSequenceNumber(message.getSequenceNumber()) |
| .setPayload(ByteString.copyFrom(message.get())) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void clientMessageHandler(final byte[] message) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.clientMessageHandler( |
| ClientMessageInfo.newBuilder() |
| .setPayload(ByteString.copyFrom(message)) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void clientCloseHandler() { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.clientCloseHandler(VOID); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void clientCloseWithMessageHandler(final byte[] message) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.clientCloseWithMessageHandler( |
| ClientMessageInfo.newBuilder() |
| .setPayload(ByteString.copyFrom(message)) |
| .build()); |
| } else { |
| LOG.log(Level.WARNING, "client shutdown has already completed"); |
| } |
| } |
| } |
| |
| @Override |
| public void driverRestarted(final DriverRestarted restart) { |
| try { |
| start(); |
| } catch (final InterruptedException | IOException e) { |
| throw new RuntimeException("unable to start driver client", e); |
| } |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilder() |
| .setResubmissionAttempts(restart.getResubmissionAttempts()) |
| .setStartTime(StartTimeInfo.newBuilder() |
| .setStartTime(restart.getStartTime().getTimestamp()).build()) |
| .addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds()) |
| .build()); |
| } else { |
| throw new RuntimeException("client stub not running"); |
| } |
| } |
| } |
| |
| @Override |
| public void restartRunningTask(final RunningTask task) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| final ActiveContext context = task.getActiveContext(); |
| if (!this.activeContextMap.containsKey(context.getId())) { |
| this.activeContextMap.put(context.getId(), context); |
| } |
| this.runningTaskMap.put(task.getId(), task); |
| this.clientStub.driverRestartRunningTaskHandler( |
| TaskInfo.newBuilder() |
| .setTaskId(task.getId()) |
| .setContext(GRPCUtils.toContextInfo(context)) |
| .build()); |
| } else { |
| throw new RuntimeException("client stub not running"); |
| } |
| } |
| } |
| |
| @Override |
| public void restartActiveContext(final ActiveContext context) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.activeContextMap.put(context.getId(), context); |
| this.clientStub.driverRestartActiveContextHandler( |
| GRPCUtils.toContextInfo(context)); |
| } else { |
| throw new RuntimeException("client stub not running"); |
| } |
| } |
| } |
| |
| @Override |
| public void driverRestartCompleted(final DriverRestartCompleted restartCompleted) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.driverRestartCompletedHandler(DriverRestartCompletedInfo.newBuilder() |
| .setCompletionTime(StopTimeInfo.newBuilder() |
| .setStopTime(restartCompleted.getCompletedTime().getTimestamp()).build()) |
| .setIsTimedOut(restartCompleted.isTimedOut()) |
| .build()); |
| } else { |
| throw new RuntimeException("client stub not running"); |
| } |
| } |
| } |
| |
| @Override |
| public void restartFailedEvalautor(final FailedEvaluator evaluator) { |
| synchronized (this) { |
| if (this.clientStub != null) { |
| this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.newBuilder() |
| .setEvaluatorId(evaluator.getId()) |
| .setFailure(EvaluatorInfo.FailureInfo.newBuilder() |
| .setMessage(evaluator.getEvaluatorException() != null ? |
| evaluator.getEvaluatorException().getMessage() : "unknown failure during restart") |
| .build()) |
| .build()); |
| } else { |
| throw new RuntimeException("client stub not running"); |
| } |
| } |
| } |
| |
| private Optional<Throwable> parseException(final ExceptionInfo info) { |
| if (info.getData().isEmpty()) { |
| return Optional.empty(); |
| } else { |
| return exceptionCodec.fromBytes(info.getData().toByteArray()); |
| } |
| } |
| |
| private final class DriverBridgeServiceImpl |
| extends DriverServiceGrpc.DriverServiceImplBase { |
| |
| @Override |
| public void registerDriverClient( |
| final DriverClientRegistration request, |
| final StreamObserver<Void> responseObserver) { |
| LOG.log(Level.INFO, "driver client register"); |
| synchronized (GRPCDriverService.this) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| if (request.hasException()) { |
| LOG.log(Level.SEVERE, "Driver client initialization exception"); |
| final Optional<Throwable> optionalEx = parseException(request.getException()); |
| final Throwable ex; |
| if (optionalEx.isPresent()) { |
| ex = optionalEx.get(); |
| } else if (!request.getException().getData().isEmpty()) { |
| ex = new NonSerializableException(request.getException().getMessage(), |
| request.getException().getData().toByteArray()); |
| } else { |
| ex = new RuntimeException(request.getException().getMessage()); |
| } |
| stop(ex); |
| } else { |
| final ManagedChannel channel = ManagedChannelBuilder |
| .forAddress(request.getHost(), request.getPort()) |
| .usePlaintext() |
| .build(); |
| GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel); |
| LOG.log(Level.INFO, "Driver has registered on port {0}", request.getPort()); |
| } |
| } finally { |
| GRPCDriverService.this.notifyAll(); |
| } |
| } |
| } |
| |
| @Override |
| public void requestResources( |
| final ResourceRequest request, |
| final StreamObserver<Void> responseObserver) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| synchronized (GRPCDriverService.this) { |
| EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest(); |
| requestBuilder.setNumber(request.getResourceCount()); |
| requestBuilder.setNumberOfCores(request.getCores()); |
| requestBuilder.setMemory(request.getMemorySize()); |
| requestBuilder.setRelaxLocality(request.getRelaxLocality()); |
| requestBuilder.setRuntimeName(request.getRuntimeName()); |
| if (request.getNodeNameListCount() > 0) { |
| requestBuilder.addNodeNames(request.getNodeNameListList()); |
| } |
| if (request.getRackNameListCount() > 0) { |
| for (final String rackName : request.getRackNameListList()) { |
| requestBuilder.addRackName(rackName); |
| } |
| } |
| GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build()); |
| } |
| } |
| } |
| |
| @Override |
| public void shutdown( |
| final ShutdownRequest request, |
| final StreamObserver<Void> responseObserver) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| LOG.log(Level.INFO, "driver shutdown"); |
| if (request.hasException()) { |
| final Optional<Throwable> exception = parseException(request.getException()); |
| if (exception.isPresent()) { |
| LOG.log(Level.INFO, "driver exception", exception.get()); |
| GRPCDriverService.this.clock.stop(exception.get()); |
| } else { |
| // exception that cannot be parsed in java |
| GRPCDriverService.this.clock.stop( |
| new NonSerializableException( |
| request.getException().getMessage(), |
| request.getException().getData().toByteArray())); |
| } |
| } else { |
| LOG.log(Level.INFO, "clean shutdown"); |
| GRPCDriverService.this.clock.stop(); |
| } |
| } |
| } |
| |
| @Override |
| public void setAlarm( |
| final AlarmRequest request, |
| final StreamObserver<Void> responseObserver) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| // do not synchronize when scheduling an alarm (or deadlock) |
| LOG.log(Level.INFO, "Set alarm {0} offset {1}", |
| new Object[] {request.getAlarmId(), request.getTimeoutMs()}); |
| LOG.log(Level.INFO, "Alarm class {0}", GRPCDriverService.this.clock.getClass()); |
| GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() { |
| @Override |
| public void onNext(final Alarm value) { |
| LOG.log(Level.INFO, "Trigger alarm {0}", request.getAlarmId()); |
| synchronized (GRPCDriverService.this) { |
| GRPCDriverService.this.clientStub.alarmTrigger( |
| AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build()); |
| LOG.log(Level.INFO, "DONE: trigger alarm {0}", request.getAlarmId()); |
| } |
| } |
| }); |
| LOG.log(Level.INFO, "Alarm {0} scheduled is idle? {1}", |
| new Object[] {request.getAlarmId(), clock.isIdle()}); |
| } |
| } |
| |
| @Override |
| public void allocatedEvaluatorOp( |
| final AllocatedEvaluatorRequest request, |
| final StreamObserver<Void> responseObserver) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| synchronized (GRPCDriverService.this) { |
| final AllocatedEvaluator evaluator = |
| GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId()); |
| if (evaluator == null) { |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Unknown allocated evaluator " + request.getEvaluatorId()) |
| .asRuntimeException()); |
| return; |
| } |
| // Close evaluator? |
| if (request.getCloseEvaluator()) { |
| evaluator.close(); |
| return; |
| } |
| |
| // Ensure context and/or task |
| if (StringUtils.isEmpty(request.getContextConfiguration()) && |
| StringUtils.isEmpty(request.getTaskConfiguration())) { |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Context and/or Task configuration required") |
| .asRuntimeException()); |
| return; |
| } |
| for (final String file : request.getAddFilesList()) { |
| evaluator.addFile(new File(file)); |
| } |
| for (final String library : request.getAddLibrariesList()) { |
| evaluator.addLibrary(new File(library)); |
| } |
| if (request.hasSetProcess()) { |
| final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest = |
| request.getSetProcess(); |
| switch (processRequest.getProcessType()) { |
| case JVM: |
| LOG.log(Level.INFO, "Setting JVM Process"); |
| setJVMProcess(evaluator, processRequest); |
| break; |
| case CLR: |
| LOG.log(Level.INFO, "Setting CLR Process"); |
| setEvaluatorProcess(evaluator, clrProcessFactory.newEvaluatorProcess(), processRequest); |
| break; |
| case DOTNET: |
| LOG.log(Level.INFO, "Setting DOTNET Process"); |
| setEvaluatorProcess(evaluator, dotNetProcessFactory.newEvaluatorProcess(), processRequest); |
| break; |
| default: |
| throw new RuntimeException("Unknown evaluator process type"); |
| } |
| } |
| final String evaluatorConfiguration = StringUtils.isNotEmpty(request.getEvaluatorConfiguration()) ? |
| request.getEvaluatorConfiguration() : null; |
| final String contextConfiguration = StringUtils.isNotEmpty(request.getContextConfiguration()) ? |
| request.getContextConfiguration() : null; |
| final String serviceConfiguration = StringUtils.isNotEmpty(request.getServiceConfiguration()) ? |
| request.getServiceConfiguration() : null; |
| final String taskConfiguration = StringUtils.isNotEmpty(request.getTaskConfiguration()) ? |
| request.getTaskConfiguration() : null; |
| if (contextConfiguration != null && taskConfiguration != null) { |
| if (serviceConfiguration == null) { |
| LOG.log(Level.INFO, "Submitting evaluator with context and task"); |
| ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask( |
| evaluatorConfiguration, |
| contextConfiguration, |
| taskConfiguration); |
| } else { |
| LOG.log(Level.INFO, "Submitting evaluator with context and service and task"); |
| ((AllocatedEvaluatorImpl) evaluator).submitContextAndServiceAndTask( |
| evaluatorConfiguration, |
| contextConfiguration, |
| serviceConfiguration, |
| taskConfiguration); |
| } |
| } else if (contextConfiguration != null) { |
| // submit context |
| if (serviceConfiguration == null) { |
| LOG.log(Level.INFO, "Submitting evaluator with context"); |
| ((AllocatedEvaluatorImpl) evaluator).submitContext(evaluatorConfiguration, contextConfiguration); |
| } else { |
| LOG.log(Level.INFO, "Submitting evaluator with context and service"); |
| ((AllocatedEvaluatorImpl) evaluator) |
| .submitContextAndService(evaluatorConfiguration, contextConfiguration, serviceConfiguration); |
| } |
| } else if (taskConfiguration != null) { |
| // submit task |
| if (serviceConfiguration != null) { |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Service must be accompanied by a context configuration") |
| .asRuntimeException()); |
| } else { |
| LOG.log(Level.INFO, "Submitting evaluator with task"); |
| ((AllocatedEvaluatorImpl) evaluator).submitTask(evaluatorConfiguration, taskConfiguration); |
| } |
| } else { |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Missing check for required evaluator configurations") |
| .asRuntimeException()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void activeContextOp( |
| final ActiveContextRequest request, |
| final StreamObserver<Void> responseObserver) { |
| LOG.log(Level.INFO, "Active context operation {0}", request.getOperationCase()); |
| synchronized (GRPCDriverService.this) { |
| LOG.log(Level.INFO, "i'm in"); |
| final String contextId = request.getContextId(); |
| final ActiveContext context = GRPCDriverService.this.activeContextMap.get(contextId); |
| if (context == null) { |
| LOG.log(Level.SEVERE, "Context does not exist with id {0}", contextId); |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Context does not exist with id " + contextId) |
| .asRuntimeException()); |
| return; |
| } |
| switch (request.getOperationCase()) { |
| case CLOSE_CONTEXT: |
| if (request.getCloseContext()) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| LOG.log(Level.INFO, "closing context {0}", context.getId()); |
| context.close(); |
| } |
| } else { |
| LOG.log(Level.SEVERE, "Close context operation not set to true"); |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Close context operation not set to true") |
| .asRuntimeException()); |
| } |
| break; |
| case MESSAGE: |
| if (request.getMessage() != null) { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| LOG.log(Level.INFO, "send message to context {0}", context.getId()); |
| context.sendMessage(request.getMessage().toByteArray()); |
| } |
| } else { |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Empty message on operation send message").asRuntimeException()); |
| } |
| break; |
| case NEW_CONTEXT_REQUEST: |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| LOG.log(Level.INFO, "submitting child context to context {0}", context.getId()); |
| ((EvaluatorContext) context).submitContext(request.getNewContextRequest()); |
| } |
| break; |
| case NEW_TASK_REQUEST: |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| LOG.log(Level.INFO, "submitting task to context {0}", context.getId()); |
| ((EvaluatorContext) context).submitTask(request.getNewTaskRequest()); |
| } |
| break; |
| default: |
| throw new RuntimeException("Unknown operation " + request.getOperationCase()); |
| } |
| } |
| } |
| |
| @Override |
| public void runningTaskOp( |
| final RunningTaskRequest request, |
| final StreamObserver<Void> responseObserver) { |
| synchronized (GRPCDriverService.this) { |
| if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) { |
| LOG.log(Level.WARNING, "Unknown task id {0}", request.getTaskId()); |
| responseObserver.onError(Status.INTERNAL |
| .withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException()); |
| } else { |
| try (ObserverCleanup cleanup = ObserverCleanup.of(responseObserver)) { |
| final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId()); |
| switch (request.getOperation()) { |
| case CLOSE: |
| LOG.log(Level.INFO, "close task {0}", task.getId()); |
| if (request.getMessage().isEmpty()) { |
| task.close(); |
| } else { |
| task.close(request.getMessage().toByteArray()); |
| } |
| break; |
| case SUSPEND: |
| LOG.log(Level.INFO, "suspend task {0}", task.getId()); |
| if (request.getMessage().isEmpty()) { |
| task.suspend(); |
| } else { |
| task.suspend(request.getMessage().toByteArray()); |
| } |
| break; |
| case SEND_MESSAGE: |
| LOG.log(Level.INFO, "send message to task {0}", task.getId()); |
| task.send(request.getMessage().toByteArray()); |
| break; |
| default: |
| throw new RuntimeException("Unknown operation " + request.getOperation()); |
| } |
| } |
| } |
| } |
| } |
| |
| private void setEvaluatorProcess( |
| final AllocatedEvaluator evaluator, |
| final EvaluatorProcess process, |
| final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { |
| if (processRequest.getMemoryMb() > 0) { |
| process.setMemory(processRequest.getMemoryMb()); |
| } |
| if (StringUtils.isNotEmpty(processRequest.getConfigurationFileName())) { |
| process.setConfigurationFileName(processRequest.getConfigurationFileName()); |
| } |
| if (StringUtils.isNotEmpty(processRequest.getStandardOut())) { |
| process.setStandardOut(processRequest.getStandardOut()); |
| } |
| if (StringUtils.isNotEmpty(processRequest.getStandardErr())) { |
| process.setStandardErr(processRequest.getStandardErr()); |
| } |
| evaluator.setProcess(process); |
| } |
| |
| private void setJVMProcess( |
| final AllocatedEvaluator evaluator, |
| final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { |
| final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess(); |
| setEvaluatorProcess(evaluator, process, processRequest); |
| if (processRequest.getOptionsCount() > 0) { |
| for (final String option : processRequest.getOptionsList()) { |
| process.addOption(option); |
| } |
| } |
| evaluator.setProcess(process); |
| } |
| } |
| } |